http://git-wip-us.apache.org/repos/asf/hadoop/blob/ccdec4a1/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase-tests/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/TestPhoenixOfflineAggregationWriterImpl.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase-tests/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/TestPhoenixOfflineAggregationWriterImpl.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase-tests/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/TestPhoenixOfflineAggregationWriterImpl.java
new file mode 100644
index 0000000..58d5e61
--- /dev/null
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase-tests/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/TestPhoenixOfflineAggregationWriterImpl.java
@@ -0,0 +1,161 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.timelineservice.storage;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+import org.apache.hadoop.hbase.IntegrationTestingUtility;
+import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntities;
+import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity;
+import org.apache.hadoop.yarn.api.records.timelineservice.TimelineMetric;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import 
org.apache.hadoop.yarn.server.timelineservice.collector.TimelineCollectorContext;
+import 
org.apache.hadoop.yarn.server.timelineservice.storage.common.OfflineAggregationInfo;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.apache.phoenix.hbase.index.write.IndexWriterUtils;
+import org.apache.phoenix.query.BaseTest;
+import org.apache.phoenix.query.QueryServices;
+import org.apache.phoenix.util.ReadOnlyProps;
+
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.sql.Statement;
+import java.util.HashMap;
+import java.util.Map;
+
+import static org.apache.phoenix.util.TestUtil.TEST_PROPERTIES;
+
+public class TestPhoenixOfflineAggregationWriterImpl extends BaseTest {
+  private static PhoenixOfflineAggregationWriterImpl storage;
+  private static final int BATCH_SIZE = 3;
+
+  @BeforeClass
+  public static void setup() throws Exception {
+    YarnConfiguration conf = new YarnConfiguration();
+    storage = setupPhoenixClusterAndWriterForTest(conf);
+  }
+
+  @Test(timeout = 90000)
+  public void testFlowLevelAggregationStorage() throws Exception {
+    testAggregator(OfflineAggregationInfo.FLOW_AGGREGATION);
+  }
+
+  @Test(timeout = 90000)
+  public void testUserLevelAggregationStorage() throws Exception {
+    testAggregator(OfflineAggregationInfo.USER_AGGREGATION);
+  }
+
+  @AfterClass
+  public static void cleanup() throws Exception {
+    storage.dropTable(OfflineAggregationInfo.FLOW_AGGREGATION_TABLE_NAME);
+    storage.dropTable(OfflineAggregationInfo.USER_AGGREGATION_TABLE_NAME);
+    tearDownMiniCluster();
+  }
+
+  private static PhoenixOfflineAggregationWriterImpl
+    setupPhoenixClusterAndWriterForTest(YarnConfiguration conf)
+      throws Exception{
+    Map<String, String> props = new HashMap<>();
+    // Must update config before starting server
+    props.put(QueryServices.STATS_USE_CURRENT_TIME_ATTRIB,
+        Boolean.FALSE.toString());
+    props.put("java.security.krb5.realm", "");
+    props.put("java.security.krb5.kdc", "");
+    props.put(IntegrationTestingUtility.IS_DISTRIBUTED_CLUSTER,
+        Boolean.FALSE.toString());
+    props.put(QueryServices.QUEUE_SIZE_ATTRIB, Integer.toString(5000));
+    props.put(IndexWriterUtils.HTABLE_THREAD_KEY, Integer.toString(100));
+    // Make a small batch size to test multiple calls to reserve sequences
+    props.put(QueryServices.SEQUENCE_CACHE_SIZE_ATTRIB,
+        Long.toString(BATCH_SIZE));
+    // Must update config before starting server
+    setUpTestDriver(new ReadOnlyProps(props.entrySet().iterator()));
+
+    // Change connection settings for test
+    conf.set(
+        YarnConfiguration.PHOENIX_OFFLINE_STORAGE_CONN_STR,
+        getUrl());
+    PhoenixOfflineAggregationWriterImpl
+        myWriter = new PhoenixOfflineAggregationWriterImpl(TEST_PROPERTIES);
+    myWriter.init(conf);
+    myWriter.start();
+    myWriter.createPhoenixTables();
+    return myWriter;
+  }
+
+  private static TimelineEntity getTestAggregationTimelineEntity() {
+    TimelineEntity entity = new TimelineEntity();
+    String id = "hello1";
+    String type = "testAggregationType";
+    entity.setId(id);
+    entity.setType(type);
+    entity.setCreatedTime(1425016501000L);
+
+    TimelineMetric metric = new TimelineMetric();
+    metric.setId("HDFS_BYTES_READ");
+    metric.addValue(1425016501100L, 8000);
+    entity.addMetric(metric);
+
+    return entity;
+  }
+
+  private void testAggregator(OfflineAggregationInfo aggregationInfo)
+      throws Exception {
+    // Set up a list of timeline entities and write them back to Phoenix
+    int numEntity = 1;
+    TimelineEntities te = new TimelineEntities();
+    te.addEntity(getTestAggregationTimelineEntity());
+    TimelineCollectorContext context = new 
TimelineCollectorContext("cluster_1",
+        "user1", "testFlow", null, 0L, null);
+    storage.writeAggregatedEntity(context, te,
+        aggregationInfo);
+
+    // Verify if we're storing all entities
+    String[] primaryKeyList = aggregationInfo.getPrimaryKeyList();
+    String sql = "SELECT COUNT(" + primaryKeyList[primaryKeyList.length - 1]
+        +") FROM " + aggregationInfo.getTableName();
+    verifySQLWithCount(sql, numEntity, "Number of entities should be ");
+    // Check metric
+    sql = "SELECT COUNT(m.HDFS_BYTES_READ) FROM "
+        + aggregationInfo.getTableName() + "(m.HDFS_BYTES_READ VARBINARY) ";
+    verifySQLWithCount(sql, numEntity,
+        "Number of entities with info should be ");
+  }
+
+
+  private void verifySQLWithCount(String sql, int targetCount, String message)
+      throws Exception {
+    try (
+        Statement stmt =
+          storage.getConnection().createStatement();
+        ResultSet rs = stmt.executeQuery(sql)) {
+      assertTrue("Result set empty on statement " + sql, rs.next());
+      assertNotNull("Fail to execute query " + sql, rs);
+      assertEquals(message + " " + targetCount, targetCount, rs.getInt(1));
+    } catch (SQLException se) {
+      fail("SQL exception on query: " + sql
+          + " With exception message: " + se.getLocalizedMessage());
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/ccdec4a1/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase-tests/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/TestFlowDataGenerator.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase-tests/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/TestFlowDataGenerator.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase-tests/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/TestFlowDataGenerator.java
new file mode 100644
index 0000000..3b8036d
--- /dev/null
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase-tests/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/TestFlowDataGenerator.java
@@ -0,0 +1,383 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.timelineservice.storage.flow;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity;
+import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntityType;
+import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEvent;
+import org.apache.hadoop.yarn.api.records.timelineservice.TimelineMetric;
+import org.apache.hadoop.yarn.api.records.timelineservice.TimelineMetric.Type;
+import org.apache.hadoop.yarn.server.metrics.ApplicationMetricsConstants;
+import org.apache.hadoop.conf.Configuration;
+
+/**
+ * Generates the data/entities for the FlowRun and FlowActivity Tables
+ */
+class TestFlowDataGenerator {
+
+  private static final String metric1 = "MAP_SLOT_MILLIS";
+  private static final String metric2 = "HDFS_BYTES_READ";
+  public static final long END_TS_INCR = 10000L;
+
+  static TimelineEntity getEntityMetricsApp1(long insertTs, Configuration c1) {
+    TimelineEntity entity = new TimelineEntity();
+    String id = "flowRunMetrics_test";
+    String type = TimelineEntityType.YARN_APPLICATION.toString();
+    entity.setId(id);
+    entity.setType(type);
+    long cTime = 1425016501000L;
+    entity.setCreatedTime(cTime);
+
+    // add metrics
+    Set<TimelineMetric> metrics = new HashSet<>();
+    TimelineMetric m1 = new TimelineMetric();
+    m1.setId(metric1);
+    Map<Long, Number> metricValues = new HashMap<Long, Number>();
+    long ts = insertTs;
+
+    for (int k=1; k< 100 ; k++) {
+    metricValues.put(ts - k*200000, 20L);
+    }
+    metricValues.put(ts - 80000, 40L);
+    m1.setType(Type.TIME_SERIES);
+    m1.setValues(metricValues);
+    metrics.add(m1);
+
+    TimelineMetric m2 = new TimelineMetric();
+    m2.setId(metric2);
+    metricValues = new HashMap<Long, Number>();
+    ts = System.currentTimeMillis();
+    for (int k=1; k< 100 ; k++) {
+      metricValues.put(ts - k*100000, 31L);
+    }
+
+    metricValues.put(ts - 80000, 57L);
+    m2.setType(Type.TIME_SERIES);
+    m2.setValues(metricValues);
+    metrics.add(m2);
+
+    entity.addMetrics(metrics);
+    return entity;
+  }
+
+
+  static TimelineEntity getEntityMetricsApp1Complete(long insertTs, 
Configuration c1) {
+    TimelineEntity entity = new TimelineEntity();
+    String id = "flowRunMetrics_test";
+    String type = TimelineEntityType.YARN_APPLICATION.toString();
+    entity.setId(id);
+    entity.setType(type);
+    long cTime = 1425016501000L;
+    entity.setCreatedTime(cTime);
+
+    // add metrics
+    Set<TimelineMetric> metrics = new HashSet<>();
+    TimelineMetric m1 = new TimelineMetric();
+    m1.setId(metric1);
+    Map<Long, Number> metricValues = new HashMap<Long, Number>();
+    long ts = insertTs;
+
+    metricValues.put(ts - 80000, 40L);
+    m1.setType(Type.TIME_SERIES);
+    m1.setValues(metricValues);
+    metrics.add(m1);
+
+    TimelineMetric m2 = new TimelineMetric();
+    m2.setId(metric2);
+    metricValues = new HashMap<Long, Number>();
+    ts = insertTs;
+    metricValues.put(ts - 80000, 57L);
+    m2.setType(Type.TIME_SERIES);
+    m2.setValues(metricValues);
+    metrics.add(m2);
+
+    entity.addMetrics(metrics);
+
+    TimelineEvent event = new TimelineEvent();
+    event.setId(ApplicationMetricsConstants.FINISHED_EVENT_TYPE);
+    event.setTimestamp(insertTs);
+    event.addInfo("done", "insertTs=" + insertTs);
+    entity.addEvent(event);
+    return entity;
+  }
+
+
+  static TimelineEntity getEntityMetricsApp1(long insertTs) {
+    TimelineEntity entity = new TimelineEntity();
+    String id = "flowRunMetrics_test";
+    String type = TimelineEntityType.YARN_APPLICATION.toString();
+    entity.setId(id);
+    entity.setType(type);
+    long cTime = 1425016501000L;
+    entity.setCreatedTime(cTime);
+
+    // add metrics
+    Set<TimelineMetric> metrics = new HashSet<>();
+    TimelineMetric m1 = new TimelineMetric();
+    m1.setId(metric1);
+    Map<Long, Number> metricValues = new HashMap<Long, Number>();
+    long ts = insertTs;
+    metricValues.put(ts - 100000, 2L);
+    metricValues.put(ts - 80000, 40L);
+    m1.setType(Type.TIME_SERIES);
+    m1.setValues(metricValues);
+    metrics.add(m1);
+
+    TimelineMetric m2 = new TimelineMetric();
+    m2.setId(metric2);
+    metricValues = new HashMap<Long, Number>();
+    ts = insertTs;
+    metricValues.put(ts - 100000, 31L);
+    metricValues.put(ts - 80000, 57L);
+    m2.setType(Type.TIME_SERIES);
+    m2.setValues(metricValues);
+    metrics.add(m2);
+
+    entity.addMetrics(metrics);
+    TimelineEvent event = new TimelineEvent();
+    event.setId(ApplicationMetricsConstants.CREATED_EVENT_TYPE);
+    long endTs = 1439379885000L;
+    event.setTimestamp(endTs);
+    String expKey = "foo_event_greater";
+    String expVal = "test_app_greater";
+    event.addInfo(expKey, expVal);
+    entity.addEvent(event);
+    return entity;
+  }
+
+
+  static TimelineEntity getEntityMetricsApp2(long insertTs) {
+    TimelineEntity entity = new TimelineEntity();
+    String id = "flowRunMetrics_test";
+    String type = TimelineEntityType.YARN_APPLICATION.toString();
+    entity.setId(id);
+    entity.setType(type);
+    long cTime = 1425016501000L;
+    entity.setCreatedTime(cTime);
+    // add metrics
+    Set<TimelineMetric> metrics = new HashSet<>();
+    TimelineMetric m1 = new TimelineMetric();
+    m1.setId(metric1);
+    Map<Long, Number> metricValues = new HashMap<Long, Number>();
+    long ts = insertTs;
+    metricValues.put(ts - 100000, 5L);
+    metricValues.put(ts - 80000, 101L);
+    m1.setType(Type.TIME_SERIES);
+    m1.setValues(metricValues);
+    metrics.add(m1);
+    entity.addMetrics(metrics);
+    TimelineEvent event = new TimelineEvent();
+    event.setId(ApplicationMetricsConstants.CREATED_EVENT_TYPE);
+    long endTs = 1439379885000L;
+    event.setTimestamp(endTs);
+    String expKey = "foo_event_greater";
+    String expVal = "test_app_greater";
+    event.addInfo(expKey, expVal);
+    entity.addEvent(event);
+    return entity;
+  }
+
+  static TimelineEntity getEntity1() {
+    TimelineEntity entity = new TimelineEntity();
+    String id = "flowRunHello";
+    String type = TimelineEntityType.YARN_APPLICATION.toString();
+    entity.setId(id);
+    entity.setType(type);
+    long cTime = 1425026901000L;
+    entity.setCreatedTime(cTime);
+    // add metrics
+    Set<TimelineMetric> metrics = new HashSet<>();
+    TimelineMetric m1 = new TimelineMetric();
+    m1.setId(metric1);
+    Map<Long, Number> metricValues = new HashMap<Long, Number>();
+    long ts = System.currentTimeMillis();
+    metricValues.put(ts - 120000, 100000000L);
+    metricValues.put(ts - 100000, 200000000L);
+    metricValues.put(ts - 80000, 300000000L);
+    metricValues.put(ts - 60000, 400000000L);
+    metricValues.put(ts - 40000, 50000000000L);
+    metricValues.put(ts - 20000, 60000000000L);
+    m1.setType(Type.TIME_SERIES);
+    m1.setValues(metricValues);
+    metrics.add(m1);
+    entity.addMetrics(metrics);
+
+    TimelineEvent event = new TimelineEvent();
+    event.setId(ApplicationMetricsConstants.CREATED_EVENT_TYPE);
+    event.setTimestamp(cTime);
+    String expKey = "foo_event";
+    Object expVal = "test";
+    event.addInfo(expKey, expVal);
+    entity.addEvent(event);
+
+    event = new TimelineEvent();
+    event.setId(ApplicationMetricsConstants.FINISHED_EVENT_TYPE);
+    long expTs = cTime + 21600000;// start time + 6hrs
+    event.setTimestamp(expTs);
+    event.addInfo(expKey, expVal);
+    entity.addEvent(event);
+
+    return entity;
+  }
+
+  static TimelineEntity getAFullEntity(long ts, long endTs) {
+    TimelineEntity entity = new TimelineEntity();
+    String id = "flowRunFullEntity";
+    String type = TimelineEntityType.YARN_APPLICATION.toString();
+    entity.setId(id);
+    entity.setType(type);
+    entity.setCreatedTime(ts);
+    // add metrics
+    Set<TimelineMetric> metrics = new HashSet<>();
+    TimelineMetric m1 = new TimelineMetric();
+    m1.setId(metric1);
+    Map<Long, Number> metricValues = new HashMap<Long, Number>();
+    metricValues.put(ts - 120000, 100000000L);
+    metricValues.put(ts - 100000, 200000000L);
+    metricValues.put(ts - 80000, 300000000L);
+    metricValues.put(ts - 60000, 400000000L);
+    metricValues.put(ts - 40000, 50000000000L);
+    metricValues.put(ts - 20000, 60000000000L);
+    m1.setType(Type.TIME_SERIES);
+    m1.setValues(metricValues);
+    metrics.add(m1);
+    TimelineMetric m2 = new TimelineMetric();
+    m2.setId(metric2);
+    metricValues = new HashMap<Long, Number>();
+    metricValues.put(ts - 900000, 31L);
+    metricValues.put(ts - 30000, 57L);
+    m2.setType(Type.TIME_SERIES);
+    m2.setValues(metricValues);
+    metrics.add(m2);
+    entity.addMetrics(metrics);
+
+    TimelineEvent event = new TimelineEvent();
+    event.setId(ApplicationMetricsConstants.CREATED_EVENT_TYPE);
+    event.setTimestamp(ts);
+    String expKey = "foo_event";
+    Object expVal = "test";
+    event.addInfo(expKey, expVal);
+    entity.addEvent(event);
+
+    event = new TimelineEvent();
+    event.setId(ApplicationMetricsConstants.FINISHED_EVENT_TYPE);
+    long expTs = ts + 21600000;// start time + 6hrs
+    event.setTimestamp(expTs);
+    event.addInfo(expKey, expVal);
+    entity.addEvent(event);
+
+    return entity;
+  }
+
+  static TimelineEntity getEntityGreaterStartTime(long startTs) {
+    TimelineEntity entity = new TimelineEntity();
+    entity.setCreatedTime(startTs);
+    entity.setId("flowRunHello with greater start time");
+    String type = TimelineEntityType.YARN_APPLICATION.toString();
+    entity.setType(type);
+    TimelineEvent event = new TimelineEvent();
+    event.setId(ApplicationMetricsConstants.CREATED_EVENT_TYPE);
+    event.setTimestamp(startTs);
+    String expKey = "foo_event_greater";
+    String expVal = "test_app_greater";
+    event.addInfo(expKey, expVal);
+    entity.addEvent(event);
+    return entity;
+  }
+
+  static TimelineEntity getEntityMaxEndTime(long endTs) {
+    TimelineEntity entity = new TimelineEntity();
+    entity.setId("flowRunHello Max End time");
+    entity.setType(TimelineEntityType.YARN_APPLICATION.toString());
+    TimelineEvent event = new TimelineEvent();
+    event.setId(ApplicationMetricsConstants.FINISHED_EVENT_TYPE);
+    event.setTimestamp(endTs);
+    String expKey = "foo_even_max_ finished";
+    String expVal = "test_app_max_finished";
+    event.addInfo(expKey, expVal);
+    entity.addEvent(event);
+    return entity;
+  }
+
+  static TimelineEntity getEntityMinStartTime(long startTs) {
+    TimelineEntity entity = new TimelineEntity();
+    String id = "flowRunHelloMInStartTime";
+    String type = TimelineEntityType.YARN_APPLICATION.toString();
+    entity.setId(id);
+    entity.setType(type);
+    entity.setCreatedTime(startTs);
+    TimelineEvent event = new TimelineEvent();
+    event.setId(ApplicationMetricsConstants.CREATED_EVENT_TYPE);
+    event.setTimestamp(startTs);
+    entity.addEvent(event);
+    return entity;
+  }
+
+  static TimelineEntity getMinFlushEntity(long startTs) {
+    TimelineEntity entity = new TimelineEntity();
+    String id = "flowRunHelloFlushEntityMin";
+    String type = TimelineEntityType.YARN_APPLICATION.toString();
+    entity.setId(id);
+    entity.setType(type);
+    entity.setCreatedTime(startTs);
+    TimelineEvent event = new TimelineEvent();
+    event.setId(ApplicationMetricsConstants.CREATED_EVENT_TYPE);
+    event.setTimestamp(startTs);
+    entity.addEvent(event);
+    return entity;
+  }
+
+  static TimelineEntity getMaxFlushEntity(long startTs) {
+    TimelineEntity entity = new TimelineEntity();
+    String id = "flowRunHelloFlushEntityMax";
+    String type = TimelineEntityType.YARN_APPLICATION.toString();
+    entity.setId(id);
+    entity.setType(type);
+    entity.setCreatedTime(startTs);
+
+    TimelineEvent event = new TimelineEvent();
+    event.setId(ApplicationMetricsConstants.FINISHED_EVENT_TYPE);
+    event.setTimestamp(startTs + END_TS_INCR);
+    entity.addEvent(event);
+    return entity;
+  }
+
+  static TimelineEntity getFlowApp1(long appCreatedTime) {
+    TimelineEntity entity = new TimelineEntity();
+    String id = "flowActivity_test";
+    String type = TimelineEntityType.YARN_APPLICATION.toString();
+    entity.setId(id);
+    entity.setType(type);
+    entity.setCreatedTime(appCreatedTime);
+
+    TimelineEvent event = new TimelineEvent();
+    event.setId(ApplicationMetricsConstants.CREATED_EVENT_TYPE);
+    event.setTimestamp(appCreatedTime);
+    String expKey = "foo_event";
+    Object expVal = "test";
+    event.addInfo(expKey, expVal);
+    entity.addEvent(event);
+
+    return entity;
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/ccdec4a1/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase-tests/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/TestHBaseStorageFlowActivity.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase-tests/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/TestHBaseStorageFlowActivity.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase-tests/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/TestHBaseStorageFlowActivity.java
new file mode 100644
index 0000000..6b23b6c
--- /dev/null
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase-tests/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/TestHBaseStorageFlowActivity.java
@@ -0,0 +1,469 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.timelineservice.storage.flow;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+import java.io.IOException;
+import java.util.Map;
+import java.util.NavigableSet;
+import java.util.Set;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.HBaseTestingUtility;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.client.Connection;
+import org.apache.hadoop.hbase.client.ConnectionFactory;
+import org.apache.hadoop.hbase.client.Get;
+import org.apache.hadoop.hbase.client.Result;
+import org.apache.hadoop.hbase.client.ResultScanner;
+import org.apache.hadoop.hbase.client.Scan;
+import org.apache.hadoop.hbase.client.Table;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.yarn.api.records.timelineservice.FlowActivityEntity;
+import org.apache.hadoop.yarn.api.records.timelineservice.FlowRunEntity;
+import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntities;
+import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity;
+import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntityType;
+import org.apache.hadoop.yarn.server.timeline.GenericObjectMapper;
+import 
org.apache.hadoop.yarn.server.timelineservice.reader.TimelineDataToRetrieve;
+import 
org.apache.hadoop.yarn.server.timelineservice.reader.TimelineEntityFilters;
+import 
org.apache.hadoop.yarn.server.timelineservice.reader.TimelineReaderContext;
+import 
org.apache.hadoop.yarn.server.timelineservice.storage.HBaseTimelineReaderImpl;
+import 
org.apache.hadoop.yarn.server.timelineservice.storage.HBaseTimelineWriterImpl;
+import 
org.apache.hadoop.yarn.server.timelineservice.storage.TimelineSchemaCreator;
+import 
org.apache.hadoop.yarn.server.timelineservice.storage.common.ColumnHelper;
+import 
org.apache.hadoop.yarn.server.timelineservice.storage.common.TimelineStorageUtils;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+/**
+ * Tests the FlowRun and FlowActivity Tables
+ */
+public class TestHBaseStorageFlowActivity {
+
+  private static HBaseTestingUtility util;
+
+  @BeforeClass
+  public static void setupBeforeClass() throws Exception {
+    util = new HBaseTestingUtility();
+    Configuration conf = util.getConfiguration();
+    conf.setInt("hfile.format.version", 3);
+    util.startMiniCluster();
+    createSchema();
+  }
+
+  private static void createSchema() throws IOException {
+    TimelineSchemaCreator.createAllTables(util.getConfiguration(), false);
+  }
+
+  /**
+   * Writes 4 timeline entities belonging to one flow run through the
+   * {@link HBaseTimelineWriterImpl}
+   *
+   * Checks the flow run table contents
+   *
+   * The first entity has a created event, metrics and a finish event.
+   *
+   * The second entity has a created event and this is the entity with smallest
+   * start time. This should be the start time for the flow run.
+   *
+   * The third entity has a finish event and this is the entity with the max 
end
+   * time. This should be the end time for the flow run.
+   *
+   * The fourth entity has a created event which has a start time that is
+   * greater than min start time.
+   *
+   * The test also checks in the flow activity table that one entry has been
+   * made for all of these 4 application entities since they belong to the same
+   * flow run.
+   */
+  @Test
+  public void testWriteFlowRunMinMax() throws Exception {
+
+    TimelineEntities te = new TimelineEntities();
+    te.addEntity(TestFlowDataGenerator.getEntity1());
+
+    HBaseTimelineWriterImpl hbi = null;
+    Configuration c1 = util.getConfiguration();
+    String cluster = "testWriteFlowRunMinMaxToHBase_cluster1";
+    String user = "testWriteFlowRunMinMaxToHBase_user1";
+    String flow = "testing_flowRun_flow_name";
+    String flowVersion = "CF7022C10F1354";
+    long runid = 1002345678919L;
+    String appName = "application_100000000000_1111";
+    long minStartTs = 1424995200300L;
+    long greaterStartTs = 1424995200300L + 864000L;
+    long endTs = 1424995200300L + 86000000L;;
+    TimelineEntity entityMinStartTime = TestFlowDataGenerator
+        .getEntityMinStartTime(minStartTs);
+
+    try {
+      hbi = new HBaseTimelineWriterImpl(c1);
+      hbi.init(c1);
+      hbi.write(cluster, user, flow, flowVersion, runid, appName, te);
+
+      // write another entity with the right min start time
+      te = new TimelineEntities();
+      te.addEntity(entityMinStartTime);
+      appName = "application_100000000000_3333";
+      hbi.write(cluster, user, flow, flowVersion, runid, appName, te);
+
+      // writer another entity for max end time
+      TimelineEntity entityMaxEndTime = TestFlowDataGenerator
+          .getEntityMaxEndTime(endTs);
+      te = new TimelineEntities();
+      te.addEntity(entityMaxEndTime);
+      appName = "application_100000000000_4444";
+      hbi.write(cluster, user, flow, flowVersion, runid, appName, te);
+
+      // writer another entity with greater start time
+      TimelineEntity entityGreaterStartTime = TestFlowDataGenerator
+          .getEntityGreaterStartTime(greaterStartTs);
+      te = new TimelineEntities();
+      te.addEntity(entityGreaterStartTime);
+      appName = "application_1000000000000000_2222";
+      hbi.write(cluster, user, flow, flowVersion, runid, appName, te);
+
+      // flush everything to hbase
+      hbi.flush();
+    } finally {
+      hbi.close();
+    }
+
+    Connection conn = ConnectionFactory.createConnection(c1);
+    // check in flow activity table
+    Table table1 = conn.getTable(TableName
+        .valueOf(FlowActivityTable.DEFAULT_TABLE_NAME));
+    byte[] startRow =
+        FlowActivityRowKey.getRowKey(cluster, minStartTs, user, flow);
+    Get g = new Get(startRow);
+    Result r1 = table1.get(g);
+    assertNotNull(r1);
+    assertTrue(!r1.isEmpty());
+    Map<byte[], byte[]> values = r1.getFamilyMap(FlowActivityColumnFamily.INFO
+        .getBytes());
+    assertEquals(1, values.size());
+    byte[] row = r1.getRow();
+    FlowActivityRowKey flowActivityRowKey = 
FlowActivityRowKey.parseRowKey(row);
+    assertNotNull(flowActivityRowKey);
+    assertEquals(cluster, flowActivityRowKey.getClusterId());
+    assertEquals(user, flowActivityRowKey.getUserId());
+    assertEquals(flow, flowActivityRowKey.getFlowName());
+    long dayTs = TimelineStorageUtils.getTopOfTheDayTimestamp(minStartTs);
+    assertEquals(dayTs, flowActivityRowKey.getDayTimestamp());
+    assertEquals(1, values.size());
+    checkFlowActivityRunId(runid, flowVersion, values);
+
+    // use the timeline reader to verify data
+    HBaseTimelineReaderImpl hbr = null;
+    try {
+      hbr = new HBaseTimelineReaderImpl();
+      hbr.init(c1);
+      hbr.start();
+      // get the flow activity entity
+      Set<TimelineEntity> entities = hbr.getEntities(
+          new TimelineReaderContext(cluster, null, null, null, null,
+          TimelineEntityType.YARN_FLOW_ACTIVITY.toString(), null),
+          new TimelineEntityFilters(10L, null, null, null, null, null,
+          null, null, null),
+          new TimelineDataToRetrieve());
+      assertEquals(1, entities.size());
+      for (TimelineEntity e : entities) {
+        FlowActivityEntity flowActivity = (FlowActivityEntity)e;
+        assertEquals(cluster, flowActivity.getCluster());
+        assertEquals(user, flowActivity.getUser());
+        assertEquals(flow, flowActivity.getFlowName());
+        assertEquals(dayTs, flowActivity.getDate().getTime());
+        Set<FlowRunEntity> flowRuns = flowActivity.getFlowRuns();
+        assertEquals(1, flowRuns.size());
+      }
+    } finally {
+      hbr.close();
+    }
+  }
+
+  /**
+   * Write 1 application entity and checks the record for today in the flow
+   * activity table
+   */
+  @Test
+  public void testWriteFlowActivityOneFlow() throws Exception {
+    String cluster = "testWriteFlowActivityOneFlow_cluster1";
+    String user = "testWriteFlowActivityOneFlow_user1";
+    String flow = "flow_activity_test_flow_name";
+    String flowVersion = "A122110F135BC4";
+    long runid = 1001111178919L;
+
+    TimelineEntities te = new TimelineEntities();
+    long appCreatedTime = 1425016501000L;
+    TimelineEntity entityApp1 =
+        TestFlowDataGenerator.getFlowApp1(appCreatedTime);
+    te.addEntity(entityApp1);
+
+    HBaseTimelineWriterImpl hbi = null;
+    Configuration c1 = util.getConfiguration();
+    try {
+      hbi = new HBaseTimelineWriterImpl(c1);
+      hbi.init(c1);
+      String appName = "application_1111999999_1234";
+      hbi.write(cluster, user, flow, flowVersion, runid, appName, te);
+      hbi.flush();
+    } finally {
+      hbi.close();
+    }
+    // check flow activity
+    checkFlowActivityTable(cluster, user, flow, flowVersion, runid, c1,
+        appCreatedTime);
+
+    // use the reader to verify the data
+    HBaseTimelineReaderImpl hbr = null;
+    try {
+      hbr = new HBaseTimelineReaderImpl();
+      hbr.init(c1);
+      hbr.start();
+
+      Set<TimelineEntity> entities = hbr.getEntities(
+          new TimelineReaderContext(cluster, user, flow, null, null,
+          TimelineEntityType.YARN_FLOW_ACTIVITY.toString(), null),
+          new TimelineEntityFilters(10L, null, null, null, null, null,
+          null, null, null),
+          new TimelineDataToRetrieve());
+      assertEquals(1, entities.size());
+      for (TimelineEntity e : entities) {
+        FlowActivityEntity entity = (FlowActivityEntity)e;
+        NavigableSet<FlowRunEntity> flowRuns = entity.getFlowRuns();
+        assertEquals(1, flowRuns.size());
+        for (FlowRunEntity flowRun : flowRuns) {
+          assertEquals(runid, flowRun.getRunId());
+          assertEquals(flowVersion, flowRun.getVersion());
+        }
+      }
+    } finally {
+      hbr.close();
+    }
+  }
+
+  private void checkFlowActivityTable(String cluster, String user, String flow,
+      String flowVersion, long runid, Configuration c1, long appCreatedTime)
+          throws IOException {
+    Scan s = new Scan();
+    s.addFamily(FlowActivityColumnFamily.INFO.getBytes());
+    byte[] startRow =
+        FlowActivityRowKey.getRowKey(cluster, appCreatedTime, user, flow);
+    s.setStartRow(startRow);
+    String clusterStop = cluster + "1";
+    byte[] stopRow =
+        FlowActivityRowKey.getRowKey(clusterStop, appCreatedTime, user, flow);
+    s.setStopRow(stopRow);
+    Connection conn = ConnectionFactory.createConnection(c1);
+    Table table1 = conn.getTable(TableName
+        .valueOf(FlowActivityTable.DEFAULT_TABLE_NAME));
+    ResultScanner scanner = table1.getScanner(s);
+    int rowCount = 0;
+    for (Result result : scanner) {
+      assertNotNull(result);
+      assertTrue(!result.isEmpty());
+      Map<byte[], byte[]> values = result
+          .getFamilyMap(FlowActivityColumnFamily.INFO.getBytes());
+      rowCount++;
+      byte[] row = result.getRow();
+      FlowActivityRowKey flowActivityRowKey = FlowActivityRowKey
+          .parseRowKey(row);
+      assertNotNull(flowActivityRowKey);
+      assertEquals(cluster, flowActivityRowKey.getClusterId());
+      assertEquals(user, flowActivityRowKey.getUserId());
+      assertEquals(flow, flowActivityRowKey.getFlowName());
+      long dayTs = 
TimelineStorageUtils.getTopOfTheDayTimestamp(appCreatedTime);
+      assertEquals(dayTs, flowActivityRowKey.getDayTimestamp());
+      assertEquals(1, values.size());
+      checkFlowActivityRunId(runid, flowVersion, values);
+    }
+    assertEquals(1, rowCount);
+  }
+
+  /**
+   * Writes 3 applications each with a different run id and version for the 
same
+   * {cluster, user, flow}
+   *
+   * They should be getting inserted into one record in the flow activity table
+   * with 3 columns, one per run id
+   */
+  @Test
+  public void testFlowActivityTableOneFlowMultipleRunIds() throws IOException {
+    String cluster = "testManyRunsFlowActivity_cluster1";
+    String user = "testManyRunsFlowActivity_c_user1";
+    String flow = "flow_activity_test_flow_name";
+    String flowVersion1 = "A122110F135BC4";
+    long runid1 = 11111111111L;
+
+    String flowVersion2 = "A12222222222C4";
+    long runid2 = 2222222222222L;
+
+    String flowVersion3 = "A1333333333C4";
+    long runid3 = 3333333333333L;
+
+    TimelineEntities te = new TimelineEntities();
+    long appCreatedTime = 1425016501000L;
+    TimelineEntity entityApp1 =
+        TestFlowDataGenerator.getFlowApp1(appCreatedTime);
+    te.addEntity(entityApp1);
+
+    HBaseTimelineWriterImpl hbi = null;
+    Configuration c1 = util.getConfiguration();
+    try {
+      hbi = new HBaseTimelineWriterImpl(c1);
+      hbi.init(c1);
+      String appName = "application_11888888888_1111";
+      hbi.write(cluster, user, flow, flowVersion1, runid1, appName, te);
+
+      // write an application with to this flow but a different runid/ version
+      te = new TimelineEntities();
+      te.addEntity(entityApp1);
+      appName = "application_11888888888_2222";
+      hbi.write(cluster, user, flow, flowVersion2, runid2, appName, te);
+
+      // write an application with to this flow but a different runid/ version
+      te = new TimelineEntities();
+      te.addEntity(entityApp1);
+      appName = "application_11888888888_3333";
+      hbi.write(cluster, user, flow, flowVersion3, runid3, appName, te);
+
+      hbi.flush();
+    } finally {
+      hbi.close();
+    }
+    // check flow activity
+    checkFlowActivityTableSeveralRuns(cluster, user, flow, c1, flowVersion1,
+        runid1, flowVersion2, runid2, flowVersion3, runid3, appCreatedTime);
+
+    // use the timeline reader to verify data
+    HBaseTimelineReaderImpl hbr = null;
+    try {
+      hbr = new HBaseTimelineReaderImpl();
+      hbr.init(c1);
+      hbr.start();
+
+      Set<TimelineEntity> entities = hbr.getEntities(
+          new TimelineReaderContext(cluster, null, null, null, null,
+          TimelineEntityType.YARN_FLOW_ACTIVITY.toString(), null),
+          new TimelineEntityFilters(10L, null, null, null, null, null,
+          null, null, null),
+          new TimelineDataToRetrieve());
+      assertEquals(1, entities.size());
+      for (TimelineEntity e : entities) {
+        FlowActivityEntity flowActivity = (FlowActivityEntity)e;
+        assertEquals(cluster, flowActivity.getCluster());
+        assertEquals(user, flowActivity.getUser());
+        assertEquals(flow, flowActivity.getFlowName());
+        long dayTs =
+            TimelineStorageUtils.getTopOfTheDayTimestamp(appCreatedTime);
+        assertEquals(dayTs, flowActivity.getDate().getTime());
+        Set<FlowRunEntity> flowRuns = flowActivity.getFlowRuns();
+        assertEquals(3, flowRuns.size());
+        for (FlowRunEntity flowRun : flowRuns) {
+          long runId = flowRun.getRunId();
+          String version = flowRun.getVersion();
+          if (runId == runid1) {
+            assertEquals(flowVersion1, version);
+          } else if (runId == runid2) {
+            assertEquals(flowVersion2, version);
+          } else if (runId == runid3) {
+            assertEquals(flowVersion3, version);
+          } else {
+            fail("unknown run id: " + runId);
+          }
+        }
+      }
+    } finally {
+      hbr.close();
+    }
+  }
+
+  private void checkFlowActivityTableSeveralRuns(String cluster, String user,
+      String flow, Configuration c1, String flowVersion1, long runid1,
+      String flowVersion2, long runid2, String flowVersion3, long runid3,
+      long appCreatedTime)
+      throws IOException {
+    Scan s = new Scan();
+    s.addFamily(FlowActivityColumnFamily.INFO.getBytes());
+    byte[] startRow =
+        FlowActivityRowKey.getRowKey(cluster, appCreatedTime, user, flow);
+    s.setStartRow(startRow);
+    String clusterStop = cluster + "1";
+    byte[] stopRow =
+        FlowActivityRowKey.getRowKey(clusterStop, appCreatedTime, user, flow);
+    s.setStopRow(stopRow);
+    Connection conn = ConnectionFactory.createConnection(c1);
+    Table table1 = conn.getTable(TableName
+        .valueOf(FlowActivityTable.DEFAULT_TABLE_NAME));
+    ResultScanner scanner = table1.getScanner(s);
+    int rowCount = 0;
+    for (Result result : scanner) {
+      assertNotNull(result);
+      assertTrue(!result.isEmpty());
+      byte[] row = result.getRow();
+      FlowActivityRowKey flowActivityRowKey = FlowActivityRowKey
+          .parseRowKey(row);
+      assertNotNull(flowActivityRowKey);
+      assertEquals(cluster, flowActivityRowKey.getClusterId());
+      assertEquals(user, flowActivityRowKey.getUserId());
+      assertEquals(flow, flowActivityRowKey.getFlowName());
+      long dayTs = 
TimelineStorageUtils.getTopOfTheDayTimestamp(appCreatedTime);
+      assertEquals(dayTs, flowActivityRowKey.getDayTimestamp());
+
+      Map<byte[], byte[]> values = result
+          .getFamilyMap(FlowActivityColumnFamily.INFO.getBytes());
+      rowCount++;
+      assertEquals(3, values.size());
+      checkFlowActivityRunId(runid1, flowVersion1, values);
+      checkFlowActivityRunId(runid2, flowVersion2, values);
+      checkFlowActivityRunId(runid3, flowVersion3, values);
+    }
+    // the flow activity table is such that it will insert
+    // into current day's record
+    // hence, if this test runs across the midnight boundary,
+    // it may fail since it would insert into two records
+    // one for each day
+    assertEquals(1, rowCount);
+  }
+
+  private void checkFlowActivityRunId(long runid, String flowVersion,
+      Map<byte[], byte[]> values) throws IOException {
+    byte[] rq = ColumnHelper.getColumnQualifier(
+        FlowActivityColumnPrefix.RUN_ID.getColumnPrefixBytes(),
+        GenericObjectMapper.write(runid));
+    for (Map.Entry<byte[], byte[]> k : values.entrySet()) {
+      String actualQ = Bytes.toString(k.getKey());
+      if (Bytes.toString(rq).equals(actualQ)) {
+        String actualV = (String) GenericObjectMapper.read(k.getValue());
+        assertEquals(flowVersion, actualV);
+      }
+    }
+  }
+
+  @AfterClass
+  public static void tearDownAfterClass() throws Exception {
+    util.shutdownMiniCluster();
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/ccdec4a1/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase-tests/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/TestHBaseStorageFlowRun.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase-tests/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/TestHBaseStorageFlowRun.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase-tests/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/TestHBaseStorageFlowRun.java
new file mode 100644
index 0000000..801d43c
--- /dev/null
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase-tests/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/TestHBaseStorageFlowRun.java
@@ -0,0 +1,851 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.timelineservice.storage.flow;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+import java.io.IOException;
+import java.util.EnumSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.HBaseTestingUtility;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.client.Admin;
+import org.apache.hadoop.hbase.client.Connection;
+import org.apache.hadoop.hbase.client.ConnectionFactory;
+import org.apache.hadoop.hbase.client.Get;
+import org.apache.hadoop.hbase.client.Result;
+import org.apache.hadoop.hbase.client.ResultScanner;
+import org.apache.hadoop.hbase.client.Scan;
+import org.apache.hadoop.hbase.client.Table;
+import org.apache.hadoop.hbase.regionserver.HRegion;
+import org.apache.hadoop.hbase.regionserver.HRegionServer;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.yarn.api.records.timelineservice.FlowRunEntity;
+import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntities;
+import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity;
+import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntityType;
+import org.apache.hadoop.yarn.api.records.timelineservice.TimelineMetric;
+import 
org.apache.hadoop.yarn.server.timelineservice.reader.TimelineDataToRetrieve;
+import 
org.apache.hadoop.yarn.server.timelineservice.reader.TimelineEntityFilters;
+import 
org.apache.hadoop.yarn.server.timelineservice.reader.TimelineReaderContext;
+import 
org.apache.hadoop.yarn.server.timelineservice.reader.filter.TimelineCompareFilter;
+import 
org.apache.hadoop.yarn.server.timelineservice.reader.filter.TimelineCompareOp;
+import 
org.apache.hadoop.yarn.server.timelineservice.reader.filter.TimelineFilterList;
+import 
org.apache.hadoop.yarn.server.timelineservice.reader.filter.TimelineFilterList.Operator;
+import 
org.apache.hadoop.yarn.server.timelineservice.reader.filter.TimelinePrefixFilter;
+import 
org.apache.hadoop.yarn.server.timelineservice.storage.HBaseTimelineReaderImpl;
+import 
org.apache.hadoop.yarn.server.timelineservice.storage.HBaseTimelineWriterImpl;
+import 
org.apache.hadoop.yarn.server.timelineservice.storage.TimelineSchemaCreator;
+import 
org.apache.hadoop.yarn.server.timelineservice.storage.TimelineReader.Field;
+import 
org.apache.hadoop.yarn.server.timelineservice.storage.common.ColumnHelper;
+import 
org.apache.hadoop.yarn.server.timelineservice.storage.common.TimelineStorageUtils;
+import 
org.apache.hadoop.yarn.server.timelineservice.storage.entity.EntityTable;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+/**
+ * Tests the FlowRun and FlowActivity Tables
+ */
+public class TestHBaseStorageFlowRun {
+
+  private static HBaseTestingUtility util;
+
+  private final String metric1 = "MAP_SLOT_MILLIS";
+  private final String metric2 = "HDFS_BYTES_READ";
+
+  @BeforeClass
+  public static void setupBeforeClass() throws Exception {
+    util = new HBaseTestingUtility();
+    Configuration conf = util.getConfiguration();
+    conf.setInt("hfile.format.version", 3);
+    util.startMiniCluster();
+    createSchema();
+  }
+
+  private static void createSchema() throws IOException {
+    TimelineSchemaCreator.createAllTables(util.getConfiguration(), false);
+  }
+
+  @Test
+  public void checkCoProcessorOff() throws IOException, InterruptedException {
+    Configuration hbaseConf = util.getConfiguration();
+    TableName table = TableName.valueOf(hbaseConf.get(
+        FlowRunTable.TABLE_NAME_CONF_NAME, FlowRunTable.DEFAULT_TABLE_NAME));
+    Connection conn = null;
+    conn = ConnectionFactory.createConnection(hbaseConf);
+    Admin admin = conn.getAdmin();
+    if (admin == null) {
+      throw new IOException("Can't check tables since admin is null");
+    }
+    if (admin.tableExists(table)) {
+      // check the regions.
+      // check in flow run table
+      util.waitUntilAllRegionsAssigned(table);
+      HRegionServer server = util.getRSForFirstRegionInTable(table);
+      List<HRegion> regions = server.getOnlineRegions(table);
+      for (HRegion region : regions) {
+        assertTrue(TimelineStorageUtils.isFlowRunTable(region.getRegionInfo(),
+            hbaseConf));
+      }
+    }
+
+    table = TableName.valueOf(hbaseConf.get(
+        FlowActivityTable.TABLE_NAME_CONF_NAME,
+        FlowActivityTable.DEFAULT_TABLE_NAME));
+    if (admin.tableExists(table)) {
+      // check the regions.
+      // check in flow activity table
+      util.waitUntilAllRegionsAssigned(table);
+      HRegionServer server = util.getRSForFirstRegionInTable(table);
+      List<HRegion> regions = server.getOnlineRegions(table);
+      for (HRegion region : regions) {
+        assertFalse(TimelineStorageUtils.isFlowRunTable(region.getRegionInfo(),
+            hbaseConf));
+      }
+    }
+
+    table = TableName.valueOf(hbaseConf.get(
+        EntityTable.TABLE_NAME_CONF_NAME,
+        EntityTable.DEFAULT_TABLE_NAME));
+    if (admin.tableExists(table)) {
+      // check the regions.
+      // check in entity run table
+      util.waitUntilAllRegionsAssigned(table);
+      HRegionServer server = util.getRSForFirstRegionInTable(table);
+      List<HRegion> regions = server.getOnlineRegions(table);
+      for (HRegion region : regions) {
+        assertFalse(TimelineStorageUtils.isFlowRunTable(region.getRegionInfo(),
+            hbaseConf));
+      }
+    }
+  }
+
+  /**
+   * Writes 4 timeline entities belonging to one flow run through the
+   * {@link HBaseTimelineWriterImpl}
+   *
+   * Checks the flow run table contents
+   *
+   * The first entity has a created event, metrics and a finish event.
+   *
+   * The second entity has a created event and this is the entity with smallest
+   * start time. This should be the start time for the flow run.
+   *
+   * The third entity has a finish event and this is the entity with the max 
end
+   * time. This should be the end time for the flow run.
+   *
+   * The fourth entity has a created event which has a start time that is
+   * greater than min start time.
+   *
+   */
+  @Test
+  public void testWriteFlowRunMinMax() throws Exception {
+
+    TimelineEntities te = new TimelineEntities();
+    te.addEntity(TestFlowDataGenerator.getEntity1());
+
+    HBaseTimelineWriterImpl hbi = null;
+    Configuration c1 = util.getConfiguration();
+    String cluster = "testWriteFlowRunMinMaxToHBase_cluster1";
+    String user = "testWriteFlowRunMinMaxToHBase_user1";
+    String flow = "testing_flowRun_flow_name";
+    String flowVersion = "CF7022C10F1354";
+    long runid = 1002345678919L;
+    String appName = "application_100000000000_1111";
+    long minStartTs = 1425026900000L;
+    long greaterStartTs = 30000000000000L;
+    long endTs = 1439750690000L;
+    TimelineEntity entityMinStartTime = TestFlowDataGenerator
+        .getEntityMinStartTime(minStartTs);
+
+    try {
+      hbi = new HBaseTimelineWriterImpl(c1);
+      hbi.init(c1);
+      hbi.write(cluster, user, flow, flowVersion, runid, appName, te);
+
+      // write another entity with the right min start time
+      te = new TimelineEntities();
+      te.addEntity(entityMinStartTime);
+      appName = "application_100000000000_3333";
+      hbi.write(cluster, user, flow, flowVersion, runid, appName, te);
+
+      // writer another entity for max end time
+      TimelineEntity entityMaxEndTime = TestFlowDataGenerator
+          .getEntityMaxEndTime(endTs);
+      te = new TimelineEntities();
+      te.addEntity(entityMaxEndTime);
+      appName = "application_100000000000_4444";
+      hbi.write(cluster, user, flow, flowVersion, runid, appName, te);
+
+      // writer another entity with greater start time
+      TimelineEntity entityGreaterStartTime = TestFlowDataGenerator
+          .getEntityGreaterStartTime(greaterStartTs);
+      te = new TimelineEntities();
+      te.addEntity(entityGreaterStartTime);
+      appName = "application_1000000000000000_2222";
+      hbi.write(cluster, user, flow, flowVersion, runid, appName, te);
+
+      // flush everything to hbase
+      hbi.flush();
+    } finally {
+      hbi.close();
+    }
+
+    Connection conn = ConnectionFactory.createConnection(c1);
+    // check in flow run table
+    Table table1 = conn.getTable(TableName
+        .valueOf(FlowRunTable.DEFAULT_TABLE_NAME));
+    // scan the table and see that we get back the right min and max
+    // timestamps
+    byte[] startRow = FlowRunRowKey.getRowKey(cluster, user, flow, runid);
+    Get g = new Get(startRow);
+    g.addColumn(FlowRunColumnFamily.INFO.getBytes(),
+        FlowRunColumn.MIN_START_TIME.getColumnQualifierBytes());
+    g.addColumn(FlowRunColumnFamily.INFO.getBytes(),
+        FlowRunColumn.MAX_END_TIME.getColumnQualifierBytes());
+    Result r1 = table1.get(g);
+    assertNotNull(r1);
+    assertTrue(!r1.isEmpty());
+    Map<byte[], byte[]> values = r1.getFamilyMap(FlowRunColumnFamily.INFO
+        .getBytes());
+
+    assertEquals(2, r1.size());
+    long starttime = Bytes.toLong(values.get(
+        FlowRunColumn.MIN_START_TIME.getColumnQualifierBytes()));
+    assertEquals(minStartTs, starttime);
+    assertEquals(endTs, Bytes.toLong(values
+        .get(FlowRunColumn.MAX_END_TIME.getColumnQualifierBytes())));
+
+    // use the timeline reader to verify data
+    HBaseTimelineReaderImpl hbr = null;
+    try {
+      hbr = new HBaseTimelineReaderImpl();
+      hbr.init(c1);
+      hbr.start();
+      // get the flow run entity
+      TimelineEntity entity = hbr.getEntity(
+          new TimelineReaderContext(cluster, user, flow, runid, null,
+          TimelineEntityType.YARN_FLOW_RUN.toString(), null),
+          new TimelineDataToRetrieve());
+      assertTrue(TimelineEntityType.YARN_FLOW_RUN.matches(entity.getType()));
+      FlowRunEntity flowRun = (FlowRunEntity)entity;
+      assertEquals(minStartTs, flowRun.getStartTime());
+      assertEquals(endTs, flowRun.getMaxEndTime());
+    } finally {
+      hbr.close();
+    }
+  }
+
+  /**
+   * Writes two application entities of the same flow run. Each application has
+   * two metrics: slot millis and hdfs bytes read. Each metric has values at 
two
+   * timestamps.
+   *
+   * Checks the metric values of the flow in the flow run table. Flow metric
+   * values should be the sum of individual metric values that belong to the
+   * latest timestamp for that metric
+   */
+  @Test
+  public void testWriteFlowRunMetricsOneFlow() throws Exception {
+    String cluster = "testWriteFlowRunMetricsOneFlow_cluster1";
+    String user = "testWriteFlowRunMetricsOneFlow_user1";
+    String flow = "testing_flowRun_metrics_flow_name";
+    String flowVersion = "CF7022C10F1354";
+    long runid = 1002345678919L;
+
+    TimelineEntities te = new TimelineEntities();
+    TimelineEntity entityApp1 = TestFlowDataGenerator
+        .getEntityMetricsApp1(System.currentTimeMillis());
+    te.addEntity(entityApp1);
+
+    HBaseTimelineWriterImpl hbi = null;
+    Configuration c1 = util.getConfiguration();
+    try {
+      hbi = new HBaseTimelineWriterImpl(c1);
+      hbi.init(c1);
+      String appName = "application_11111111111111_1111";
+      hbi.write(cluster, user, flow, flowVersion, runid, appName, te);
+      // write another application with same metric to this flow
+      te = new TimelineEntities();
+      TimelineEntity entityApp2 = TestFlowDataGenerator
+          .getEntityMetricsApp2(System.currentTimeMillis());
+      te.addEntity(entityApp2);
+      appName = "application_11111111111111_2222";
+      hbi.write(cluster, user, flow, flowVersion, runid, appName, te);
+      hbi.flush();
+    } finally {
+      hbi.close();
+    }
+
+    // check flow run
+    checkFlowRunTable(cluster, user, flow, runid, c1);
+
+    // use the timeline reader to verify data
+    HBaseTimelineReaderImpl hbr = null;
+    try {
+      hbr = new HBaseTimelineReaderImpl();
+      hbr.init(c1);
+      hbr.start();
+      TimelineEntity entity = hbr.getEntity(
+          new TimelineReaderContext(cluster, user, flow, runid, null,
+          TimelineEntityType.YARN_FLOW_RUN.toString(), null),
+          new TimelineDataToRetrieve());
+      assertTrue(TimelineEntityType.YARN_FLOW_RUN.matches(entity.getType()));
+      Set<TimelineMetric> metrics = entity.getMetrics();
+      assertEquals(2, metrics.size());
+      for (TimelineMetric metric : metrics) {
+        String id = metric.getId();
+        Map<Long, Number> values = metric.getValues();
+        assertEquals(1, values.size());
+        Number value = null;
+        for (Number n : values.values()) {
+          value = n;
+        }
+        switch (id) {
+        case metric1:
+          assertEquals(141L, value);
+          break;
+        case metric2:
+          assertEquals(57L, value);
+          break;
+        default:
+          fail("unrecognized metric: " + id);
+        }
+      }
+    } finally {
+      hbr.close();
+    }
+  }
+
+  private void checkFlowRunTable(String cluster, String user, String flow,
+      long runid, Configuration c1) throws IOException {
+    Scan s = new Scan();
+    s.addFamily(FlowRunColumnFamily.INFO.getBytes());
+    byte[] startRow = FlowRunRowKey.getRowKey(cluster, user, flow, runid);
+    s.setStartRow(startRow);
+    String clusterStop = cluster + "1";
+    byte[] stopRow = FlowRunRowKey.getRowKey(clusterStop, user, flow, runid);
+    s.setStopRow(stopRow);
+    Connection conn = ConnectionFactory.createConnection(c1);
+    Table table1 = conn.getTable(TableName
+        .valueOf(FlowRunTable.DEFAULT_TABLE_NAME));
+    ResultScanner scanner = table1.getScanner(s);
+
+    int rowCount = 0;
+    for (Result result : scanner) {
+      assertNotNull(result);
+      assertTrue(!result.isEmpty());
+      Map<byte[], byte[]> values = result.getFamilyMap(FlowRunColumnFamily.INFO
+          .getBytes());
+      rowCount++;
+      // check metric1
+      byte[] q = ColumnHelper.getColumnQualifier(
+          FlowRunColumnPrefix.METRIC.getColumnPrefixBytes(), metric1);
+      assertTrue(values.containsKey(q));
+      assertEquals(141L, Bytes.toLong(values.get(q)));
+
+      // check metric2
+      assertEquals(3, values.size());
+      q = ColumnHelper.getColumnQualifier(
+          FlowRunColumnPrefix.METRIC.getColumnPrefixBytes(), metric2);
+      assertTrue(values.containsKey(q));
+      assertEquals(57L, Bytes.toLong(values.get(q)));
+    }
+    assertEquals(1, rowCount);
+  }
+
+  @Test
+  public void testWriteFlowRunMetricsPrefix() throws Exception {
+    String cluster = "testWriteFlowRunMetricsPrefix_cluster1";
+    String user = "testWriteFlowRunMetricsPrefix_user1";
+    String flow = "testWriteFlowRunMetricsPrefix_flow_name";
+    String flowVersion = "CF7022C10F1354";
+
+    TimelineEntities te = new TimelineEntities();
+    TimelineEntity entityApp1 = TestFlowDataGenerator
+        .getEntityMetricsApp1(System.currentTimeMillis());
+    te.addEntity(entityApp1);
+
+    HBaseTimelineWriterImpl hbi = null;
+    Configuration c1 = util.getConfiguration();
+    try {
+      hbi = new HBaseTimelineWriterImpl(c1);
+      hbi.init(c1);
+      String appName = "application_11111111111111_1111";
+      hbi.write(cluster, user, flow, flowVersion, 1002345678919L, appName, te);
+      // write another application with same metric to this flow
+      te = new TimelineEntities();
+      TimelineEntity entityApp2 = TestFlowDataGenerator
+          .getEntityMetricsApp2(System.currentTimeMillis());
+      te.addEntity(entityApp2);
+      appName = "application_11111111111111_2222";
+      hbi.write(cluster, user, flow, flowVersion, 1002345678918L, appName, te);
+      hbi.flush();
+    } finally {
+      hbi.close();
+    }
+
+    // use the timeline reader to verify data
+    HBaseTimelineReaderImpl hbr = null;
+    try {
+      hbr = new HBaseTimelineReaderImpl();
+      hbr.init(c1);
+      hbr.start();
+      TimelineFilterList metricsToRetrieve = new TimelineFilterList(
+          Operator.OR, new TimelinePrefixFilter(TimelineCompareOp.EQUAL,
+              metric1.substring(0, metric1.indexOf("_") + 1)));
+      TimelineEntity entity = hbr.getEntity(
+          new TimelineReaderContext(cluster, user, flow, 1002345678919L, null,
+          TimelineEntityType.YARN_FLOW_RUN.toString(), null),
+          new TimelineDataToRetrieve(null, metricsToRetrieve, null));
+      assertTrue(TimelineEntityType.YARN_FLOW_RUN.matches(entity.getType()));
+      Set<TimelineMetric> metrics = entity.getMetrics();
+      assertEquals(1, metrics.size());
+      for (TimelineMetric metric : metrics) {
+        String id = metric.getId();
+        Map<Long, Number> values = metric.getValues();
+        assertEquals(1, values.size());
+        Number value = null;
+        for (Number n : values.values()) {
+          value = n;
+        }
+        switch (id) {
+        case metric1:
+          assertEquals(40L, value);
+          break;
+        default:
+          fail("unrecognized metric: " + id);
+        }
+      }
+
+      Set<TimelineEntity> entities = hbr.getEntities(
+          new TimelineReaderContext(cluster, user, flow, null, null,
+          TimelineEntityType.YARN_FLOW_RUN.toString(), null),
+          new TimelineEntityFilters(),
+          new TimelineDataToRetrieve(null, metricsToRetrieve, null));
+      assertEquals(2, entities.size());
+      int metricCnt = 0;
+      for (TimelineEntity timelineEntity : entities) {
+        metricCnt += timelineEntity.getMetrics().size();
+      }
+      assertEquals(2, metricCnt);
+    } finally {
+      hbr.close();
+    }
+  }
+
+  @Test
+  public void testWriteFlowRunsMetricFields() throws Exception {
+    String cluster = "testWriteFlowRunsMetricFields_cluster1";
+    String user = "testWriteFlowRunsMetricFields_user1";
+    String flow = "testWriteFlowRunsMetricFields_flow_name";
+    String flowVersion = "CF7022C10F1354";
+    long runid = 1002345678919L;
+
+    TimelineEntities te = new TimelineEntities();
+    TimelineEntity entityApp1 = TestFlowDataGenerator
+        .getEntityMetricsApp1(System.currentTimeMillis());
+    te.addEntity(entityApp1);
+
+    HBaseTimelineWriterImpl hbi = null;
+    Configuration c1 = util.getConfiguration();
+    try {
+      hbi = new HBaseTimelineWriterImpl(c1);
+      hbi.init(c1);
+      String appName = "application_11111111111111_1111";
+      hbi.write(cluster, user, flow, flowVersion, runid, appName, te);
+      // write another application with same metric to this flow
+      te = new TimelineEntities();
+      TimelineEntity entityApp2 = TestFlowDataGenerator
+          .getEntityMetricsApp2(System.currentTimeMillis());
+      te.addEntity(entityApp2);
+      appName = "application_11111111111111_2222";
+      hbi.write(cluster, user, flow, flowVersion, runid, appName, te);
+      hbi.flush();
+    } finally {
+      hbi.close();
+    }
+
+    // check flow run
+    checkFlowRunTable(cluster, user, flow, runid, c1);
+
+    // use the timeline reader to verify data
+    HBaseTimelineReaderImpl hbr = null;
+    try {
+      hbr = new HBaseTimelineReaderImpl();
+      hbr.init(c1);
+      hbr.start();
+      Set<TimelineEntity> entities = hbr.getEntities(
+          new TimelineReaderContext(cluster, user, flow, runid, null,
+          TimelineEntityType.YARN_FLOW_RUN.toString(), null),
+          new TimelineEntityFilters(),
+          new TimelineDataToRetrieve());
+      assertEquals(1, entities.size());
+      for (TimelineEntity timelineEntity : entities) {
+        assertEquals(0, timelineEntity.getMetrics().size());
+      }
+
+      entities = hbr.getEntities(
+          new TimelineReaderContext(cluster, user, flow, runid, null,
+          TimelineEntityType.YARN_FLOW_RUN.toString(), null),
+          new TimelineEntityFilters(),
+          new TimelineDataToRetrieve(null, null, EnumSet.of(Field.METRICS)));
+      assertEquals(1, entities.size());
+      for (TimelineEntity timelineEntity : entities) {
+        Set<TimelineMetric> timelineMetrics = timelineEntity.getMetrics();
+        assertEquals(2, timelineMetrics.size());
+        for (TimelineMetric metric : timelineMetrics) {
+          String id = metric.getId();
+          Map<Long, Number> values = metric.getValues();
+          assertEquals(1, values.size());
+          Number value = null;
+          for (Number n : values.values()) {
+            value = n;
+          }
+          switch (id) {
+          case metric1:
+            assertEquals(141L, value);
+            break;
+          case metric2:
+            assertEquals(57L, value);
+            break;
+          default:
+            fail("unrecognized metric: " + id);
+          }
+        }
+      }
+    } finally {
+      hbr.close();
+    }
+  }
+
+  @Test
+  public void testWriteFlowRunFlush() throws Exception {
+    String cluster = "atestFlushFlowRun_cluster1";
+    String user = "atestFlushFlowRun__user1";
+    String flow = "atestFlushFlowRun_flow_name";
+    String flowVersion = "AF1021C19F1351";
+    long runid = 1449526652000L;
+
+    int start = 10;
+    int count = 20000;
+    int appIdSuffix = 1;
+    HBaseTimelineWriterImpl hbi = null;
+    long insertTs = 1449796654827L - count;
+    long minTS = insertTs + 1;
+    long startTs = insertTs;
+    Configuration c1 = util.getConfiguration();
+    TimelineEntities te1 = null;
+    TimelineEntity entityApp1 = null;
+    TimelineEntity entityApp2 = null;
+    try {
+      hbi = new HBaseTimelineWriterImpl(c1);
+      hbi.init(c1);
+
+      for (int i = start; i < count; i++) {
+        String appName = "application_1060350000000_" + appIdSuffix;
+        insertTs++;
+        te1 = new TimelineEntities();
+        entityApp1 = TestFlowDataGenerator.getMinFlushEntity(insertTs);
+        te1.addEntity(entityApp1);
+        entityApp2 = TestFlowDataGenerator.getMaxFlushEntity(insertTs);
+        te1.addEntity(entityApp2);
+        hbi.write(cluster, user, flow, flowVersion, runid, appName, te1);
+        Thread.sleep(1);
+
+        appName = "application_1001199480000_7" + appIdSuffix;
+        insertTs++;
+        appIdSuffix++;
+        te1 = new TimelineEntities();
+        entityApp1 = TestFlowDataGenerator.getMinFlushEntity(insertTs);
+        te1.addEntity(entityApp1);
+        entityApp2 = TestFlowDataGenerator.getMaxFlushEntity(insertTs);
+        te1.addEntity(entityApp2);
+
+        hbi.write(cluster, user, flow, flowVersion, runid, appName, te1);
+        if (i % 1000 == 0) {
+          hbi.flush();
+          checkMinMaxFlush(c1, minTS, startTs, count, cluster, user, flow,
+              runid, false);
+        }
+      }
+    } finally {
+      hbi.flush();
+      hbi.close();
+      checkMinMaxFlush(c1, minTS, startTs, count, cluster, user, flow, runid,
+          true);
+    }
+  }
+
+  private void checkMinMaxFlush(Configuration c1, long minTS, long startTs,
+      int count, String cluster, String user, String flow, long runid,
+      boolean checkMax) throws IOException {
+    Connection conn = ConnectionFactory.createConnection(c1);
+    // check in flow run table
+    Table table1 = conn.getTable(TableName
+        .valueOf(FlowRunTable.DEFAULT_TABLE_NAME));
+    // scan the table and see that we get back the right min and max
+    // timestamps
+    byte[] startRow = FlowRunRowKey.getRowKey(cluster, user, flow, runid);
+    Get g = new Get(startRow);
+    g.addColumn(FlowRunColumnFamily.INFO.getBytes(),
+        FlowRunColumn.MIN_START_TIME.getColumnQualifierBytes());
+    g.addColumn(FlowRunColumnFamily.INFO.getBytes(),
+        FlowRunColumn.MAX_END_TIME.getColumnQualifierBytes());
+
+    Result r1 = table1.get(g);
+    assertNotNull(r1);
+    assertTrue(!r1.isEmpty());
+    Map<byte[], byte[]> values = r1.getFamilyMap(FlowRunColumnFamily.INFO
+        .getBytes());
+    int start = 10;
+    assertEquals(2, r1.size());
+    long starttime = Bytes.toLong(values
+        .get(FlowRunColumn.MIN_START_TIME.getColumnQualifierBytes()));
+    assertEquals(minTS, starttime);
+    if (checkMax) {
+      assertEquals(startTs + 2 * (count - start)
+          + TestFlowDataGenerator.END_TS_INCR,
+          Bytes.toLong(values
+          .get(FlowRunColumn.MAX_END_TIME.getColumnQualifierBytes())));
+    }
+  }
+
+  @Test
+  public void testFilterFlowRunsByCreatedTime() throws Exception {
+    String cluster = "cluster2";
+    String user = "user2";
+    String flow = "flow_name2";
+
+    TimelineEntities te = new TimelineEntities();
+    TimelineEntity entityApp1 = TestFlowDataGenerator.getEntityMetricsApp1(
+        System.currentTimeMillis());
+    entityApp1.setCreatedTime(1425016501000L);
+    te.addEntity(entityApp1);
+
+    HBaseTimelineWriterImpl hbi = null;
+    Configuration c1 = util.getConfiguration();
+    try {
+      hbi = new HBaseTimelineWriterImpl(c1);
+      hbi.init(c1);
+      hbi.write(cluster, user, flow, "CF7022C10F1354", 1002345678919L,
+          "application_11111111111111_1111", te);
+      // write another application with same metric to this flow
+      te = new TimelineEntities();
+      TimelineEntity entityApp2 = TestFlowDataGenerator.getEntityMetricsApp2(
+          System.currentTimeMillis());
+      entityApp2.setCreatedTime(1425016502000L);
+      te.addEntity(entityApp2);
+      hbi.write(cluster, user, flow, "CF7022C10F1354", 1002345678918L,
+          "application_11111111111111_2222", te);
+      hbi.flush();
+    } finally {
+      hbi.close();
+    }
+
+    // use the timeline reader to verify data
+    HBaseTimelineReaderImpl hbr = null;
+    try {
+      hbr = new HBaseTimelineReaderImpl();
+      hbr.init(c1);
+      hbr.start();
+
+      Set<TimelineEntity> entities = hbr.getEntities(
+          new TimelineReaderContext(cluster, user, flow,
+          null, null, TimelineEntityType.YARN_FLOW_RUN.toString(), null),
+          new TimelineEntityFilters(null, 1425016501000L, 1425016502001L, null,
+          null, null, null, null, null), new TimelineDataToRetrieve());
+      assertEquals(2, entities.size());
+      for (TimelineEntity entity : entities) {
+        if (!entity.getId().equals("user2@flow_name2/1002345678918") &&
+            !entity.getId().equals("user2@flow_name2/1002345678919")) {
+          fail("Entities with flow runs 1002345678918 and 1002345678919" +
+              "should be present.");
+        }
+      }
+      entities = hbr.getEntities(
+          new TimelineReaderContext(cluster, user, flow, null, null,
+          TimelineEntityType.YARN_FLOW_RUN.toString(), null),
+          new TimelineEntityFilters(null, 1425016501050L, null, null, null,
+          null, null, null, null), new TimelineDataToRetrieve());
+      assertEquals(1, entities.size());
+      for (TimelineEntity entity : entities) {
+        if (!entity.getId().equals("user2@flow_name2/1002345678918")) {
+          fail("Entity with flow run 1002345678918 should be present.");
+        }
+      }
+      entities = hbr.getEntities(
+          new TimelineReaderContext(cluster, user, flow, null, null,
+          TimelineEntityType.YARN_FLOW_RUN.toString(), null),
+          new TimelineEntityFilters(null, null, 1425016501050L, null, null,
+          null, null, null, null), new TimelineDataToRetrieve());
+      assertEquals(1, entities.size());
+      for (TimelineEntity entity : entities) {
+        if (!entity.getId().equals("user2@flow_name2/1002345678919")) {
+          fail("Entity with flow run 1002345678919 should be present.");
+        }
+      }
+    } finally {
+      hbr.close();
+    }
+  }
+
+  @Test
+  public void testMetricFilters() throws Exception {
+    String cluster = "cluster1";
+    String user = "user1";
+    String flow = "flow_name1";
+
+    TimelineEntities te = new TimelineEntities();
+    TimelineEntity entityApp1 = TestFlowDataGenerator.getEntityMetricsApp1(
+        System.currentTimeMillis());
+    te.addEntity(entityApp1);
+
+    HBaseTimelineWriterImpl hbi = null;
+    Configuration c1 = util.getConfiguration();
+    try {
+      hbi = new HBaseTimelineWriterImpl(c1);
+      hbi.init(c1);
+      hbi.write(cluster, user, flow, "CF7022C10F1354", 1002345678919L,
+          "application_11111111111111_1111", te);
+      // write another application with same metric to this flow
+      te = new TimelineEntities();
+      TimelineEntity entityApp2 = TestFlowDataGenerator.getEntityMetricsApp2(
+          System.currentTimeMillis());
+      te.addEntity(entityApp2);
+      hbi.write(cluster, user, flow, "CF7022C10F1354", 1002345678918L,
+          "application_11111111111111_2222", te);
+      hbi.flush();
+    } finally {
+      hbi.close();
+    }
+
+    // use the timeline reader to verify data
+    HBaseTimelineReaderImpl hbr = null;
+    try {
+      hbr = new HBaseTimelineReaderImpl();
+      hbr.init(c1);
+      hbr.start();
+
+      TimelineFilterList list1 = new TimelineFilterList();
+      list1.addFilter(new TimelineCompareFilter(
+          TimelineCompareOp.GREATER_OR_EQUAL, metric1, 101));
+      TimelineFilterList list2 = new TimelineFilterList();
+      list2.addFilter(new TimelineCompareFilter(
+          TimelineCompareOp.LESS_THAN, metric1, 43));
+      list2.addFilter(new TimelineCompareFilter(
+          TimelineCompareOp.EQUAL, metric2, 57));
+      TimelineFilterList metricFilterList =
+          new TimelineFilterList(Operator.OR, list1, list2);
+      Set<TimelineEntity> entities = hbr.getEntities(
+          new TimelineReaderContext(cluster, user, flow, null,
+          null, TimelineEntityType.YARN_FLOW_RUN.toString(), null),
+          new TimelineEntityFilters(null, null, null, null, null, null, null,
+          metricFilterList, null),
+          new TimelineDataToRetrieve(null, null, EnumSet.of(Field.METRICS)));
+      assertEquals(2, entities.size());
+      int metricCnt = 0;
+      for (TimelineEntity entity : entities) {
+        metricCnt += entity.getMetrics().size();
+      }
+      assertEquals(3, metricCnt);
+
+      TimelineFilterList metricFilterList1 = new TimelineFilterList(
+          new TimelineCompareFilter(
+          TimelineCompareOp.LESS_OR_EQUAL, metric1, 127),
+          new TimelineCompareFilter(TimelineCompareOp.NOT_EQUAL, metric2, 30));
+      entities = hbr.getEntities(
+          new TimelineReaderContext(cluster, user, flow, null, null,
+          TimelineEntityType.YARN_FLOW_RUN.toString(), null),
+          new TimelineEntityFilters(null, null, null, null, null, null, null,
+          metricFilterList1, null),
+          new TimelineDataToRetrieve(null, null, EnumSet.of(Field.METRICS)));
+      assertEquals(1, entities.size());
+      metricCnt = 0;
+      for (TimelineEntity entity : entities) {
+        metricCnt += entity.getMetrics().size();
+      }
+      assertEquals(2, metricCnt);
+
+      TimelineFilterList metricFilterList2 = new TimelineFilterList(
+          new TimelineCompareFilter(TimelineCompareOp.LESS_THAN, metric1, 32),
+          new TimelineCompareFilter(TimelineCompareOp.NOT_EQUAL, metric2, 57));
+      entities = hbr.getEntities(
+          new TimelineReaderContext(cluster, user, flow, null, null,
+          TimelineEntityType.YARN_FLOW_RUN.toString(), null),
+          new TimelineEntityFilters(null, null, null, null, null, null, null,
+          metricFilterList2, null),
+          new TimelineDataToRetrieve(null, null, EnumSet.of(Field.METRICS)));
+      assertEquals(0, entities.size());
+
+      TimelineFilterList metricFilterList3 = new TimelineFilterList(
+          new TimelineCompareFilter(TimelineCompareOp.EQUAL, "s_metric", 32));
+      entities = hbr.getEntities(
+          new TimelineReaderContext(cluster, user, flow, null, null,
+          TimelineEntityType.YARN_FLOW_RUN.toString(), null),
+          new TimelineEntityFilters(null, null, null, null, null, null, null,
+          metricFilterList3, null),
+          new TimelineDataToRetrieve(null, null, EnumSet.of(Field.METRICS)));
+      assertEquals(0, entities.size());
+
+      TimelineFilterList list3 = new TimelineFilterList();
+      list3.addFilter(new TimelineCompareFilter(
+          TimelineCompareOp.GREATER_OR_EQUAL, metric1, 101));
+      TimelineFilterList list4 = new TimelineFilterList();
+      list4.addFilter(new TimelineCompareFilter(
+          TimelineCompareOp.LESS_THAN, metric1, 43));
+      list4.addFilter(new TimelineCompareFilter(
+          TimelineCompareOp.EQUAL, metric2, 57));
+      TimelineFilterList metricFilterList4 =
+          new TimelineFilterList(Operator.OR, list3, list4);
+      TimelineFilterList metricsToRetrieve = new 
TimelineFilterList(Operator.OR,
+          new TimelinePrefixFilter(TimelineCompareOp.EQUAL,
+          metric2.substring(0, metric2.indexOf("_") + 1)));
+      entities = hbr.getEntities(
+          new TimelineReaderContext(cluster, user, flow, null, null,
+          TimelineEntityType.YARN_FLOW_RUN.toString(), null),
+          new TimelineEntityFilters(null, null, null, null, null, null, null,
+          metricFilterList4, null),
+          new TimelineDataToRetrieve(null, metricsToRetrieve,
+          EnumSet.of(Field.ALL)));
+      assertEquals(2, entities.size());
+      metricCnt = 0;
+      for (TimelineEntity entity : entities) {
+        metricCnt += entity.getMetrics().size();
+      }
+      assertEquals(1, metricCnt);
+    } finally {
+      hbr.close();
+    }
+  }
+
+  @AfterClass
+  public static void tearDownAfterClass() throws Exception {
+    util.shutdownMiniCluster();
+  }
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscr...@hadoop.apache.org
For additional commands, e-mail: common-commits-h...@hadoop.apache.org

Reply via email to