http://git-wip-us.apache.org/repos/asf/hadoop/blob/bb0a24ca/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/reader/TimelineReaderWebServicesUtils.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/reader/TimelineReaderWebServicesUtils.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/reader/TimelineReaderWebServicesUtils.java new file mode 100644 index 0000000..7fc8cb8 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/reader/TimelineReaderWebServicesUtils.java @@ -0,0 +1,247 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.timelineservice.reader; + +import java.util.EnumSet; + +import javax.servlet.http.HttpServletRequest; + +import org.apache.commons.io.IOUtils; +import org.apache.hadoop.security.UserGroupInformation; +import org.apache.hadoop.yarn.server.timelineservice.reader.filter.TimelineFilterList; +import org.apache.hadoop.yarn.server.timelineservice.storage.TimelineReader.Field; + +/** + * Set of utility methods to be used by timeline reader web services. + */ +final class TimelineReaderWebServicesUtils { + + private TimelineReaderWebServicesUtils() { + } + + /** + * Parse the passed context information represented as strings and convert + * into a {@link TimelineReaderContext} object. + * @param clusterId Cluster Id. + * @param userId User Id. + * @param flowName Flow Name. + * @param flowRunId Run id for the flow. + * @param appId App Id. + * @param entityType Entity Type. + * @param entityId Entity Id. + * @return a {@link TimelineReaderContext} object. + */ + static TimelineReaderContext createTimelineReaderContext(String clusterId, + String userId, String flowName, String flowRunId, String appId, + String entityType, String entityId) { + return new TimelineReaderContext(parseStr(clusterId), parseStr(userId), + parseStr(flowName), parseLongStr(flowRunId), parseStr(appId), + parseStr(entityType), parseStr(entityId)); + } + + /** + * Parse the passed filters represented as strings and convert them into a + * {@link TimelineEntityFilters} object. + * @param limit Limit to number of entities to return. + * @param createdTimeStart Created time start for the entities to return. + * @param createdTimeEnd Created time end for the entities to return. + * @param relatesTo Entities to return must match relatesTo. + * @param isRelatedTo Entities to return must match isRelatedTo. + * @param infofilters Entities to return must match these info filters. + * @param conffilters Entities to return must match these metric filters. + * @param metricfilters Entities to return must match these metric filters. + * @param eventfilters Entities to return must match these event filters. + * @return a {@link TimelineEntityFilters} object. + * @throws TimelineParseException if any problem occurs during parsing. + */ + static TimelineEntityFilters createTimelineEntityFilters(String limit, + String createdTimeStart, String createdTimeEnd, String relatesTo, + String isRelatedTo, String infofilters, String conffilters, + String metricfilters, String eventfilters) throws TimelineParseException { + return new TimelineEntityFilters(parseLongStr(limit), + parseLongStr(createdTimeStart), parseLongStr(createdTimeEnd), + parseRelationFilters(relatesTo), parseRelationFilters(isRelatedTo), + parseKVFilters(infofilters, false), parseKVFilters(conffilters, true), + parseMetricFilters(metricfilters), parseEventFilters(eventfilters)); + } + + /** + * Parse the passed fields represented as strings and convert them into a + * {@link TimelineDataToRetrieve} object. + * @param confs confs to retrieve. + * @param metrics metrics to retrieve. + * @param fields fields to retrieve. + * @param metricsLimit upper limit on number of metrics to return. + * @return a {@link TimelineDataToRetrieve} object. + * @throws TimelineParseException if any problem occurs during parsing. + */ + static TimelineDataToRetrieve createTimelineDataToRetrieve(String confs, + String metrics, String fields, String metricsLimit) + throws TimelineParseException { + return new TimelineDataToRetrieve(parseDataToRetrieve(confs), + parseDataToRetrieve(metrics), parseFieldsStr(fields, + TimelineParseConstants.COMMA_DELIMITER), parseIntStr(metricsLimit)); + } + + /** + * Parse a delimited string and convert it into a set of strings. For + * instance, if delimiter is ",", then the string should be represented as + * "value1,value2,value3". + * @param str delimited string. + * @param delimiter string is delimited by this delimiter. + * @return set of strings. + */ + static TimelineFilterList parseEventFilters(String expr) + throws TimelineParseException { + return parseFilters(new TimelineParserForExistFilters(expr, + TimelineParseConstants.COMMA_CHAR)); + } + + /** + * Parse relation filters. + * @param expr Relation filter expression + * @return a {@link TimelineFilterList} object. + * + * @throws Exception if any problem occurs. + */ + static TimelineFilterList parseRelationFilters(String expr) + throws TimelineParseException { + return parseFilters(new TimelineParserForRelationFilters(expr, + TimelineParseConstants.COMMA_CHAR, + TimelineParseConstants.COLON_DELIMITER)); + } + + private static TimelineFilterList parseFilters(TimelineParser parser) + throws TimelineParseException { + try { + return parser.parse(); + } finally { + IOUtils.closeQuietly(parser); + } + } + + /** + * Parses config and info filters. + * + * @param expr Expression to be parsed. + * @param valueAsString true, if value has to be interpreted as string, false + * otherwise. It is true for config filters and false for info filters. + * @return a {@link TimelineFilterList} object. + * @throws TimelineParseException if any problem occurs during parsing. + */ + static TimelineFilterList parseKVFilters(String expr, boolean valueAsString) + throws TimelineParseException { + return parseFilters(new TimelineParserForKVFilters(expr, valueAsString)); + } + + /** + * Interprets passed string as set of fields delimited by passed delimiter. + * For instance, if delimiter is ",", then the passed string should be + * represented as "METRICS,CONFIGS" where the delimited parts of the string + * present in {@link Field}. + * @param str passed string. + * @param delimiter string delimiter. + * @return a set of {@link Field}. + */ + static EnumSet<Field> parseFieldsStr(String str, String delimiter) { + if (str == null) { + return null; + } + String[] strs = str.split(delimiter); + EnumSet<Field> fieldList = EnumSet.noneOf(Field.class); + for (String s : strs) { + fieldList.add(Field.valueOf(s.trim().toUpperCase())); + } + return fieldList; + } + + /** + * Parses metric filters. + * + * @param expr Metric filter expression to be parsed. + * @return a {@link TimelineFilterList} object. + * @throws TimelineParseException if any problem occurs during parsing. + */ + static TimelineFilterList parseMetricFilters(String expr) + throws TimelineParseException { + return parseFilters(new TimelineParserForNumericFilters(expr)); + } + + /** + * Interpret passed string as a long. + * @param str Passed string. + * @return long representation if string is not null, null otherwise. + */ + static Long parseLongStr(String str) { + return str == null ? null : Long.parseLong(str.trim()); + } + + /** + * Interpret passed string as a integer. + * @param str Passed string. + * @return integer representation if string is not null, null otherwise. + */ + static Integer parseIntStr(String str) { + return str == null ? null : Integer.parseInt(str.trim()); + } + + /** + * Trims the passed string if its not null. + * @param str Passed string. + * @return trimmed string if string is not null, null otherwise. + */ + static String parseStr(String str) { + return str == null ? null : str.trim(); + } + + /** + * Get UGI from HTTP request. + * @param req HTTP request. + * @return UGI. + */ + static UserGroupInformation getUser(HttpServletRequest req) { + String remoteUser = req.getRemoteUser(); + UserGroupInformation callerUGI = null; + if (remoteUser != null) { + callerUGI = UserGroupInformation.createRemoteUser(remoteUser); + } + return callerUGI; + } + + /** + * Get username from caller UGI. + * @param callerUGI caller UGI. + * @return username. + */ + static String getUserName(UserGroupInformation callerUGI) { + return ((callerUGI != null) ? callerUGI.getUserName().trim() : ""); + } + + /** + * Parses confstoretrieve and metricstoretrieve. + * @param str String representing confs/metrics to retrieve expression. + * + * @return a {@link TimelineFilterList} object. + * @throws TimelineParseException if any problem occurs during parsing. + */ + static TimelineFilterList parseDataToRetrieve(String expr) + throws TimelineParseException { + return parseFilters(new TimelineParserForDataToRetrieve(expr)); + } +} \ No newline at end of file
http://git-wip-us.apache.org/repos/asf/hadoop/blob/bb0a24ca/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/reader/TimelineUIDConverter.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/reader/TimelineUIDConverter.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/reader/TimelineUIDConverter.java new file mode 100644 index 0000000..08e5405 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/reader/TimelineUIDConverter.java @@ -0,0 +1,247 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.timelineservice.reader; + +import java.util.List; + +/** + * Used for encoding/decoding UID which will be used for query by UI. + */ +enum TimelineUIDConverter { + // Flow UID should contain cluster, user and flow name. + FLOW_UID { + @Override + String encodeUID(TimelineReaderContext context) { + if (context == null) { + return null; + } + if (context.getClusterId() == null || context.getUserId() == null || + context.getFlowName() == null) { + return null; + } + String[] flowNameTupleArr = {context.getClusterId(), context.getUserId(), + context.getFlowName()}; + return joinAndEscapeUIDParts(flowNameTupleArr); + } + + @Override + TimelineReaderContext decodeUID(String uId) throws Exception { + if (uId == null) { + return null; + } + List<String> flowNameTupleList = splitUID(uId); + // Should have 3 parts i.e. cluster, user and flow name. + if (flowNameTupleList.size() != 3) { + return null; + } + return new TimelineReaderContext(flowNameTupleList.get(0), + flowNameTupleList.get(1), flowNameTupleList.get(2), null, + null, null, null); + } + }, + + // Flowrun UID should contain cluster, user, flow name and flowrun id. + FLOWRUN_UID{ + @Override + String encodeUID(TimelineReaderContext context) { + if (context == null) { + return null; + } + if (context.getClusterId() == null || context.getUserId() == null || + context.getFlowName() == null || context.getFlowRunId() == null) { + return null; + } + String[] flowRunTupleArr = {context.getClusterId(), context.getUserId(), + context.getFlowName(), context.getFlowRunId().toString()}; + return joinAndEscapeUIDParts(flowRunTupleArr); + } + + @Override + TimelineReaderContext decodeUID(String uId) throws Exception { + if (uId == null) { + return null; + } + List<String> flowRunTupleList = splitUID(uId); + // Should have 4 parts i.e. cluster, user, flow name and flowrun id. + if (flowRunTupleList.size() != 4) { + return null; + } + return new TimelineReaderContext(flowRunTupleList.get(0), + flowRunTupleList.get(1), flowRunTupleList.get(2), + Long.parseLong(flowRunTupleList.get(3)), null, null, null); + } + }, + + // Application UID should contain cluster, user, flow name, flowrun id + // and app id OR cluster and app id(i.e.without flow context info). + APPLICATION_UID{ + @Override + String encodeUID(TimelineReaderContext context) { + if (context == null) { + return null; + } + if (context.getClusterId() == null || context.getAppId() == null) { + return null; + } + if (context.getUserId() != null && context.getFlowName() != null && + context.getFlowRunId() != null) { + // Flow information exists. + String[] appTupleArr = {context.getClusterId(), context.getUserId(), + context.getFlowName(), context.getFlowRunId().toString(), + context.getAppId()}; + return joinAndEscapeUIDParts(appTupleArr); + } else { + // Only cluster and app information exists. Flow info does not exist. + String[] appTupleArr = {context.getClusterId(), context.getAppId()}; + return joinAndEscapeUIDParts(appTupleArr); + } + } + + @Override + TimelineReaderContext decodeUID(String uId) throws Exception { + if (uId == null) { + return null; + } + List<String> appTupleList = splitUID(uId); + // Should have 5 parts i.e. cluster, user, flow name, flowrun id + // and app id OR should have 2 parts i.e. cluster and app id. + if (appTupleList.size() == 5) { + // Flow information exists. + return new TimelineReaderContext(appTupleList.get(0), + appTupleList.get(1), appTupleList.get(2), + Long.parseLong(appTupleList.get(3)), appTupleList.get(4), + null, null); + } else if (appTupleList.size() == 2) { + // Flow information does not exist. + return new TimelineReaderContext(appTupleList.get(0), null, null, null, + appTupleList.get(1), null, null); + } else { + return null; + } + } + }, + + // Generic Entity UID should contain cluster, user, flow name, flowrun id, + // app id, entity type and entity id OR should contain cluster, appid, entity + // type and entity id(i.e.without flow context info). + GENERIC_ENTITY_UID { + @Override + String encodeUID(TimelineReaderContext context) { + if (context == null) { + return null; + } + if (context.getClusterId() == null || context.getAppId() == null || + context.getEntityType() == null || context.getEntityId() == null) { + return null; + } + if (context.getUserId() != null && context.getFlowName() != null && + context.getFlowRunId() != null) { + // Flow information exists. + String[] entityTupleArr = {context.getClusterId(), context.getUserId(), + context.getFlowName(), context.getFlowRunId().toString(), + context.getAppId(), context.getEntityType(), context.getEntityId()}; + return joinAndEscapeUIDParts(entityTupleArr); + } else { + // Only entity and app information exists. Flow info does not exist. + String[] entityTupleArr = {context.getClusterId(), context.getAppId(), + context.getEntityType(), context.getEntityId()}; + return joinAndEscapeUIDParts(entityTupleArr); + } + } + + @Override + TimelineReaderContext decodeUID(String uId) throws Exception { + if (uId == null) { + return null; + } + List<String> entityTupleList = splitUID(uId); + // Should have 7 parts i.e. cluster, user, flow name, flowrun id, app id, + // entity type and entity id OR should have 4 parts i.e. cluster, app id, + // entity type and entity id. + if (entityTupleList.size() == 7) { + // Flow information exists. + return new TimelineReaderContext(entityTupleList.get(0), + entityTupleList.get(1), entityTupleList.get(2), + Long.parseLong(entityTupleList.get(3)), entityTupleList.get(4), + entityTupleList.get(5), entityTupleList.get(6)); + } else if (entityTupleList.size() == 4) { + // Flow information does not exist. + return new TimelineReaderContext(entityTupleList.get(0), null, null, + null, entityTupleList.get(1), entityTupleList.get(2), + entityTupleList.get(3)); + } else { + return null; + } + } + }; + + /** + * Delimiter used for UID. + */ + public static final char UID_DELIMITER_CHAR = '!'; + + /** + * Escape Character used if delimiter or escape character itself is part of + * different components of UID. + */ + public static final char UID_ESCAPE_CHAR = '*'; + + /** + * Split UID using {@link #UID_DELIMITER_CHAR} and {@link #UID_ESCAPE_CHAR}. + * @param uid UID to be splitted. + * @return a list of different parts of UID split across delimiter. + * @throws IllegalArgumentException if UID is not properly escaped. + */ + private static List<String> splitUID(String uid) + throws IllegalArgumentException { + return TimelineReaderUtils.split(uid, UID_DELIMITER_CHAR, UID_ESCAPE_CHAR); + } + + /** + * Join different parts of UID delimited by {@link #UID_DELIMITER_CHAR} with + * delimiter and escape character escaped using {@link #UID_ESCAPE_CHAR} if + * UID parts contain them. + * @param parts an array of UID parts to be joined. + * @return a string joined using the delimiter with escape and delimiter + * characters escaped if they are part of the string parts to be joined. + * Returns null if one of the parts is null. + */ + private static String joinAndEscapeUIDParts(String[] parts) { + return TimelineReaderUtils.joinAndEscapeStrings(parts, UID_DELIMITER_CHAR, + UID_ESCAPE_CHAR); + } + + /** + * Encodes UID depending on UID implementation. + * + * @param context Reader context. + * @return UID represented as a string. + */ + abstract String encodeUID(TimelineReaderContext context); + + /** + * Decodes UID depending on UID implementation. + * + * @param uId UID to be decoded. + * @return a {@link TimelineReaderContext} object if UID passed can be + * decoded, null otherwise. + * @throws Exception if any problem occurs while decoding. + */ + abstract TimelineReaderContext decodeUID(String uId) throws Exception; +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/bb0a24ca/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/reader/filter/TimelineCompareFilter.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/reader/filter/TimelineCompareFilter.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/reader/filter/TimelineCompareFilter.java new file mode 100644 index 0000000..1127f4a --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/reader/filter/TimelineCompareFilter.java @@ -0,0 +1,147 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.timelineservice.reader.filter; + +import org.apache.hadoop.classification.InterfaceAudience.Private; +import org.apache.hadoop.classification.InterfaceStability.Unstable; + +/** + * Filter class which represents filter to be applied based on key-value pair + * and the relation between them represented by different relational operators. + */ +@Private +@Unstable +public class TimelineCompareFilter extends TimelineFilter { + + private TimelineCompareOp compareOp; + private String key; + private Object value; + // If comparison operator is NOT_EQUAL, this flag decides if we should return + // the entity if key does not exist. + private boolean keyMustExist = true; + + public TimelineCompareFilter() { + } + + public TimelineCompareFilter(TimelineCompareOp op, String key, Object val, + boolean keyMustExistFlag) { + this.compareOp = op; + this.key = key; + this.value = val; + if (op == TimelineCompareOp.NOT_EQUAL) { + this.keyMustExist = keyMustExistFlag; + } else { + this.keyMustExist = true; + } + } + + public TimelineCompareFilter(TimelineCompareOp op, String key, Object val) { + this(op, key, val, true); + } + + @Override + public TimelineFilterType getFilterType() { + return TimelineFilterType.COMPARE; + } + + public TimelineCompareOp getCompareOp() { + return compareOp; + } + + public String getKey() { + return key; + } + + public void setKey(String keyToBeSet) { + key = keyToBeSet; + } + + public Object getValue() { + return value; + } + + public void setCompareOp(TimelineCompareOp timelineCompareOp, + boolean keyExistFlag) { + this.compareOp = timelineCompareOp; + if (timelineCompareOp == TimelineCompareOp.NOT_EQUAL) { + this.keyMustExist = keyExistFlag; + } + } + + public void setValue(Object val) { + value = val; + } + + public boolean getKeyMustExist() { + return keyMustExist; + } + + @Override + public int hashCode() { + final int prime = 31; + int result = 1; + result = prime * result + ((compareOp == null) ? 0 : compareOp.hashCode()); + result = prime * result + ((key == null) ? 0 : key.hashCode()); + result = prime * result + (keyMustExist ? 1231 : 1237); + result = prime * result + ((value == null) ? 0 : value.hashCode()); + return result; + } + + @Override + public boolean equals(Object obj) { + if (this == obj) { + return true; + } + if (obj == null) { + return false; + } + if (getClass() != obj.getClass()) { + return false; + } + TimelineCompareFilter other = (TimelineCompareFilter) obj; + if (compareOp != other.compareOp) { + return false; + } + if (key == null) { + if (other.key != null) { + return false; + } + } else if (!key.equals(other.key)) { + return false; + } + if (keyMustExist != other.keyMustExist) { + return false; + } + if (value == null) { + if (other.value != null) { + return false; + } + } else if (!value.equals(other.value)) { + return false; + } + return true; + } + + @Override + public String toString() { + return String.format("%s (%s, %s:%s:%b)", + this.getClass().getSimpleName(), this.compareOp.name(), + this.key, this.value, this.keyMustExist); + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/hadoop/blob/bb0a24ca/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/reader/filter/TimelineCompareOp.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/reader/filter/TimelineCompareOp.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/reader/filter/TimelineCompareOp.java new file mode 100644 index 0000000..461a7d8 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/reader/filter/TimelineCompareOp.java @@ -0,0 +1,36 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.timelineservice.reader.filter; + +import org.apache.hadoop.classification.InterfaceAudience.Private; +import org.apache.hadoop.classification.InterfaceStability.Unstable; + +/** + * Comparison Operators. + */ +@Private +@Unstable +public enum TimelineCompareOp { + LESS_THAN, + LESS_OR_EQUAL, + EQUAL, + NOT_EQUAL, + GREATER_OR_EQUAL, + GREATER_THAN +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/hadoop/blob/bb0a24ca/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/reader/filter/TimelineExistsFilter.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/reader/filter/TimelineExistsFilter.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/reader/filter/TimelineExistsFilter.java new file mode 100644 index 0000000..b4c8e25 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/reader/filter/TimelineExistsFilter.java @@ -0,0 +1,107 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.timelineservice.reader.filter; + +import org.apache.hadoop.classification.InterfaceAudience.Private; +import org.apache.hadoop.classification.InterfaceStability.Unstable; + +/** + * Filter class which represents filter to be applied based on existence of a + * value. + */ +@Private +@Unstable +public class TimelineExistsFilter extends TimelineFilter { + + private TimelineCompareOp compareOp; + private String value; + + public TimelineExistsFilter() { + } + + public TimelineExistsFilter(TimelineCompareOp op, String value) { + this.value = value; + if (op != TimelineCompareOp.EQUAL && op != TimelineCompareOp.NOT_EQUAL) { + throw new IllegalArgumentException("CompareOp for exists filter should " + + "be EQUAL or NOT_EQUAL"); + } + this.compareOp = op; + } + + @Override + public int hashCode() { + final int prime = 31; + int result = 1; + result = prime * result + ((compareOp == null) ? 0 : compareOp.hashCode()); + result = prime * result + ((value == null) ? 0 : value.hashCode()); + return result; + } + + @Override + public boolean equals(Object obj) { + if (this == obj) { + return true; + } + if (obj == null) { + return false; + } + if (getClass() != obj.getClass()) { + return false; + } + TimelineExistsFilter other = (TimelineExistsFilter) obj; + if (compareOp != other.compareOp) { + return false; + } + if (value == null) { + if (other.value != null) { + return false; + } + } else if (!value.equals(other.value)) { + return false; + } + return true; + } + + @Override + public TimelineFilterType getFilterType() { + return TimelineFilterType.EXISTS; + } + + public void setValue(String val) { + value = val; + } + + public String getValue() { + return value; + } + + public void setCompareOp(TimelineCompareOp op) { + compareOp = op; + } + + public TimelineCompareOp getCompareOp() { + return compareOp; + } + + @Override + public String toString() { + return String.format("%s (%s %s)", + this.getClass().getSimpleName(), this.compareOp.name(), this.value); + } +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/bb0a24ca/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/reader/filter/TimelineFilter.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/reader/filter/TimelineFilter.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/reader/filter/TimelineFilter.java new file mode 100644 index 0000000..5e84976 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/reader/filter/TimelineFilter.java @@ -0,0 +1,68 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.timelineservice.reader.filter; + +import org.apache.hadoop.classification.InterfaceAudience.Private; +import org.apache.hadoop.classification.InterfaceStability.Unstable; + +/** + * Abstract base class extended to implement timeline filters. + */ +@Private +@Unstable +public abstract class TimelineFilter { + + /** + * Lists the different filter types. + */ + @Private + @Unstable + public enum TimelineFilterType { + /** + * Combines multiple filters. + */ + LIST, + /** + * Filter which is used for key-value comparison. + */ + COMPARE, + /** + * Filter which is used for checking key-value equality. + */ + KEY_VALUE, + /** + * Filter which is used for checking key-multiple values equality. + */ + KEY_VALUES, + /** + * Filter which matches prefix for a config or a metric. + */ + PREFIX, + /** + * Filter which checks existence of a value. + */ + EXISTS + } + + public abstract TimelineFilterType getFilterType(); + + public String toString() { + return this.getClass().getSimpleName(); + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/hadoop/blob/bb0a24ca/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/reader/filter/TimelineFilterList.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/reader/filter/TimelineFilterList.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/reader/filter/TimelineFilterList.java new file mode 100644 index 0000000..b4c7ad2 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/reader/filter/TimelineFilterList.java @@ -0,0 +1,141 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.timelineservice.reader.filter; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; + +import org.apache.hadoop.classification.InterfaceAudience.Private; +import org.apache.hadoop.classification.InterfaceStability.Unstable; + +/** + * Implementation of {@link TimelineFilter} that represents an ordered list of + * timeline filters which will then be evaluated with a specified boolean + * operator {@link Operator#AND} or {@link Operator#OR}. Since you can use + * timeline filter lists as children of timeline filter lists, you can create a + * hierarchy of filters to be evaluated. + */ +@Private +@Unstable +public class TimelineFilterList extends TimelineFilter { + /** + * Specifies how filters in the filter list will be evaluated. AND means all + * the filters should match and OR means atleast one should match. + */ + @Private + @Unstable + public static enum Operator { + AND, + OR + } + + private Operator operator; + private List<TimelineFilter> filterList = new ArrayList<TimelineFilter>(); + + public TimelineFilterList(TimelineFilter...filters) { + this(Operator.AND, filters); + } + + public TimelineFilterList() { + this(Operator.AND); + } + + public TimelineFilterList(Operator op) { + this.operator = op; + } + + public TimelineFilterList(Operator op, TimelineFilter...filters) { + this.operator = op; + this.filterList = new ArrayList<TimelineFilter>(Arrays.asList(filters)); + } + + @Override + public TimelineFilterType getFilterType() { + return TimelineFilterType.LIST; + } + + /** + * Get the filter list. + * + * @return filterList + */ + public List<TimelineFilter> getFilterList() { + return filterList; + } + + /** + * Get the operator. + * + * @return operator + */ + public Operator getOperator() { + return operator; + } + + public void setOperator(Operator op) { + operator = op; + } + + public void addFilter(TimelineFilter filter) { + filterList.add(filter); + } + + @Override + public int hashCode() { + final int prime = 31; + int result = 1; + result = + prime * result + ((filterList == null) ? 0 : filterList.hashCode()); + result = + prime * result + ((operator == null) ? 0 : operator.hashCode()); + return result; + } + + @Override + public boolean equals(Object obj) { + if (this == obj) { + return true; + } + if (obj == null) { + return false; + } + if (getClass() != obj.getClass()) { + return false; + } + TimelineFilterList other = (TimelineFilterList) obj; + if (operator != other.operator) { + return false; + } + if (filterList == null) { + if (other.filterList != null) { + return false; + } + } else if (!filterList.equals(other.filterList)) { + return false; + } + return true; + } + + @Override + public String toString() { + return String.format("TimelineFilterList %s (%d): %s", + this.operator, this.filterList.size(), this.filterList.toString()); + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/hadoop/blob/bb0a24ca/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/reader/filter/TimelineFilterUtils.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/reader/filter/TimelineFilterUtils.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/reader/filter/TimelineFilterUtils.java new file mode 100644 index 0000000..cccae26 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/reader/filter/TimelineFilterUtils.java @@ -0,0 +1,290 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.timelineservice.reader.filter; + +import java.io.IOException; +import java.util.HashSet; +import java.util.Set; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.hbase.filter.BinaryComparator; +import org.apache.hadoop.hbase.filter.BinaryPrefixComparator; +import org.apache.hadoop.hbase.filter.FamilyFilter; +import org.apache.hadoop.hbase.filter.CompareFilter.CompareOp; +import org.apache.hadoop.hbase.filter.Filter; +import org.apache.hadoop.hbase.filter.FilterList; +import org.apache.hadoop.hbase.filter.FilterList.Operator; +import org.apache.hadoop.yarn.server.timelineservice.storage.common.Column; +import org.apache.hadoop.yarn.server.timelineservice.storage.common.ColumnFamily; +import org.apache.hadoop.yarn.server.timelineservice.storage.common.ColumnPrefix; +import org.apache.hadoop.hbase.filter.QualifierFilter; +import org.apache.hadoop.hbase.filter.SingleColumnValueFilter; + +/** + * Set of utility methods used by timeline filter classes. + */ +public final class TimelineFilterUtils { + + private static final Log LOG = LogFactory.getLog(TimelineFilterUtils.class); + + private TimelineFilterUtils() { + } + + /** + * Returns the equivalent HBase filter list's {@link Operator}. + * + * @param op timeline filter list operator. + * @return HBase filter list's Operator. + */ + private static Operator getHBaseOperator(TimelineFilterList.Operator op) { + switch (op) { + case AND: + return Operator.MUST_PASS_ALL; + case OR: + return Operator.MUST_PASS_ONE; + default: + throw new IllegalArgumentException("Invalid operator"); + } + } + + /** + * Returns the equivalent HBase compare filter's {@link CompareOp}. + * + * @param op timeline compare op. + * @return HBase compare filter's CompareOp. + */ + private static CompareOp getHBaseCompareOp( + TimelineCompareOp op) { + switch (op) { + case LESS_THAN: + return CompareOp.LESS; + case LESS_OR_EQUAL: + return CompareOp.LESS_OR_EQUAL; + case EQUAL: + return CompareOp.EQUAL; + case NOT_EQUAL: + return CompareOp.NOT_EQUAL; + case GREATER_OR_EQUAL: + return CompareOp.GREATER_OR_EQUAL; + case GREATER_THAN: + return CompareOp.GREATER; + default: + throw new IllegalArgumentException("Invalid compare operator"); + } + } + + /** + * Converts a {@link TimelinePrefixFilter} to an equivalent HBase + * {@link QualifierFilter}. + * @param colPrefix + * @param filter + * @return a {@link QualifierFilter} object + */ + private static <T> Filter createHBaseColQualPrefixFilter( + ColumnPrefix<T> colPrefix, TimelinePrefixFilter filter) { + return new QualifierFilter(getHBaseCompareOp(filter.getCompareOp()), + new BinaryPrefixComparator( + colPrefix.getColumnPrefixBytes(filter.getPrefix()))); + } + + /** + * Create a HBase {@link QualifierFilter} for the passed column prefix and + * compare op. + * + * @param <T> Describes the type of column prefix. + * @param compareOp compare op. + * @param columnPrefix column prefix. + * @return a column qualifier filter. + */ + public static <T> Filter createHBaseQualifierFilter(CompareOp compareOp, + ColumnPrefix<T> columnPrefix) { + return new QualifierFilter(compareOp, + new BinaryPrefixComparator( + columnPrefix.getColumnPrefixBytes(""))); + } + + /** + * Create filters for confs or metrics to retrieve. This list includes a + * configs/metrics family filter and relevant filters for confs/metrics to + * retrieve, if present. + * + * @param <T> Describes the type of column prefix. + * @param confsOrMetricToRetrieve configs/metrics to retrieve. + * @param columnFamily config or metric column family. + * @param columnPrefix config or metric column prefix. + * @return a filter list. + * @throws IOException if any problem occurs while creating the filters. + */ + public static <T> Filter createFilterForConfsOrMetricsToRetrieve( + TimelineFilterList confsOrMetricToRetrieve, ColumnFamily<T> columnFamily, + ColumnPrefix<T> columnPrefix) throws IOException { + Filter familyFilter = new FamilyFilter(CompareOp.EQUAL, + new BinaryComparator(columnFamily.getBytes())); + if (confsOrMetricToRetrieve != null && + !confsOrMetricToRetrieve.getFilterList().isEmpty()) { + // If confsOrMetricsToRetrive are specified, create a filter list based + // on it and family filter. + FilterList filter = new FilterList(familyFilter); + filter.addFilter( + createHBaseFilterList(columnPrefix, confsOrMetricToRetrieve)); + return filter; + } else { + // Only the family filter needs to be added. + return familyFilter; + } + } + + /** + * Create 2 HBase {@link SingleColumnValueFilter} filters for the specified + * value range represented by start and end value and wraps them inside a + * filter list. Start and end value should not be null. + * + * @param <T> Describes the type of column prefix. + * @param column Column for which single column value filter is to be created. + * @param startValue Start value. + * @param endValue End value. + * @return 2 single column value filters wrapped in a filter list. + * @throws IOException if any problem is encountered while encoding value. + */ + public static <T> FilterList createSingleColValueFiltersByRange( + Column<T> column, Object startValue, Object endValue) throws IOException { + FilterList list = new FilterList(); + Filter singleColValFilterStart = createHBaseSingleColValueFilter( + column.getColumnFamilyBytes(), column.getColumnQualifierBytes(), + column.getValueConverter().encodeValue(startValue), + CompareOp.GREATER_OR_EQUAL, true); + list.addFilter(singleColValFilterStart); + + Filter singleColValFilterEnd = createHBaseSingleColValueFilter( + column.getColumnFamilyBytes(), column.getColumnQualifierBytes(), + column.getValueConverter().encodeValue(endValue), + CompareOp.LESS_OR_EQUAL, true); + list.addFilter(singleColValFilterEnd); + return list; + } + + /** + * Creates a HBase {@link SingleColumnValueFilter}. + * + * @param columnFamily Column Family represented as bytes. + * @param columnQualifier Column Qualifier represented as bytes. + * @param value Value. + * @param compareOp Compare operator. + * @param filterIfMissing This flag decides if we should filter the row if the + * specified column is missing. This is based on the filter's keyMustExist + * field. + * @return a {@link SingleColumnValueFilter} object + * @throws IOException + */ + private static SingleColumnValueFilter createHBaseSingleColValueFilter( + byte[] columnFamily, byte[] columnQualifier, byte[] value, + CompareOp compareOp, boolean filterIfMissing) throws IOException { + SingleColumnValueFilter singleColValFilter = + new SingleColumnValueFilter(columnFamily, columnQualifier, compareOp, + new BinaryComparator(value)); + singleColValFilter.setLatestVersionOnly(true); + singleColValFilter.setFilterIfMissing(filterIfMissing); + return singleColValFilter; + } + + /** + * Fetch columns from filter list containing exists and multivalue equality + * filters. This is done to fetch only required columns from back-end and + * then match event filters or relationships in reader. + * + * @param filterList filter list. + * @return set of columns. + */ + public static Set<String> fetchColumnsFromFilterList( + TimelineFilterList filterList) { + Set<String> strSet = new HashSet<String>(); + for (TimelineFilter filter : filterList.getFilterList()) { + switch(filter.getFilterType()) { + case LIST: + strSet.addAll(fetchColumnsFromFilterList((TimelineFilterList)filter)); + break; + case KEY_VALUES: + strSet.add(((TimelineKeyValuesFilter)filter).getKey()); + break; + case EXISTS: + strSet.add(((TimelineExistsFilter)filter).getValue()); + break; + default: + LOG.info("Unexpected filter type " + filter.getFilterType()); + break; + } + } + return strSet; + } + + /** + * Creates equivalent HBase {@link FilterList} from {@link TimelineFilterList} + * while converting different timeline filters(of type {@link TimelineFilter}) + * into their equivalent HBase filters. + * + * @param <T> Describes the type of column prefix. + * @param colPrefix column prefix which will be used for conversion. + * @param filterList timeline filter list which has to be converted. + * @return A {@link FilterList} object. + * @throws IOException if any problem occurs while creating the filter list. + */ + public static <T> FilterList createHBaseFilterList(ColumnPrefix<T> colPrefix, + TimelineFilterList filterList) throws IOException { + FilterList list = + new FilterList(getHBaseOperator(filterList.getOperator())); + for (TimelineFilter filter : filterList.getFilterList()) { + switch(filter.getFilterType()) { + case LIST: + list.addFilter(createHBaseFilterList(colPrefix, + (TimelineFilterList)filter)); + break; + case PREFIX: + list.addFilter(createHBaseColQualPrefixFilter(colPrefix, + (TimelinePrefixFilter)filter)); + break; + case COMPARE: + TimelineCompareFilter compareFilter = (TimelineCompareFilter)filter; + list.addFilter( + createHBaseSingleColValueFilter( + colPrefix.getColumnFamilyBytes(), + colPrefix.getColumnPrefixBytes(compareFilter.getKey()), + colPrefix.getValueConverter(). + encodeValue(compareFilter.getValue()), + getHBaseCompareOp(compareFilter.getCompareOp()), + compareFilter.getKeyMustExist())); + break; + case KEY_VALUE: + TimelineKeyValueFilter kvFilter = (TimelineKeyValueFilter)filter; + list.addFilter( + createHBaseSingleColValueFilter( + colPrefix.getColumnFamilyBytes(), + colPrefix.getColumnPrefixBytes(kvFilter.getKey()), + colPrefix.getValueConverter().encodeValue(kvFilter.getValue()), + getHBaseCompareOp(kvFilter.getCompareOp()), + kvFilter.getKeyMustExist())); + break; + default: + LOG.info("Unexpected filter type " + filter.getFilterType()); + break; + } + } + return list; + } +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/bb0a24ca/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/reader/filter/TimelineKeyValueFilter.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/reader/filter/TimelineKeyValueFilter.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/reader/filter/TimelineKeyValueFilter.java new file mode 100644 index 0000000..8bc8584 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/reader/filter/TimelineKeyValueFilter.java @@ -0,0 +1,61 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.timelineservice.reader.filter; + +import org.apache.hadoop.classification.InterfaceAudience.Private; +import org.apache.hadoop.classification.InterfaceStability.Unstable; + +/** + * Filter class which represents filter to be applied based on key-value pair + * being equal or not to the values in back-end store. + */ +@Private +@Unstable +public class TimelineKeyValueFilter extends TimelineCompareFilter { + public TimelineKeyValueFilter() { + } + + public TimelineKeyValueFilter(TimelineCompareOp op, String key, Object val, + boolean keyMustExistFlag) { + super(op, key, val, keyMustExistFlag); + if (op != TimelineCompareOp.EQUAL && op != TimelineCompareOp.NOT_EQUAL) { + throw new IllegalArgumentException("TimelineCompareOp for equality" + + " filter should be EQUAL or NOT_EQUAL"); + } + } + + public TimelineKeyValueFilter(TimelineCompareOp op, String key, Object val) { + this(op, key, val, true); + } + + @Override + public TimelineFilterType getFilterType() { + return TimelineFilterType.KEY_VALUE; + } + + public void setCompareOp(TimelineCompareOp timelineCompareOp, + boolean keyExistFlag) { + if (timelineCompareOp != TimelineCompareOp.EQUAL && + timelineCompareOp != TimelineCompareOp.NOT_EQUAL) { + throw new IllegalArgumentException("TimelineCompareOp for equality" + + " filter should be EQUAL or NOT_EQUAL"); + } + super.setCompareOp(timelineCompareOp, keyExistFlag); + } +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/bb0a24ca/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/reader/filter/TimelineKeyValuesFilter.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/reader/filter/TimelineKeyValuesFilter.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/reader/filter/TimelineKeyValuesFilter.java new file mode 100644 index 0000000..fe4f6b2 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/reader/filter/TimelineKeyValuesFilter.java @@ -0,0 +1,126 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.timelineservice.reader.filter; + +import java.util.Set; + +import org.apache.hadoop.classification.InterfaceAudience.Private; +import org.apache.hadoop.classification.InterfaceStability.Unstable; + +/** + * Filter class which represents filter to be applied based on multiple values + * for a key and these values being equal or not equal to values in back-end + * store. + */ +@Private +@Unstable +public class TimelineKeyValuesFilter extends TimelineFilter { + private TimelineCompareOp compareOp; + private String key; + private Set<Object> values; + + public TimelineKeyValuesFilter() { + } + + public TimelineKeyValuesFilter(TimelineCompareOp op, String key, + Set<Object> values) { + if (op != TimelineCompareOp.EQUAL && op != TimelineCompareOp.NOT_EQUAL) { + throw new IllegalArgumentException("TimelineCompareOp for multi value " + + "equality filter should be EQUAL or NOT_EQUAL"); + } + this.compareOp = op; + this.key = key; + this.values = values; + } + + @Override + public TimelineFilterType getFilterType() { + return TimelineFilterType.KEY_VALUES; + } + + public String getKey() { + return key; + } + + public Set<Object> getValues() { + return values; + } + + public void setKeyAndValues(String keyForValues, Set<Object> vals) { + key = keyForValues; + values = vals; + } + + public void setCompareOp(TimelineCompareOp op) { + compareOp = op; + } + + public TimelineCompareOp getCompareOp() { + return compareOp; + } + + @Override + public String toString() { + return String.format("%s (%s, %s:%s)", + this.getClass().getSimpleName(), this.compareOp.name(), + this.key, (values == null) ? "" : values.toString()); + } + + @Override + public int hashCode() { + final int prime = 31; + int result = 1; + result = prime * result + ((compareOp == null) ? 0 : compareOp.hashCode()); + result = prime * result + ((key == null) ? 0 : key.hashCode()); + result = prime * result + ((values == null) ? 0 : values.hashCode()); + return result; + } + + @Override + public boolean equals(Object obj) { + if (this == obj) { + return true; + } + if (obj == null) { + return false; + } + if (getClass() != obj.getClass()) { + return false; + } + TimelineKeyValuesFilter other = (TimelineKeyValuesFilter) obj; + if (compareOp != other.compareOp) { + return false; + } + if (key == null) { + if (other.key != null) { + return false; + } + } else if (!key.equals(other.key)) { + return false; + } + if (values == null) { + if (other.values != null) { + return false; + } + } else if (!values.equals(other.values)) { + return false; + } + return true; + } +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/bb0a24ca/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/reader/filter/TimelinePrefixFilter.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/reader/filter/TimelinePrefixFilter.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/reader/filter/TimelinePrefixFilter.java new file mode 100644 index 0000000..bbdc960 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/reader/filter/TimelinePrefixFilter.java @@ -0,0 +1,99 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.timelineservice.reader.filter; + +import org.apache.hadoop.classification.InterfaceAudience.Private; +import org.apache.hadoop.classification.InterfaceStability.Unstable; + +/** + * Filter class which represents filter to be applied based on prefixes. + * Prefixes can either match or not match. + */ +@Private +@Unstable +public class TimelinePrefixFilter extends TimelineFilter { + + private TimelineCompareOp compareOp; + private String prefix; + + public TimelinePrefixFilter() { + } + + public TimelinePrefixFilter(TimelineCompareOp op, String prefix) { + this.prefix = prefix; + if (op != TimelineCompareOp.EQUAL && op != TimelineCompareOp.NOT_EQUAL) { + throw new IllegalArgumentException("CompareOp for prefix filter should " + + "be EQUAL or NOT_EQUAL"); + } + this.compareOp = op; + } + + @Override + public TimelineFilterType getFilterType() { + return TimelineFilterType.PREFIX; + } + + public String getPrefix() { + return prefix; + } + + public TimelineCompareOp getCompareOp() { + return compareOp; + } + + @Override + public String toString() { + return String.format("%s (%s %s)", + this.getClass().getSimpleName(), this.compareOp.name(), this.prefix); + } + + @Override + public int hashCode() { + final int prime = 31; + int result = 1; + result = prime * result + ((compareOp == null) ? 0 : compareOp.hashCode()); + result = prime * result + ((prefix == null) ? 0 : prefix.hashCode()); + return result; + } + + @Override + public boolean equals(Object obj) { + if (this == obj) { + return true; + } + if (obj == null) { + return false; + } + if (getClass() != obj.getClass()) { + return false; + } + TimelinePrefixFilter other = (TimelinePrefixFilter) obj; + if (compareOp != other.compareOp) { + return false; + } + if (prefix == null) { + if (other.prefix != null) { + return false; + } + } else if (!prefix.equals(other.prefix)){ + return false; + } + return true; + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/hadoop/blob/bb0a24ca/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/reader/filter/package-info.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/reader/filter/package-info.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/reader/filter/package-info.java new file mode 100644 index 0000000..f7c0705 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/reader/filter/package-info.java @@ -0,0 +1,28 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +/** + * Package org.apache.hadoop.server.timelineservice.reader.filter stores + * timeline filter implementations. + */ +@InterfaceAudience.Private +@InterfaceStability.Unstable +package org.apache.hadoop.yarn.server.timelineservice.reader.filter; + +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.classification.InterfaceStability; \ No newline at end of file http://git-wip-us.apache.org/repos/asf/hadoop/blob/bb0a24ca/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/reader/package-info.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/reader/package-info.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/reader/package-info.java new file mode 100644 index 0000000..116509a --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/reader/package-info.java @@ -0,0 +1,29 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +/** + * Package org.apache.hadoop.yarn.server.timelineservice.reader contains classes + * which can be used across reader. This package contains classes which are + * not related to storage implementations. + */ +@InterfaceAudience.Private +@InterfaceStability.Unstable +package org.apache.hadoop.yarn.server.timelineservice.reader; + +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.classification.InterfaceStability; \ No newline at end of file http://git-wip-us.apache.org/repos/asf/hadoop/blob/bb0a24ca/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/FileSystemTimelineReaderImpl.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/FileSystemTimelineReaderImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/FileSystemTimelineReaderImpl.java new file mode 100644 index 0000000..047f401 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/FileSystemTimelineReaderImpl.java @@ -0,0 +1,408 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.timelineservice.storage; + +import java.io.BufferedReader; +import java.io.File; +import java.io.FileInputStream; +import java.io.FileNotFoundException; +import java.io.IOException; +import java.io.InputStreamReader; +import java.nio.charset.Charset; +import java.util.Comparator; +import java.util.EnumSet; +import java.util.HashSet; +import java.util.Map; +import java.util.Map.Entry; +import java.util.Set; +import java.util.TreeMap; + +import org.apache.commons.csv.CSVFormat; +import org.apache.commons.csv.CSVParser; +import org.apache.commons.csv.CSVRecord; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.service.AbstractService; +import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity; +import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEvent; +import org.apache.hadoop.yarn.api.records.timelineservice.TimelineMetric; +import org.apache.hadoop.yarn.conf.YarnConfiguration; +import org.apache.hadoop.yarn.server.timelineservice.reader.TimelineDataToRetrieve; +import org.apache.hadoop.yarn.server.timelineservice.reader.TimelineEntityFilters; +import org.apache.hadoop.yarn.server.timelineservice.reader.TimelineReaderContext; +import org.apache.hadoop.yarn.server.timelineservice.storage.common.TimelineStorageUtils; +import org.apache.hadoop.yarn.webapp.YarnJacksonJaxbJsonProvider; +import org.codehaus.jackson.JsonGenerationException; +import org.codehaus.jackson.map.JsonMappingException; +import org.codehaus.jackson.map.ObjectMapper; + +import com.google.common.annotations.VisibleForTesting; + +/** + * File System based implementation for TimelineReader. This implementation may + * not provide a complete implementation of all the necessary features. This + * implementation is provided solely for basic testing purposes, and should not + * be used in a non-test situation. + */ +public class FileSystemTimelineReaderImpl extends AbstractService + implements TimelineReader { + + private static final Log LOG = + LogFactory.getLog(FileSystemTimelineReaderImpl.class); + + private String rootPath; + private static final String ENTITIES_DIR = "entities"; + + /** Default extension for output files. */ + private static final String TIMELINE_SERVICE_STORAGE_EXTENSION = ".thist"; + + @VisibleForTesting + /** Default extension for output files. */ + static final String APP_FLOW_MAPPING_FILE = "app_flow_mapping.csv"; + + @VisibleForTesting + /** Config param for timeline service file system storage root. */ + static final String TIMELINE_SERVICE_STORAGE_DIR_ROOT = + YarnConfiguration.TIMELINE_SERVICE_PREFIX + "fs-writer.root-dir"; + + @VisibleForTesting + /** Default value for storage location on local disk. */ + static final String DEFAULT_TIMELINE_SERVICE_STORAGE_DIR_ROOT = + "/tmp/timeline_service_data"; + + private final CSVFormat csvFormat = + CSVFormat.DEFAULT.withHeader("APP", "USER", "FLOW", "FLOWRUN"); + + public FileSystemTimelineReaderImpl() { + super(FileSystemTimelineReaderImpl.class.getName()); + } + + @VisibleForTesting + String getRootPath() { + return rootPath; + } + + private static ObjectMapper mapper; + + static { + mapper = new ObjectMapper(); + YarnJacksonJaxbJsonProvider.configObjectMapper(mapper); + } + + /** + * Deserialize a POJO object from a JSON string. + * + * @param <T> Describes the type of class to be returned. + * @param clazz class to be deserialized. + * @param jsonString JSON string to deserialize. + * @return An object based on class type. Used typically for + * <cite>TimelineEntity</cite> object. + * @throws IOException if the underlying input source has problems during + * parsing. + * @throws JsonMappingException if parser has problems parsing content. + * @throws JsonGenerationException if there is a problem in JSON writing. + */ + public static <T> T getTimelineRecordFromJSON( + String jsonString, Class<T> clazz) + throws JsonGenerationException, JsonMappingException, IOException { + return mapper.readValue(jsonString, clazz); + } + + private static void fillFields(TimelineEntity finalEntity, + TimelineEntity real, EnumSet<Field> fields) { + if (fields.contains(Field.ALL)) { + fields = EnumSet.allOf(Field.class); + } + for (Field field : fields) { + switch(field) { + case CONFIGS: + finalEntity.setConfigs(real.getConfigs()); + break; + case METRICS: + finalEntity.setMetrics(real.getMetrics()); + break; + case INFO: + finalEntity.setInfo(real.getInfo()); + break; + case IS_RELATED_TO: + finalEntity.setIsRelatedToEntities(real.getIsRelatedToEntities()); + break; + case RELATES_TO: + finalEntity.setIsRelatedToEntities(real.getIsRelatedToEntities()); + break; + case EVENTS: + finalEntity.setEvents(real.getEvents()); + break; + default: + continue; + } + } + } + + private String getFlowRunPath(String userId, String clusterId, + String flowName, Long flowRunId, String appId) throws IOException { + if (userId != null && flowName != null && flowRunId != null) { + return userId + "/" + flowName + "/" + flowRunId; + } + if (clusterId == null || appId == null) { + throw new IOException("Unable to get flow info"); + } + String appFlowMappingFile = rootPath + "/" + ENTITIES_DIR + "/" + + clusterId + "/" + APP_FLOW_MAPPING_FILE; + try (BufferedReader reader = + new BufferedReader(new InputStreamReader( + new FileInputStream( + appFlowMappingFile), Charset.forName("UTF-8"))); + CSVParser parser = new CSVParser(reader, csvFormat)) { + for (CSVRecord record : parser.getRecords()) { + if (record.size() < 4) { + continue; + } + String applicationId = record.get("APP"); + if (applicationId != null && !applicationId.trim().isEmpty() && + !applicationId.trim().equals(appId)) { + continue; + } + return record.get(1).trim() + "/" + record.get(2).trim() + "/" + + record.get(3).trim(); + } + parser.close(); + } + throw new IOException("Unable to get flow info"); + } + + private static TimelineEntity createEntityToBeReturned(TimelineEntity entity, + EnumSet<Field> fieldsToRetrieve) { + TimelineEntity entityToBeReturned = new TimelineEntity(); + entityToBeReturned.setIdentifier(entity.getIdentifier()); + entityToBeReturned.setCreatedTime(entity.getCreatedTime()); + if (fieldsToRetrieve != null) { + fillFields(entityToBeReturned, entity, fieldsToRetrieve); + } + return entityToBeReturned; + } + + private static boolean isTimeInRange(Long time, Long timeBegin, + Long timeEnd) { + return (time >= timeBegin) && (time <= timeEnd); + } + + private static void mergeEntities(TimelineEntity entity1, + TimelineEntity entity2) { + // Ideally created time wont change except in the case of issue from client. + if (entity2.getCreatedTime() != null && entity2.getCreatedTime() > 0) { + entity1.setCreatedTime(entity2.getCreatedTime()); + } + for (Entry<String, String> configEntry : entity2.getConfigs().entrySet()) { + entity1.addConfig(configEntry.getKey(), configEntry.getValue()); + } + for (Entry<String, Object> infoEntry : entity2.getInfo().entrySet()) { + entity1.addInfo(infoEntry.getKey(), infoEntry.getValue()); + } + for (Entry<String, Set<String>> isRelatedToEntry : + entity2.getIsRelatedToEntities().entrySet()) { + String type = isRelatedToEntry.getKey(); + for (String entityId : isRelatedToEntry.getValue()) { + entity1.addIsRelatedToEntity(type, entityId); + } + } + for (Entry<String, Set<String>> relatesToEntry : + entity2.getRelatesToEntities().entrySet()) { + String type = relatesToEntry.getKey(); + for (String entityId : relatesToEntry.getValue()) { + entity1.addRelatesToEntity(type, entityId); + } + } + for (TimelineEvent event : entity2.getEvents()) { + entity1.addEvent(event); + } + for (TimelineMetric metric2 : entity2.getMetrics()) { + boolean found = false; + for (TimelineMetric metric1 : entity1.getMetrics()) { + if (metric1.getId().equals(metric2.getId())) { + metric1.addValues(metric2.getValues()); + found = true; + break; + } + } + if (!found) { + entity1.addMetric(metric2); + } + } + } + + private static TimelineEntity readEntityFromFile(BufferedReader reader) + throws IOException { + TimelineEntity entity = + getTimelineRecordFromJSON(reader.readLine(), TimelineEntity.class); + String entityStr = ""; + while ((entityStr = reader.readLine()) != null) { + if (entityStr.trim().isEmpty()) { + continue; + } + TimelineEntity anotherEntity = + getTimelineRecordFromJSON(entityStr, TimelineEntity.class); + if (!entity.getId().equals(anotherEntity.getId()) || + !entity.getType().equals(anotherEntity.getType())) { + continue; + } + mergeEntities(entity, anotherEntity); + } + return entity; + } + + private Set<TimelineEntity> getEntities(File dir, String entityType, + TimelineEntityFilters filters, TimelineDataToRetrieve dataToRetrieve) + throws IOException { + // First sort the selected entities based on created/start time. + Map<Long, Set<TimelineEntity>> sortedEntities = + new TreeMap<>( + new Comparator<Long>() { + @Override + public int compare(Long l1, Long l2) { + return l2.compareTo(l1); + } + } + ); + for (File entityFile : dir.listFiles()) { + if (!entityFile.getName().contains(TIMELINE_SERVICE_STORAGE_EXTENSION)) { + continue; + } + try (BufferedReader reader = + new BufferedReader( + new InputStreamReader( + new FileInputStream( + entityFile), Charset.forName("UTF-8")))) { + TimelineEntity entity = readEntityFromFile(reader); + if (!entity.getType().equals(entityType)) { + continue; + } + if (!isTimeInRange(entity.getCreatedTime(), + filters.getCreatedTimeBegin(), filters.getCreatedTimeEnd())) { + continue; + } + if (filters.getRelatesTo() != null && + !filters.getRelatesTo().getFilterList().isEmpty() && + !TimelineStorageUtils.matchRelatesTo(entity, + filters.getRelatesTo())) { + continue; + } + if (filters.getIsRelatedTo() != null && + !filters.getIsRelatedTo().getFilterList().isEmpty() && + !TimelineStorageUtils.matchIsRelatedTo(entity, + filters.getIsRelatedTo())) { + continue; + } + if (filters.getInfoFilters() != null && + !filters.getInfoFilters().getFilterList().isEmpty() && + !TimelineStorageUtils.matchInfoFilters(entity, + filters.getInfoFilters())) { + continue; + } + if (filters.getConfigFilters() != null && + !filters.getConfigFilters().getFilterList().isEmpty() && + !TimelineStorageUtils.matchConfigFilters(entity, + filters.getConfigFilters())) { + continue; + } + if (filters.getMetricFilters() != null && + !filters.getMetricFilters().getFilterList().isEmpty() && + !TimelineStorageUtils.matchMetricFilters(entity, + filters.getMetricFilters())) { + continue; + } + if (filters.getEventFilters() != null && + !filters.getEventFilters().getFilterList().isEmpty() && + !TimelineStorageUtils.matchEventFilters(entity, + filters.getEventFilters())) { + continue; + } + TimelineEntity entityToBeReturned = createEntityToBeReturned( + entity, dataToRetrieve.getFieldsToRetrieve()); + Set<TimelineEntity> entitiesCreatedAtSameTime = + sortedEntities.get(entityToBeReturned.getCreatedTime()); + if (entitiesCreatedAtSameTime == null) { + entitiesCreatedAtSameTime = new HashSet<TimelineEntity>(); + } + entitiesCreatedAtSameTime.add(entityToBeReturned); + sortedEntities.put( + entityToBeReturned.getCreatedTime(), entitiesCreatedAtSameTime); + } + } + + Set<TimelineEntity> entities = new HashSet<TimelineEntity>(); + long entitiesAdded = 0; + for (Set<TimelineEntity> entitySet : sortedEntities.values()) { + for (TimelineEntity entity : entitySet) { + entities.add(entity); + ++entitiesAdded; + if (entitiesAdded >= filters.getLimit()) { + return entities; + } + } + } + return entities; + } + + @Override + public void serviceInit(Configuration conf) throws Exception { + rootPath = conf.get(TIMELINE_SERVICE_STORAGE_DIR_ROOT, + DEFAULT_TIMELINE_SERVICE_STORAGE_DIR_ROOT); + super.serviceInit(conf); + } + + @Override + public TimelineEntity getEntity(TimelineReaderContext context, + TimelineDataToRetrieve dataToRetrieve) throws IOException { + String flowRunPath = getFlowRunPath(context.getUserId(), + context.getClusterId(), context.getFlowName(), context.getFlowRunId(), + context.getAppId()); + File dir = new File(new File(rootPath, ENTITIES_DIR), + context.getClusterId() + "/" + flowRunPath + "/" + context.getAppId() + + "/" + context.getEntityType()); + File entityFile = new File( + dir, context.getEntityId() + TIMELINE_SERVICE_STORAGE_EXTENSION); + try (BufferedReader reader = + new BufferedReader(new InputStreamReader( + new FileInputStream(entityFile), Charset.forName("UTF-8")))) { + TimelineEntity entity = readEntityFromFile(reader); + return createEntityToBeReturned( + entity, dataToRetrieve.getFieldsToRetrieve()); + } catch (FileNotFoundException e) { + LOG.info("Cannot find entity {id:" + context.getEntityId() + " , type:" + + context.getEntityType() + "}. Will send HTTP 404 in response."); + return null; + } + } + + @Override + public Set<TimelineEntity> getEntities(TimelineReaderContext context, + TimelineEntityFilters filters, TimelineDataToRetrieve dataToRetrieve) + throws IOException { + String flowRunPath = getFlowRunPath(context.getUserId(), + context.getClusterId(), context.getFlowName(), context.getFlowRunId(), + context.getAppId()); + File dir = + new File(new File(rootPath, ENTITIES_DIR), + context.getClusterId() + "/" + flowRunPath + "/" + + context.getAppId() + "/" + context.getEntityType()); + return getEntities(dir, context.getEntityType(), filters, dataToRetrieve); + } +} \ No newline at end of file --------------------------------------------------------------------- To unsubscribe, e-mail: common-commits-unsubscr...@hadoop.apache.org For additional commands, e-mail: common-commits-h...@hadoop.apache.org