Repository: hadoop
Updated Branches:
  refs/heads/feature-YARN-2928 7c861634e -> cdb96df97


http://git-wip-us.apache.org/repos/asf/hadoop/blob/cdb96df9/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowActivityColumnPrefix.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowActivityColumnPrefix.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowActivityColumnPrefix.java
index 38c0f3f..21ddcc2 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowActivityColumnPrefix.java
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowActivityColumnPrefix.java
@@ -83,6 +83,18 @@ public enum FlowActivityColumnPrefix implements 
ColumnPrefix<FlowActivityTable>
     return columnPrefix;
   }
 
+  @Override
+  public byte[] getColumnPrefixBytes(byte[] qualifierPrefix) {
+    return ColumnHelper.getColumnQualifier(
+        this.columnPrefixBytes, qualifierPrefix);
+  }
+
+  @Override
+  public byte[] getColumnPrefixBytes(String qualifierPrefix) {
+    return ColumnHelper.getColumnQualifier(
+        this.columnPrefixBytes, qualifierPrefix);
+  }
+
   public byte[] getColumnPrefixBytes() {
     return columnPrefixBytes.clone();
   }
@@ -112,8 +124,7 @@ public enum FlowActivityColumnPrefix implements 
ColumnPrefix<FlowActivityTable>
           + tableMutator.getName().getNameAsString());
     }
 
-    byte[] columnQualifier = ColumnHelper.getColumnQualifier(
-        this.columnPrefixBytes, qualifier);
+    byte[] columnQualifier = getColumnPrefixBytes(qualifier);
     Attribute[] combinedAttributes = TimelineStorageUtils.combineAttributes(
         attributes, this.aggOp);
     column.store(rowKey, tableMutator, columnQualifier, timestamp, inputValue,
@@ -233,8 +244,7 @@ public enum FlowActivityColumnPrefix implements 
ColumnPrefix<FlowActivityTable>
           + tableMutator.getName().getNameAsString());
     }
 
