http://git-wip-us.apache.org/repos/asf/hadoop/blob/bb0a24ca/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/collector/TimelineCollectorContext.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/collector/TimelineCollectorContext.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/collector/TimelineCollectorContext.java
new file mode 100644
index 0000000..981ee2a
--- /dev/null
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/collector/TimelineCollectorContext.java
@@ -0,0 +1,76 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.timelineservice.collector;
+
+import org.apache.hadoop.yarn.server.timelineservice.TimelineContext;
+import org.apache.hadoop.yarn.util.timeline.TimelineUtils;
+
+/**
+ * Encapsulates context information required by collector during a put.
+ */
+public class TimelineCollectorContext extends TimelineContext {
+  private String flowVersion;
+
+  public TimelineCollectorContext() {
+    this(null, null, null, null, 0L, null);
+  }
+
+  public TimelineCollectorContext(String clusterId, String userId,
+      String flowName, String flowVersion, Long flowRunId, String appId) {
+    super(clusterId, userId, flowName, flowRunId, appId);
+    this.flowVersion = flowVersion == null ?
+        TimelineUtils.DEFAULT_FLOW_VERSION : flowVersion;
+  }
+
+  @Override
+  public int hashCode() {
+    final int prime = 31;
+    int result = super.hashCode();
+    result =
+        prime * result + ((flowVersion == null) ? 0 : flowVersion.hashCode());
+    return result + super.hashCode();
+  }
+
+  @Override
+  public boolean equals(Object obj) {
+    if (this == obj) {
+      return true;
+    }
+    if (!super.equals(obj)) {
+      return false;
+    }
+    TimelineCollectorContext other = (TimelineCollectorContext) obj;
+    if (flowVersion == null) {
+      if (other.flowVersion != null) {
+        return false;
+      }
+    } else if (!flowVersion.equals(other.flowVersion)) {
+      return false;
+    }
+    return true;
+  }
+
+  public String getFlowVersion() {
+    return flowVersion;
+  }
+
+  public void setFlowVersion(String version) {
+    this.flowVersion = version;
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hadoop/blob/bb0a24ca/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/collector/TimelineCollectorManager.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/collector/TimelineCollectorManager.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/collector/TimelineCollectorManager.java
new file mode 100644
index 0000000..9758320
--- /dev/null
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/collector/TimelineCollectorManager.java
@@ -0,0 +1,254 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.timelineservice.collector;
+
+
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.service.AbstractService;
+import org.apache.hadoop.util.ReflectionUtils;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
+import 
org.apache.hadoop.yarn.server.timelineservice.storage.HBaseTimelineWriterImpl;
+import org.apache.hadoop.yarn.server.timelineservice.storage.TimelineWriter;
+
+import com.google.common.annotations.VisibleForTesting;
+
+/**
+ * Class that manages adding and removing collectors and their lifecycle. It
+ * provides thread safety access to the collectors inside.
+ *
+ */
+@InterfaceAudience.Private
+@InterfaceStability.Unstable
+public class TimelineCollectorManager extends AbstractService {
+  private static final Log LOG =
+      LogFactory.getLog(TimelineCollectorManager.class);
+
+  private TimelineWriter writer;
+  private ScheduledExecutorService writerFlusher;
+  private int flushInterval;
+  private boolean writerFlusherRunning;
+
+  @Override
+  public void serviceInit(Configuration conf) throws Exception {
+    writer = ReflectionUtils.newInstance(conf.getClass(
+        YarnConfiguration.TIMELINE_SERVICE_WRITER_CLASS,
+        HBaseTimelineWriterImpl.class,
+        TimelineWriter.class), conf);
+    writer.init(conf);
+    // create a single dedicated thread for flushing the writer on a periodic
+    // basis
+    writerFlusher = Executors.newSingleThreadScheduledExecutor();
+    flushInterval = conf.getInt(
+        YarnConfiguration.
+        TIMELINE_SERVICE_WRITER_FLUSH_INTERVAL_SECONDS,
+        YarnConfiguration.
+        DEFAULT_TIMELINE_SERVICE_WRITER_FLUSH_INTERVAL_SECONDS);
+    super.serviceInit(conf);
+  }
+
+  @Override
+  protected void serviceStart() throws Exception {
+    super.serviceStart();
+    if (writer != null) {
+      writer.start();
+    }
+    // schedule the flush task
+    writerFlusher.scheduleAtFixedRate(new WriterFlushTask(writer),
+        flushInterval, flushInterval, TimeUnit.SECONDS);
+    writerFlusherRunning = true;
+  }
+
+  // access to this map is synchronized with the map itself
+  private final Map<ApplicationId, TimelineCollector> collectors =
+      Collections.synchronizedMap(
+          new HashMap<ApplicationId, TimelineCollector>());
+
+  public TimelineCollectorManager(String name) {
+    super(name);
+  }
+
+  protected TimelineWriter getWriter() {
+    return writer;
+  }
+
+  /**
+   * Put the collector into the collection if an collector mapped by id does
+   * not exist.
+   *
+   * @param appId Application Id for which collector needs to be put.
+   * @param collector timeline collector to be put.
+   * @throws YarnRuntimeException if there  was any exception in initializing
+   *                              and starting the app level service
+   * @return the collector associated with id after the potential put.
+   */
+  public TimelineCollector putIfAbsent(ApplicationId appId,
+      TimelineCollector collector) {
+    TimelineCollector collectorInTable = null;
+    synchronized (collectors) {
+      collectorInTable = collectors.get(appId);
+      if (collectorInTable == null) {
+        try {
+          // initialize, start, and add it to the collection so it can be
+          // cleaned up when the parent shuts down
+          collector.init(getConfig());
+          collector.setWriter(writer);
+          collector.start();
+          collectors.put(appId, collector);
+          LOG.info("the collector for " + appId + " was added");
+          collectorInTable = collector;
+          postPut(appId, collectorInTable);
+        } catch (Exception e) {
+          throw new YarnRuntimeException(e);
+        }
+      } else {
+        LOG.info("the collector for " + appId + " already exists!");
+      }
+    }
+    return collectorInTable;
+  }
+
+  /**
+   * Callback handler for the timeline collector manager when a collector has
+   * been added into the collector map.
+   * @param appId Application id of the collector.
+   * @param collector The actual timeline collector that has been added.
+   */
+  public void postPut(ApplicationId appId, TimelineCollector collector) {
+    doPostPut(appId, collector);
+    collector.setReadyToAggregate();
+  }
+
+  /**
+   * A template method that will be called by
+   * {@link  #postPut(ApplicationId, TimelineCollector)}.
+   * @param appId Application id of the collector.
+   * @param collector The actual timeline collector that has been added.
+   */
+  protected void doPostPut(ApplicationId appId, TimelineCollector collector) {
+  }
+
+  /**
+   * Removes the collector for the specified id. The collector is also stopped
+   * as a result. If the collector does not exist, no change is made.
+   *
+   * @param appId Application Id to remove.
+   * @return whether it was removed successfully
+   */
+  public boolean remove(ApplicationId appId) {
+    TimelineCollector collector = collectors.remove(appId);
+    if (collector == null) {
+      LOG.error("the collector for " + appId + " does not exist!");
+    } else {
+      postRemove(appId, collector);
+      // stop the service to do clean up
+      collector.stop();
+      LOG.info("The collector service for " + appId + " was removed");
+    }
+    return collector != null;
+  }
+
+  protected void postRemove(ApplicationId appId, TimelineCollector collector) {
+
+  }
+
+  /**
+   * Returns the collector for the specified id.
+   *
+   * @param appId Application Id for which we need to get the collector.
+   * @return the collector or null if it does not exist
+   */
+  public TimelineCollector get(ApplicationId appId) {
+    return collectors.get(appId);
+  }
+
+  /**
+   * Returns whether the collector for the specified id exists in this
+   * collection.
+   * @param appId Application Id.
+   * @return true if collector for the app id is found, false otherwise.
+   */
+  public boolean containsTimelineCollector(ApplicationId appId) {
+    return collectors.containsKey(appId);
+  }
+
+  @Override
+  protected void serviceStop() throws Exception {
+    if (collectors != null && collectors.size() > 1) {
+      for (TimelineCollector c : collectors.values()) {
+        c.serviceStop();
+      }
+    }
+    // stop the flusher first
+    if (writerFlusher != null) {
+      writerFlusher.shutdown();
+      writerFlusherRunning = false;
+      if (!writerFlusher.awaitTermination(30, TimeUnit.SECONDS)) {
+        // in reality it should be ample time for the flusher task to finish
+        // even if it times out, writers may be able to handle closing in this
+        // situation fine
+        // proceed to close the writer
+        LOG.warn("failed to stop the flusher task in time. " +
+            "will still proceed to close the writer.");
+      }
+    }
+    if (writer != null) {
+      writer.close();
+    }
+    super.serviceStop();
+  }
+
+  @VisibleForTesting
+  boolean writerFlusherRunning() {
+    return writerFlusherRunning;
+  }
+
+  /**
+   * Task that invokes the flush operation on the timeline writer.
+   */
+  private static class WriterFlushTask implements Runnable {
+    private final TimelineWriter writer;
+
+    public WriterFlushTask(TimelineWriter writer) {
+      this.writer = writer;
+    }
+
+    public void run() {
+      try {
+        writer.flush();
+      } catch (Throwable th) {
+        // we need to handle all exceptions or subsequent execution may be
+        // suppressed
+        LOG.error("exception during timeline writer flush!", th);
+      }
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/bb0a24ca/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/collector/TimelineCollectorWebService.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/collector/TimelineCollectorWebService.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/collector/TimelineCollectorWebService.java
new file mode 100644
index 0000000..29ef1f8
--- /dev/null
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/collector/TimelineCollectorWebService.java
@@ -0,0 +1,250 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.timelineservice.collector;
+
+import javax.servlet.ServletContext;
+import javax.servlet.http.HttpServletRequest;
+import javax.servlet.http.HttpServletResponse;
+import javax.ws.rs.Consumes;
+import javax.ws.rs.GET;
+import javax.ws.rs.PUT;
+import javax.ws.rs.Path;
+import javax.ws.rs.Produces;
+import javax.ws.rs.QueryParam;
+import javax.ws.rs.WebApplicationException;
+import javax.ws.rs.core.Context;
+import javax.ws.rs.core.MediaType;
+import javax.ws.rs.core.Response;
+import javax.xml.bind.annotation.XmlAccessType;
+import javax.xml.bind.annotation.XmlAccessorType;
+import javax.xml.bind.annotation.XmlElement;
+import javax.xml.bind.annotation.XmlRootElement;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.classification.InterfaceAudience.Private;
+import org.apache.hadoop.classification.InterfaceAudience.Public;
+import org.apache.hadoop.classification.InterfaceStability.Unstable;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import 
org.apache.hadoop.yarn.api.records.timelineservice.ApplicationAttemptEntity;
+import org.apache.hadoop.yarn.api.records.timelineservice.ApplicationEntity;
+import org.apache.hadoop.yarn.api.records.timelineservice.ClusterEntity;
+import org.apache.hadoop.yarn.api.records.timelineservice.ContainerEntity;
+import org.apache.hadoop.yarn.api.records.timelineservice.FlowRunEntity;
+import org.apache.hadoop.yarn.api.records.timelineservice.QueueEntity;
+import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntities;
+import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity;
+import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntityType;
+import org.apache.hadoop.yarn.api.records.timelineservice.UserEntity;
+import org.apache.hadoop.yarn.webapp.ForbiddenException;
+import org.apache.hadoop.yarn.webapp.NotFoundException;
+
+import com.google.inject.Singleton;
+
+/**
+ * The main per-node REST end point for timeline service writes. It is
+ * essentially a container service that routes requests to the appropriate
+ * per-app services.
+ */
+@Private
+@Unstable
+@Singleton
+@Path("/ws/v2/timeline")
+public class TimelineCollectorWebService {
+  private static final Log LOG =
+      LogFactory.getLog(TimelineCollectorWebService.class);
+
+  private @Context ServletContext context;
+
+  /**
+   * Gives information about timeline collector.
+   */
+  @XmlRootElement(name = "about")
+  @XmlAccessorType(XmlAccessType.NONE)
+  @Public
+  @Unstable
+  public static class AboutInfo {
+
+    private String about;
+
+    public AboutInfo() {
+
+    }
+
+    public AboutInfo(String abt) {
+      this.about = abt;
+    }
+
+    @XmlElement(name = "About")
+    public String getAbout() {
+      return about;
+    }
+
+    public void setAbout(String abt) {
+      this.about = abt;
+    }
+
+  }
+
+  /**
+   * Return the description of the timeline web services.
+   *
+   * @param req Servlet request.
+   * @param res Servlet response.
+   * @return description of timeline web service.
+   */
+  @GET
+  @Produces({ MediaType.APPLICATION_JSON /* , MediaType.APPLICATION_XML */})
+  public AboutInfo about(
+      @Context HttpServletRequest req,
+      @Context HttpServletResponse res) {
+    init(res);
+    return new AboutInfo("Timeline Collector API");
+  }
+
+  /**
+   * Accepts writes to the collector, and returns a response. It simply routes
+   * the request to the app level collector. It expects an application as a
+   * context.
+   *
+   * @param req Servlet request.
+   * @param res Servlet response.
+   * @param async flag indicating whether its an async put or not. "true"
+   *     indicates, its an async call. If null, its considered false.
+   * @param appId Application Id to which the entities to be put belong to. If
+   *     appId is not there or it cannot be parsed, HTTP 400 will be sent back.
+   * @param entities timeline entities to be put.
+   * @return a Response with appropriate HTTP status.
+   */
+  @PUT
+  @Path("/entities")
+  @Consumes({ MediaType.APPLICATION_JSON /* , MediaType.APPLICATION_XML */})
+  public Response putEntities(
+      @Context HttpServletRequest req,
+      @Context HttpServletResponse res,
+      @QueryParam("async") String async,
+      @QueryParam("appid") String appId,
+      TimelineEntities entities) {
+    init(res);
+    UserGroupInformation callerUgi = getUser(req);
+    if (callerUgi == null) {
+      String msg = "The owner of the posted timeline entities is not set";
+      LOG.error(msg);
+      throw new ForbiddenException(msg);
+    }
+
+    // TODO how to express async posts and handle them
+    boolean isAsync = async != null && async.trim().equalsIgnoreCase("true");
+
+    try {
+      ApplicationId appID = parseApplicationId(appId);
+      if (appID == null) {
+        return Response.status(Response.Status.BAD_REQUEST).build();
+      }
+      NodeTimelineCollectorManager collectorManager =
+          (NodeTimelineCollectorManager) context.getAttribute(
+              NodeTimelineCollectorManager.COLLECTOR_MANAGER_ATTR_KEY);
+      TimelineCollector collector = collectorManager.get(appID);
+      if (collector == null) {
+        LOG.error("Application: "+ appId + " is not found");
+        throw new NotFoundException(); // different exception?
+      }
+
+      collector.putEntities(processTimelineEntities(entities), callerUgi);
+      return Response.ok().build();
+    } catch (Exception e) {
+      LOG.error("Error putting entities", e);
+      throw new WebApplicationException(e,
+          Response.Status.INTERNAL_SERVER_ERROR);
+    }
+  }
+
+  private static ApplicationId parseApplicationId(String appId) {
+    try {
+      if (appId != null) {
+        return ApplicationId.fromString(appId.trim());
+      } else {
+        return null;
+      }
+    } catch (Exception e) {
+      LOG.error("Invalid application ID: " + appId);
+      return null;
+    }
+  }
+
+  private static void init(HttpServletResponse response) {
+    response.setContentType(null);
+  }
+
+  private static UserGroupInformation getUser(HttpServletRequest req) {
+    String remoteUser = req.getRemoteUser();
+    UserGroupInformation callerUgi = null;
+    if (remoteUser != null) {
+      callerUgi = UserGroupInformation.createRemoteUser(remoteUser);
+    }
+    return callerUgi;
+  }
+
+  // The process may not be necessary according to the way we write the 
backend,
+  // but let's keep it for now in case we need to use sub-classes APIs in the
+  // future (e.g., aggregation).
+  private static TimelineEntities processTimelineEntities(
+      TimelineEntities entities) {
+    TimelineEntities entitiesToReturn = new TimelineEntities();
+    for (TimelineEntity entity : entities.getEntities()) {
+      TimelineEntityType type = null;
+      try {
+        type = TimelineEntityType.valueOf(entity.getType());
+      } catch (IllegalArgumentException e) {
+        type = null;
+      }
+      if (type != null) {
+        switch (type) {
+        case YARN_CLUSTER:
+          entitiesToReturn.addEntity(new ClusterEntity(entity));
+          break;
+        case YARN_FLOW_RUN:
+          entitiesToReturn.addEntity(new FlowRunEntity(entity));
+          break;
+        case YARN_APPLICATION:
+          entitiesToReturn.addEntity(new ApplicationEntity(entity));
+          break;
+        case YARN_APPLICATION_ATTEMPT:
+          entitiesToReturn.addEntity(new ApplicationAttemptEntity(entity));
+          break;
+        case YARN_CONTAINER:
+          entitiesToReturn.addEntity(new ContainerEntity(entity));
+          break;
+        case YARN_QUEUE:
+          entitiesToReturn.addEntity(new QueueEntity(entity));
+          break;
+        case YARN_USER:
+          entitiesToReturn.addEntity(new UserEntity(entity));
+          break;
+        default:
+          break;
+        }
+      } else {
+        entitiesToReturn.addEntity(entity);
+      }
+    }
+    return entitiesToReturn;
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/bb0a24ca/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/collector/package-info.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/collector/package-info.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/collector/package-info.java
new file mode 100644
index 0000000..1f7dd23
--- /dev/null
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/collector/package-info.java
@@ -0,0 +1,29 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/**
+ * Package org.apache.hadoop.yarn.server.timelineservice.collector contains
+ * classes which can be used across collector. This package contains classes
+ * which are not related to storage implementations though.
+ */
+@InterfaceAudience.Private
+@InterfaceStability.Unstable
+package org.apache.hadoop.yarn.server.timelineservice.collector;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;

http://git-wip-us.apache.org/repos/asf/hadoop/blob/bb0a24ca/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/package-info.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/package-info.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/package-info.java
new file mode 100644
index 0000000..58e23f0
--- /dev/null
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/package-info.java
@@ -0,0 +1,28 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/**
+ * Package org.apache.hadoop.server.timelineservice contains classes to be used
+ * across timeline reader and collector.
+ */
+@InterfaceAudience.Private
+@InterfaceStability.Unstable
+package org.apache.hadoop.yarn.server.timelineservice;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hadoop/blob/bb0a24ca/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/reader/TimelineDataToRetrieve.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/reader/TimelineDataToRetrieve.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/reader/TimelineDataToRetrieve.java
new file mode 100644
index 0000000..325050a
--- /dev/null
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/reader/TimelineDataToRetrieve.java
@@ -0,0 +1,147 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.timelineservice.reader;
+
+import java.util.EnumSet;
+
+import org.apache.hadoop.classification.InterfaceAudience.Private;
+import org.apache.hadoop.classification.InterfaceStability.Unstable;
+import 
org.apache.hadoop.yarn.server.timelineservice.reader.filter.TimelineFilterList;
+import 
org.apache.hadoop.yarn.server.timelineservice.reader.filter.TimelinePrefixFilter;
+import 
org.apache.hadoop.yarn.server.timelineservice.storage.TimelineReader.Field;
+
+/**
+ * Encapsulates information regarding which data to retrieve for each entity
+ * while querying.<br>
+ * Data to retrieve contains the following :<br>
+ * <ul>
+ * <li><b>confsToRetrieve</b> - Used for deciding which configs to return
+ * in response. This is represented as a {@link TimelineFilterList} object
+ * containing {@link TimelinePrefixFilter} objects. These can either be
+ * exact config keys' or prefixes which are then compared against config
+ * keys' to decide configs(inside entities) to return in response. If null
+ * or empty, all configurations will be fetched if fieldsToRetrieve
+ * contains {@link Field#CONFIGS} or {@link Field#ALL}. This should not be
+ * confused with configFilters which is used to decide which entities to
+ * return instead.</li>
+ * <li><b>metricsToRetrieve</b> - Used for deciding which metrics to return
+ * in response. This is represented as a {@link TimelineFilterList} object
+ * containing {@link TimelinePrefixFilter} objects. These can either be
+ * exact metric ids' or prefixes which are then compared against metric
+ * ids' to decide metrics(inside entities) to return in response. If null
+ * or empty, all metrics will be fetched if fieldsToRetrieve contains
+ * {@link Field#METRICS} or {@link Field#ALL}. This should not be confused
+ * with metricFilters which is used to decide which entities to return
+ * instead.</li>
+ * <li><b>fieldsToRetrieve</b> - Specifies which fields of the entity
+ * object to retrieve, see {@link Field}. If null, retrieves 3 fields,
+ * namely entity id, entity type and entity created time. All fields will
+ * be returned if {@link Field#ALL} is specified.</li>
+ * <li><b>metricsLimit</b> - If fieldsToRetrieve contains METRICS/ALL or
+ * metricsToRetrieve is specified, this limit defines an upper limit to the
+ * number of metrics to return. This parameter is ignored if METRICS are not to
+ * be fetched.</li>
+ * </ul>
+ */
+@Private
+@Unstable
+public class TimelineDataToRetrieve {
+  private TimelineFilterList confsToRetrieve;
+  private TimelineFilterList metricsToRetrieve;
+  private EnumSet<Field> fieldsToRetrieve;
+  private Integer metricsLimit;
+
+  /**
+   * Default limit of number of metrics to return.
+   */
+  public static final Integer DEFAULT_METRICS_LIMIT = 1;
+
+  public TimelineDataToRetrieve() {
+    this(null, null, null, null);
+  }
+
+  public TimelineDataToRetrieve(TimelineFilterList confs,
+      TimelineFilterList metrics, EnumSet<Field> fields,
+      Integer limitForMetrics) {
+    this.confsToRetrieve = confs;
+    this.metricsToRetrieve = metrics;
+    this.fieldsToRetrieve = fields;
+    if (limitForMetrics == null || limitForMetrics < 1) {
+      this.metricsLimit = DEFAULT_METRICS_LIMIT;
+    } else {
+      this.metricsLimit = limitForMetrics;
+    }
+
+    if (this.fieldsToRetrieve == null) {
+      this.fieldsToRetrieve = EnumSet.noneOf(Field.class);
+    }
+  }
+
+  public TimelineFilterList getConfsToRetrieve() {
+    return confsToRetrieve;
+  }
+
+  public void setConfsToRetrieve(TimelineFilterList confs) {
+    this.confsToRetrieve = confs;
+  }
+
+  public TimelineFilterList getMetricsToRetrieve() {
+    return metricsToRetrieve;
+  }
+
+  public void setMetricsToRetrieve(TimelineFilterList metrics) {
+    this.metricsToRetrieve = metrics;
+  }
+
+  public EnumSet<Field> getFieldsToRetrieve() {
+    return fieldsToRetrieve;
+  }
+
+  public void setFieldsToRetrieve(EnumSet<Field> fields) {
+    this.fieldsToRetrieve = fields;
+  }
+
+  /**
+   * Adds configs and metrics fields to fieldsToRetrieve(if they are not
+   * present) if confsToRetrieve and metricsToRetrieve are specified.
+   */
+  public void addFieldsBasedOnConfsAndMetricsToRetrieve() {
+    if (!fieldsToRetrieve.contains(Field.CONFIGS) && confsToRetrieve != null &&
+        !confsToRetrieve.getFilterList().isEmpty()) {
+      fieldsToRetrieve.add(Field.CONFIGS);
+    }
+    if (!fieldsToRetrieve.contains(Field.METRICS) &&
+        metricsToRetrieve != null &&
+        !metricsToRetrieve.getFilterList().isEmpty()) {
+      fieldsToRetrieve.add(Field.METRICS);
+    }
+  }
+
+  public Integer getMetricsLimit() {
+    return metricsLimit;
+  }
+
+  public void setMetricsLimit(Integer limit) {
+    if (limit == null || limit < 1) {
+      this.metricsLimit = DEFAULT_METRICS_LIMIT;
+    } else {
+      this.metricsLimit = limit;
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/bb0a24ca/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/reader/TimelineEntityFilters.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/reader/TimelineEntityFilters.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/reader/TimelineEntityFilters.java
new file mode 100644
index 0000000..8f2b725
--- /dev/null
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/reader/TimelineEntityFilters.java
@@ -0,0 +1,242 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.timelineservice.reader;
+
+import org.apache.hadoop.classification.InterfaceAudience.Private;
+import org.apache.hadoop.classification.InterfaceStability.Unstable;
+import 
org.apache.hadoop.yarn.server.timelineservice.reader.filter.TimelineCompareFilter;
+import 
org.apache.hadoop.yarn.server.timelineservice.reader.filter.TimelineCompareOp;
+import 
org.apache.hadoop.yarn.server.timelineservice.reader.filter.TimelineKeyValueFilter;
+import 
org.apache.hadoop.yarn.server.timelineservice.reader.filter.TimelineExistsFilter;
+import 
org.apache.hadoop.yarn.server.timelineservice.reader.filter.TimelineFilterList;
+import 
org.apache.hadoop.yarn.server.timelineservice.reader.filter.TimelineKeyValuesFilter;
+
+/**
+ * Encapsulates information regarding the filters to apply while querying. 
These
+ * filters restrict the number of entities to return.<br>
+ * Filters contain the following :<br>
+ * <ul>
+ * <li><b>limit</b> - A limit on the number of entities to return. If null or
+ * {@literal < 0}, defaults to {@link #DEFAULT_LIMIT}. The maximum possible
+ * value for limit can be {@link Long#MAX_VALUE}.</li>
+ * <li><b>createdTimeBegin</b> - Matched entities should not be created
+ * before this timestamp. If null or {@literal <=0}, defaults to 0.</li>
+ * <li><b>createdTimeEnd</b> - Matched entities should not be created after
+ * this timestamp. If null or {@literal <=0}, defaults to
+ * {@link Long#MAX_VALUE}.</li>
+ * <li><b>relatesTo</b> - Matched entities should or should not relate to given
+ * entities depending on what's specified in the filter. The entities in
+ * relatesTo are identified by entity type and id. This is represented as
+ * a {@link TimelineFilterList} object containing
+ * {@link TimelineKeyValuesFilter} objects, each of which contains a
+ * set of values for a key and the comparison operator (equals/not equals). The
+ * key which represents the entity type is a string and values are a set of
+ * entity identifiers (also string). As it is a filter list, relatesTo can be
+ * evaluated with logical AND/OR and we can create a hierarchy of these
+ * {@link TimelineKeyValuesFilter} objects. If null or empty, the relations are
+ * not matched.</li>
+ * <li><b>isRelatedTo</b> - Matched entities should or should not be related
+ * to given entities depending on what's specified in the filter. The entities
+ * in isRelatedTo are identified by entity type and id.  This is represented as
+ * a {@link TimelineFilterList} object containing
+ * {@link TimelineKeyValuesFilter} objects, each of which contains a
+ * set of values for a key and the comparison operator (equals/not equals). The
+ * key which represents the entity type is a string and values are a set of
+ * entity identifiers (also string). As it is a filter list, relatesTo can be
+ * evaluated with logical AND/OR and we can create a hierarchy of these
+ * {@link TimelineKeyValuesFilter} objects. If null or empty, the relations are
+ * not matched.</li>
+ * <li><b>infoFilters</b> - Matched entities should have exact matches to
+ * the given info and should be either equal or not equal to given value
+ * depending on what's specified in the filter. This is represented as a
+ * {@link TimelineFilterList} object containing {@link TimelineKeyValueFilter}
+ * objects, each of which contains key-value pairs with a comparison operator
+ * (equals/not equals). The key which represents the info key is a string but
+ * value can be any object. As it is a filter list, info filters can be
+ * evaluated with logical AND/OR and we can create a hierarchy of these
+ * key-value pairs. If null or empty, the filter is not applied.</li>
+ * <li><b>configFilters</b> - Matched entities should have exact matches to
+ * the given configurations and should be either equal or not equal to given
+ * value depending on what's specified in the filter. This is represented as a
+ * {@link TimelineFilterList} object containing {@link TimelineKeyValueFilter}
+ * objects, each of which contains key-value pairs with a comparison operator
+ * (equals/not equals). Both key (which represents config name) and value 
(which
+ * is config value) are strings. As it is a filter list, config filters can be
+ * evaluated with logical AND/OR and we can create a hierarchy of these
+ * {@link TimelineKeyValueFilter} objects. If null or empty, the filter is not
+ * applied.</li>
+ * <li><b>metricFilters</b> - Matched entities should contain the given
+ * metrics and satisfy the specified relation with the value. This is
+ * represented as a {@link TimelineFilterList} object containing
+ * {@link TimelineCompareFilter} objects, each of which contains key-value 
pairs
+ * along with the specified relational/comparison operator represented by
+ * {@link TimelineCompareOp}.  The key is a string and value is integer
+ * (Short/Integer/Long). As it is a filter list, metric filters can be 
evaluated
+ * with logical AND/OR and we can create a hierarchy of these
+ * {@link TimelineCompareFilter} objects. If null or empty, the filter is not
+ * applied.</li>
+ * <li><b>eventFilters</b> - Matched entities should contain or not contain the
+ * given events. This is represented as a {@link TimelineFilterList} object
+ * containing {@link TimelineExistsFilter} objects, each of which contains a
+ * value which must or must not exist depending on comparison operator 
specified
+ * in the filter. For event filters, the value represents a event id. As it is 
a
+ * filter list, event filters can be evaluated with logical AND/OR and we can
+ * create a hierarchy of these {@link TimelineExistsFilter} objects. If null or
+ * empty, the filter is not applied.</li>
+ * </ul>
+ */
+@Private
+@Unstable
+public class TimelineEntityFilters {
+  private long limit;
+  private long createdTimeBegin;
+  private long createdTimeEnd;
+  private TimelineFilterList relatesTo;
+  private TimelineFilterList isRelatedTo;
+  private TimelineFilterList infoFilters;
+  private TimelineFilterList configFilters;
+  private TimelineFilterList metricFilters;
+  private TimelineFilterList eventFilters;
+  private static final long DEFAULT_BEGIN_TIME = 0L;
+  private static final long DEFAULT_END_TIME = Long.MAX_VALUE;
+
+  /**
+   * Default limit of number of entities to return for getEntities API.
+   */
+  public static final long DEFAULT_LIMIT = 100;
+
+  public TimelineEntityFilters() {
+    this(null, null, null, null, null, null, null, null, null);
+  }
+
+  public TimelineEntityFilters(
+      Long entityLimit, Long timeBegin, Long timeEnd,
+      TimelineFilterList entityRelatesTo,
+      TimelineFilterList entityIsRelatedTo,
+      TimelineFilterList entityInfoFilters,
+      TimelineFilterList entityConfigFilters,
+      TimelineFilterList  entityMetricFilters,
+      TimelineFilterList entityEventFilters) {
+    if (entityLimit == null || entityLimit < 0) {
+      this.limit = DEFAULT_LIMIT;
+    } else {
+      this.limit = entityLimit;
+    }
+    if (timeBegin == null || timeBegin < 0) {
+      this.createdTimeBegin = DEFAULT_BEGIN_TIME;
+    } else {
+      this.createdTimeBegin = timeBegin;
+    }
+    if (timeEnd == null || timeEnd < 0) {
+      this.createdTimeEnd = DEFAULT_END_TIME;
+    } else {
+      this.createdTimeEnd = timeEnd;
+    }
+    this.relatesTo = entityRelatesTo;
+    this.isRelatedTo = entityIsRelatedTo;
+    this.infoFilters = entityInfoFilters;
+    this.configFilters = entityConfigFilters;
+    this.metricFilters = entityMetricFilters;
+    this.eventFilters = entityEventFilters;
+  }
+
+  public long getLimit() {
+    return limit;
+  }
+
+  public void setLimit(Long entityLimit) {
+    if (entityLimit == null || entityLimit < 0) {
+      this.limit = DEFAULT_LIMIT;
+    } else {
+      this.limit = entityLimit;
+    }
+  }
+
+  public long getCreatedTimeBegin() {
+    return createdTimeBegin;
+  }
+
+  public void setCreatedTimeBegin(Long timeBegin) {
+    if (timeBegin == null || timeBegin < 0) {
+      this.createdTimeBegin = DEFAULT_BEGIN_TIME;
+    } else {
+      this.createdTimeBegin = timeBegin;
+    }
+  }
+
+  public long getCreatedTimeEnd() {
+    return createdTimeEnd;
+  }
+
+  public void setCreatedTimeEnd(Long timeEnd) {
+    if (timeEnd == null || timeEnd < 0) {
+      this.createdTimeEnd = DEFAULT_END_TIME;
+    } else {
+      this.createdTimeEnd = timeEnd;
+    }
+  }
+
+  public TimelineFilterList getRelatesTo() {
+    return relatesTo;
+  }
+
+  public void setRelatesTo(TimelineFilterList relations) {
+    this.relatesTo = relations;
+  }
+
+  public TimelineFilterList getIsRelatedTo() {
+    return isRelatedTo;
+  }
+
+  public void setIsRelatedTo(TimelineFilterList relations) {
+    this.isRelatedTo = relations;
+  }
+
+  public TimelineFilterList getInfoFilters() {
+    return infoFilters;
+  }
+
+  public void setInfoFilters(TimelineFilterList filters) {
+    this.infoFilters = filters;
+  }
+
+  public TimelineFilterList getConfigFilters() {
+    return configFilters;
+  }
+
+  public void setConfigFilters(TimelineFilterList filters) {
+    this.configFilters = filters;
+  }
+
+  public TimelineFilterList getMetricFilters() {
+    return metricFilters;
+  }
+
+  public void setMetricFilters(TimelineFilterList filters) {
+    this.metricFilters = filters;
+  }
+
+  public TimelineFilterList getEventFilters() {
+    return eventFilters;
+  }
+
+  public void setEventFilters(TimelineFilterList filters) {
+    this.eventFilters = filters;
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/bb0a24ca/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/reader/TimelineParseConstants.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/reader/TimelineParseConstants.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/reader/TimelineParseConstants.java
new file mode 100644
index 0000000..662a102
--- /dev/null
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/reader/TimelineParseConstants.java
@@ -0,0 +1,34 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.timelineservice.reader;
+
+/**
+ * Set of constants used while parsing filter expressions.
+ */
+final class TimelineParseConstants {
+  private TimelineParseConstants() {
+  }
+  static final String COMMA_DELIMITER = ",";
+  static final String COLON_DELIMITER = ":";
+  static final char NOT_CHAR = '!';
+  static final char SPACE_CHAR = ' ';
+  static final char OPENING_BRACKET_CHAR = '(';
+  static final char CLOSING_BRACKET_CHAR = ')';
+  static final char COMMA_CHAR = ',';
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hadoop/blob/bb0a24ca/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/reader/TimelineParseException.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/reader/TimelineParseException.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/reader/TimelineParseException.java
new file mode 100644
index 0000000..8d4a5dc
--- /dev/null
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/reader/TimelineParseException.java
@@ -0,0 +1,36 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.timelineservice.reader;
+
+/**
+ * Exception thrown to indicate that a timeline filter expression cannot be
+ * parsed.
+ */
+class TimelineParseException extends Exception {
+
+  private static final long serialVersionUID = 1L;
+
+  public TimelineParseException() {
+    super();
+  }
+
+  public TimelineParseException(String message) {
+    super(message);
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/bb0a24ca/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/reader/TimelineParser.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/reader/TimelineParser.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/reader/TimelineParser.java
new file mode 100644
index 0000000..6b461a0
--- /dev/null
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/reader/TimelineParser.java
@@ -0,0 +1,37 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.timelineservice.reader;
+
+import java.io.Closeable;
+
+import org.apache.hadoop.classification.InterfaceAudience.Private;
+import org.apache.hadoop.classification.InterfaceStability.Unstable;
+import 
org.apache.hadoop.yarn.server.timelineservice.reader.filter.TimelineFilterList;
+
+@Private
+@Unstable
+interface TimelineParser extends Closeable {
+  /**
+   * Method used for parsing.
+   *
+   * @return a {@link TimelineFilterList} object.
+   * @throws TimelineParseException if any problem occurs while parsing.
+   */
+  TimelineFilterList parse() throws TimelineParseException;
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/bb0a24ca/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/reader/TimelineParserForCompareExpr.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/reader/TimelineParserForCompareExpr.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/reader/TimelineParserForCompareExpr.java
new file mode 100644
index 0000000..1b020d9
--- /dev/null
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/reader/TimelineParserForCompareExpr.java
@@ -0,0 +1,300 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.timelineservice.reader;
+
+import java.util.Deque;
+import java.util.LinkedList;
+
+import org.apache.hadoop.classification.InterfaceAudience.Private;
+import org.apache.hadoop.classification.InterfaceStability.Unstable;
+import 
org.apache.hadoop.yarn.server.timelineservice.reader.filter.TimelineCompareFilter;
+import 
org.apache.hadoop.yarn.server.timelineservice.reader.filter.TimelineCompareOp;
+import 
org.apache.hadoop.yarn.server.timelineservice.reader.filter.TimelineFilter;
+import 
org.apache.hadoop.yarn.server.timelineservice.reader.filter.TimelineFilterList;
+import 
org.apache.hadoop.yarn.server.timelineservice.reader.filter.TimelineFilterList.Operator;
+
+/**
+ * Abstract class for parsing compare expressions.
+ * Compare expressions are of the form :
+ * (&lt;key&gt; &lt;compareop&gt; &lt;value&gt;) &lt;op&gt; (&lt;key
+ * &gt; &lt;compareop&gt; &lt;value&gt;)
+ * compareop is used to compare value of a the specified key in the backend
+ * storage. compareop can be :
+ * 1. eq - Equals
+ * 2. ne - Not equals (matches if key does not exist)
+ * 3. ene - Exists and not equals (key must exist for match to occur)
+ * 4. lt - Less than
+ * 5. gt - Greater than
+ * 6. le - Less than or equals
+ * 7. ge - Greater than or equals
+ * compareop's supported would depend on implementation. For instance, all
+ * the above compareops' will be supported for metric filters but only eq,ne 
and
+ * ene would be supported for KV filters like config/info filters.
+ *
+ * op is a logical operator and can be either AND or OR.
+ *
+ * The way values will be interpreted would also depend on implementation
+ *
+ * A typical compare expression would look as under:
+ * ((key1 eq val1 OR key2 ne val2) AND (key5 gt val45))
+ */
+@Private
+@Unstable
+abstract class TimelineParserForCompareExpr implements TimelineParser {
+  private enum ParseState {
+    PARSING_KEY,
+    PARSING_VALUE,
+    PARSING_OP,
+    PARSING_COMPAREOP
+  }
+  // Main expression.
+  private final String expr;
+  // Expression in lower case.
+  private final String exprInLowerCase;
+  private final String exprName;
+  private int offset = 0;
+  private int kvStartOffset = 0;
+  private final int exprLength;
+  private ParseState currentParseState = ParseState.PARSING_KEY;
+  // Linked list implemented as a stack.
+  private Deque<TimelineFilterList> filterListStack = new LinkedList<>();
+  private TimelineFilter currentFilter = null;
+  private TimelineFilterList filterList = null;
+  public TimelineParserForCompareExpr(String expression, String name) {
+    if (expression != null) {
+      expr = expression.trim();
+      exprLength = expr.length();
+      exprInLowerCase = expr.toLowerCase();
+    } else {
+      expr = null;
+      exprInLowerCase = null;
+      exprLength = 0;
+    }
+    this.exprName = name;
+  }
+
+  protected TimelineFilter getCurrentFilter() {
+    return currentFilter;
+  }
+
+  protected TimelineFilter getFilterList() {
+    return filterList;
+  }
+
+  protected abstract TimelineFilter createFilter();
+
+  protected abstract Object parseValue(String strValue)
+      throws TimelineParseException;
+
+  protected abstract void setCompareOpToCurrentFilter(
+      TimelineCompareOp compareOp, boolean keyMustExistFlag)
+      throws TimelineParseException;
+
+  protected abstract void setValueToCurrentFilter(Object value);
+
+  private void handleSpaceChar() throws TimelineParseException {
+    if (currentParseState == ParseState.PARSING_KEY ||
+        currentParseState == ParseState.PARSING_VALUE) {
+      if (kvStartOffset == offset) {
+        kvStartOffset++;
+        offset++;
+        return;
+      }
+      String str = expr.substring(kvStartOffset, offset);
+      if (currentParseState == ParseState.PARSING_KEY) {
+        if (currentFilter == null) {
+          currentFilter = createFilter();
+        }
+        ((TimelineCompareFilter)currentFilter).setKey(str);
+        currentParseState = ParseState.PARSING_COMPAREOP;
+      } else if (currentParseState == ParseState.PARSING_VALUE) {
+        if (currentFilter != null) {
+          setValueToCurrentFilter(parseValue(str));
+        }
+        currentParseState = ParseState.PARSING_OP;
+      }
+    }
+    offset++;
+  }
+
+  private void handleOpeningBracketChar() throws TimelineParseException {
+    if (currentParseState != ParseState.PARSING_KEY) {
+      throw new TimelineParseException("Encountered unexpected opening " +
+          "bracket while parsing " + exprName + ".");
+    }
+    offset++;
+    kvStartOffset = offset;
+    filterListStack.push(filterList);
+    filterList = null;
+  }
+
+  private void handleClosingBracketChar() throws TimelineParseException {
+    if (currentParseState != ParseState.PARSING_VALUE &&
+        currentParseState != ParseState.PARSING_OP) {
+      throw new TimelineParseException("Encountered unexpected closing " +
+          "bracket while parsing " + exprName + ".");
+    }
+    if (!filterListStack.isEmpty()) {
+      if (currentParseState == ParseState.PARSING_VALUE) {
+        setValueToCurrentFilter(
+            parseValue(expr.substring(kvStartOffset, offset)));
+        currentParseState = ParseState.PARSING_OP;
+      }
+      if (currentFilter != null) {
+        filterList.addFilter(currentFilter);
+      }
+      // As bracket is closing, pop the filter list from top of the stack and
+      // combine it with current filter list.
+      TimelineFilterList fList = filterListStack.pop();
+      if (fList != null) {
+        fList.addFilter(filterList);
+        filterList = fList;
+      }
+      currentFilter = null;
+      offset++;
+      kvStartOffset = offset;
+    } else {
+      throw new TimelineParseException("Encountered unexpected closing " +
+          "bracket while parsing " + exprName + ".");
+    }
+  }
+
+  private void parseCompareOp() throws TimelineParseException {
+    if (offset + 2 >= exprLength) {
+      throw new TimelineParseException("Compare op cannot be parsed for " +
+          exprName + ".");
+    }
+    TimelineCompareOp compareOp = null;
+    boolean keyExistFlag = true;
+    if (expr.charAt(offset + 2) == TimelineParseConstants.SPACE_CHAR) {
+      if (exprInLowerCase.startsWith("eq", offset)) {
+        compareOp = TimelineCompareOp.EQUAL;
+      } else if (exprInLowerCase.startsWith("ne", offset)) {
+        compareOp = TimelineCompareOp.NOT_EQUAL;
+        keyExistFlag = false;
+      } else if (exprInLowerCase.startsWith("lt", offset)) {
+        compareOp = TimelineCompareOp.LESS_THAN;
+      } else if (exprInLowerCase.startsWith("le", offset)) {
+        compareOp = TimelineCompareOp.LESS_OR_EQUAL;
+      } else if (exprInLowerCase.startsWith("gt", offset)) {
+        compareOp = TimelineCompareOp.GREATER_THAN;
+      } else if (exprInLowerCase.startsWith("ge", offset)) {
+        compareOp = TimelineCompareOp.GREATER_OR_EQUAL;
+      }
+      offset = offset + 3;
+    } else if (exprInLowerCase.startsWith("ene ", offset)) {
+      // Not equal but key should be present.
+      compareOp = TimelineCompareOp.NOT_EQUAL;
+      offset = offset + 4;
+    }
+    if (compareOp == null) {
+      throw new TimelineParseException("Compare op cannot be parsed for " +
+          exprName + ".");
+    }
+    setCompareOpToCurrentFilter(compareOp, keyExistFlag);
+    kvStartOffset = offset;
+    currentParseState = ParseState.PARSING_VALUE;
+  }
+
+  private void parseOp(boolean closingBracket) throws TimelineParseException {
+    Operator operator = null;
+    if (exprInLowerCase.startsWith("or ", offset)) {
+      operator = Operator.OR;
+      offset = offset + 3;
+    } else if (exprInLowerCase.startsWith("and ", offset)) {
+      operator = Operator.AND;
+      offset = offset + 4;
+    }
+    if (operator == null) {
+      throw new TimelineParseException("Operator cannot be parsed for " +
+          exprName + ".");
+    }
+    if (filterList == null) {
+      filterList = new TimelineFilterList(operator);
+    }
+    if (currentFilter != null) {
+      filterList.addFilter(currentFilter);
+    }
+    if (closingBracket || filterList.getOperator() != operator) {
+      filterList = new TimelineFilterList(operator, filterList);
+    }
+    currentFilter = null;
+    kvStartOffset = offset;
+    currentParseState = ParseState.PARSING_KEY;
+  }
+
+  @Override
+  public TimelineFilterList parse() throws TimelineParseException {
+    if (expr == null || exprLength == 0) {
+      return null;
+    }
+    boolean closingBracket = false;
+    while (offset < exprLength) {
+      char offsetChar = expr.charAt(offset);
+      switch(offsetChar) {
+      case TimelineParseConstants.SPACE_CHAR:
+        handleSpaceChar();
+        break;
+      case TimelineParseConstants.OPENING_BRACKET_CHAR:
+        handleOpeningBracketChar();
+        break;
+      case TimelineParseConstants.CLOSING_BRACKET_CHAR:
+        handleClosingBracketChar();
+        closingBracket = true;
+        break;
+      default: // other characters.
+        // Parse based on state.
+        if (currentParseState == ParseState.PARSING_COMPAREOP) {
+          parseCompareOp();
+        } else if (currentParseState == ParseState.PARSING_OP) {
+          parseOp(closingBracket);
+          closingBracket = false;
+        } else {
+          // Might be a key or value. Move ahead.
+          offset++;
+        }
+        break;
+      }
+    }
+    if (!filterListStack.isEmpty()) {
+      filterListStack.clear();
+      throw new TimelineParseException("Encountered improper brackets while " +
+          "parsing " + exprName + ".");
+    }
+    if (currentParseState == ParseState.PARSING_VALUE) {
+      setValueToCurrentFilter(
+          parseValue(expr.substring(kvStartOffset, offset)));
+    }
+    if (filterList == null || filterList.getFilterList().isEmpty()) {
+      filterList = new TimelineFilterList(currentFilter);
+    } else if (currentFilter != null) {
+      filterList.addFilter(currentFilter);
+    }
+    return filterList;
+  }
+
+  @Override
+  public void close() {
+    if (filterListStack != null) {
+      filterListStack.clear();
+    }
+    filterList = null;
+    currentFilter = null;
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hadoop/blob/bb0a24ca/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/reader/TimelineParserForDataToRetrieve.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/reader/TimelineParserForDataToRetrieve.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/reader/TimelineParserForDataToRetrieve.java
new file mode 100644
index 0000000..1e6039d
--- /dev/null
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/reader/TimelineParserForDataToRetrieve.java
@@ -0,0 +1,95 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.timelineservice.reader;
+
+import org.apache.hadoop.classification.InterfaceAudience.Private;
+import org.apache.hadoop.classification.InterfaceStability.Unstable;
+import 
org.apache.hadoop.yarn.server.timelineservice.reader.filter.TimelineCompareOp;
+import 
org.apache.hadoop.yarn.server.timelineservice.reader.filter.TimelineFilterList;
+import 
org.apache.hadoop.yarn.server.timelineservice.reader.filter.TimelinePrefixFilter;
+import 
org.apache.hadoop.yarn.server.timelineservice.reader.filter.TimelineFilterList.Operator;
+
+/**
+ * Used for parsing metrics or configs to retrieve.
+ */
+@Private
+@Unstable
+public class TimelineParserForDataToRetrieve implements TimelineParser {
+  private String expr;
+  private final int exprLength;
+  public TimelineParserForDataToRetrieve(String expression) {
+    this.expr = expression;
+    if (expression != null) {
+      this.expr = expr.trim();
+      exprLength = expr.length();
+    } else {
+      exprLength = 0;
+    }
+  }
+
+  @Override
+  public TimelineFilterList parse() throws TimelineParseException {
+    if (expr == null || exprLength == 0) {
+      return null;
+    }
+    TimelineCompareOp compareOp = null;
+    int openingBracketIndex =
+        expr.indexOf(TimelineParseConstants.OPENING_BRACKET_CHAR);
+    if (expr.charAt(0) == TimelineParseConstants.NOT_CHAR) {
+      if (openingBracketIndex == -1) {
+        throw new TimelineParseException("Invalid config/metric to retrieve " +
+            "expression");
+      }
+      if (openingBracketIndex != 1 &&
+          expr.substring(1, openingBracketIndex + 1).trim().length() != 1) {
+        throw new TimelineParseException("Invalid config/metric to retrieve " +
+            "expression");
+      }
+      compareOp = TimelineCompareOp.NOT_EQUAL;
+    } else if (openingBracketIndex <= 0) {
+      compareOp = TimelineCompareOp.EQUAL;
+    }
+    char lastChar = expr.charAt(exprLength - 1);
+    if (compareOp == TimelineCompareOp.NOT_EQUAL &&
+        lastChar != TimelineParseConstants.CLOSING_BRACKET_CHAR) {
+      throw new TimelineParseException("Invalid config/metric to retrieve " +
+          "expression");
+    }
+    if (openingBracketIndex != -1 &&
+        expr.charAt(exprLength - 1) ==
+            TimelineParseConstants.CLOSING_BRACKET_CHAR) {
+      expr = expr.substring(openingBracketIndex + 1, exprLength - 1).trim();
+    }
+    if (expr.isEmpty()) {
+      return null;
+    }
+    Operator op =
+        (compareOp == TimelineCompareOp.NOT_EQUAL) ? Operator.AND : 
Operator.OR;
+    TimelineFilterList list = new TimelineFilterList(op);
+    String[] splits = expr.split(TimelineParseConstants.COMMA_DELIMITER);
+    for (String split : splits) {
+      list.addFilter(new TimelinePrefixFilter(compareOp, split.trim()));
+    }
+    return list;
+  }
+
+  @Override
+  public void close() {
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/bb0a24ca/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/reader/TimelineParserForEqualityExpr.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/reader/TimelineParserForEqualityExpr.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/reader/TimelineParserForEqualityExpr.java
new file mode 100644
index 0000000..7451713
--- /dev/null
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/reader/TimelineParserForEqualityExpr.java
@@ -0,0 +1,343 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.timelineservice.reader;
+
+import java.util.Deque;
+import java.util.LinkedList;
+
+import org.apache.hadoop.classification.InterfaceAudience.Private;
+import org.apache.hadoop.classification.InterfaceStability.Unstable;
+import 
org.apache.hadoop.yarn.server.timelineservice.reader.filter.TimelineCompareOp;
+import 
org.apache.hadoop.yarn.server.timelineservice.reader.filter.TimelineFilter;
+import 
org.apache.hadoop.yarn.server.timelineservice.reader.filter.TimelineFilterList;
+import 
org.apache.hadoop.yarn.server.timelineservice.reader.filter.TimelineFilterList.Operator;
+
+/**
+ * Abstract class for parsing equality expressions. This means the values in
+ * expression would either be equal or not equal.
+ * Equality expressions are of the form :
+ * (&lt;value&gt;,&lt;value&gt;,&lt;value&gt;) &lt;op&gt; !(&lt;value&gt;,
+ * &lt;value&gt;)
+ *
+ * Here, "!" means all the values should not exist/should not be equal.
+ * If not specified, they should exist/be equal.
+ *
+ * op is a logical operator and can be either AND or OR.
+ *
+ * The way values will be interpreted would also depend on implementation.
+ *
+ * For instance for event filters this expression may look like,
+ * (event1,event2) AND !(event3,event4)
+ * This means for an entity to match, event1 and event2 should exist. But 
event3
+ * and event4 should not exist.
+ */
+@Private
+@Unstable
+abstract class TimelineParserForEqualityExpr implements TimelineParser {
+  private enum ParseState {
+    PARSING_VALUE,
+    PARSING_OP,
+    PARSING_COMPAREOP
+  }
+  private final String expr;
+  // Expression in lower case.
+  private final String exprInLowerCase;
+  // Expression name.
+  private final String exprName;
+  // Expression offset.
+  private int offset = 0;
+  // Offset used to parse values in the expression.
+  private int startOffset = 0;
+  private final int exprLength;
+  private ParseState currentParseState = ParseState.PARSING_COMPAREOP;
+  private TimelineCompareOp currentCompareOp = null;
+  // Used to store filter lists which can then be combined as brackets are
+  // closed.
+  private Deque<TimelineFilterList> filterListStack = new LinkedList<>();
+  private TimelineFilter currentFilter = null;
+  private TimelineFilterList filterList = null;
+  // Delimiter used to separate values.
+  private final char delimiter;
+  public TimelineParserForEqualityExpr(String expression, String name,
+      char delim) {
+    if (expression != null) {
+      expr = expression.trim();
+      exprLength = expr.length();
+      exprInLowerCase = expr.toLowerCase();
+    } else {
+      exprLength = 0;
+      expr = null;
+      exprInLowerCase = null;
+    }
+    exprName = name;
+    delimiter = delim;
+  }
+
+  protected TimelineFilter getCurrentFilter() {
+    return currentFilter;
+  }
+
+  protected TimelineFilter getFilterList() {
+    return filterList;
+  }
+
+  /**
+   * Creates filter as per implementation.
+   *
+   * @return a {@link TimelineFilter} implementation.
+   */
+  protected abstract TimelineFilter createFilter();
+
+  /**
+   * Sets compare op to the current filter as per filter implementation.
+   *
+   * @param compareOp compare op to be set.
+   * @throws Exception if any problem occurs.
+   */
+  protected abstract void setCompareOpToCurrentFilter(
+      TimelineCompareOp compareOp) throws TimelineParseException;
+
+  /**
+   * Sets value to the current filter as per filter implementation.
+   *
+   * @param value value to be set.
+   * @throws Exception if any problem occurs.
+   */
+  protected abstract void setValueToCurrentFilter(String value)
+      throws TimelineParseException;
+
+  private void createAndSetFilter(boolean checkIfNull)
+      throws TimelineParseException {
+    if (!checkIfNull || currentFilter == null) {
+      currentFilter = createFilter();
+      setCompareOpToCurrentFilter(currentCompareOp);
+    }
+    setValueToCurrentFilter(expr.substring(startOffset, offset).trim());
+  }
+
+  private void handleSpaceChar() throws TimelineParseException {
+    if (currentParseState == ParseState.PARSING_VALUE) {
+      if (startOffset == offset) {
+        startOffset++;
+      } else {
+        createAndSetFilter(true);
+        currentParseState = ParseState.PARSING_OP;
+      }
+    }
+    offset++;
+  }
+
+  private void handleDelimiter() throws TimelineParseException {
+    if (currentParseState == ParseState.PARSING_OP ||
+        currentParseState == ParseState.PARSING_VALUE) {
+      if (currentParseState == ParseState.PARSING_VALUE) {
+        createAndSetFilter(false);
+      }
+      if (filterList == null) {
+        filterList = new TimelineFilterList();
+      }
+      // Add parsed filter into filterlist and make it null to move on to next
+      // filter.
+      filterList.addFilter(currentFilter);
+      currentFilter = null;
+      offset++;
+      startOffset = offset;
+      currentParseState = ParseState.PARSING_VALUE;
+    } else {
+      throw new TimelineParseException("Invalid " + exprName + "expression.");
+    }
+  }
+
+  private void handleOpeningBracketChar(boolean encounteredNot)
+      throws TimelineParseException {
+    if (currentParseState == ParseState.PARSING_COMPAREOP ||
+        currentParseState == ParseState.PARSING_VALUE) {
+      offset++;
+      startOffset = offset;
+      filterListStack.push(filterList);
+      filterList = null;
+      if (currentFilter == null) {
+        currentFilter = createFilter();
+      }
+      currentCompareOp = encounteredNot ?
+          TimelineCompareOp.NOT_EQUAL : TimelineCompareOp.EQUAL;
+      setCompareOpToCurrentFilter(currentCompareOp);
+      currentParseState = ParseState.PARSING_VALUE;
+    } else {
+      throw new TimelineParseException("Encountered unexpected opening " +
+          "bracket while parsing " + exprName + ".");
+    }
+  }
+
+  private void handleNotChar() throws TimelineParseException {
+    if (currentParseState == ParseState.PARSING_COMPAREOP ||
+        currentParseState == ParseState.PARSING_VALUE) {
+      offset++;
+      while (offset < exprLength &&
+          expr.charAt(offset) == TimelineParseConstants.SPACE_CHAR) {
+        offset++;
+      }
+      if (offset == exprLength) {
+        throw new TimelineParseException("Invalid " + exprName + "expression");
+      }
+      if (expr.charAt(offset) == TimelineParseConstants.OPENING_BRACKET_CHAR) {
+        handleOpeningBracketChar(true);
+      } else {
+        throw new TimelineParseException("Invalid " + exprName + "expression");
+      }
+    } else {
+      throw new TimelineParseException("Encountered unexpected not(!) char " +
+         "while parsing " + exprName + ".");
+    }
+  }
+
+  private void handleClosingBracketChar() throws TimelineParseException {
+    if (currentParseState != ParseState.PARSING_VALUE &&
+        currentParseState != ParseState.PARSING_OP) {
+      throw new TimelineParseException("Encountered unexpected closing " +
+          "bracket while parsing " + exprName + ".");
+    }
+    if (!filterListStack.isEmpty()) {
+      if (currentParseState == ParseState.PARSING_VALUE) {
+        if (startOffset != offset) {
+          createAndSetFilter(true);
+          currentParseState = ParseState.PARSING_OP;
+        }
+      }
+      if (filterList == null) {
+        filterList = new TimelineFilterList();
+      }
+      if (currentFilter != null) {
+        filterList.addFilter(currentFilter);
+      }
+      // As bracket is closing, pop the filter list from top of the stack and
+      // combine it with current filter list.
+      TimelineFilterList fList = filterListStack.pop();
+      if (fList != null) {
+        fList.addFilter(filterList);
+        filterList = fList;
+      }
+      currentFilter = null;
+      offset++;
+      startOffset = offset;
+    } else {
+      throw new TimelineParseException("Encountered unexpected closing " +
+          "bracket while parsing " + exprName + ".");
+    }
+  }
+
+  private void parseOp(boolean closingBracket) throws TimelineParseException {
+    Operator operator = null;
+    if (exprInLowerCase.startsWith("or ", offset)) {
+      operator = Operator.OR;
+      offset = offset + 3;
+    } else if (exprInLowerCase.startsWith("and ", offset)) {
+      operator = Operator.AND;
+      offset = offset + 4;
+    }
+    if (operator == null) {
+      throw new TimelineParseException("Operator cannot be parsed for " +
+          exprName + ".");
+    }
+    if (filterList == null) {
+      filterList = new TimelineFilterList(operator);
+    }
+    if (currentFilter != null) {
+      filterList.addFilter(currentFilter);
+    }
+    if (closingBracket || filterList.getOperator() != operator) {
+      filterList = new TimelineFilterList(operator, filterList);
+    }
+    currentFilter = null;
+    startOffset = offset;
+    currentParseState = ParseState.PARSING_COMPAREOP;
+  }
+
+  private void parseCompareOp() throws TimelineParseException {
+    if (currentFilter == null) {
+      currentFilter = createFilter();
+    }
+    currentCompareOp = TimelineCompareOp.EQUAL;
+    setCompareOpToCurrentFilter(currentCompareOp);
+    currentParseState = ParseState.PARSING_VALUE;
+  }
+
+  @Override
+  public TimelineFilterList parse() throws TimelineParseException {
+    if (expr == null || exprLength == 0) {
+      return null;
+    }
+    boolean closingBracket = false;
+    while (offset < exprLength) {
+      char offsetChar = expr.charAt(offset);
+      switch(offsetChar) {
+      case TimelineParseConstants.NOT_CHAR:
+        handleNotChar();
+        break;
+      case TimelineParseConstants.SPACE_CHAR:
+        handleSpaceChar();
+        break;
+      case TimelineParseConstants.OPENING_BRACKET_CHAR:
+        handleOpeningBracketChar(false);
+        break;
+      case TimelineParseConstants.CLOSING_BRACKET_CHAR:
+        handleClosingBracketChar();
+        closingBracket = true;
+        break;
+      default: // other characters.
+        if (offsetChar == delimiter) {
+          handleDelimiter();
+        } else if (currentParseState == ParseState.PARSING_COMPAREOP) {
+          parseCompareOp();
+        } else if (currentParseState == ParseState.PARSING_OP) {
+          parseOp(closingBracket);
+          closingBracket = false;
+        } else {
+          offset++;
+        }
+        break;
+      }
+    }
+    if (!filterListStack.isEmpty()) {
+      filterListStack.clear();
+      throw new TimelineParseException("Encountered improper brackets while " +
+          "parsing " + exprName + ".");
+    }
+    if (currentParseState == ParseState.PARSING_VALUE) {
+      if (startOffset != offset) {
+        createAndSetFilter(true);
+      }
+    }
+    if (filterList == null || filterList.getFilterList().isEmpty()) {
+      filterList = new TimelineFilterList(currentFilter);
+    } else if (currentFilter != null) {
+      filterList.addFilter(currentFilter);
+    }
+    return filterList;
+  }
+
+  @Override
+  public void close() {
+    if (filterListStack != null) {
+      filterListStack.clear();
+    }
+    currentFilter = null;
+    filterList = null;
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hadoop/blob/bb0a24ca/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/reader/TimelineParserForExistFilters.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/reader/TimelineParserForExistFilters.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/reader/TimelineParserForExistFilters.java
new file mode 100644
index 0000000..8048c6e
--- /dev/null
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/reader/TimelineParserForExistFilters.java
@@ -0,0 +1,51 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.timelineservice.reader;
+
+import org.apache.hadoop.classification.InterfaceAudience.Private;
+import org.apache.hadoop.classification.InterfaceStability.Unstable;
+import 
org.apache.hadoop.yarn.server.timelineservice.reader.filter.TimelineCompareOp;
+import 
org.apache.hadoop.yarn.server.timelineservice.reader.filter.TimelineExistsFilter;
+import 
org.apache.hadoop.yarn.server.timelineservice.reader.filter.TimelineFilter;
+
+/**
+ * Used for parsing existence filters such as event filters. These filters
+ * check for existence of a value. For example, in case of event filters, they
+ * check if an event exists or not and accordingly return an entity.
+ */
+@Private
+@Unstable
+class TimelineParserForExistFilters extends TimelineParserForEqualityExpr {
+
+  public TimelineParserForExistFilters(String expression, char delimiter) {
+    super(expression, "Event Filter", delimiter);
+  }
+
+  protected TimelineFilter createFilter() {
+    return new TimelineExistsFilter();
+  }
+
+  protected void setValueToCurrentFilter(String value) {
+    ((TimelineExistsFilter)getCurrentFilter()).setValue(value);
+  }
+
+  protected void setCompareOpToCurrentFilter(TimelineCompareOp compareOp) {
+    ((TimelineExistsFilter)getCurrentFilter()).setCompareOp(compareOp);
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/bb0a24ca/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/reader/TimelineParserForKVFilters.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/reader/TimelineParserForKVFilters.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/reader/TimelineParserForKVFilters.java
new file mode 100644
index 0000000..ec68bec
--- /dev/null
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/reader/TimelineParserForKVFilters.java
@@ -0,0 +1,78 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.timelineservice.reader;
+
+import java.io.IOException;
+
+import org.apache.hadoop.classification.InterfaceAudience.Private;
+import org.apache.hadoop.classification.InterfaceStability.Unstable;
+import org.apache.hadoop.yarn.server.timeline.GenericObjectMapper;
+import 
org.apache.hadoop.yarn.server.timelineservice.reader.filter.TimelineCompareOp;
+import 
org.apache.hadoop.yarn.server.timelineservice.reader.filter.TimelineFilter;
+import 
org.apache.hadoop.yarn.server.timelineservice.reader.filter.TimelineKeyValueFilter;
+
+/**
+ * Used for parsing key-value filters such as config and info filters.
+ */
+@Private
+@Unstable
+class TimelineParserForKVFilters extends TimelineParserForCompareExpr {
+  // Indicates if value has to be interpreted as a string.
+  private final boolean valueAsString;
+  public TimelineParserForKVFilters(String expression, boolean valAsStr) {
+    super(expression, "Config/Info Filter");
+    this.valueAsString = valAsStr;
+  }
+
+  protected TimelineFilter createFilter() {
+    return new TimelineKeyValueFilter();
+  }
+
+  protected Object parseValue(String strValue) {
+    if (!valueAsString) {
+      try {
+        return GenericObjectMapper.OBJECT_READER.readValue(strValue);
+      } catch (IOException e) {
+        return strValue;
+      }
+    } else {
+      return strValue;
+    }
+  }
+
+  @Override
+  protected void setCompareOpToCurrentFilter(TimelineCompareOp compareOp,
+      boolean keyMustExistFlag) throws TimelineParseException {
+    if (compareOp != TimelineCompareOp.EQUAL &&
+        compareOp != TimelineCompareOp.NOT_EQUAL) {
+      throw new TimelineParseException("TimelineCompareOp for kv-filter " +
+          "should be EQUAL or NOT_EQUAL");
+    }
+    ((TimelineKeyValueFilter)getCurrentFilter()).setCompareOp(
+        compareOp, keyMustExistFlag);
+  }
+
+  @Override
+  protected void setValueToCurrentFilter(Object value) {
+    TimelineFilter currentFilter = getCurrentFilter();
+    if (currentFilter != null) {
+      ((TimelineKeyValueFilter)currentFilter).setValue(value);
+    }
+  }
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscr...@hadoop.apache.org
For additional commands, e-mail: common-commits-h...@hadoop.apache.org

Reply via email to