http://git-wip-us.apache.org/repos/asf/hadoop/blob/bb0a24ca/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/reader/TimelineReaderWebServicesUtils.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/reader/TimelineReaderWebServicesUtils.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/reader/TimelineReaderWebServicesUtils.java
new file mode 100644
index 0000000..7fc8cb8
--- /dev/null
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/reader/TimelineReaderWebServicesUtils.java
@@ -0,0 +1,247 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.timelineservice.reader;
+
+import java.util.EnumSet;
+
+import javax.servlet.http.HttpServletRequest;
+
+import org.apache.commons.io.IOUtils;
+import org.apache.hadoop.security.UserGroupInformation;
+import 
org.apache.hadoop.yarn.server.timelineservice.reader.filter.TimelineFilterList;
+import 
org.apache.hadoop.yarn.server.timelineservice.storage.TimelineReader.Field;
+
+/**
+ * Set of utility methods to be used by timeline reader web services.
+ */
+final class TimelineReaderWebServicesUtils {
+
+  private TimelineReaderWebServicesUtils() {
+  }
+
+  /**
+   * Parse the passed context information represented as strings and convert
+   * into a {@link TimelineReaderContext} object.
+   * @param clusterId Cluster Id.
+   * @param userId User Id.
+   * @param flowName Flow Name.
+   * @param flowRunId Run id for the flow.
+   * @param appId App Id.
+   * @param entityType Entity Type.
+   * @param entityId Entity Id.
+   * @return a {@link TimelineReaderContext} object.
+   */
+  static TimelineReaderContext createTimelineReaderContext(String clusterId,
+      String userId, String flowName, String flowRunId, String appId,
+      String entityType, String entityId) {
+    return new TimelineReaderContext(parseStr(clusterId), parseStr(userId),
+        parseStr(flowName), parseLongStr(flowRunId), parseStr(appId),
+        parseStr(entityType), parseStr(entityId));
+  }
+
+  /**
+   * Parse the passed filters represented as strings and convert them into a
+   * {@link TimelineEntityFilters} object.
+   * @param limit Limit to number of entities to return.
+   * @param createdTimeStart Created time start for the entities to return.
+   * @param createdTimeEnd Created time end for the entities to return.
+   * @param relatesTo Entities to return must match relatesTo.
+   * @param isRelatedTo Entities to return must match isRelatedTo.
+   * @param infofilters Entities to return must match these info filters.
+   * @param conffilters Entities to return must match these metric filters.
+   * @param metricfilters Entities to return must match these metric filters.
+   * @param eventfilters Entities to return must match these event filters.
+   * @return a {@link TimelineEntityFilters} object.
+   * @throws TimelineParseException if any problem occurs during parsing.
+   */
+  static TimelineEntityFilters createTimelineEntityFilters(String limit,
+      String createdTimeStart, String createdTimeEnd, String relatesTo,
+      String isRelatedTo, String infofilters, String conffilters,
+      String metricfilters, String eventfilters) throws TimelineParseException 
{
+    return new TimelineEntityFilters(parseLongStr(limit),
+        parseLongStr(createdTimeStart), parseLongStr(createdTimeEnd),
+        parseRelationFilters(relatesTo), parseRelationFilters(isRelatedTo),
+        parseKVFilters(infofilters, false), parseKVFilters(conffilters, true),
+        parseMetricFilters(metricfilters), parseEventFilters(eventfilters));
+  }
+
+  /**
+   * Parse the passed fields represented as strings and convert them into a
+   * {@link TimelineDataToRetrieve} object.
+   * @param confs confs to retrieve.
+   * @param metrics metrics to retrieve.
+   * @param fields fields to retrieve.
+   * @param metricsLimit upper limit on number of metrics to return.
+   * @return a {@link TimelineDataToRetrieve} object.
+   * @throws TimelineParseException if any problem occurs during parsing.
+   */
+  static TimelineDataToRetrieve createTimelineDataToRetrieve(String confs,
+      String metrics, String fields, String metricsLimit)
+      throws TimelineParseException {
+    return new TimelineDataToRetrieve(parseDataToRetrieve(confs),
+        parseDataToRetrieve(metrics), parseFieldsStr(fields,
+        TimelineParseConstants.COMMA_DELIMITER), parseIntStr(metricsLimit));
+  }
+
+  /**
+   * Parse a delimited string and convert it into a set of strings. For
+   * instance, if delimiter is ",", then the string should be represented as
+   * "value1,value2,value3".
+   * @param str delimited string.
+   * @param delimiter string is delimited by this delimiter.
+   * @return set of strings.
+   */
+  static TimelineFilterList parseEventFilters(String expr)
+      throws TimelineParseException {
+    return parseFilters(new TimelineParserForExistFilters(expr,
+        TimelineParseConstants.COMMA_CHAR));
+  }
+
+  /**
+   * Parse relation filters.
+   * @param expr Relation filter expression
+   * @return a {@link TimelineFilterList} object.
+   *
+   * @throws Exception if any problem occurs.
+   */
+  static TimelineFilterList parseRelationFilters(String expr)
+      throws TimelineParseException {
+    return parseFilters(new TimelineParserForRelationFilters(expr,
+        TimelineParseConstants.COMMA_CHAR,
+        TimelineParseConstants.COLON_DELIMITER));
+  }
+
+  private static TimelineFilterList parseFilters(TimelineParser parser)
+      throws TimelineParseException {
+    try {
+      return parser.parse();
+    } finally {
+      IOUtils.closeQuietly(parser);
+    }
+  }
+
+  /**
+   * Parses config and info filters.
+   *
+   * @param expr Expression to be parsed.
+   * @param valueAsString true, if value has to be interpreted as string, false
+   *     otherwise. It is true for config filters and false for info filters.
+   * @return a {@link TimelineFilterList} object.
+   * @throws TimelineParseException if any problem occurs during parsing.
+   */
+  static TimelineFilterList parseKVFilters(String expr, boolean valueAsString)
+      throws TimelineParseException {
+    return parseFilters(new TimelineParserForKVFilters(expr, valueAsString));
+  }
+
+  /**
+   * Interprets passed string as set of fields delimited by passed delimiter.
+   * For instance, if delimiter is ",", then the passed string should be
+   * represented as "METRICS,CONFIGS" where the delimited parts of the string
+   * present in {@link Field}.
+   * @param str passed string.
+   * @param delimiter string delimiter.
+   * @return a set of {@link Field}.
+   */
+  static EnumSet<Field> parseFieldsStr(String str, String delimiter) {
+    if (str == null) {
+      return null;
+    }
+    String[] strs = str.split(delimiter);
+    EnumSet<Field> fieldList = EnumSet.noneOf(Field.class);
+    for (String s : strs) {
+      fieldList.add(Field.valueOf(s.trim().toUpperCase()));
+    }
+    return fieldList;
+  }
+
+  /**
+   * Parses metric filters.
+   *
+   * @param expr Metric filter expression to be parsed.
+   * @return a {@link TimelineFilterList} object.
+   * @throws TimelineParseException if any problem occurs during parsing.
+   */
+  static TimelineFilterList parseMetricFilters(String expr)
+      throws TimelineParseException {
+    return parseFilters(new TimelineParserForNumericFilters(expr));
+  }
+
+  /**
+   * Interpret passed string as a long.
+   * @param str Passed string.
+   * @return long representation if string is not null, null otherwise.
+   */
+  static Long parseLongStr(String str) {
+    return str == null ? null : Long.parseLong(str.trim());
+  }
+
+  /**
+   * Interpret passed string as a integer.
+   * @param str Passed string.
+   * @return integer representation if string is not null, null otherwise.
+   */
+  static Integer parseIntStr(String str) {
+    return str == null ? null : Integer.parseInt(str.trim());
+  }
+
+  /**
+   * Trims the passed string if its not null.
+   * @param str Passed string.
+   * @return trimmed string if string is not null, null otherwise.
+   */
+  static String parseStr(String str) {
+    return str == null ? null : str.trim();
+  }
+
+  /**
+   * Get UGI from HTTP request.
+   * @param req HTTP request.
+   * @return UGI.
+   */
+  static UserGroupInformation getUser(HttpServletRequest req) {
+    String remoteUser = req.getRemoteUser();
+    UserGroupInformation callerUGI = null;
+    if (remoteUser != null) {
+      callerUGI = UserGroupInformation.createRemoteUser(remoteUser);
+    }
+    return callerUGI;
+  }
+
+  /**
+   * Get username from caller UGI.
+   * @param callerUGI caller UGI.
+   * @return username.
+   */
+  static String getUserName(UserGroupInformation callerUGI) {
+    return ((callerUGI != null) ? callerUGI.getUserName().trim() : "");
+  }
+
+  /**
+   * Parses confstoretrieve and metricstoretrieve.
+   * @param str String representing confs/metrics to retrieve expression.
+   *
+   * @return a {@link TimelineFilterList} object.
+   * @throws TimelineParseException if any problem occurs during parsing.
+   */
+  static TimelineFilterList parseDataToRetrieve(String expr)
+        throws TimelineParseException {
+    return parseFilters(new TimelineParserForDataToRetrieve(expr));
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hadoop/blob/bb0a24ca/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/reader/TimelineUIDConverter.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/reader/TimelineUIDConverter.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/reader/TimelineUIDConverter.java
new file mode 100644
index 0000000..08e5405
--- /dev/null
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/reader/TimelineUIDConverter.java
@@ -0,0 +1,247 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.timelineservice.reader;
+
+import java.util.List;
+
+/**
+ * Used for encoding/decoding UID which will be used for query by UI.
+ */
+enum TimelineUIDConverter {
+  // Flow UID should contain cluster, user and flow name.
+  FLOW_UID {
+    @Override
+    String encodeUID(TimelineReaderContext context) {
+      if (context == null) {
+        return null;
+      }
+      if (context.getClusterId() == null || context.getUserId() == null ||
+          context.getFlowName() == null) {
+        return null;
+      }
+      String[] flowNameTupleArr = {context.getClusterId(), context.getUserId(),
+          context.getFlowName()};
+      return joinAndEscapeUIDParts(flowNameTupleArr);
+    }
+
+    @Override
+    TimelineReaderContext decodeUID(String uId) throws Exception {
+      if (uId == null) {
+        return null;
+      }
+      List<String> flowNameTupleList = splitUID(uId);
+      // Should have 3 parts i.e. cluster, user and flow name.
+      if (flowNameTupleList.size() != 3) {
+        return null;
+      }
+      return new TimelineReaderContext(flowNameTupleList.get(0),
+          flowNameTupleList.get(1), flowNameTupleList.get(2), null,
+          null, null, null);
+    }
+  },
+
+  // Flowrun UID should contain cluster, user, flow name and flowrun id.
+  FLOWRUN_UID{
+    @Override
+    String encodeUID(TimelineReaderContext context) {
+      if (context == null) {
+        return null;
+      }
+      if (context.getClusterId() == null || context.getUserId() == null ||
+          context.getFlowName() == null || context.getFlowRunId() == null) {
+        return null;
+      }
+      String[] flowRunTupleArr = {context.getClusterId(), context.getUserId(),
+          context.getFlowName(), context.getFlowRunId().toString()};
+      return joinAndEscapeUIDParts(flowRunTupleArr);
+    }
+
+    @Override
+    TimelineReaderContext decodeUID(String uId) throws Exception {
+      if (uId == null) {
+        return null;
+      }
+      List<String> flowRunTupleList = splitUID(uId);
+      // Should have 4 parts i.e. cluster, user, flow name and flowrun id.
+      if (flowRunTupleList.size() != 4) {
+        return null;
+      }
+      return new TimelineReaderContext(flowRunTupleList.get(0),
+          flowRunTupleList.get(1), flowRunTupleList.get(2),
+          Long.parseLong(flowRunTupleList.get(3)), null, null, null);
+    }
+  },
+
+  // Application UID should contain cluster, user, flow name, flowrun id
+  // and app id OR cluster and app id(i.e.without flow context info).
+  APPLICATION_UID{
+    @Override
+    String encodeUID(TimelineReaderContext context) {
+      if (context == null) {
+        return null;
+      }
+      if (context.getClusterId() == null || context.getAppId() == null) {
+        return null;
+      }
+      if (context.getUserId() != null && context.getFlowName() != null &&
+          context.getFlowRunId() != null) {
+        // Flow information exists.
+        String[] appTupleArr = {context.getClusterId(), context.getUserId(),
+            context.getFlowName(), context.getFlowRunId().toString(),
+            context.getAppId()};
+        return joinAndEscapeUIDParts(appTupleArr);
+      } else {
+        // Only cluster and app information exists. Flow info does not exist.
+        String[] appTupleArr = {context.getClusterId(), context.getAppId()};
+        return joinAndEscapeUIDParts(appTupleArr);
+      }
+    }
+
+    @Override
+    TimelineReaderContext decodeUID(String uId) throws Exception {
+      if (uId == null) {
+        return null;
+      }
+      List<String> appTupleList = splitUID(uId);
+      // Should have 5 parts i.e. cluster, user, flow name, flowrun id
+      // and app id OR should have 2 parts i.e. cluster and app id.
+      if (appTupleList.size() == 5) {
+        // Flow information exists.
+        return new TimelineReaderContext(appTupleList.get(0),
+            appTupleList.get(1), appTupleList.get(2),
+            Long.parseLong(appTupleList.get(3)), appTupleList.get(4),
+            null, null);
+      } else if (appTupleList.size() == 2) {
+        // Flow information does not exist.
+        return new TimelineReaderContext(appTupleList.get(0), null, null, null,
+            appTupleList.get(1), null, null);
+      } else {
+        return null;
+      }
+    }
+  },
+
+  // Generic Entity UID should contain cluster, user, flow name, flowrun id,
+  // app id, entity type and entity id OR should contain cluster, appid, entity
+  // type and entity id(i.e.without flow context info).
+  GENERIC_ENTITY_UID {
+    @Override
+    String encodeUID(TimelineReaderContext context) {
+      if (context == null) {
+        return null;
+      }
+      if (context.getClusterId() == null || context.getAppId() == null ||
+          context.getEntityType() == null || context.getEntityId() == null) {
+        return null;
+      }
+      if (context.getUserId() != null && context.getFlowName() != null &&
+          context.getFlowRunId() != null) {
+        // Flow information exists.
+        String[] entityTupleArr = {context.getClusterId(), context.getUserId(),
+            context.getFlowName(), context.getFlowRunId().toString(),
+            context.getAppId(), context.getEntityType(), 
context.getEntityId()};
+        return joinAndEscapeUIDParts(entityTupleArr);
+      } else {
+        // Only entity and app information exists. Flow info does not exist.
+        String[] entityTupleArr = {context.getClusterId(), context.getAppId(),
+            context.getEntityType(), context.getEntityId()};
+        return joinAndEscapeUIDParts(entityTupleArr);
+      }
+    }
+
+    @Override
+    TimelineReaderContext decodeUID(String uId) throws Exception {
+      if (uId == null) {
+        return null;
+      }
+      List<String> entityTupleList = splitUID(uId);
+      // Should have 7 parts i.e. cluster, user, flow name, flowrun id, app id,
+      // entity type and entity id OR should have 4 parts i.e. cluster, app id,
+      // entity type and entity id.
+      if (entityTupleList.size() == 7) {
+        // Flow information exists.
+        return new TimelineReaderContext(entityTupleList.get(0),
+            entityTupleList.get(1), entityTupleList.get(2),
+            Long.parseLong(entityTupleList.get(3)), entityTupleList.get(4),
+            entityTupleList.get(5), entityTupleList.get(6));
+      } else if (entityTupleList.size() == 4) {
+        // Flow information does not exist.
+        return new TimelineReaderContext(entityTupleList.get(0), null, null,
+            null, entityTupleList.get(1), entityTupleList.get(2),
+            entityTupleList.get(3));
+      } else {
+        return null;
+      }
+    }
+  };
+
+  /**
+   * Delimiter used for UID.
+   */
+  public static final char UID_DELIMITER_CHAR = '!';
+
+  /**
+   * Escape Character used if delimiter or escape character itself is part of
+   * different components of UID.
+   */
+  public static final char UID_ESCAPE_CHAR = '*';
+
+  /**
+   * Split UID using {@link #UID_DELIMITER_CHAR} and {@link #UID_ESCAPE_CHAR}.
+   * @param uid UID to be splitted.
+   * @return a list of different parts of UID split across delimiter.
+   * @throws IllegalArgumentException if UID is not properly escaped.
+   */
+  private static List<String> splitUID(String uid)
+      throws IllegalArgumentException {
+    return TimelineReaderUtils.split(uid, UID_DELIMITER_CHAR, UID_ESCAPE_CHAR);
+  }
+
+  /**
+   * Join different parts of UID delimited by {@link #UID_DELIMITER_CHAR} with
+   * delimiter and escape character escaped using {@link #UID_ESCAPE_CHAR} if
+   * UID parts contain them.
+   * @param parts an array of UID parts to be joined.
+   * @return a string joined using the delimiter with escape and delimiter
+   *     characters escaped if they are part of the string parts to be joined.
+   *     Returns null if one of the parts is null.
+   */
+  private static String joinAndEscapeUIDParts(String[] parts) {
+    return TimelineReaderUtils.joinAndEscapeStrings(parts, UID_DELIMITER_CHAR,
+        UID_ESCAPE_CHAR);
+  }
+
+  /**
+   * Encodes UID depending on UID implementation.
+   *
+   * @param context Reader context.
+   * @return UID represented as a string.
+   */
+  abstract String encodeUID(TimelineReaderContext context);
+
+  /**
+   * Decodes UID depending on UID implementation.
+   *
+   * @param uId UID to be decoded.
+   * @return a {@link TimelineReaderContext} object if UID passed can be
+   * decoded, null otherwise.
+   * @throws Exception if any problem occurs while decoding.
+   */
+  abstract TimelineReaderContext decodeUID(String uId) throws Exception;
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/bb0a24ca/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/reader/filter/TimelineCompareFilter.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/reader/filter/TimelineCompareFilter.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/reader/filter/TimelineCompareFilter.java
new file mode 100644
index 0000000..1127f4a
--- /dev/null
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/reader/filter/TimelineCompareFilter.java
@@ -0,0 +1,147 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.timelineservice.reader.filter;
+
+import org.apache.hadoop.classification.InterfaceAudience.Private;
+import org.apache.hadoop.classification.InterfaceStability.Unstable;
+
+/**
+ * Filter class which represents filter to be applied based on key-value pair
+ * and the relation between them represented by different relational operators.
+ */
+@Private
+@Unstable
+public class TimelineCompareFilter extends TimelineFilter {
+
+  private TimelineCompareOp compareOp;
+  private String key;
+  private Object value;
+  // If comparison operator is NOT_EQUAL, this flag decides if we should return
+  // the entity if key does not exist.
+  private boolean keyMustExist = true;
+
+  public TimelineCompareFilter() {
+  }
+
+  public TimelineCompareFilter(TimelineCompareOp op, String key, Object val,
+       boolean keyMustExistFlag) {
+    this.compareOp = op;
+    this.key = key;
+    this.value = val;
+    if (op == TimelineCompareOp.NOT_EQUAL) {
+      this.keyMustExist = keyMustExistFlag;
+    } else {
+      this.keyMustExist = true;
+    }
+  }
+
+  public TimelineCompareFilter(TimelineCompareOp op, String key, Object val) {
+    this(op, key, val, true);
+  }
+
+  @Override
+  public TimelineFilterType getFilterType() {
+    return TimelineFilterType.COMPARE;
+  }
+
+  public TimelineCompareOp getCompareOp() {
+    return compareOp;
+  }
+
+  public String getKey() {
+    return key;
+  }
+
+  public void setKey(String keyToBeSet) {
+    key = keyToBeSet;
+  }
+
+  public Object getValue() {
+    return value;
+  }
+
+  public void setCompareOp(TimelineCompareOp timelineCompareOp,
+      boolean keyExistFlag) {
+    this.compareOp = timelineCompareOp;
+    if (timelineCompareOp == TimelineCompareOp.NOT_EQUAL) {
+      this.keyMustExist = keyExistFlag;
+    }
+  }
+
+  public void setValue(Object val) {
+    value = val;
+  }
+
+  public boolean getKeyMustExist() {
+    return keyMustExist;
+  }
+
+  @Override
+  public int hashCode() {
+    final int prime = 31;
+    int result = 1;
+    result = prime * result + ((compareOp == null) ? 0 : compareOp.hashCode());
+    result = prime * result + ((key == null) ? 0 : key.hashCode());
+    result = prime * result + (keyMustExist ? 1231 : 1237);
+    result = prime * result + ((value == null) ? 0 : value.hashCode());
+    return result;
+  }
+
+  @Override
+  public boolean equals(Object obj) {
+    if (this == obj) {
+      return true;
+    }
+    if (obj == null) {
+      return false;
+    }
+    if (getClass() != obj.getClass()) {
+      return false;
+    }
+    TimelineCompareFilter other = (TimelineCompareFilter) obj;
+    if (compareOp != other.compareOp) {
+      return false;
+    }
+    if (key == null) {
+      if (other.key != null) {
+        return false;
+      }
+    } else if (!key.equals(other.key)) {
+      return false;
+    }
+    if (keyMustExist != other.keyMustExist) {
+      return false;
+    }
+    if (value == null) {
+      if (other.value != null) {
+        return false;
+      }
+    } else if (!value.equals(other.value)) {
+      return false;
+    }
+    return true;
+  }
+
+  @Override
+  public String toString() {
+    return String.format("%s (%s, %s:%s:%b)",
+        this.getClass().getSimpleName(), this.compareOp.name(),
+        this.key, this.value, this.keyMustExist);
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hadoop/blob/bb0a24ca/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/reader/filter/TimelineCompareOp.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/reader/filter/TimelineCompareOp.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/reader/filter/TimelineCompareOp.java
new file mode 100644
index 0000000..461a7d8
--- /dev/null
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/reader/filter/TimelineCompareOp.java
@@ -0,0 +1,36 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.timelineservice.reader.filter;
+
+import org.apache.hadoop.classification.InterfaceAudience.Private;
+import org.apache.hadoop.classification.InterfaceStability.Unstable;
+
+/**
+ *  Comparison Operators.
+ */
+@Private
+@Unstable
+public enum TimelineCompareOp {
+  LESS_THAN,
+  LESS_OR_EQUAL,
+  EQUAL,
+  NOT_EQUAL,
+  GREATER_OR_EQUAL,
+  GREATER_THAN
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hadoop/blob/bb0a24ca/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/reader/filter/TimelineExistsFilter.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/reader/filter/TimelineExistsFilter.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/reader/filter/TimelineExistsFilter.java
new file mode 100644
index 0000000..b4c8e25
--- /dev/null
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/reader/filter/TimelineExistsFilter.java
@@ -0,0 +1,107 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.timelineservice.reader.filter;
+
+import org.apache.hadoop.classification.InterfaceAudience.Private;
+import org.apache.hadoop.classification.InterfaceStability.Unstable;
+
+/**
+ * Filter class which represents filter to be applied based on existence of a
+ * value.
+ */
+@Private
+@Unstable
+public class TimelineExistsFilter extends TimelineFilter {
+
+  private TimelineCompareOp compareOp;
+  private String value;
+
+  public TimelineExistsFilter() {
+  }
+
+  public TimelineExistsFilter(TimelineCompareOp op, String value) {
+    this.value = value;
+    if (op != TimelineCompareOp.EQUAL && op != TimelineCompareOp.NOT_EQUAL) {
+      throw new IllegalArgumentException("CompareOp for exists filter should " 
+
+          "be EQUAL or NOT_EQUAL");
+    }
+    this.compareOp = op;
+  }
+
+  @Override
+  public int hashCode() {
+    final int prime = 31;
+    int result = 1;
+    result = prime * result + ((compareOp == null) ? 0 : compareOp.hashCode());
+    result = prime * result + ((value == null) ? 0 : value.hashCode());
+    return result;
+  }
+
+  @Override
+  public boolean equals(Object obj) {
+    if (this == obj) {
+      return true;
+    }
+    if (obj == null) {
+      return false;
+    }
+    if (getClass() != obj.getClass()) {
+      return false;
+    }
+    TimelineExistsFilter other = (TimelineExistsFilter) obj;
+    if (compareOp != other.compareOp) {
+      return false;
+    }
+    if (value == null) {
+      if (other.value != null) {
+        return false;
+      }
+    } else if (!value.equals(other.value)) {
+      return false;
+    }
+    return true;
+  }
+
+  @Override
+  public TimelineFilterType getFilterType() {
+    return TimelineFilterType.EXISTS;
+  }
+
+  public void setValue(String val) {
+    value = val;
+  }
+
+  public String getValue() {
+    return value;
+  }
+
+  public void setCompareOp(TimelineCompareOp op) {
+    compareOp = op;
+  }
+
+  public TimelineCompareOp getCompareOp() {
+    return compareOp;
+  }
+
+  @Override
+  public String toString() {
+    return String.format("%s (%s %s)",
+        this.getClass().getSimpleName(), this.compareOp.name(), this.value);
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/bb0a24ca/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/reader/filter/TimelineFilter.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/reader/filter/TimelineFilter.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/reader/filter/TimelineFilter.java
new file mode 100644
index 0000000..5e84976
--- /dev/null
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/reader/filter/TimelineFilter.java
@@ -0,0 +1,68 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.timelineservice.reader.filter;
+
+import org.apache.hadoop.classification.InterfaceAudience.Private;
+import org.apache.hadoop.classification.InterfaceStability.Unstable;
+
+/**
+ * Abstract base class extended to implement timeline filters.
+ */
+@Private
+@Unstable
+public abstract class TimelineFilter {
+
+  /**
+   * Lists the different filter types.
+   */
+  @Private
+  @Unstable
+  public enum TimelineFilterType {
+    /**
+     * Combines multiple filters.
+     */
+    LIST,
+    /**
+     * Filter which is used for key-value comparison.
+     */
+    COMPARE,
+    /**
+     * Filter which is used for checking key-value equality.
+     */
+    KEY_VALUE,
+    /**
+     * Filter which is used for checking key-multiple values equality.
+     */
+    KEY_VALUES,
+    /**
+     * Filter which matches prefix for a config or a metric.
+     */
+    PREFIX,
+    /**
+     * Filter which checks existence of a value.
+     */
+    EXISTS
+  }
+
+  public abstract TimelineFilterType getFilterType();
+
+  public String toString() {
+    return this.getClass().getSimpleName();
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hadoop/blob/bb0a24ca/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/reader/filter/TimelineFilterList.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/reader/filter/TimelineFilterList.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/reader/filter/TimelineFilterList.java
new file mode 100644
index 0000000..b4c7ad2
--- /dev/null
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/reader/filter/TimelineFilterList.java
@@ -0,0 +1,141 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.timelineservice.reader.filter;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+
+import org.apache.hadoop.classification.InterfaceAudience.Private;
+import org.apache.hadoop.classification.InterfaceStability.Unstable;
+
+/**
+ * Implementation of {@link TimelineFilter} that represents an ordered list of
+ * timeline filters which will then be evaluated with a specified boolean
+ * operator {@link Operator#AND} or {@link Operator#OR}. Since you can use
+ * timeline filter lists as children of timeline filter lists, you can create a
+ * hierarchy of filters to be evaluated.
+ */
+@Private
+@Unstable
+public class TimelineFilterList extends TimelineFilter {
+  /**
+   * Specifies how filters in the filter list will be evaluated. AND means all
+   * the filters should match and OR means atleast one should match.
+   */
+  @Private
+  @Unstable
+  public static enum Operator {
+    AND,
+    OR
+  }
+
+  private Operator operator;
+  private List<TimelineFilter> filterList = new ArrayList<TimelineFilter>();
+
+  public TimelineFilterList(TimelineFilter...filters) {
+    this(Operator.AND, filters);
+  }
+
+  public TimelineFilterList() {
+    this(Operator.AND);
+  }
+
+  public TimelineFilterList(Operator op) {
+    this.operator = op;
+  }
+
+  public TimelineFilterList(Operator op, TimelineFilter...filters) {
+    this.operator = op;
+    this.filterList = new ArrayList<TimelineFilter>(Arrays.asList(filters));
+  }
+
+  @Override
+  public TimelineFilterType getFilterType() {
+    return TimelineFilterType.LIST;
+  }
+
+  /**
+   * Get the filter list.
+   *
+   * @return filterList
+   */
+  public List<TimelineFilter> getFilterList() {
+    return filterList;
+  }
+
+  /**
+   * Get the operator.
+   *
+   * @return operator
+   */
+  public Operator getOperator() {
+    return operator;
+  }
+
+  public void setOperator(Operator op) {
+    operator = op;
+  }
+
+  public void addFilter(TimelineFilter filter) {
+    filterList.add(filter);
+  }
+
+  @Override
+  public int hashCode() {
+    final int prime = 31;
+    int result = 1;
+    result =
+        prime * result + ((filterList == null) ? 0 : filterList.hashCode());
+    result =
+        prime * result + ((operator == null) ? 0 : operator.hashCode());
+    return result;
+  }
+
+  @Override
+  public boolean equals(Object obj) {
+    if (this == obj) {
+      return true;
+    }
+    if (obj == null) {
+      return false;
+    }
+    if (getClass() != obj.getClass()) {
+      return false;
+    }
+    TimelineFilterList other = (TimelineFilterList) obj;
+    if (operator != other.operator) {
+      return false;
+    }
+    if (filterList == null) {
+      if (other.filterList != null) {
+        return false;
+      }
+    } else if (!filterList.equals(other.filterList)) {
+      return false;
+    }
+    return true;
+  }
+
+  @Override
+  public String toString() {
+    return String.format("TimelineFilterList %s (%d): %s",
+        this.operator, this.filterList.size(), this.filterList.toString());
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hadoop/blob/bb0a24ca/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/reader/filter/TimelineFilterUtils.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/reader/filter/TimelineFilterUtils.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/reader/filter/TimelineFilterUtils.java
new file mode 100644
index 0000000..cccae26
--- /dev/null
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/reader/filter/TimelineFilterUtils.java
@@ -0,0 +1,290 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.timelineservice.reader.filter;
+
+import java.io.IOException;
+import java.util.HashSet;
+import java.util.Set;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.hbase.filter.BinaryComparator;
+import org.apache.hadoop.hbase.filter.BinaryPrefixComparator;
+import org.apache.hadoop.hbase.filter.FamilyFilter;
+import org.apache.hadoop.hbase.filter.CompareFilter.CompareOp;
+import org.apache.hadoop.hbase.filter.Filter;
+import org.apache.hadoop.hbase.filter.FilterList;
+import org.apache.hadoop.hbase.filter.FilterList.Operator;
+import org.apache.hadoop.yarn.server.timelineservice.storage.common.Column;
+import 
org.apache.hadoop.yarn.server.timelineservice.storage.common.ColumnFamily;
+import 
org.apache.hadoop.yarn.server.timelineservice.storage.common.ColumnPrefix;
+import org.apache.hadoop.hbase.filter.QualifierFilter;
+import org.apache.hadoop.hbase.filter.SingleColumnValueFilter;
+
+/**
+ * Set of utility methods used by timeline filter classes.
+ */
+public final class TimelineFilterUtils {
+
+  private static final Log LOG = LogFactory.getLog(TimelineFilterUtils.class);
+
+  private TimelineFilterUtils() {
+  }
+
+  /**
+   * Returns the equivalent HBase filter list's {@link Operator}.
+   *
+   * @param op timeline filter list operator.
+   * @return HBase filter list's Operator.
+   */
+  private static Operator getHBaseOperator(TimelineFilterList.Operator op) {
+    switch (op) {
+    case AND:
+      return Operator.MUST_PASS_ALL;
+    case OR:
+      return Operator.MUST_PASS_ONE;
+    default:
+      throw new IllegalArgumentException("Invalid operator");
+    }
+  }
+
+  /**
+   * Returns the equivalent HBase compare filter's {@link CompareOp}.
+   *
+   * @param op timeline compare op.
+   * @return HBase compare filter's CompareOp.
+   */
+  private static CompareOp getHBaseCompareOp(
+      TimelineCompareOp op) {
+    switch (op) {
+    case LESS_THAN:
+      return CompareOp.LESS;
+    case LESS_OR_EQUAL:
+      return CompareOp.LESS_OR_EQUAL;
+    case EQUAL:
+      return CompareOp.EQUAL;
+    case NOT_EQUAL:
+      return CompareOp.NOT_EQUAL;
+    case GREATER_OR_EQUAL:
+      return CompareOp.GREATER_OR_EQUAL;
+    case GREATER_THAN:
+      return CompareOp.GREATER;
+    default:
+      throw new IllegalArgumentException("Invalid compare operator");
+    }
+  }
+
+  /**
+   * Converts a {@link TimelinePrefixFilter} to an equivalent HBase
+   * {@link QualifierFilter}.
+   * @param colPrefix
+   * @param filter
+   * @return a {@link QualifierFilter} object
+   */
+  private static <T> Filter createHBaseColQualPrefixFilter(
+      ColumnPrefix<T> colPrefix, TimelinePrefixFilter filter) {
+    return new QualifierFilter(getHBaseCompareOp(filter.getCompareOp()),
+        new BinaryPrefixComparator(
+            colPrefix.getColumnPrefixBytes(filter.getPrefix())));
+  }
+
+  /**
+   * Create a HBase {@link QualifierFilter} for the passed column prefix and
+   * compare op.
+   *
+   * @param <T> Describes the type of column prefix.
+   * @param compareOp compare op.
+   * @param columnPrefix column prefix.
+   * @return a column qualifier filter.
+   */
+  public static <T> Filter createHBaseQualifierFilter(CompareOp compareOp,
+      ColumnPrefix<T> columnPrefix) {
+    return new QualifierFilter(compareOp,
+        new BinaryPrefixComparator(
+            columnPrefix.getColumnPrefixBytes("")));
+  }
+
+  /**
+   * Create filters for confs or metrics to retrieve. This list includes a
+   * configs/metrics family filter and relevant filters for confs/metrics to
+   * retrieve, if present.
+   *
+   * @param <T> Describes the type of column prefix.
+   * @param confsOrMetricToRetrieve configs/metrics to retrieve.
+   * @param columnFamily config or metric column family.
+   * @param columnPrefix config or metric column prefix.
+   * @return a filter list.
+   * @throws IOException if any problem occurs while creating the filters.
+   */
+  public static <T> Filter createFilterForConfsOrMetricsToRetrieve(
+      TimelineFilterList confsOrMetricToRetrieve, ColumnFamily<T> columnFamily,
+      ColumnPrefix<T> columnPrefix) throws IOException {
+    Filter familyFilter = new FamilyFilter(CompareOp.EQUAL,
+        new BinaryComparator(columnFamily.getBytes()));
+    if (confsOrMetricToRetrieve != null &&
+        !confsOrMetricToRetrieve.getFilterList().isEmpty()) {
+      // If confsOrMetricsToRetrive are specified, create a filter list based
+      // on it and family filter.
+      FilterList filter = new FilterList(familyFilter);
+      filter.addFilter(
+          createHBaseFilterList(columnPrefix, confsOrMetricToRetrieve));
+      return filter;
+    } else {
+      // Only the family filter needs to be added.
+      return familyFilter;
+    }
+  }
+
+  /**
+   * Create 2 HBase {@link SingleColumnValueFilter} filters for the specified
+   * value range represented by start and end value and wraps them inside a
+   * filter list. Start and end value should not be null.
+   *
+   * @param <T> Describes the type of column prefix.
+   * @param column Column for which single column value filter is to be 
created.
+   * @param startValue Start value.
+   * @param endValue End value.
+   * @return 2 single column value filters wrapped in a filter list.
+   * @throws IOException if any problem is encountered while encoding value.
+   */
+  public static <T> FilterList createSingleColValueFiltersByRange(
+      Column<T> column, Object startValue, Object endValue) throws IOException 
{
+    FilterList list = new FilterList();
+    Filter singleColValFilterStart = createHBaseSingleColValueFilter(
+        column.getColumnFamilyBytes(), column.getColumnQualifierBytes(),
+        column.getValueConverter().encodeValue(startValue),
+        CompareOp.GREATER_OR_EQUAL, true);
+    list.addFilter(singleColValFilterStart);
+
+    Filter singleColValFilterEnd = createHBaseSingleColValueFilter(
+        column.getColumnFamilyBytes(), column.getColumnQualifierBytes(),
+        column.getValueConverter().encodeValue(endValue),
+        CompareOp.LESS_OR_EQUAL, true);
+    list.addFilter(singleColValFilterEnd);
+    return list;
+  }
+
+  /**
+   * Creates a HBase {@link SingleColumnValueFilter}.
+   *
+   * @param columnFamily Column Family represented as bytes.
+   * @param columnQualifier Column Qualifier represented as bytes.
+   * @param value Value.
+   * @param compareOp Compare operator.
+   * @param filterIfMissing This flag decides if we should filter the row if 
the
+   *     specified column is missing. This is based on the filter's 
keyMustExist
+   *     field.
+   * @return a {@link SingleColumnValueFilter} object
+   * @throws IOException
+   */
+  private static SingleColumnValueFilter createHBaseSingleColValueFilter(
+      byte[] columnFamily, byte[] columnQualifier, byte[] value,
+      CompareOp compareOp, boolean filterIfMissing) throws IOException {
+    SingleColumnValueFilter singleColValFilter =
+        new SingleColumnValueFilter(columnFamily, columnQualifier, compareOp,
+        new BinaryComparator(value));
+    singleColValFilter.setLatestVersionOnly(true);
+    singleColValFilter.setFilterIfMissing(filterIfMissing);
+    return singleColValFilter;
+  }
+
+  /**
+   * Fetch columns from filter list containing exists and multivalue equality
+   * filters. This is done to fetch only required columns from back-end and
+   * then match event filters or relationships in reader.
+   *
+   * @param filterList filter list.
+   * @return set of columns.
+   */
+  public static Set<String> fetchColumnsFromFilterList(
+      TimelineFilterList filterList) {
+    Set<String> strSet = new HashSet<String>();
+    for (TimelineFilter filter : filterList.getFilterList()) {
+      switch(filter.getFilterType()) {
+      case LIST:
+        strSet.addAll(fetchColumnsFromFilterList((TimelineFilterList)filter));
+        break;
+      case KEY_VALUES:
+        strSet.add(((TimelineKeyValuesFilter)filter).getKey());
+        break;
+      case EXISTS:
+        strSet.add(((TimelineExistsFilter)filter).getValue());
+        break;
+      default:
+        LOG.info("Unexpected filter type " + filter.getFilterType());
+        break;
+      }
+    }
+    return strSet;
+  }
+
+  /**
+   * Creates equivalent HBase {@link FilterList} from {@link 
TimelineFilterList}
+   * while converting different timeline filters(of type {@link 
TimelineFilter})
+   * into their equivalent HBase filters.
+   *
+   * @param <T> Describes the type of column prefix.
+   * @param colPrefix column prefix which will be used for conversion.
+   * @param filterList timeline filter list which has to be converted.
+   * @return A {@link FilterList} object.
+   * @throws IOException if any problem occurs while creating the filter list.
+   */
+  public static <T> FilterList createHBaseFilterList(ColumnPrefix<T> colPrefix,
+      TimelineFilterList filterList) throws IOException {
+    FilterList list =
+        new FilterList(getHBaseOperator(filterList.getOperator()));
+    for (TimelineFilter filter : filterList.getFilterList()) {
+      switch(filter.getFilterType()) {
+      case LIST:
+        list.addFilter(createHBaseFilterList(colPrefix,
+            (TimelineFilterList)filter));
+        break;
+      case PREFIX:
+        list.addFilter(createHBaseColQualPrefixFilter(colPrefix,
+            (TimelinePrefixFilter)filter));
+        break;
+      case COMPARE:
+        TimelineCompareFilter compareFilter = (TimelineCompareFilter)filter;
+        list.addFilter(
+            createHBaseSingleColValueFilter(
+                colPrefix.getColumnFamilyBytes(),
+                colPrefix.getColumnPrefixBytes(compareFilter.getKey()),
+                colPrefix.getValueConverter().
+                    encodeValue(compareFilter.getValue()),
+                getHBaseCompareOp(compareFilter.getCompareOp()),
+                compareFilter.getKeyMustExist()));
+        break;
+      case KEY_VALUE:
+        TimelineKeyValueFilter kvFilter = (TimelineKeyValueFilter)filter;
+        list.addFilter(
+            createHBaseSingleColValueFilter(
+                colPrefix.getColumnFamilyBytes(),
+                colPrefix.getColumnPrefixBytes(kvFilter.getKey()),
+                colPrefix.getValueConverter().encodeValue(kvFilter.getValue()),
+                getHBaseCompareOp(kvFilter.getCompareOp()),
+                kvFilter.getKeyMustExist()));
+        break;
+      default:
+        LOG.info("Unexpected filter type " + filter.getFilterType());
+        break;
+      }
+    }
+    return list;
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/bb0a24ca/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/reader/filter/TimelineKeyValueFilter.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/reader/filter/TimelineKeyValueFilter.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/reader/filter/TimelineKeyValueFilter.java
new file mode 100644
index 0000000..8bc8584
--- /dev/null
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/reader/filter/TimelineKeyValueFilter.java
@@ -0,0 +1,61 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.timelineservice.reader.filter;
+
+import org.apache.hadoop.classification.InterfaceAudience.Private;
+import org.apache.hadoop.classification.InterfaceStability.Unstable;
+
+/**
+ * Filter class which represents filter to be applied based on key-value pair
+ * being equal or not to the values in back-end store.
+ */
+@Private
+@Unstable
+public class TimelineKeyValueFilter extends TimelineCompareFilter {
+  public TimelineKeyValueFilter() {
+  }
+
+  public TimelineKeyValueFilter(TimelineCompareOp op, String key, Object val,
+      boolean keyMustExistFlag) {
+    super(op, key, val, keyMustExistFlag);
+    if (op != TimelineCompareOp.EQUAL && op != TimelineCompareOp.NOT_EQUAL) {
+      throw new IllegalArgumentException("TimelineCompareOp for equality"
+          + " filter should be EQUAL or NOT_EQUAL");
+    }
+  }
+
+  public TimelineKeyValueFilter(TimelineCompareOp op, String key, Object val) {
+    this(op, key, val, true);
+  }
+
+  @Override
+  public TimelineFilterType getFilterType() {
+    return TimelineFilterType.KEY_VALUE;
+  }
+
+  public void setCompareOp(TimelineCompareOp timelineCompareOp,
+      boolean keyExistFlag) {
+    if (timelineCompareOp != TimelineCompareOp.EQUAL &&
+        timelineCompareOp != TimelineCompareOp.NOT_EQUAL) {
+      throw new IllegalArgumentException("TimelineCompareOp for equality"
+          + " filter should be EQUAL or NOT_EQUAL");
+    }
+    super.setCompareOp(timelineCompareOp, keyExistFlag);
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/bb0a24ca/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/reader/filter/TimelineKeyValuesFilter.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/reader/filter/TimelineKeyValuesFilter.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/reader/filter/TimelineKeyValuesFilter.java
new file mode 100644
index 0000000..fe4f6b2
--- /dev/null
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/reader/filter/TimelineKeyValuesFilter.java
@@ -0,0 +1,126 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.timelineservice.reader.filter;
+
+import java.util.Set;
+
+import org.apache.hadoop.classification.InterfaceAudience.Private;
+import org.apache.hadoop.classification.InterfaceStability.Unstable;
+
+/**
+ * Filter class which represents filter to be applied based on multiple values
+ * for a key and these values being equal or not equal to values in back-end
+ * store.
+ */
+@Private
+@Unstable
+public class TimelineKeyValuesFilter extends TimelineFilter {
+  private TimelineCompareOp compareOp;
+  private String key;
+  private Set<Object> values;
+
+  public TimelineKeyValuesFilter() {
+  }
+
+  public TimelineKeyValuesFilter(TimelineCompareOp op, String key,
+      Set<Object> values) {
+    if (op != TimelineCompareOp.EQUAL && op != TimelineCompareOp.NOT_EQUAL) {
+      throw new IllegalArgumentException("TimelineCompareOp for multi value "
+          + "equality filter should be EQUAL or NOT_EQUAL");
+    }
+    this.compareOp = op;
+    this.key = key;
+    this.values = values;
+  }
+
+  @Override
+  public TimelineFilterType getFilterType() {
+    return TimelineFilterType.KEY_VALUES;
+  }
+
+  public String getKey() {
+    return key;
+  }
+
+  public Set<Object> getValues() {
+    return values;
+  }
+
+  public void setKeyAndValues(String keyForValues, Set<Object> vals) {
+    key = keyForValues;
+    values = vals;
+  }
+
+  public void setCompareOp(TimelineCompareOp op) {
+    compareOp = op;
+  }
+
+  public TimelineCompareOp getCompareOp() {
+    return compareOp;
+  }
+
+  @Override
+  public String toString() {
+    return String.format("%s (%s, %s:%s)",
+        this.getClass().getSimpleName(), this.compareOp.name(),
+        this.key, (values == null) ? "" : values.toString());
+  }
+
+  @Override
+  public int hashCode() {
+    final int prime = 31;
+    int result = 1;
+    result = prime * result + ((compareOp == null) ? 0 : compareOp.hashCode());
+    result = prime * result + ((key == null) ? 0 : key.hashCode());
+    result = prime * result + ((values == null) ? 0 : values.hashCode());
+    return result;
+  }
+
+  @Override
+  public boolean equals(Object obj) {
+    if (this == obj) {
+      return true;
+    }
+    if (obj == null) {
+      return false;
+    }
+    if (getClass() != obj.getClass()) {
+      return false;
+    }
+    TimelineKeyValuesFilter other = (TimelineKeyValuesFilter) obj;
+    if (compareOp != other.compareOp) {
+      return false;
+    }
+    if (key == null) {
+      if (other.key != null) {
+        return false;
+      }
+    } else if (!key.equals(other.key)) {
+      return false;
+    }
+    if (values == null) {
+      if (other.values != null) {
+        return false;
+      }
+    } else if (!values.equals(other.values)) {
+      return false;
+    }
+    return true;
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/bb0a24ca/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/reader/filter/TimelinePrefixFilter.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/reader/filter/TimelinePrefixFilter.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/reader/filter/TimelinePrefixFilter.java
new file mode 100644
index 0000000..bbdc960
--- /dev/null
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/reader/filter/TimelinePrefixFilter.java
@@ -0,0 +1,99 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.timelineservice.reader.filter;
+
+import org.apache.hadoop.classification.InterfaceAudience.Private;
+import org.apache.hadoop.classification.InterfaceStability.Unstable;
+
+/**
+ * Filter class which represents filter to be applied based on prefixes.
+ * Prefixes can either match or not match.
+ */
+@Private
+@Unstable
+public class TimelinePrefixFilter extends TimelineFilter {
+
+  private TimelineCompareOp compareOp;
+  private String prefix;
+
+  public TimelinePrefixFilter() {
+  }
+
+  public TimelinePrefixFilter(TimelineCompareOp op, String prefix) {
+    this.prefix = prefix;
+    if (op != TimelineCompareOp.EQUAL && op != TimelineCompareOp.NOT_EQUAL) {
+      throw new IllegalArgumentException("CompareOp for prefix filter should " 
+
+          "be EQUAL or NOT_EQUAL");
+    }
+    this.compareOp = op;
+  }
+
+  @Override
+  public TimelineFilterType getFilterType() {
+    return TimelineFilterType.PREFIX;
+  }
+
+  public String getPrefix() {
+    return prefix;
+  }
+
+  public TimelineCompareOp getCompareOp() {
+    return compareOp;
+  }
+
+  @Override
+  public String toString() {
+    return String.format("%s (%s %s)",
+        this.getClass().getSimpleName(), this.compareOp.name(), this.prefix);
+  }
+
+  @Override
+  public int hashCode() {
+    final int prime = 31;
+    int result = 1;
+    result = prime * result + ((compareOp == null) ? 0 : compareOp.hashCode());
+    result = prime * result + ((prefix == null) ? 0 : prefix.hashCode());
+    return result;
+  }
+
+  @Override
+  public boolean equals(Object obj) {
+    if (this == obj) {
+      return true;
+    }
+    if (obj == null) {
+      return false;
+    }
+    if (getClass() != obj.getClass()) {
+      return false;
+    }
+    TimelinePrefixFilter other = (TimelinePrefixFilter) obj;
+    if (compareOp != other.compareOp) {
+      return false;
+    }
+    if (prefix == null) {
+      if (other.prefix != null) {
+        return false;
+      }
+    } else if (!prefix.equals(other.prefix)){
+      return false;
+    }
+    return true;
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hadoop/blob/bb0a24ca/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/reader/filter/package-info.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/reader/filter/package-info.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/reader/filter/package-info.java
new file mode 100644
index 0000000..f7c0705
--- /dev/null
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/reader/filter/package-info.java
@@ -0,0 +1,28 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/**
+ * Package org.apache.hadoop.server.timelineservice.reader.filter stores
+ * timeline filter implementations.
+ */
+@InterfaceAudience.Private
+@InterfaceStability.Unstable
+package org.apache.hadoop.yarn.server.timelineservice.reader.filter;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hadoop/blob/bb0a24ca/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/reader/package-info.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/reader/package-info.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/reader/package-info.java
new file mode 100644
index 0000000..116509a
--- /dev/null
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/reader/package-info.java
@@ -0,0 +1,29 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/**
+ * Package org.apache.hadoop.yarn.server.timelineservice.reader contains 
classes
+ * which can be used across reader. This package contains classes which are
+ * not related to storage implementations.
+ */
+@InterfaceAudience.Private
+@InterfaceStability.Unstable
+package org.apache.hadoop.yarn.server.timelineservice.reader;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hadoop/blob/bb0a24ca/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/FileSystemTimelineReaderImpl.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/FileSystemTimelineReaderImpl.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/FileSystemTimelineReaderImpl.java
new file mode 100644
index 0000000..047f401
--- /dev/null
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/FileSystemTimelineReaderImpl.java
@@ -0,0 +1,408 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.timelineservice.storage;
+
+import java.io.BufferedReader;
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.io.InputStreamReader;
+import java.nio.charset.Charset;
+import java.util.Comparator;
+import java.util.EnumSet;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Set;
+import java.util.TreeMap;
+
+import org.apache.commons.csv.CSVFormat;
+import org.apache.commons.csv.CSVParser;
+import org.apache.commons.csv.CSVRecord;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.service.AbstractService;
+import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity;
+import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEvent;
+import org.apache.hadoop.yarn.api.records.timelineservice.TimelineMetric;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import 
org.apache.hadoop.yarn.server.timelineservice.reader.TimelineDataToRetrieve;
+import 
org.apache.hadoop.yarn.server.timelineservice.reader.TimelineEntityFilters;
+import 
org.apache.hadoop.yarn.server.timelineservice.reader.TimelineReaderContext;
+import 
org.apache.hadoop.yarn.server.timelineservice.storage.common.TimelineStorageUtils;
+import org.apache.hadoop.yarn.webapp.YarnJacksonJaxbJsonProvider;
+import org.codehaus.jackson.JsonGenerationException;
+import org.codehaus.jackson.map.JsonMappingException;
+import org.codehaus.jackson.map.ObjectMapper;
+
+import com.google.common.annotations.VisibleForTesting;
+
+/**
+ *  File System based implementation for TimelineReader. This implementation 
may
+ *  not provide a complete implementation of all the necessary features. This
+ *  implementation is provided solely for basic testing purposes, and should 
not
+ *  be used in a non-test situation.
+ */
+public class FileSystemTimelineReaderImpl extends AbstractService
+    implements TimelineReader {
+
+  private static final Log LOG =
+      LogFactory.getLog(FileSystemTimelineReaderImpl.class);
+
+  private String rootPath;
+  private static final String ENTITIES_DIR = "entities";
+
+  /** Default extension for output files. */
+  private static final String TIMELINE_SERVICE_STORAGE_EXTENSION = ".thist";
+
+  @VisibleForTesting
+  /** Default extension for output files. */
+  static final String APP_FLOW_MAPPING_FILE = "app_flow_mapping.csv";
+
+  @VisibleForTesting
+  /** Config param for timeline service file system storage root. */
+  static final String TIMELINE_SERVICE_STORAGE_DIR_ROOT =
+      YarnConfiguration.TIMELINE_SERVICE_PREFIX + "fs-writer.root-dir";
+
+  @VisibleForTesting
+  /** Default value for storage location on local disk. */
+  static final String DEFAULT_TIMELINE_SERVICE_STORAGE_DIR_ROOT =
+      "/tmp/timeline_service_data";
+
+  private final CSVFormat csvFormat =
+      CSVFormat.DEFAULT.withHeader("APP", "USER", "FLOW", "FLOWRUN");
+
+  public FileSystemTimelineReaderImpl() {
+    super(FileSystemTimelineReaderImpl.class.getName());
+  }
+
+  @VisibleForTesting
+  String getRootPath() {
+    return rootPath;
+  }
+
+  private static ObjectMapper mapper;
+
+  static {
+    mapper = new ObjectMapper();
+    YarnJacksonJaxbJsonProvider.configObjectMapper(mapper);
+  }
+
+  /**
+   * Deserialize a POJO object from a JSON string.
+   *
+   * @param <T> Describes the type of class to be returned.
+   * @param clazz class to be deserialized.
+   * @param jsonString JSON string to deserialize.
+   * @return An object based on class type. Used typically for
+   *     <cite>TimelineEntity</cite> object.
+   * @throws IOException if the underlying input source has problems during
+   *     parsing.
+   * @throws JsonMappingException  if parser has problems parsing content.
+   * @throws JsonGenerationException if there is a problem in JSON writing.
+   */
+  public static <T> T getTimelineRecordFromJSON(
+      String jsonString, Class<T> clazz)
+      throws JsonGenerationException, JsonMappingException, IOException {
+    return mapper.readValue(jsonString, clazz);
+  }
+
+  private static void fillFields(TimelineEntity finalEntity,
+      TimelineEntity real, EnumSet<Field> fields) {
+    if (fields.contains(Field.ALL)) {
+      fields = EnumSet.allOf(Field.class);
+    }
+    for (Field field : fields) {
+      switch(field) {
+      case CONFIGS:
+        finalEntity.setConfigs(real.getConfigs());
+        break;
+      case METRICS:
+        finalEntity.setMetrics(real.getMetrics());
+        break;
+      case INFO:
+        finalEntity.setInfo(real.getInfo());
+        break;
+      case IS_RELATED_TO:
+        finalEntity.setIsRelatedToEntities(real.getIsRelatedToEntities());
+        break;
+      case RELATES_TO:
+        finalEntity.setIsRelatedToEntities(real.getIsRelatedToEntities());
+        break;
+      case EVENTS:
+        finalEntity.setEvents(real.getEvents());
+        break;
+      default:
+        continue;
+      }
+    }
+  }
+
+  private String getFlowRunPath(String userId, String clusterId,
+      String flowName, Long flowRunId, String appId) throws IOException {
+    if (userId != null && flowName != null && flowRunId != null) {
+      return userId + "/" + flowName + "/" + flowRunId;
+    }
+    if (clusterId == null || appId == null) {
+      throw new IOException("Unable to get flow info");
+    }
+    String appFlowMappingFile = rootPath + "/" +  ENTITIES_DIR + "/" +
+        clusterId + "/" + APP_FLOW_MAPPING_FILE;
+    try (BufferedReader reader =
+             new BufferedReader(new InputStreamReader(
+                 new FileInputStream(
+                     appFlowMappingFile), Charset.forName("UTF-8")));
+         CSVParser parser = new CSVParser(reader, csvFormat)) {
+      for (CSVRecord record : parser.getRecords()) {
+        if (record.size() < 4) {
+          continue;
+        }
+        String applicationId = record.get("APP");
+        if (applicationId != null && !applicationId.trim().isEmpty() &&
+            !applicationId.trim().equals(appId)) {
+          continue;
+        }
+        return record.get(1).trim() + "/" + record.get(2).trim() + "/" +
+            record.get(3).trim();
+      }
+      parser.close();
+    }
+    throw new IOException("Unable to get flow info");
+  }
+
+  private static TimelineEntity createEntityToBeReturned(TimelineEntity entity,
+      EnumSet<Field> fieldsToRetrieve) {
+    TimelineEntity entityToBeReturned = new TimelineEntity();
+    entityToBeReturned.setIdentifier(entity.getIdentifier());
+    entityToBeReturned.setCreatedTime(entity.getCreatedTime());
+    if (fieldsToRetrieve != null) {
+      fillFields(entityToBeReturned, entity, fieldsToRetrieve);
+    }
+    return entityToBeReturned;
+  }
+
+  private static boolean isTimeInRange(Long time, Long timeBegin,
+      Long timeEnd) {
+    return (time >= timeBegin) && (time <= timeEnd);
+  }
+
+  private static void mergeEntities(TimelineEntity entity1,
+      TimelineEntity entity2) {
+    // Ideally created time wont change except in the case of issue from 
client.
+    if (entity2.getCreatedTime() != null && entity2.getCreatedTime() > 0) {
+      entity1.setCreatedTime(entity2.getCreatedTime());
+    }
+    for (Entry<String, String> configEntry : entity2.getConfigs().entrySet()) {
+      entity1.addConfig(configEntry.getKey(), configEntry.getValue());
+    }
+    for (Entry<String, Object> infoEntry : entity2.getInfo().entrySet()) {
+      entity1.addInfo(infoEntry.getKey(), infoEntry.getValue());
+    }
+    for (Entry<String, Set<String>> isRelatedToEntry :
+        entity2.getIsRelatedToEntities().entrySet()) {
+      String type = isRelatedToEntry.getKey();
+      for (String entityId : isRelatedToEntry.getValue()) {
+        entity1.addIsRelatedToEntity(type, entityId);
+      }
+    }
+    for (Entry<String, Set<String>> relatesToEntry :
+        entity2.getRelatesToEntities().entrySet()) {
+      String type = relatesToEntry.getKey();
+      for (String entityId : relatesToEntry.getValue()) {
+        entity1.addRelatesToEntity(type, entityId);
+      }
+    }
+    for (TimelineEvent event : entity2.getEvents()) {
+      entity1.addEvent(event);
+    }
+    for (TimelineMetric metric2 : entity2.getMetrics()) {
+      boolean found = false;
+      for (TimelineMetric metric1 : entity1.getMetrics()) {
+        if (metric1.getId().equals(metric2.getId())) {
+          metric1.addValues(metric2.getValues());
+          found = true;
+          break;
+        }
+      }
+      if (!found) {
+        entity1.addMetric(metric2);
+      }
+    }
+  }
+
+  private static TimelineEntity readEntityFromFile(BufferedReader reader)
+      throws IOException {
+    TimelineEntity entity =
+        getTimelineRecordFromJSON(reader.readLine(), TimelineEntity.class);
+    String entityStr = "";
+    while ((entityStr = reader.readLine()) != null) {
+      if (entityStr.trim().isEmpty()) {
+        continue;
+      }
+      TimelineEntity anotherEntity =
+          getTimelineRecordFromJSON(entityStr, TimelineEntity.class);
+      if (!entity.getId().equals(anotherEntity.getId()) ||
+          !entity.getType().equals(anotherEntity.getType())) {
+        continue;
+      }
+      mergeEntities(entity, anotherEntity);
+    }
+    return entity;
+  }
+
+  private Set<TimelineEntity> getEntities(File dir, String entityType,
+      TimelineEntityFilters filters, TimelineDataToRetrieve dataToRetrieve)
+      throws IOException {
+    // First sort the selected entities based on created/start time.
+    Map<Long, Set<TimelineEntity>> sortedEntities =
+        new TreeMap<>(
+            new Comparator<Long>() {
+            @Override
+            public int compare(Long l1, Long l2) {
+              return l2.compareTo(l1);
+            }
+          }
+        );
+    for (File entityFile : dir.listFiles()) {
+      if (!entityFile.getName().contains(TIMELINE_SERVICE_STORAGE_EXTENSION)) {
+        continue;
+      }
+      try (BufferedReader reader =
+               new BufferedReader(
+                   new InputStreamReader(
+                       new FileInputStream(
+                           entityFile), Charset.forName("UTF-8")))) {
+        TimelineEntity entity = readEntityFromFile(reader);
+        if (!entity.getType().equals(entityType)) {
+          continue;
+        }
+        if (!isTimeInRange(entity.getCreatedTime(),
+            filters.getCreatedTimeBegin(), filters.getCreatedTimeEnd())) {
+          continue;
+        }
+        if (filters.getRelatesTo() != null &&
+            !filters.getRelatesTo().getFilterList().isEmpty() &&
+            !TimelineStorageUtils.matchRelatesTo(entity,
+            filters.getRelatesTo())) {
+          continue;
+        }
+        if (filters.getIsRelatedTo()  != null &&
+            !filters.getIsRelatedTo().getFilterList().isEmpty() &&
+            !TimelineStorageUtils.matchIsRelatedTo(entity,
+            filters.getIsRelatedTo())) {
+          continue;
+        }
+        if (filters.getInfoFilters() != null &&
+            !filters.getInfoFilters().getFilterList().isEmpty() &&
+            !TimelineStorageUtils.matchInfoFilters(entity,
+            filters.getInfoFilters())) {
+          continue;
+        }
+        if (filters.getConfigFilters() != null &&
+            !filters.getConfigFilters().getFilterList().isEmpty() &&
+            !TimelineStorageUtils.matchConfigFilters(entity,
+            filters.getConfigFilters())) {
+          continue;
+        }
+        if (filters.getMetricFilters() != null &&
+            !filters.getMetricFilters().getFilterList().isEmpty() &&
+            !TimelineStorageUtils.matchMetricFilters(entity,
+            filters.getMetricFilters())) {
+          continue;
+        }
+        if (filters.getEventFilters() != null &&
+            !filters.getEventFilters().getFilterList().isEmpty() &&
+            !TimelineStorageUtils.matchEventFilters(entity,
+            filters.getEventFilters())) {
+          continue;
+        }
+        TimelineEntity entityToBeReturned = createEntityToBeReturned(
+            entity, dataToRetrieve.getFieldsToRetrieve());
+        Set<TimelineEntity> entitiesCreatedAtSameTime =
+            sortedEntities.get(entityToBeReturned.getCreatedTime());
+        if (entitiesCreatedAtSameTime == null) {
+          entitiesCreatedAtSameTime = new HashSet<TimelineEntity>();
+        }
+        entitiesCreatedAtSameTime.add(entityToBeReturned);
+        sortedEntities.put(
+            entityToBeReturned.getCreatedTime(), entitiesCreatedAtSameTime);
+      }
+    }
+
+    Set<TimelineEntity> entities = new HashSet<TimelineEntity>();
+    long entitiesAdded = 0;
+    for (Set<TimelineEntity> entitySet : sortedEntities.values()) {
+      for (TimelineEntity entity : entitySet) {
+        entities.add(entity);
+        ++entitiesAdded;
+        if (entitiesAdded >= filters.getLimit()) {
+          return entities;
+        }
+      }
+    }
+    return entities;
+  }
+
+  @Override
+  public void serviceInit(Configuration conf) throws Exception {
+    rootPath = conf.get(TIMELINE_SERVICE_STORAGE_DIR_ROOT,
+        DEFAULT_TIMELINE_SERVICE_STORAGE_DIR_ROOT);
+    super.serviceInit(conf);
+  }
+
+  @Override
+  public TimelineEntity getEntity(TimelineReaderContext context,
+      TimelineDataToRetrieve dataToRetrieve) throws IOException {
+    String flowRunPath = getFlowRunPath(context.getUserId(),
+        context.getClusterId(), context.getFlowName(), context.getFlowRunId(),
+        context.getAppId());
+    File dir = new File(new File(rootPath, ENTITIES_DIR),
+        context.getClusterId() + "/" + flowRunPath + "/" + context.getAppId() +
+        "/" + context.getEntityType());
+    File entityFile = new File(
+        dir, context.getEntityId() + TIMELINE_SERVICE_STORAGE_EXTENSION);
+    try (BufferedReader reader =
+             new BufferedReader(new InputStreamReader(
+                 new FileInputStream(entityFile), Charset.forName("UTF-8")))) {
+      TimelineEntity entity = readEntityFromFile(reader);
+      return createEntityToBeReturned(
+          entity, dataToRetrieve.getFieldsToRetrieve());
+    } catch (FileNotFoundException e) {
+      LOG.info("Cannot find entity {id:" + context.getEntityId() + " , type:" +
+          context.getEntityType() + "}. Will send HTTP 404 in response.");
+      return null;
+    }
+  }
+
+  @Override
+  public Set<TimelineEntity> getEntities(TimelineReaderContext context,
+      TimelineEntityFilters filters, TimelineDataToRetrieve dataToRetrieve)
+      throws IOException {
+    String flowRunPath = getFlowRunPath(context.getUserId(),
+        context.getClusterId(), context.getFlowName(), context.getFlowRunId(),
+        context.getAppId());
+    File dir =
+        new File(new File(rootPath, ENTITIES_DIR),
+            context.getClusterId() + "/" + flowRunPath + "/" +
+            context.getAppId() + "/" + context.getEntityType());
+    return getEntities(dir, context.getEntityType(), filters, dataToRetrieve);
+  }
+}
\ No newline at end of file


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscr...@hadoop.apache.org
For additional commands, e-mail: common-commits-h...@hadoop.apache.org

Reply via email to