-    byte[] columnQualifier = ColumnHelper.getColumnQualifier(
-        this.columnPrefixBytes, qualifier);
+    byte[] columnQualifier = getColumnPrefixBytes(qualifier);
     Attribute[] combinedAttributes = TimelineStorageUtils.combineAttributes(
         attributes, this.aggOp);
     column.store(rowKey, tableMutator, columnQualifier, null, inputValue,

http://git-wip-us.apache.org/repos/asf/hadoop/blob/cdb96df9/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowRunColumnPrefix.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowRunColumnPrefix.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowRunColumnPrefix.java
index eb055fe..e3bb52d 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowRunColumnPrefix.java
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowRunColumnPrefix.java
@@ -89,8 +89,16 @@ public enum FlowRunColumnPrefix implements 
ColumnPrefix<FlowRunTable> {
     return columnPrefixBytes.clone();
   }
 
-  public byte[] getColumnPrefixBytes(String qualifier) {
-    return ColumnHelper.getColumnQualifier(this.columnPrefixBytes, qualifier);
+  @Override
+  public byte[] getColumnPrefixBytes(byte[] qualifierPrefix) {
+    return ColumnHelper.getColumnQualifier(
+        this.columnPrefixBytes, qualifierPrefix);
+  }
+
+  @Override
+  public byte[] getColumnPrefixBytes(String qualifierPrefix) {
+    return ColumnHelper.getColumnQualifier(
+        this.columnPrefixBytes, qualifierPrefix);
   }
 
   public byte[] getColumnFamilyBytes() {
@@ -121,8 +129,7 @@ public enum FlowRunColumnPrefix implements 
ColumnPrefix<FlowRunTable> {
           + tableMutator.getName().getNameAsString());
     }
 
-    byte[] columnQualifier = ColumnHelper.getColumnQualifier(
-        this.columnPrefixBytes, qualifier);
+    byte[] columnQualifier = getColumnPrefixBytes(qualifier);
     Attribute[] combinedAttributes = TimelineStorageUtils.combineAttributes(
         attributes, this.aggOp);
     column.store(rowKey, tableMutator, columnQualifier, timestamp, inputValue,
@@ -149,8 +156,7 @@ public enum FlowRunColumnPrefix implements 
ColumnPrefix<FlowRunTable> {
           + tableMutator.getName().getNameAsString());
     }
 
-    byte[] columnQualifier = ColumnHelper.getColumnQualifier(
-        this.columnPrefixBytes, qualifier);
+    byte[] columnQualifier = getColumnPrefixBytes(qualifier);
     Attribute[] combinedAttributes = TimelineStorageUtils.combineAttributes(
         attributes, this.aggOp);
     column.store(rowKey, tableMutator, columnQualifier, timestamp, inputValue,

http://git-wip-us.apache.org/repos/asf/hadoop/blob/cdb96df9/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/TestFileSystemTimelineReaderImpl.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/TestFileSystemTimelineReaderImpl.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/TestFileSystemTimelineReaderImpl.java
index 4e23e49..e864d61 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/TestFileSystemTimelineReaderImpl.java
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/TestFileSystemTimelineReaderImpl.java
@@ -266,7 +266,7 @@ public class TestFileSystemTimelineReaderImpl {
     // only the id, created and modified time
     TimelineEntity result =
         reader.getEntity("user1", "cluster1", "flow1", 1L, "app1",
-            "app", "id_1", null);
+            "app", "id_1", null, null, null);
     Assert.assertEquals(
         (new TimelineEntity.Identifier("app", "id_1")).toString(),
         result.getIdentifier().toString());
@@ -281,7 +281,7 @@ public class TestFileSystemTimelineReaderImpl {
     // Cluster and AppId should be enough to get an entity.
     TimelineEntity result =
         reader.getEntity(null, "cluster1", null, null, "app1",
-            "app", "id_1", null);
+            "app", "id_1", null, null, null);
     Assert.assertEquals(
         (new TimelineEntity.Identifier("app", "id_1")).toString(),
         result.getIdentifier().toString());
@@ -298,7 +298,7 @@ public class TestFileSystemTimelineReaderImpl {
     // in app flow mapping csv has commas.
     TimelineEntity result =
         reader.getEntity(null, "cluster1", null, null, "app2",
-            "app", "id_5", null);
+            "app", "id_5", null, null, null);
     Assert.assertEquals(
         (new TimelineEntity.Identifier("app", "id_5")).toString(),
         result.getIdentifier().toString());
@@ -311,7 +311,7 @@ public class TestFileSystemTimelineReaderImpl {
     // Specified fields in addition to default view will be returned.
     TimelineEntity result =
         reader.getEntity("user1", "cluster1", "flow1", 1L,
-            "app1", "app", "id_1",
+            "app1", "app", "id_1", null, null,
             EnumSet.of(Field.INFO, Field.CONFIGS, Field.METRICS));
     Assert.assertEquals(
         (new TimelineEntity.Identifier("app", "id_1")).toString(),
@@ -329,8 +329,8 @@ public class TestFileSystemTimelineReaderImpl {
   public void testGetEntityAllFields() throws Exception {
     // All fields of TimelineEntity will be returned.
     TimelineEntity result =
-        reader.getEntity("user1", "cluster1", "flow1", 1L,
-            "app1", "app", "id_1", EnumSet.of(Field.ALL));
+        reader.getEntity("user1", "cluster1", "flow1", 1L, "app1", "app",
+        "id_1", null, null, EnumSet.of(Field.ALL));
     Assert.assertEquals(
         (new TimelineEntity.Identifier("app", "id_1")).toString(),
         result.getIdentifier().toString());
@@ -347,7 +347,7 @@ public class TestFileSystemTimelineReaderImpl {
     Set<TimelineEntity> result =
         reader.getEntities("user1", "cluster1", "flow1", 1L, "app1", "app",
             null, null, null, null, null, null, null, null, null, null,
-            null, null);
+            null, null, null, null);
     // All 3 entities will be returned
     Assert.assertEquals(4, result.size());
   }
@@ -357,7 +357,7 @@ public class TestFileSystemTimelineReaderImpl {
     Set<TimelineEntity> result =
         reader.getEntities("user1", "cluster1", "flow1", 1L, "app1", "app",
             2L, null, null, null, null, null, null, null, null, null,
-            null, null);
+            null, null, null, null);
     Assert.assertEquals(2, result.size());
     // Needs to be rewritten once hashcode and equals for
     // TimelineEntity is implemented
@@ -371,7 +371,7 @@ public class TestFileSystemTimelineReaderImpl {
     result =
         reader.getEntities("user1", "cluster1", "flow1", 1L, "app1", "app",
             3L, null, null, null, null, null, null, null, null, null,
-                null, null);
+                null, null, null, null);
      // Even though 2 entities out of 4 have same created time, one entity
      // is left out due to limit
      Assert.assertEquals(3, result.size());
@@ -383,7 +383,7 @@ public class TestFileSystemTimelineReaderImpl {
     Set<TimelineEntity> result =
         reader.getEntities("user1", "cluster1", "flow1", 1L, "app1", "app",
             null, 1425016502030L, 1425016502060L, null, null, null, null, null,
-            null, null, null, null);
+            null, null, null, null, null, null);
     Assert.assertEquals(1, result.size());
     // Only one entity with ID id_4 should be returned.
     for (TimelineEntity entity : result) {
@@ -396,7 +396,7 @@ public class TestFileSystemTimelineReaderImpl {
     result =
         reader.getEntities("user1", "cluster1", "flow1", 1L, "app1", "app",
             null, null, 1425016502010L, null, null, null, null, null, null,
-            null, null, null);
+            null, null, null, null, null);
     Assert.assertEquals(3, result.size());
     for (TimelineEntity entity : result) {
       if (entity.getId().equals("id_4")) {
@@ -408,7 +408,7 @@ public class TestFileSystemTimelineReaderImpl {
     result =
         reader.getEntities("user1", "cluster1", "flow1", 1L, "app1", "app",
             null, 1425016502010L, null, null, null, null, null, null, null,
-            null, null, null);
+            null, null, null, null, null);
     Assert.assertEquals(1, result.size());
     for (TimelineEntity entity : result) {
       if (!entity.getId().equals("id_4")) {
@@ -420,7 +420,7 @@ public class TestFileSystemTimelineReaderImpl {
     result =
         reader.getEntities("user1", "cluster1", "flow1", 1L, "app1", "app",
             null, null, null, 1425016502090L, 1425016503020L, null, null, null,
-            null, null, null, null);
+            null, null, null, null, null, null);
     Assert.assertEquals(2, result.size());
     // Two entities with IDs' id_1 and id_4 should be returned.
     for (TimelineEntity entity : result) {
@@ -433,7 +433,7 @@ public class TestFileSystemTimelineReaderImpl {
     result =
         reader.getEntities("user1", "cluster1", "flow1", 1L, "app1", "app",
             null, null, null, null, 1425016502090L, null, null, null, null,
-            null, null, null);
+            null, null, null, null, null);
     Assert.assertEquals(2, result.size());
     for (TimelineEntity entity : result) {
       if (entity.getId().equals("id_1") || entity.getId().equals("id_4")) {
@@ -445,7 +445,7 @@ public class TestFileSystemTimelineReaderImpl {
     result =
         reader.getEntities("user1", "cluster1", "flow1", 1L, "app1", "app",
             null, null, null, 1425016503005L, null, null, null, null, null,
-            null, null, null);
+            null, null, null, null, null);
     Assert.assertEquals(1, result.size());
     for (TimelineEntity entity : result) {
       if (!entity.getId().equals("id_4")) {
@@ -462,7 +462,7 @@ public class TestFileSystemTimelineReaderImpl {
     Set<TimelineEntity> result =
         reader.getEntities("user1", "cluster1", "flow1", 1L, "app1", "app",
             null, null, null, null, null, null, null, infoFilters, null, null,
-            null, null);
+            null, null, null, null);
     Assert.assertEquals(1, result.size());
     // Only one entity with ID id_3 should be returned.
     for (TimelineEntity entity : result) {
@@ -478,7 +478,7 @@ public class TestFileSystemTimelineReaderImpl {
     result =
         reader.getEntities("user1", "cluster1", "flow1", 1L, "app1", "app",
             null, null, null, null, null, null, null, null, configFilters, 
null,
-            null, null);
+            null, null, null, null);
     Assert.assertEquals(2, result.size());
     for (TimelineEntity entity : result) {
       if (!entity.getId().equals("id_1") && !entity.getId().equals("id_3")) {
@@ -493,7 +493,7 @@ public class TestFileSystemTimelineReaderImpl {
     result =
         reader.getEntities("user1", "cluster1", "flow1", 1L, "app1", "app",
             null, null, null, null, null, null, null, null, null, null,
-            eventFilters, null);
+            eventFilters, null, null, null);
     Assert.assertEquals(1, result.size());
     for (TimelineEntity entity : result) {
       if (!entity.getId().equals("id_3")) {
@@ -507,7 +507,7 @@ public class TestFileSystemTimelineReaderImpl {
     result =
         reader.getEntities("user1", "cluster1", "flow1", 1L, "app1", "app",
             null, null, null, null, null, null, null, null, null, 
metricFilters,
-            null, null);
+            null, null, null, null);
     Assert.assertEquals(2, result.size());
     // Two entities with IDs' id_1 and id_2 should be returned.
     for (TimelineEntity entity : result) {
@@ -527,7 +527,7 @@ public class TestFileSystemTimelineReaderImpl {
     Set<TimelineEntity> result =
         reader.getEntities("user1", "cluster1", "flow1", 1L, "app1", "app",
             null, null, null, null, null, relatesTo, null, null, null, null,
-            null, null);
+            null, null, null, null);
     Assert.assertEquals(1, result.size());
     // Only one entity with ID id_1 should be returned.
     for (TimelineEntity entity : result) {
@@ -544,7 +544,7 @@ public class TestFileSystemTimelineReaderImpl {
     result =
         reader.getEntities("user1", "cluster1", "flow1", 1L, "app1", "app",
             null, null, null, null, null, null, isRelatedTo, null, null, null,
-            null, null);
+            null, null, null, null);
     Assert.assertEquals(2, result.size());
     // Two entities with IDs' id_1 and id_3 should be returned.
     for (TimelineEntity entity : result) {

http://git-wip-us.apache.org/repos/asf/hadoop/blob/cdb96df9/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/TestHBaseTimelineStorage.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/TestHBaseTimelineStorage.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/TestHBaseTimelineStorage.java
index 30ead40..bc7b3a4 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/TestHBaseTimelineStorage.java
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/TestHBaseTimelineStorage.java
@@ -49,6 +49,11 @@ import 
org.apache.hadoop.yarn.api.records.timelineservice.TimelineEvent;
 import org.apache.hadoop.yarn.api.records.timelineservice.TimelineMetric;
 import org.apache.hadoop.yarn.api.records.timelineservice.TimelineMetric.Type;
 import org.apache.hadoop.yarn.server.metrics.ApplicationMetricsConstants;
+import 
org.apache.hadoop.yarn.server.timelineservice.reader.filter.TimelineCompareOp;
+import 
org.apache.hadoop.yarn.server.timelineservice.reader.filter.TimelineFilterList;
+import 
org.apache.hadoop.yarn.server.timelineservice.reader.filter.TimelinePrefixFilter;
+import 
org.apache.hadoop.yarn.server.timelineservice.reader.filter.TimelineFilterList.Operator;
+import 
org.apache.hadoop.yarn.server.timelineservice.storage.TimelineReader.Field;
 import 
org.apache.hadoop.yarn.server.timelineservice.storage.application.ApplicationColumn;
 import 
org.apache.hadoop.yarn.server.timelineservice.storage.application.ApplicationColumnPrefix;
 import 
org.apache.hadoop.yarn.server.timelineservice.storage.application.ApplicationRowKey;
@@ -60,11 +65,17 @@ import 
org.apache.hadoop.yarn.server.timelineservice.storage.entity.EntityColumn
 import 
org.apache.hadoop.yarn.server.timelineservice.storage.entity.EntityColumnPrefix;
 import 
org.apache.hadoop.yarn.server.timelineservice.storage.entity.EntityRowKey;
 import 
org.apache.hadoop.yarn.server.timelineservice.storage.entity.EntityTable;
+import org.junit.After;
 import org.junit.AfterClass;
 import org.junit.Assert;
+import org.junit.Before;
 import org.junit.BeforeClass;
 import org.junit.Test;
 
+import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.ImmutableSet;
+import com.google.common.collect.Maps;
+
 /**
  * Various tests to test writing entities to HBase and reading them back from
  * it.
@@ -79,18 +90,344 @@ import org.junit.Test;
 public class TestHBaseTimelineStorage {
 
   private static HBaseTestingUtility util;
+  private HBaseTimelineReaderImpl reader;
 
   @BeforeClass
   public static void setupBeforeClass() throws Exception {
     util = new HBaseTestingUtility();
     util.startMiniCluster();
     createSchema();
+    loadEntities();
+    loadApps();
   }
 
   private static void createSchema() throws IOException {
     TimelineSchemaCreator.createAllTables(util.getConfiguration(), false);
   }
 
+  private static void loadApps() throws IOException {
+    TimelineEntities te = new TimelineEntities();
+    TimelineEntity entity = new TimelineEntity();
+    String id = "application_1111111111_2222";
+    entity.setId(id);
+    entity.setType(TimelineEntityType.YARN_APPLICATION.toString());
+    Long cTime = 1425016501000L;
+    Long mTime = 1425026901000L;
+    entity.setCreatedTime(cTime);
+    entity.setModifiedTime(mTime);
+    // add the info map in Timeline Entity
+    Map<String, Object> infoMap = new HashMap<String, Object>();
+    infoMap.put("infoMapKey1", "infoMapValue1");
+    infoMap.put("infoMapKey2", 10);
+    entity.addInfo(infoMap);
+    // add the isRelatedToEntity info
+    String key = "task";
+    String value = "is_related_to_entity_id_here";
+    Set<String> isRelatedToSet = new HashSet<String>();
+    isRelatedToSet.add(value);
+    Map<String, Set<String>> isRelatedTo = new HashMap<String, Set<String>>();
+    isRelatedTo.put(key, isRelatedToSet);
+    entity.setIsRelatedToEntities(isRelatedTo);
+    // add the relatesTo info
+    key = "container";
+    value = "relates_to_entity_id_here";
+    Set<String> relatesToSet = new HashSet<String>();
+    relatesToSet.add(value);
+    value = "relates_to_entity_id_here_Second";
+    relatesToSet.add(value);
+    Map<String, Set<String>> relatesTo = new HashMap<String, Set<String>>();
+    relatesTo.put(key, relatesToSet);
+    entity.setRelatesToEntities(relatesTo);
+    // add some config entries
+    Map<String, String> conf = new HashMap<String, String>();
+    conf.put("config_param1", "value1");
+    conf.put("config_param2", "value2");
+    conf.put("cfg_param1", "value3");
+    entity.addConfigs(conf);
+    // add metrics
+    Set<TimelineMetric> metrics = new HashSet<>();
+    TimelineMetric m1 = new TimelineMetric();
+    m1.setId("MAP_SLOT_MILLIS");
+    Map<Long, Number> metricValues = new HashMap<Long, Number>();
+    long ts = System.currentTimeMillis();
+    metricValues.put(ts - 120000, 100000000);
+    metricValues.put(ts - 100000, 200000000);
+    metricValues.put(ts - 80000, 300000000);
+    metricValues.put(ts - 60000, 400000000);
+    metricValues.put(ts - 40000, 50000000000L);
+    metricValues.put(ts - 20000, 60000000000L);
+    m1.setType(Type.TIME_SERIES);
+    m1.setValues(metricValues);
+    metrics.add(m1);
+
+    TimelineMetric m12 = new TimelineMetric();
+    m12.setId("MAP1_BYTES");
+    m12.addValue(ts, 50);
+    metrics.add(m12);
+    entity.addMetrics(metrics);
+    TimelineEvent event = new TimelineEvent();
+    event.setId("event1");
+    event.setTimestamp(ts - 2000);
+    entity.addEvent(event);
+    te.addEntity(entity);
+
+    TimelineEntities te1 = new TimelineEntities();
+    TimelineEntity entity1 = new TimelineEntity();
+    String id1 = "application_1111111111_3333";
+    entity1.setId(id1);
+    entity1.setType(TimelineEntityType.YARN_APPLICATION.toString());
+    entity1.setCreatedTime(cTime);
+    entity1.setModifiedTime(mTime);
+
+    // add the info map in Timeline Entity
+    Map<String, Object> infoMap1 = new HashMap<String, Object>();
+    infoMap1.put("infoMapKey1", "infoMapValue1");
+    infoMap1.put("infoMapKey2", 10);
+    entity1.addInfo(infoMap1);
+
+    // add the isRelatedToEntity info
+    String key1 = "task";
+    String value1 = "is_related_to_entity_id_here";
+    Set<String> isRelatedToSet1 = new HashSet<String>();
+    isRelatedToSet1.add(value1);
+    Map<String, Set<String>> isRelatedTo1 = new HashMap<String, Set<String>>();
+    isRelatedTo1.put(key, isRelatedToSet1);
+    entity1.setIsRelatedToEntities(isRelatedTo1);
+
+    // add the relatesTo info
+    key1 = "container";
+    value1 = "relates_to_entity_id_here";
+    Set<String> relatesToSet1 = new HashSet<String>();
+    relatesToSet1.add(value1);
+    value1 = "relates_to_entity_id_here_Second";
+    relatesToSet1.add(value1);
+    Map<String, Set<String>> relatesTo1 = new HashMap<String, Set<String>>();
+    relatesTo1.put(key1, relatesToSet1);
+    entity1.setRelatesToEntities(relatesTo1);
+
+    // add some config entries
+    Map<String, String> conf1 = new HashMap<String, String>();
+    conf1.put("cfg_param1", "value1");
+    conf1.put("cfg_param2", "value2");
+    entity1.addConfigs(conf1);
+
+    // add metrics
+    Set<TimelineMetric> metrics1 = new HashSet<>();
+    TimelineMetric m2 = new TimelineMetric();
+    m2.setId("MAP1_SLOT_MILLIS");
+    Map<Long, Number> metricValues1 = new HashMap<Long, Number>();
+    long ts1 = System.currentTimeMillis();
+    metricValues1.put(ts1 - 120000, 100000000);
+    metricValues1.put(ts1 - 100000, 200000000);
+    metricValues1.put(ts1 - 80000, 300000000);
+    metricValues1.put(ts1 - 60000, 400000000);
+    metricValues1.put(ts1 - 40000, 50000000000L);
+    metricValues1.put(ts1 - 20000, 60000000000L);
+    m2.setType(Type.TIME_SERIES);
+    m2.setValues(metricValues1);
+    metrics1.add(m2);
+    entity1.addMetrics(metrics1);
+    te1.addEntity(entity1);
+
+    TimelineEntities te2 = new TimelineEntities();
+    TimelineEntity entity2 = new TimelineEntity();
+    String id2 = "application_1111111111_4444";
+    entity2.setId(id2);
+    entity2.setType(TimelineEntityType.YARN_APPLICATION.toString());
+    entity2.setCreatedTime(cTime);
+    entity2.setModifiedTime(mTime);
+    te2.addEntity(entity2);
+    HBaseTimelineWriterImpl hbi = null;
+    try {
+      hbi = new HBaseTimelineWriterImpl(util.getConfiguration());
+      hbi.init(util.getConfiguration());
+      hbi.start();
+      String cluster = "cluster1";
+      String user = "user1";
+      String flow = "some_flow_name";
+      String flowVersion = "AB7822C10F1111";
+      long runid = 1002345678919L;
+      String appName = "application_1111111111_2222";
+      hbi.write(cluster, user, flow, flowVersion, runid, appName, te);
+      appName = "application_1111111111_3333";
+      hbi.write(cluster, user, flow, flowVersion, runid, appName, te1);
+      appName = "application_1111111111_4444";
+      hbi.write(cluster, user, flow, flowVersion, runid, appName, te2);
+      hbi.stop();
+    } finally {
+      if (hbi != null) {
+        hbi.stop();
+        hbi.close();
+      }
+    }
+  }
+
+  private static void loadEntities() throws IOException {
+    TimelineEntities te = new TimelineEntities();
+    TimelineEntity entity = new TimelineEntity();
+    String id = "hello";
+    String type = "world";
+    entity.setId(id);
+    entity.setType(type);
+    Long cTime = 1425016501000L;
+    Long mTime = 1425026901000L;
+    entity.setCreatedTime(cTime);
+    entity.setModifiedTime(mTime);
+    // add the info map in Timeline Entity
+    Map<String, Object> infoMap = new HashMap<String, Object>();
+    infoMap.put("infoMapKey1", "infoMapValue1");
+    infoMap.put("infoMapKey2", 10);
+    entity.addInfo(infoMap);
+    // add the isRelatedToEntity info
+    String key = "task";
+    String value = "is_related_to_entity_id_here";
+    Set<String> isRelatedToSet = new HashSet<String>();
+    isRelatedToSet.add(value);
+    Map<String, Set<String>> isRelatedTo = new HashMap<String, Set<String>>();
+    isRelatedTo.put(key, isRelatedToSet);
+    entity.setIsRelatedToEntities(isRelatedTo);
+
+    // add the relatesTo info
+    key = "container";
+    value = "relates_to_entity_id_here";
+    Set<String> relatesToSet = new HashSet<String>();
+    relatesToSet.add(value);
+    value = "relates_to_entity_id_here_Second";
+    relatesToSet.add(value);
+    Map<String, Set<String>> relatesTo = new HashMap<String, Set<String>>();
+    relatesTo.put(key, relatesToSet);
+    entity.setRelatesToEntities(relatesTo);
+
+    // add some config entries
+    Map<String, String> conf = new HashMap<String, String>();
+    conf.put("config_param1", "value1");
+    conf.put("config_param2", "value2");
+    conf.put("cfg_param1", "value3");
+    entity.addConfigs(conf);
+
+    // add metrics
+    Set<TimelineMetric> metrics = new HashSet<>();
+    TimelineMetric m1 = new TimelineMetric();
+    m1.setId("MAP_SLOT_MILLIS");
+    Map<Long, Number> metricValues = new HashMap<Long, Number>();
+    long ts = System.currentTimeMillis();
+    metricValues.put(ts - 120000, 100000000);
+    metricValues.put(ts - 100000, 200000000);
+    metricValues.put(ts - 80000, 300000000);
+    metricValues.put(ts - 60000, 400000000);
+    metricValues.put(ts - 40000, 50000000000L);
+    metricValues.put(ts - 20000, 60000000000L);
+    m1.setType(Type.TIME_SERIES);
+    m1.setValues(metricValues);
+    metrics.add(m1);
+
+    TimelineMetric m12 = new TimelineMetric();
+    m12.setId("MAP1_BYTES");
+    m12.addValue(ts, 50);
+    metrics.add(m12);
+    entity.addMetrics(metrics);
+    te.addEntity(entity);
+
+    TimelineEntity entity1 = new TimelineEntity();
+    String id1 = "hello1";
+    entity1.setId(id1);
+    entity1.setType(type);
+    entity1.setCreatedTime(cTime);
+    entity1.setModifiedTime(mTime);
+
+    // add the info map in Timeline Entity
+    Map<String, Object> infoMap1 = new HashMap<String, Object>();
+    infoMap1.put("infoMapKey1", "infoMapValue1");
+    infoMap1.put("infoMapKey2", 10);
+    entity1.addInfo(infoMap1);
+
+    // add the isRelatedToEntity info
+    String key1 = "task";
+    String value1 = "is_related_to_entity_id_here";
+    Set<String> isRelatedToSet1 = new HashSet<String>();
+    isRelatedToSet1.add(value1);
+    Map<String, Set<String>> isRelatedTo1 = new HashMap<String, Set<String>>();
+    isRelatedTo1.put(key, isRelatedToSet1);
+    entity1.setIsRelatedToEntities(isRelatedTo1);
+
+    // add the relatesTo info
+    key1 = "container";
+    value1 = "relates_to_entity_id_here";
+    Set<String> relatesToSet1 = new HashSet<String>();
+    relatesToSet1.add(value1);
+    value1 = "relates_to_entity_id_here_Second";
+    relatesToSet1.add(value1);
+    Map<String, Set<String>> relatesTo1 = new HashMap<String, Set<String>>();
+    relatesTo1.put(key1, relatesToSet1);
+    entity1.setRelatesToEntities(relatesTo1);
+
+    // add some config entries
+    Map<String, String> conf1 = new HashMap<String, String>();
+    conf1.put("cfg_param1", "value1");
+    conf1.put("cfg_param2", "value2");
+    entity1.addConfigs(conf1);
+
+    // add metrics
+    Set<TimelineMetric> metrics1 = new HashSet<>();
+    TimelineMetric m2 = new TimelineMetric();
+    m2.setId("MAP1_SLOT_MILLIS");
+    Map<Long, Number> metricValues1 = new HashMap<Long, Number>();
+    long ts1 = System.currentTimeMillis();
+    metricValues1.put(ts1 - 120000, 100000000);
+    metricValues1.put(ts1 - 100000, 200000000);
+    metricValues1.put(ts1 - 80000, 300000000);
+    metricValues1.put(ts1 - 60000, 400000000);
+    metricValues1.put(ts1 - 40000, 50000000000L);
+    metricValues1.put(ts1 - 20000, 60000000000L);
+    m2.setType(Type.TIME_SERIES);
+    m2.setValues(metricValues1);
+    metrics1.add(m2);
+    entity1.addMetrics(metrics1);
+    te.addEntity(entity1);
+
+    TimelineEntity entity2 = new TimelineEntity();
+    String id2 = "hello2";
+    entity2.setId(id2);
+    entity2.setType(type);
+    entity2.setCreatedTime(cTime);
+    entity2.setModifiedTime(mTime);
+    te.addEntity(entity2);
+    HBaseTimelineWriterImpl hbi = null;
+    try {
+        hbi = new HBaseTimelineWriterImpl(util.getConfiguration());
+        hbi.init(util.getConfiguration());
+        hbi.start();
+        String cluster = "cluster1";
+        String user = "user1";
+        String flow = "some_flow_name";
+        String flowVersion = "AB7822C10F1111";
+        long runid = 1002345678919L;
+        String appName = "application_1231111111_1111";
+        hbi.write(cluster, user, flow, flowVersion, runid, appName, te);
+        hbi.stop();
+    } finally {
+      if (hbi != null) {
+        hbi.stop();
+        hbi.close();
+      }
+    }
+  }
+
+  @Before
+  public void init() throws Exception {
+    reader = new HBaseTimelineReaderImpl();
+    reader.init(util.getConfiguration());
+    reader.start();
+  }
+
+  @After
+  public void stop() throws Exception {
+    if (reader != null) {
+      reader.stop();
+      reader.close();
+    }
+  }
+
   private static void matchMetrics(Map<Long, Number> m1, Map<Long, Number> m2) 
{
     assertEquals(m1.size(), m2.size());
     for (Map.Entry<Long, Number> entry : m2.entrySet()) {
@@ -163,15 +500,11 @@ public class TestHBaseTimelineStorage {
     te.addEntity(entity);
 
     HBaseTimelineWriterImpl hbi = null;
-    HBaseTimelineReaderImpl hbr = null;
     try {
       Configuration c1 = util.getConfiguration();
       hbi = new HBaseTimelineWriterImpl(c1);
       hbi.init(c1);
       hbi.start();
-      hbr = new HBaseTimelineReaderImpl();
-      hbr.init(c1);
-      hbr.start();
       String cluster = "cluster_test_write_app";
       String user = "user1";
       String flow = "some_flow_name";
@@ -256,8 +589,8 @@ public class TestHBaseTimelineStorage {
       matchMetrics(metricValues, metricMap);
 
       // read the timeline entity using the reader this time
-      TimelineEntity e1 = hbr.getEntity(user, cluster, flow, runid, appId,
-          entity.getType(), entity.getId(),
+      TimelineEntity e1 = reader.getEntity(user, cluster, flow, runid, appId,
+          entity.getType(), entity.getId(), null, null,
           EnumSet.of(TimelineReader.Field.ALL));
       assertNotNull(e1);
 
@@ -290,10 +623,6 @@ public class TestHBaseTimelineStorage {
         hbi.stop();
         hbi.close();
       }
-      if (hbr != null) {
-        hbr.stop();
-        hbr.close();
-      }
     }
   }
 
@@ -362,15 +691,11 @@ public class TestHBaseTimelineStorage {
     te.addEntity(entity);
 
     HBaseTimelineWriterImpl hbi = null;
-    HBaseTimelineReaderImpl hbr = null;
     try {
       Configuration c1 = util.getConfiguration();
       hbi = new HBaseTimelineWriterImpl(c1);
       hbi.init(c1);
       hbi.start();
-      hbr = new HBaseTimelineReaderImpl();
-      hbr.init(c1);
-      hbr.start();
       String cluster = "cluster_test_write_entity";
       String user = "user1";
       String flow = "some_flow_name";
@@ -468,12 +793,13 @@ public class TestHBaseTimelineStorage {
       assertEquals(17, colCount);
 
       // read the timeline entity using the reader this time
-      TimelineEntity e1 = hbr.getEntity(user, cluster, flow, runid, appName,
-          entity.getType(), entity.getId(),
+      TimelineEntity e1 = reader.getEntity(user, cluster, flow, runid, appName,
+          entity.getType(), entity.getId(), null, null,
           EnumSet.of(TimelineReader.Field.ALL));
-      Set<TimelineEntity> es1 = hbr.getEntities(user, cluster, flow, runid,
+      Set<TimelineEntity> es1 = reader.getEntities(user, cluster, flow, runid,
           appName, entity.getType(), null, null, null, null, null, null, null,
-          null, null, null, null, EnumSet.of(TimelineReader.Field.ALL));
+          null, null, null, null, null, null,
+          EnumSet.of(TimelineReader.Field.ALL));
       assertNotNull(e1);
       assertEquals(1, es1.size());
 
@@ -505,10 +831,6 @@ public class TestHBaseTimelineStorage {
         hbi.stop();
         hbi.close();
       }
-      if (hbr != null) {
-        hbr.stop();
-        hbr.close();
-      }
     }
   }
 
@@ -559,15 +881,11 @@ public class TestHBaseTimelineStorage {
     entities.addEntity(entity);
 
     HBaseTimelineWriterImpl hbi = null;
-    HBaseTimelineReaderImpl hbr = null;
     try {
       Configuration c1 = util.getConfiguration();
       hbi = new HBaseTimelineWriterImpl(c1);
       hbi.init(c1);
       hbi.start();
-      hbr = new HBaseTimelineReaderImpl();
-      hbr.init(c1);
-      hbr.start();
       String cluster = "cluster_test_events";
       String user = "user2";
       String flow = "other_flow_name";
@@ -612,11 +930,11 @@ public class TestHBaseTimelineStorage {
       }
 
       // read the timeline entity using the reader this time
-      TimelineEntity e1 = hbr.getEntity(user, cluster, flow, runid, appName,
-          entity.getType(), entity.getId(),
+      TimelineEntity e1 = reader.getEntity(user, cluster, flow, runid, appName,
+          entity.getType(), entity.getId(), null, null,
           EnumSet.of(TimelineReader.Field.ALL));
-      TimelineEntity e2 = hbr.getEntity(user, cluster, null, null, appName,
-          entity.getType(), entity.getId(),
+      TimelineEntity e2 = reader.getEntity(user, cluster, null, null, appName,
+          entity.getType(), entity.getId(), null, null,
           EnumSet.of(TimelineReader.Field.ALL));
       assertNotNull(e1);
       assertNotNull(e2);
@@ -641,10 +959,6 @@ public class TestHBaseTimelineStorage {
         hbi.stop();
         hbi.close();
       }
-      if (hbr != null) {
-        hbr.stop();
-        hbr.close();
-      }
     }
   }
 
@@ -665,15 +979,11 @@ public class TestHBaseTimelineStorage {
     entities.addEntity(entity);
 
     HBaseTimelineWriterImpl hbi = null;
-    HBaseTimelineReaderImpl hbr = null;
     try {
       Configuration c1 = util.getConfiguration();
       hbi = new HBaseTimelineWriterImpl(c1);
       hbi.init(c1);
       hbi.start();
-      hbr = new HBaseTimelineReaderImpl();
-      hbr.init(c1);
-      hbr.start();
       String cluster = "cluster_test_empty_eventkey";
       String user = "user_emptyeventkey";
       String flow = "other_flow_name";
@@ -726,12 +1036,13 @@ public class TestHBaseTimelineStorage {
       assertEquals(1, rowCount);
 
       // read the timeline entity using the reader this time
-      TimelineEntity e1 = hbr.getEntity(user, cluster, flow, runid, appName,
-          entity.getType(), entity.getId(),
+      TimelineEntity e1 = reader.getEntity(user, cluster, flow, runid, appName,
+          entity.getType(), entity.getId(), null, null,
           EnumSet.of(TimelineReader.Field.ALL));
-      Set<TimelineEntity> es1 = hbr.getEntities(user, cluster, flow, runid,
+      Set<TimelineEntity> es1 = reader.getEntities(user, cluster, flow, runid,
           appName, entity.getType(), null, null, null, null, null, null, null,
-          null, null, null, null, EnumSet.of(TimelineReader.Field.ALL));
+          null, null, null, null, null, null,
+          EnumSet.of(TimelineReader.Field.ALL));
       assertNotNull(e1);
       assertEquals(1, es1.size());
 
@@ -748,8 +1059,6 @@ public class TestHBaseTimelineStorage {
     } finally {
       hbi.stop();
       hbi.close();
-      hbr.stop();;
-      hbr.close();
     }
   }
 
@@ -816,6 +1125,291 @@ public class TestHBaseTimelineStorage {
     }
   }
 
+  @Test
+  public void testReadEntities() throws Exception {
+    TimelineEntity e1 = reader.getEntity("user1", "cluster1", "some_flow_name",
+        1002345678919L, "application_1231111111_1111","world", "hello", null,
+        null, EnumSet.of(Field.ALL));
+    assertNotNull(e1);
+    assertEquals(3, e1.getConfigs().size());
+    assertEquals(1, e1.getIsRelatedToEntities().size());
+    Set<TimelineEntity> es1 = reader.getEntities("user1", "cluster1",
+        "some_flow_name", 1002345678919L, 
"application_1231111111_1111","world",
+        null, null, null, null, null, null, null, null, null, null, null, null,
+        null, EnumSet.of(Field.ALL));
+    assertEquals(3, es1.size());
+  }
+
+  @Test
+  public void testReadEntitiesDefaultView() throws Exception {
+    TimelineEntity e1 =
+        reader.getEntity("user1", "cluster1", "some_flow_name", 1002345678919L,
+        "application_1231111111_1111","world", "hello", null, null, null);
+    assertNotNull(e1);
+    assertTrue(e1.getInfo().isEmpty() && e1.getConfigs().isEmpty() &&
+        e1.getMetrics().isEmpty() && e1.getIsRelatedToEntities().isEmpty() &&
+        e1.getRelatesToEntities().isEmpty());
+    Set<TimelineEntity> es1 = reader.getEntities("user1", "cluster1",
+        "some_flow_name", 1002345678919L, 
"application_1231111111_1111","world",
+        null, null, null, null, null, null, null, null, null, null, null, null,
+        null, null);
+    assertEquals(3, es1.size());
+    for (TimelineEntity e : es1) {
+      assertTrue(e.getInfo().isEmpty() && e.getConfigs().isEmpty() &&
+          e.getMetrics().isEmpty() && e.getIsRelatedToEntities().isEmpty() &&
+          e.getRelatesToEntities().isEmpty());
+    }
+  }
+
+  @Test
+  public void testReadEntitiesByFields() throws Exception {
+    TimelineEntity e1 =
+        reader.getEntity("user1", "cluster1", "some_flow_name", 1002345678919L,
+        "application_1231111111_1111","world", "hello", null, null,
+        EnumSet.of(Field.INFO, Field.CONFIGS));
+    assertNotNull(e1);
+    assertEquals(3, e1.getConfigs().size());
+    assertEquals(0, e1.getIsRelatedToEntities().size());
+    Set<TimelineEntity> es1 = reader.getEntities("user1", "cluster1",
+        "some_flow_name", 1002345678919L, 
"application_1231111111_1111","world",
+        null, null, null, null, null, null, null, null, null, null, null, null,
+        null, EnumSet.of(Field.IS_RELATED_TO, Field.METRICS));
+    assertEquals(3, es1.size());
+    int metricsCnt = 0;
+    int isRelatedToCnt = 0;
+    int infoCnt = 0;
+    for (TimelineEntity entity : es1) {
+      metricsCnt += entity.getMetrics().size();
+      isRelatedToCnt += entity.getIsRelatedToEntities().size();
+      infoCnt += entity.getInfo().size();
+    }
+    assertEquals(0, infoCnt);
+    assertEquals(2, isRelatedToCnt);
+    assertEquals(3, metricsCnt);
+  }
+
+  @Test
+  public void testReadEntitiesConfigPrefix() throws Exception {
+    TimelineFilterList list =
+        new TimelineFilterList(Operator.OR,
+            new TimelinePrefixFilter(TimelineCompareOp.EQUAL, "cfg_"));
+    TimelineEntity e1 =
+        reader.getEntity("user1", "cluster1", "some_flow_name", 1002345678919L,
+        "application_1231111111_1111","world", "hello", list, null, null);
+    assertNotNull(e1);
+    assertEquals(1, e1.getConfigs().size());
+    Set<TimelineEntity> es1 = reader.getEntities("user1", "cluster1",
+        "some_flow_name", 1002345678919L, 
"application_1231111111_1111","world",
+        null, null, null, null, null, null, null, null, null, null, null,
+        list, null, null);
+    int cfgCnt = 0;
+    for (TimelineEntity entity : es1) {
+      cfgCnt += entity.getConfigs().size();
+    }
+    assertEquals(3, cfgCnt);
+  }
+
+  @Test
+  public void testReadEntitiesConfigFilterPrefix() throws Exception {
+    Map<String, String> confFilters = ImmutableMap.of("cfg_param1","value1");
+    TimelineFilterList list =
+        new TimelineFilterList(Operator.OR,
+            new TimelinePrefixFilter(TimelineCompareOp.EQUAL, "cfg_"));
+    Set<TimelineEntity> entities = reader.getEntities("user1", "cluster1",
+        "some_flow_name", 1002345678919L, 
"application_1231111111_1111","world",
+        null, null, null, null, null, null, null, null, confFilters, null, 
null,
+        list, null, null);
+    assertEquals(1, entities.size());
+    int cfgCnt = 0;
+    for (TimelineEntity entity : entities) {
+      cfgCnt += entity.getConfigs().size();
+    }
+    assertEquals(2, cfgCnt);
+  }
+
+  @Test
+  public void testReadEntitiesMetricPrefix() throws Exception {
+    TimelineFilterList list =
+        new TimelineFilterList(Operator.OR,
+            new TimelinePrefixFilter(TimelineCompareOp.EQUAL, "MAP1_"));
+    TimelineEntity e1 =
+        reader.getEntity("user1", "cluster1", "some_flow_name", 1002345678919L,
+        "application_1231111111_1111","world", "hello", null, list, null);
+    assertNotNull(e1);
+    assertEquals(1, e1.getMetrics().size());
+    Set<TimelineEntity> es1 = reader.getEntities("user1", "cluster1",
+        "some_flow_name", 1002345678919L, 
"application_1231111111_1111","world",
+        null, null, null, null, null, null, null, null, null, null, null, null,
+        list, null);
+    int metricCnt = 0;
+    for (TimelineEntity entity : es1) {
+      metricCnt += entity.getMetrics().size();
+    }
+    assertEquals(2, metricCnt);
+  }
+
+  @Test
+  public void testReadEntitiesMetricFilterPrefix() throws Exception {
+    Set<String> metricFilters = ImmutableSet.of("MAP1_SLOT_MILLIS");
+    TimelineFilterList list =
+        new TimelineFilterList(Operator.OR,
+            new TimelinePrefixFilter(TimelineCompareOp.EQUAL, "MAP1_"));
+    Set<TimelineEntity> entities = reader.getEntities("user1", "cluster1",
+        "some_flow_name", 1002345678919L, 
"application_1231111111_1111","world",
+        null, null, null, null, null, null, null, null, null, metricFilters,
+        null, null, list, null);
+    assertEquals(1, entities.size());
+    int metricCnt = 0;
+    for (TimelineEntity entity : entities) {
+      metricCnt += entity.getMetrics().size();
+    }
+    assertEquals(1, metricCnt);
+  }
+
+  @Test
+  public void testReadApps() throws Exception {
+    TimelineEntity e1 = reader.getEntity("user1", "cluster1", "some_flow_name",
+        1002345678919L, "application_1111111111_2222",
+        TimelineEntityType.YARN_APPLICATION.toString(), null, null, null,
+        EnumSet.of(Field.ALL));
+    assertNotNull(e1);
+    assertEquals(3, e1.getConfigs().size());
+    assertEquals(1, e1.getIsRelatedToEntities().size());
+    Set<TimelineEntity> es1 = reader.getEntities("user1", "cluster1",
+        "some_flow_name", 1002345678919L, null,
+        TimelineEntityType.YARN_APPLICATION.toString(), null, null, null, null,
+        null, null, null, null, null, null, null, null, null,
+        EnumSet.of(Field.ALL));
+    assertEquals(3, es1.size());
+  }
+
+  @Test
+  public void testReadAppsDefaultView() throws Exception {
+    TimelineEntity e1 = reader.getEntity("user1", "cluster1", "some_flow_name",
+        1002345678919L, "application_1111111111_2222",
+        TimelineEntityType.YARN_APPLICATION.toString(), null, null, null, 
null);
+    assertNotNull(e1);
+    assertTrue(e1.getInfo().isEmpty() && e1.getConfigs().isEmpty() &&
+        e1.getMetrics().isEmpty() && e1.getIsRelatedToEntities().isEmpty() &&
+        e1.getRelatesToEntities().isEmpty());
+    Set<TimelineEntity> es1 = reader.getEntities("user1", "cluster1",
+        "some_flow_name", 1002345678919L, null,
+        TimelineEntityType.YARN_APPLICATION.toString(), null, null, null, null,
+        null, null, null, null, null, null, null, null, null, null);
+    assertEquals(3, es1.size());
+    for (TimelineEntity e : es1) {
+      assertTrue(e.getInfo().isEmpty() && e.getConfigs().isEmpty() &&
+          e.getMetrics().isEmpty() && e.getIsRelatedToEntities().isEmpty() &&
+          e.getRelatesToEntities().isEmpty());
+    }
+  }
+
+  @Test
+  public void testReadAppsByFields() throws Exception {
+    TimelineEntity e1 = reader.getEntity("user1", "cluster1", "some_flow_name",
+        1002345678919L, "application_1111111111_2222",
+        TimelineEntityType.YARN_APPLICATION.toString(), null, null, null,
+        EnumSet.of(Field.INFO, Field.CONFIGS));
+    assertNotNull(e1);
+    assertEquals(3, e1.getConfigs().size());
+    assertEquals(0, e1.getIsRelatedToEntities().size());
+    Set<TimelineEntity> es1 = reader.getEntities("user1", "cluster1",
+        "some_flow_name", 1002345678919L, null,
+        TimelineEntityType.YARN_APPLICATION.toString(), null, null, null, null,
+        null, null, null, null, null, null, null, null, null,
+        EnumSet.of(Field.IS_RELATED_TO, Field.METRICS));
+    assertEquals(3, es1.size());
+    int metricsCnt = 0;
+    int isRelatedToCnt = 0;
+    int infoCnt = 0;
+    for (TimelineEntity entity : es1) {
+      metricsCnt += entity.getMetrics().size();
+      isRelatedToCnt += entity.getIsRelatedToEntities().size();
+      infoCnt += entity.getInfo().size();
+    }
+    assertEquals(0, infoCnt);
+    assertEquals(2, isRelatedToCnt);
+    assertEquals(3, metricsCnt);
+  }
+
+  @Test
+  public void testReadAppsConfigPrefix() throws Exception {
+    TimelineFilterList list =
+        new TimelineFilterList(Operator.OR,
+            new TimelinePrefixFilter(TimelineCompareOp.EQUAL, "cfg_"));
+    TimelineEntity e1 = reader.getEntity("user1", "cluster1", "some_flow_name",
+        1002345678919L, "application_1111111111_2222",
+        TimelineEntityType.YARN_APPLICATION.toString(), null, list, null, 
null);
+    assertNotNull(e1);
+    assertEquals(1, e1.getConfigs().size());
+    Set<TimelineEntity> es1 = reader.getEntities("user1", "cluster1",
+        "some_flow_name", 1002345678919L, null,
+        TimelineEntityType.YARN_APPLICATION.toString(), null, null, null, null,
+        null, null, null, null, null, null, null, list, null, null);
+    int cfgCnt = 0;
+    for (TimelineEntity entity : es1) {
+      cfgCnt += entity.getConfigs().size();
+    }
+    assertEquals(3, cfgCnt);
+  }
+
+  @Test
+  public void testReadAppsConfigFilterPrefix() throws Exception {
+    Map<String, String> confFilters = ImmutableMap.of("cfg_param1","value1");
+    TimelineFilterList list =
+        new TimelineFilterList(Operator.OR,
+            new TimelinePrefixFilter(TimelineCompareOp.EQUAL, "cfg_"));
+    Set<TimelineEntity> entities = reader.getEntities("user1", "cluster1",
+        "some_flow_name", 1002345678919L, null,
+        TimelineEntityType.YARN_APPLICATION.toString(), null, null, null, null,
+        null, null, null, null, confFilters, null, null, list, null, null);
+    assertEquals(1, entities.size());
+    int cfgCnt = 0;
+    for (TimelineEntity entity : entities) {
+      cfgCnt += entity.getConfigs().size();
+    }
+    assertEquals(2, cfgCnt);
+  }
+
+  @Test
+  public void testReadAppsMetricPrefix() throws Exception {
+    TimelineFilterList list =
+        new TimelineFilterList(Operator.OR,
+            new TimelinePrefixFilter(TimelineCompareOp.EQUAL, "MAP1_"));
+    TimelineEntity e1 = reader.getEntity("user1", "cluster1", "some_flow_name",
+        1002345678919L, "application_1111111111_2222",
+        TimelineEntityType.YARN_APPLICATION.toString(), null, null, list, 
null);
+    assertNotNull(e1);
+    assertEquals(1, e1.getMetrics().size());
+    Set<TimelineEntity> es1 = reader.getEntities("user1", "cluster1",
+        "some_flow_name", 1002345678919L, null,
+        TimelineEntityType.YARN_APPLICATION.toString(), null, null, null, null,
+        null, null, null, null, null, null, null, null, list, null);
+    int metricCnt = 0;
+    for (TimelineEntity entity : es1) {
+      metricCnt += entity.getMetrics().size();
+    }
+    assertEquals(2, metricCnt);
+  }
+
+  @Test
+  public void testReadAppsMetricFilterPrefix() throws Exception {
+    TimelineFilterList list =
+        new TimelineFilterList(Operator.OR,
+            new TimelinePrefixFilter(TimelineCompareOp.EQUAL, "MAP1_"));
+    Set<String> metricFilters = ImmutableSet.of("MAP1_SLOT_MILLIS");
+    Set<TimelineEntity> entities = reader.getEntities("user1", "cluster1",
+        "some_flow_name", 1002345678919L, null,
+        TimelineEntityType.YARN_APPLICATION.toString(), null, null, null, null,
+        null, null, null, null, null, metricFilters, null, null, list, null);
+    int metricCnt = 0;
+    assertEquals(1, entities.size());
+    for (TimelineEntity entity : entities) {
+      metricCnt += entity.getMetrics().size();
+    }
+    assertEquals(1, metricCnt);
+  }
+
   @AfterClass
   public static void tearDownAfterClass() throws Exception {
     util.shutdownMiniCluster();

http://git-wip-us.apache.org/repos/asf/hadoop/blob/cdb96df9/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/TestHBaseStorageFlowActivity.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/TestHBaseStorageFlowActivity.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/TestHBaseStorageFlowActivity.java
index c957dad..434adac 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/TestHBaseStorageFlowActivity.java
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/TestHBaseStorageFlowActivity.java
@@ -182,7 +182,7 @@ public class TestHBaseStorageFlowActivity {
       Set<TimelineEntity> entities =
           hbr.getEntities(null, cluster, null, null, null,
               TimelineEntityType.YARN_FLOW_ACTIVITY.toString(), 10L, null, 
null,
-              null, null, null, null, null, null, null, null, null);
+              null, null, null, null, null, null, null, null, null, null, 
null);
       assertEquals(1, entities.size());
       for (TimelineEntity e : entities) {
         FlowActivityEntity flowActivity = (FlowActivityEntity)e;
@@ -238,7 +238,7 @@ public class TestHBaseStorageFlowActivity {
       Set<TimelineEntity> entities =
           hbr.getEntities(user, cluster, flow, null, null,
               TimelineEntityType.YARN_FLOW_ACTIVITY.toString(), 10L, null, 
null,
-              null, null, null, null, null, null, null, null, null);
+              null, null, null, null, null, null, null, null, null, null, 
null);
       assertEquals(1, entities.size());
       for (TimelineEntity e : entities) {
         FlowActivityEntity entity = (FlowActivityEntity)e;
@@ -353,7 +353,7 @@ public class TestHBaseStorageFlowActivity {
       Set<TimelineEntity> entities =
           hbr.getEntities(null, cluster, null, null, null,
               TimelineEntityType.YARN_FLOW_ACTIVITY.toString(), 10L, null, 
null,
-              null, null, null, null, null, null, null, null, null);
+              null, null, null, null, null, null, null, null, null, null, 
null);
       assertEquals(1, entities.size());
       for (TimelineEntity e : entities) {
         FlowActivityEntity flowActivity = (FlowActivityEntity)e;

http://git-wip-us.apache.org/repos/asf/hadoop/blob/cdb96df9/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/TestHBaseStorageFlowRun.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/TestHBaseStorageFlowRun.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/TestHBaseStorageFlowRun.java
index 4fb8f0e..5da0192 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/TestHBaseStorageFlowRun.java
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/TestHBaseStorageFlowRun.java
@@ -24,6 +24,7 @@ import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
 
 import java.io.IOException;
+import java.util.EnumSet;
 import java.util.Map;
 import java.util.Set;
 
@@ -44,9 +45,13 @@ import 
org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity;
 import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntityType;
 import org.apache.hadoop.yarn.api.records.timelineservice.TimelineMetric;
 import org.apache.hadoop.yarn.server.timeline.GenericObjectMapper;
+import 
org.apache.hadoop.yarn.server.timelineservice.reader.filter.TimelineCompareOp;
+import 
org.apache.hadoop.yarn.server.timelineservice.reader.filter.TimelineFilterList;
+import 
org.apache.hadoop.yarn.server.timelineservice.reader.filter.TimelinePrefixFilter;
 import 
org.apache.hadoop.yarn.server.timelineservice.storage.HBaseTimelineReaderImpl;
 import 
org.apache.hadoop.yarn.server.timelineservice.storage.HBaseTimelineWriterImpl;
 import 
org.apache.hadoop.yarn.server.timelineservice.storage.TimelineSchemaCreator;
+import 
org.apache.hadoop.yarn.server.timelineservice.storage.TimelineReader.Field;
 import 
org.apache.hadoop.yarn.server.timelineservice.storage.common.ColumnHelper;
 import org.junit.AfterClass;
 import org.junit.BeforeClass;
@@ -178,9 +183,8 @@ public class TestHBaseStorageFlowRun {
       hbr.init(c1);
       hbr.start();
       // get the flow run entity
-      TimelineEntity entity =
-          hbr.getEntity(user, cluster, flow, runid, null,
-              TimelineEntityType.YARN_FLOW_RUN.toString(), null, null);
+      TimelineEntity entity = hbr.getEntity(user, cluster, flow, runid, null,
+          TimelineEntityType.YARN_FLOW_RUN.toString(), null, null, null, null);
       assertTrue(TimelineEntityType.YARN_FLOW_RUN.matches(entity.getType()));
       FlowRunEntity flowRun = (FlowRunEntity)entity;
       assertEquals(minStartTs, flowRun.getStartTime());
@@ -238,9 +242,8 @@ public class TestHBaseStorageFlowRun {
       hbr = new HBaseTimelineReaderImpl();
       hbr.init(c1);
       hbr.start();
-      TimelineEntity entity =
-          hbr.getEntity(user, cluster, flow, runid, null,
-            TimelineEntityType.YARN_FLOW_RUN.toString(), null, null);
+      TimelineEntity entity = hbr.getEntity(user, cluster, flow, runid, null,
+          TimelineEntityType.YARN_FLOW_RUN.toString(), null, null, null, null);
       assertTrue(TimelineEntityType.YARN_FLOW_RUN.matches(entity.getType()));
       Set<TimelineMetric> metrics = entity.getMetrics();
       assertEquals(2, metrics.size());
@@ -305,6 +308,181 @@ public class TestHBaseStorageFlowRun {
     assertEquals(1, rowCount);
   }
 
+  @Test
+  public void testWriteFlowRunMetricsPrefix() throws Exception {
+    String cluster = "testWriteFlowRunMetricsOneFlow_cluster1";
+    String user = "testWriteFlowRunMetricsOneFlow_user1";
+    String flow = "testing_flowRun_metrics_flow_name";
+    String flowVersion = "CF7022C10F1354";
+    long runid = 1002345678919L;
+
+    TimelineEntities te = new TimelineEntities();
+    TimelineEntity entityApp1 = TestFlowDataGenerator.getEntityMetricsApp1();
+    te.addEntity(entityApp1);
+
+    HBaseTimelineWriterImpl hbi = null;
+    Configuration c1 = util.getConfiguration();
+    try {
+      hbi = new HBaseTimelineWriterImpl(c1);
+      hbi.init(c1);
+      String appName = "application_11111111111111_1111";
+      hbi.write(cluster, user, flow, flowVersion, runid, appName, te);
+      // write another application with same metric to this flow
+      te = new TimelineEntities();
+      TimelineEntity entityApp2 = TestFlowDataGenerator.getEntityMetricsApp2();
+      te.addEntity(entityApp2);
+      appName = "application_11111111111111_2222";
+      hbi.write(cluster, user, flow, flowVersion, runid, appName, te);
+      hbi.flush();
+    } finally {
+      hbi.close();
+    }
+
+    // check flow run
+    checkFlowRunTable(cluster, user, flow, runid, c1);
+
+    // use the timeline reader to verify data
+    HBaseTimelineReaderImpl hbr = null;
+    try {
+      hbr = new HBaseTimelineReaderImpl();
+      hbr.init(c1);
+      hbr.start();
+      TimelineFilterList metricsToRetrieve =
+          new TimelineFilterList(new 
TimelinePrefixFilter(TimelineCompareOp.EQUAL,
+          metric1.substring(0, metric1.indexOf("_") + 1)));
+      TimelineEntity entity = hbr.getEntity(user, cluster, flow, runid, null,
+          TimelineEntityType.YARN_FLOW_RUN.toString(), null, null,
+          metricsToRetrieve, null);
+      assertTrue(TimelineEntityType.YARN_FLOW_RUN.matches(entity.getType()));
+      Set<TimelineMetric> metrics = entity.getMetrics();
+      assertEquals(1, metrics.size());
+      for (TimelineMetric metric : metrics) {
+        String id = metric.getId();
+        Map<Long, Number> values = metric.getValues();
+        assertEquals(1, values.size());
+        Number value = null;
+        for (Number n : values.values()) {
+          value = n;
+        }
+        switch (id) {
+        case metric1:
+          assertEquals(141L, value);
+          break;
+        default:
+          fail("unrecognized metric: " + id);
+        }
+      }
+
+      Set<TimelineEntity> entities = hbr.getEntities(user, cluster, flow, 
runid,
+          null, TimelineEntityType.YARN_FLOW_RUN.toString(), null, null, null,
+          null, null, null, null, null, null, null, null, null,
+          metricsToRetrieve, null);
+      assertEquals(1, entities.size());
+      for (TimelineEntity timelineEntity : entities) {
+        Set<TimelineMetric> timelineMetrics = timelineEntity.getMetrics();
+        assertEquals(1, timelineMetrics.size());
+        for (TimelineMetric metric : timelineMetrics) {
+          String id = metric.getId();
+          Map<Long, Number> values = metric.getValues();
+          assertEquals(1, values.size());
+          Number value = null;
+          for (Number n : values.values()) {
+            value = n;
+          }
+          switch (id) {
+          case metric1:
+            assertEquals(141L, value);
+            break;
+          default:
+            fail("unrecognized metric: " + id);
+          }
+        }
+      }
+    } finally {
+      hbr.close();
+    }
+  }
+
+  @Test
+  public void testWriteFlowRunsMetricFields() throws Exception {
+    String cluster = "testWriteFlowRunMetricsOneFlow_cluster1";
+    String user = "testWriteFlowRunMetricsOneFlow_user1";
+    String flow = "testing_flowRun_metrics_flow_name";
+    String flowVersion = "CF7022C10F1354";
+    long runid = 1002345678919L;
+
+    TimelineEntities te = new TimelineEntities();
+    TimelineEntity entityApp1 = TestFlowDataGenerator.getEntityMetricsApp1();
+    te.addEntity(entityApp1);
+
+    HBaseTimelineWriterImpl hbi = null;
+    Configuration c1 = util.getConfiguration();
+    try {
+      hbi = new HBaseTimelineWriterImpl(c1);
+      hbi.init(c1);
+      String appName = "application_11111111111111_1111";
+      hbi.write(cluster, user, flow, flowVersion, runid, appName, te);
+      // write another application with same metric to this flow
+      te = new TimelineEntities();
+      TimelineEntity entityApp2 = TestFlowDataGenerator.getEntityMetricsApp2();
+      te.addEntity(entityApp2);
+      appName = "application_11111111111111_2222";
+      hbi.write(cluster, user, flow, flowVersion, runid, appName, te);
+      hbi.flush();
+    } finally {
+      hbi.close();
+    }
+
+    // check flow run
+    checkFlowRunTable(cluster, user, flow, runid, c1);
+
+    // use the timeline reader to verify data
+    HBaseTimelineReaderImpl hbr = null;
+    try {
+      hbr = new HBaseTimelineReaderImpl();
+      hbr.init(c1);
+      hbr.start();
+      Set<TimelineEntity> entities = hbr.getEntities(user, cluster, flow, 
runid,
+          null, TimelineEntityType.YARN_FLOW_RUN.toString(), null, null, null,
+          null, null, null, null, null, null, null, null, null, null, null);
+      assertEquals(1, entities.size());
+      for (TimelineEntity timelineEntity : entities) {
+        assertEquals(0, timelineEntity.getMetrics().size());
+      }
+
+      entities = hbr.getEntities(user, cluster, flow, runid,
+          null, TimelineEntityType.YARN_FLOW_RUN.toString(), null, null, null,
+          null, null, null, null, null, null, null, null, null,
+          null, EnumSet.of(Field.METRICS));
+      assertEquals(1, entities.size());
+      for (TimelineEntity timelineEntity : entities) {
+        Set<TimelineMetric> timelineMetrics = timelineEntity.getMetrics();
+        assertEquals(2, timelineMetrics.size());
+        for (TimelineMetric metric : timelineMetrics) {
+          String id = metric.getId();
+          Map<Long, Number> values = metric.getValues();
+          assertEquals(1, values.size());
+          Number value = null;
+          for (Number n : values.values()) {
+            value = n;
+          }
+          switch (id) {
+          case metric1:
+            assertEquals(141L, value);
+            break;
+          case metric2:
+            assertEquals(57L, value);
+            break;
+          default:
+            fail("unrecognized metric: " + id);
+          }
+        }
+      }
+    } finally {
+      hbr.close();
+    }
+  }
+
   @AfterClass
   public static void tearDownAfterClass() throws Exception {
     util.shutdownMiniCluster();

Reply via email to