http://git-wip-us.apache.org/repos/asf/hadoop/blob/e1f30320/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/TestHBaseStorageFlowActivity.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/TestHBaseStorageFlowActivity.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/TestHBaseStorageFlowActivity.java
new file mode 100644
index 0000000..b4a0c74
--- /dev/null
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/TestHBaseStorageFlowActivity.java
@@ -0,0 +1,372 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.timelineservice.storage.flow;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertTrue;
+
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.HBaseTestingUtility;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.client.Admin;
+import org.apache.hadoop.hbase.client.Connection;
+import org.apache.hadoop.hbase.client.ConnectionFactory;
+import org.apache.hadoop.hbase.client.Get;
+import org.apache.hadoop.hbase.client.Result;
+import org.apache.hadoop.hbase.client.ResultScanner;
+import org.apache.hadoop.hbase.client.Scan;
+import org.apache.hadoop.hbase.client.Table;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntities;
+import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity;
+import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntityType;
+import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEvent;
+import org.apache.hadoop.yarn.api.records.timelineservice.TimelineMetric;
+import org.apache.hadoop.yarn.api.records.timelineservice.TimelineMetric.Type;
+import org.apache.hadoop.yarn.server.metrics.ApplicationMetricsConstants;
+import org.apache.hadoop.yarn.server.timeline.GenericObjectMapper;
+import 
org.apache.hadoop.yarn.server.timelineservice.storage.HBaseTimelineWriterImpl;
+import 
org.apache.hadoop.yarn.server.timelineservice.storage.TimelineSchemaCreator;
+import 
org.apache.hadoop.yarn.server.timelineservice.storage.application.ApplicationTable;
+import 
org.apache.hadoop.yarn.server.timelineservice.storage.apptoflow.AppToFlowTable;
+import 
org.apache.hadoop.yarn.server.timelineservice.storage.common.ColumnHelper;
+import 
org.apache.hadoop.yarn.server.timelineservice.storage.common.TimelineWriterUtils;
+import 
org.apache.hadoop.yarn.server.timelineservice.storage.entity.EntityTable;
+import 
org.apache.hadoop.yarn.server.timelineservice.storage.flow.FlowActivityColumnFamily;
+import 
org.apache.hadoop.yarn.server.timelineservice.storage.flow.FlowActivityColumnPrefix;
+import 
org.apache.hadoop.yarn.server.timelineservice.storage.flow.FlowActivityRowKey;
+import 
org.apache.hadoop.yarn.server.timelineservice.storage.flow.FlowActivityTable;
+import org.apache.hadoop.yarn.server.timelineservice.storage.flow.FlowRunTable;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+/**
+ * Tests the FlowRun and FlowActivity Tables
+ */
+public class TestHBaseStorageFlowActivity {
+
+  private static HBaseTestingUtility util;
+
+  @BeforeClass
+  public static void setupBeforeClass() throws Exception {
+    util = new HBaseTestingUtility();
+    Configuration conf = util.getConfiguration();
+    conf.setInt("hfile.format.version", 3);
+    util.startMiniCluster();
+    createSchema();
+  }
+
+  private static void createSchema() throws IOException {
+    TimelineSchemaCreator.createAllTables(util.getConfiguration(), false);
+  }
+
+  /**
+   * Writes 4 timeline entities belonging to one flow run through the
+   * {@link HBaseTimelineWriterImpl}
+   *
+   * Checks the flow run table contents
+   *
+   * The first entity has a created event, metrics and a finish event.
+   *
+   * The second entity has a created event and this is the entity with smallest
+   * start time. This should be the start time for the flow run.
+   *
+   * The third entity has a finish event and this is the entity with the max 
end
+   * time. This should be the end time for the flow run.
+   *
+   * The fourth entity has a created event which has a start time that is
+   * greater than min start time.
+   *
+   * The test also checks in the flow activity table that one entry has been
+   * made for all of these 4 application entities since they belong to the same
+   * flow run.
+   */
+  @Test
+  public void testWriteFlowRunMinMax() throws Exception {
+
+    TimelineEntities te = new TimelineEntities();
+    te.addEntity(TestFlowDataGenerator.getEntity1());
+
+    HBaseTimelineWriterImpl hbi = null;
+    Configuration c1 = util.getConfiguration();
+    String cluster = "testWriteFlowRunMinMaxToHBase_cluster1";
+    String user = "testWriteFlowRunMinMaxToHBase_user1";
+    String flow = "testing_flowRun_flow_name";
+    String flowVersion = "CF7022C10F1354";
+    Long runid = 1002345678919L;
+    String appName = "application_100000000000_1111";
+    long endTs = 1439750690000L;
+    TimelineEntity entityMinStartTime = TestFlowDataGenerator
+        .getEntityMinStartTime();
+
+    try {
+      hbi = new HBaseTimelineWriterImpl(c1);
+      hbi.init(c1);
+      hbi.write(cluster, user, flow, flowVersion, runid, appName, te);
+
+      // write another entity with the right min start time
+      te = new TimelineEntities();
+      te.addEntity(entityMinStartTime);
+      appName = "application_100000000000_3333";
+      hbi.write(cluster, user, flow, flowVersion, runid, appName, te);
+
+      // writer another entity for max end time
+      TimelineEntity entityMaxEndTime = TestFlowDataGenerator
+          .getEntityMaxEndTime(endTs);
+      te = new TimelineEntities();
+      te.addEntity(entityMaxEndTime);
+      appName = "application_100000000000_4444";
+      hbi.write(cluster, user, flow, flowVersion, runid, appName, te);
+
+      // writer another entity with greater start time
+      TimelineEntity entityGreaterStartTime = TestFlowDataGenerator
+          .getEntityGreaterStartTime();
+      te = new TimelineEntities();
+      te.addEntity(entityGreaterStartTime);
+      appName = "application_1000000000000000_2222";
+      hbi.write(cluster, user, flow, flowVersion, runid, appName, te);
+
+      // flush everything to hbase
+      hbi.flush();
+    } finally {
+      hbi.close();
+    }
+
+    Connection conn = ConnectionFactory.createConnection(c1);
+    // check in flow activity table
+    Table table1 = conn.getTable(TableName
+        .valueOf(FlowActivityTable.DEFAULT_TABLE_NAME));
+    byte[] startRow = FlowActivityRowKey.getRowKey(cluster, user, flow);
+    Get g = new Get(startRow);
+    Result r1 = table1.get(g);
+    assertNotNull(r1);
+    assertTrue(!r1.isEmpty());
+    Map<byte[], byte[]> values = r1.getFamilyMap(FlowActivityColumnFamily.INFO
+        .getBytes());
+    assertEquals(1, values.size());
+    byte[] row = r1.getRow();
+    FlowActivityRowKey flowActivityRowKey = 
FlowActivityRowKey.parseRowKey(row);
+    assertNotNull(flowActivityRowKey);
+    assertEquals(cluster, flowActivityRowKey.getClusterId());
+    assertEquals(user, flowActivityRowKey.getUserId());
+    assertEquals(flow, flowActivityRowKey.getFlowId());
+    long dayTs = TimelineWriterUtils.getTopOfTheDayTimestamp(System
+        .currentTimeMillis());
+    assertEquals(dayTs, flowActivityRowKey.getDayTimestamp());
+    assertEquals(1, values.size());
+    checkFlowActivityRunId(runid, flowVersion, values);
+  }
+
+  /**
+   * Write 1 application entity and checks the record for today in the flow
+   * activity table
+   */
+  @Test
+  public void testWriteFlowActivityOneFlow() throws Exception {
+    String cluster = "testWriteFlowActivityOneFlow_cluster1";
+    String user = "testWriteFlowActivityOneFlow_user1";
+    String flow = "flow_activity_test_flow_name";
+    String flowVersion = "A122110F135BC4";
+    Long runid = 1001111178919L;
+
+    TimelineEntities te = new TimelineEntities();
+    TimelineEntity entityApp1 = TestFlowDataGenerator.getFlowApp1();
+    te.addEntity(entityApp1);
+
+    HBaseTimelineWriterImpl hbi = null;
+    Configuration c1 = util.getConfiguration();
+    try {
+      hbi = new HBaseTimelineWriterImpl(c1);
+      hbi.init(c1);
+      String appName = "application_1111999999_1234";
+      hbi.write(cluster, user, flow, flowVersion, runid, appName, te);
+      hbi.flush();
+    } finally {
+      hbi.close();
+    }
+    // check flow activity
+    checkFlowActivityTable(cluster, user, flow, flowVersion, runid, c1);
+  }
+
+  private void checkFlowActivityTable(String cluster, String user, String flow,
+      String flowVersion, Long runid, Configuration c1) throws IOException {
+    Scan s = new Scan();
+    s.addFamily(FlowActivityColumnFamily.INFO.getBytes());
+    byte[] startRow = FlowActivityRowKey.getRowKey(cluster, user, flow);
+    s.setStartRow(startRow);
+    String clusterStop = cluster + "1";
+    byte[] stopRow = FlowActivityRowKey.getRowKey(clusterStop, user, flow);
+    s.setStopRow(stopRow);
+    Connection conn = ConnectionFactory.createConnection(c1);
+    Table table1 = conn.getTable(TableName
+        .valueOf(FlowActivityTable.DEFAULT_TABLE_NAME));
+    ResultScanner scanner = table1.getScanner(s);
+    int rowCount = 0;
+    for (Result result : scanner) {
+      assertNotNull(result);
+      assertTrue(!result.isEmpty());
+      Map<byte[], byte[]> values = result
+          .getFamilyMap(FlowActivityColumnFamily.INFO.getBytes());
+      rowCount++;
+      byte[] row = result.getRow();
+      FlowActivityRowKey flowActivityRowKey = FlowActivityRowKey
+          .parseRowKey(row);
+      assertNotNull(flowActivityRowKey);
+      assertEquals(cluster, flowActivityRowKey.getClusterId());
+      assertEquals(user, flowActivityRowKey.getUserId());
+      assertEquals(flow, flowActivityRowKey.getFlowId());
+      long dayTs = TimelineWriterUtils.getTopOfTheDayTimestamp(System
+          .currentTimeMillis());
+      assertEquals(dayTs, flowActivityRowKey.getDayTimestamp());
+      assertEquals(1, values.size());
+      checkFlowActivityRunId(runid, flowVersion, values);
+    }
+    assertEquals(1, rowCount);
+  }
+
+  /**
+   * Writes 3 applications each with a different run id and version for the 
same
+   * {cluster, user, flow}
+   *
+   * They should be getting inserted into one record in the flow activity table
+   * with 3 columns, one per run id
+   */
+  @Test
+  public void testFlowActivityTableOneFlowMultipleRunIds() throws IOException {
+    String cluster = "testManyRunsFlowActivity_cluster1";
+    String user = "testManyRunsFlowActivity_c_user1";
+    String flow = "flow_activity_test_flow_name";
+    String flowVersion1 = "A122110F135BC4";
+    Long runid1 = 11111111111L;
+
+    String flowVersion2 = "A12222222222C4";
+    long runid2 = 2222222222222L;
+
+    String flowVersion3 = "A1333333333C4";
+    long runid3 = 3333333333333L;
+
+    TimelineEntities te = new TimelineEntities();
+    TimelineEntity entityApp1 = TestFlowDataGenerator.getFlowApp1();
+    te.addEntity(entityApp1);
+
+    HBaseTimelineWriterImpl hbi = null;
+    Configuration c1 = util.getConfiguration();
+    try {
+      hbi = new HBaseTimelineWriterImpl(c1);
+      hbi.init(c1);
+      String appName = "application_11888888888_1111";
+      hbi.write(cluster, user, flow, flowVersion1, runid1, appName, te);
+
+      // write an application with to this flow but a different runid/ version
+      te = new TimelineEntities();
+      te.addEntity(entityApp1);
+      appName = "application_11888888888_2222";
+      hbi.write(cluster, user, flow, flowVersion2, runid2, appName, te);
+
+      // write an application with to this flow but a different runid/ version
+      te = new TimelineEntities();
+      te.addEntity(entityApp1);
+      appName = "application_11888888888_3333";
+      hbi.write(cluster, user, flow, flowVersion3, runid3, appName, te);
+
+      hbi.flush();
+    } finally {
+      hbi.close();
+    }
+    // check flow activity
+    checkFlowActivityTableSeveralRuns(cluster, user, flow, c1, flowVersion1,
+        runid1, flowVersion2, runid2, flowVersion3, runid3);
+
+  }
+
+  private void checkFlowActivityTableSeveralRuns(String cluster, String user,
+      String flow, Configuration c1, String flowVersion1, Long runid1,
+      String flowVersion2, Long runid2, String flowVersion3, Long runid3)
+      throws IOException {
+    Scan s = new Scan();
+    s.addFamily(FlowActivityColumnFamily.INFO.getBytes());
+    byte[] startRow = FlowActivityRowKey.getRowKey(cluster, user, flow);
+    s.setStartRow(startRow);
+    String clusterStop = cluster + "1";
+    byte[] stopRow = FlowActivityRowKey.getRowKey(clusterStop, user, flow);
+    s.setStopRow(stopRow);
+    Connection conn = ConnectionFactory.createConnection(c1);
+    Table table1 = conn.getTable(TableName
+        .valueOf(FlowActivityTable.DEFAULT_TABLE_NAME));
+    ResultScanner scanner = table1.getScanner(s);
+    int rowCount = 0;
+    for (Result result : scanner) {
+      assertNotNull(result);
+      assertTrue(!result.isEmpty());
+      byte[] row = result.getRow();
+      FlowActivityRowKey flowActivityRowKey = FlowActivityRowKey
+          .parseRowKey(row);
+      assertNotNull(flowActivityRowKey);
+      assertEquals(cluster, flowActivityRowKey.getClusterId());
+      assertEquals(user, flowActivityRowKey.getUserId());
+      assertEquals(flow, flowActivityRowKey.getFlowId());
+      long dayTs = TimelineWriterUtils.getTopOfTheDayTimestamp(System
+          .currentTimeMillis());
+      assertEquals(dayTs, flowActivityRowKey.getDayTimestamp());
+
+      Map<byte[], byte[]> values = result
+          .getFamilyMap(FlowActivityColumnFamily.INFO.getBytes());
+      rowCount++;
+      assertEquals(3, values.size());
+      checkFlowActivityRunId(runid1, flowVersion1, values);
+      checkFlowActivityRunId(runid2, flowVersion2, values);
+      checkFlowActivityRunId(runid3, flowVersion3, values);
+    }
+    // the flow activity table is such that it will insert
+    // into current day's record
+    // hence, if this test runs across the midnight boundary,
+    // it may fail since it would insert into two records
+    // one for each day
+    assertEquals(1, rowCount);
+  }
+
+  private void checkFlowActivityRunId(Long runid, String flowVersion,
+      Map<byte[], byte[]> values) throws IOException {
+    byte[] rq = ColumnHelper.getColumnQualifier(
+        FlowActivityColumnPrefix.RUN_ID.getColumnPrefixBytes(),
+        GenericObjectMapper.write(runid));
+    for (Map.Entry<byte[], byte[]> k : values.entrySet()) {
+      String actualQ = Bytes.toString(k.getKey());
+      if (Bytes.toString(rq).equals(actualQ)) {
+        String actualV = (String) GenericObjectMapper.read(k.getValue());
+        assertEquals(flowVersion, actualV);
+      }
+    }
+  }
+
+  @AfterClass
+  public static void tearDownAfterClass() throws Exception {
+    util.shutdownMiniCluster();
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/e1f30320/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/TestHBaseStorageFlowRun.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/TestHBaseStorageFlowRun.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/TestHBaseStorageFlowRun.java
new file mode 100644
index 0000000..bf524ea
--- /dev/null
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/TestHBaseStorageFlowRun.java
@@ -0,0 +1,290 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.timelineservice.storage.flow;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertTrue;
+
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.HBaseConfiguration;
+import org.apache.hadoop.hbase.HBaseTestingUtility;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.client.Admin;
+import org.apache.hadoop.hbase.client.Connection;
+import org.apache.hadoop.hbase.client.ConnectionFactory;
+import org.apache.hadoop.hbase.client.Get;
+import org.apache.hadoop.hbase.client.Result;
+import org.apache.hadoop.hbase.client.ResultScanner;
+import org.apache.hadoop.hbase.client.Scan;
+import org.apache.hadoop.hbase.client.Table;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntities;
+import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity;
+import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntityType;
+import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEvent;
+import org.apache.hadoop.yarn.api.records.timelineservice.TimelineMetric;
+import org.apache.hadoop.yarn.api.records.timelineservice.TimelineMetric.Type;
+import org.apache.hadoop.yarn.server.metrics.ApplicationMetricsConstants;
+import org.apache.hadoop.yarn.server.timeline.GenericObjectMapper;
+import 
org.apache.hadoop.yarn.server.timelineservice.storage.HBaseTimelineWriterImpl;
+import 
org.apache.hadoop.yarn.server.timelineservice.storage.TimelineSchemaCreator;
+import 
org.apache.hadoop.yarn.server.timelineservice.storage.application.ApplicationTable;
+import 
org.apache.hadoop.yarn.server.timelineservice.storage.apptoflow.AppToFlowTable;
+import 
org.apache.hadoop.yarn.server.timelineservice.storage.common.ColumnHelper;
+import org.apache.hadoop.yarn.server.timelineservice.storage.common.Separator;
+import 
org.apache.hadoop.yarn.server.timelineservice.storage.common.TimelineWriterUtils;
+import 
org.apache.hadoop.yarn.server.timelineservice.storage.entity.EntityTable;
+import 
org.apache.hadoop.yarn.server.timelineservice.storage.flow.FlowActivityColumnFamily;
+import 
org.apache.hadoop.yarn.server.timelineservice.storage.flow.FlowActivityColumnPrefix;
+import 
org.apache.hadoop.yarn.server.timelineservice.storage.flow.FlowActivityRowKey;
+import 
org.apache.hadoop.yarn.server.timelineservice.storage.flow.FlowActivityTable;
+import 
org.apache.hadoop.yarn.server.timelineservice.storage.flow.FlowRunColumn;
+import 
org.apache.hadoop.yarn.server.timelineservice.storage.flow.FlowRunColumnFamily;
+import 
org.apache.hadoop.yarn.server.timelineservice.storage.flow.FlowRunColumnPrefix;
+import 
org.apache.hadoop.yarn.server.timelineservice.storage.flow.FlowRunRowKey;
+import org.apache.hadoop.yarn.server.timelineservice.storage.flow.FlowRunTable;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+/**
+ * Tests the FlowRun and FlowActivity Tables
+ */
+public class TestHBaseStorageFlowRun {
+
+  private static HBaseTestingUtility util;
+
+  private final String metric1 = "MAP_SLOT_MILLIS";
+  private final String metric2 = "HDFS_BYTES_READ";
+
+  @BeforeClass
+  public static void setupBeforeClass() throws Exception {
+    util = new HBaseTestingUtility();
+    Configuration conf = util.getConfiguration();
+    conf.setInt("hfile.format.version", 3);
+    util.startMiniCluster();
+    createSchema();
+  }
+
+  private static void createSchema() throws IOException {
+    TimelineSchemaCreator.createAllTables(util.getConfiguration(), false);
+  }
+
+  /**
+   * Writes 4 timeline entities belonging to one flow run through the
+   * {@link HBaseTimelineWriterImpl}
+   *
+   * Checks the flow run table contents
+   *
+   * The first entity has a created event, metrics and a finish event.
+   *
+   * The second entity has a created event and this is the entity with smallest
+   * start time. This should be the start time for the flow run.
+   *
+   * The third entity has a finish event and this is the entity with the max 
end
+   * time. This should be the end time for the flow run.
+   *
+   * The fourth entity has a created event which has a start time that is
+   * greater than min start time.
+   *
+   */
+  @Test
+  public void testWriteFlowRunMinMax() throws Exception {
+
+    TimelineEntities te = new TimelineEntities();
+    te.addEntity(TestFlowDataGenerator.getEntity1());
+
+    HBaseTimelineWriterImpl hbi = null;
+    Configuration c1 = util.getConfiguration();
+    String cluster = "testWriteFlowRunMinMaxToHBase_cluster1";
+    String user = "testWriteFlowRunMinMaxToHBase_user1";
+    String flow = "testing_flowRun_flow_name";
+    String flowVersion = "CF7022C10F1354";
+    Long runid = 1002345678919L;
+    String appName = "application_100000000000_1111";
+    long endTs = 1439750690000L;
+    TimelineEntity entityMinStartTime = TestFlowDataGenerator
+        .getEntityMinStartTime();
+
+    try {
+      hbi = new HBaseTimelineWriterImpl(c1);
+      hbi.init(c1);
+      hbi.write(cluster, user, flow, flowVersion, runid, appName, te);
+
+      // write another entity with the right min start time
+      te = new TimelineEntities();
+      te.addEntity(entityMinStartTime);
+      appName = "application_100000000000_3333";
+      hbi.write(cluster, user, flow, flowVersion, runid, appName, te);
+
+      // writer another entity for max end time
+      TimelineEntity entityMaxEndTime = TestFlowDataGenerator
+          .getEntityMaxEndTime(endTs);
+      te = new TimelineEntities();
+      te.addEntity(entityMaxEndTime);
+      appName = "application_100000000000_4444";
+      hbi.write(cluster, user, flow, flowVersion, runid, appName, te);
+
+      // writer another entity with greater start time
+      TimelineEntity entityGreaterStartTime = TestFlowDataGenerator
+          .getEntityGreaterStartTime();
+      te = new TimelineEntities();
+      te.addEntity(entityGreaterStartTime);
+      appName = "application_1000000000000000_2222";
+      hbi.write(cluster, user, flow, flowVersion, runid, appName, te);
+
+      // flush everything to hbase
+      hbi.flush();
+    } finally {
+      hbi.close();
+    }
+
+    Connection conn = ConnectionFactory.createConnection(c1);
+    // check in flow run table
+    Table table1 = conn.getTable(TableName
+        .valueOf(FlowRunTable.DEFAULT_TABLE_NAME));
+    // scan the table and see that we get back the right min and max
+    // timestamps
+    byte[] startRow = FlowRunRowKey.getRowKey(cluster, user, flow, runid);
+    Get g = new Get(startRow);
+    g.addColumn(FlowRunColumnFamily.INFO.getBytes(),
+        FlowRunColumn.MIN_START_TIME.getColumnQualifierBytes());
+    g.addColumn(FlowRunColumnFamily.INFO.getBytes(),
+        FlowRunColumn.MAX_END_TIME.getColumnQualifierBytes());
+    Result r1 = table1.get(g);
+    assertNotNull(r1);
+    assertTrue(!r1.isEmpty());
+    Map<byte[], byte[]> values = r1.getFamilyMap(FlowRunColumnFamily.INFO
+        .getBytes());
+
+    assertEquals(2, r1.size());
+    Long starttime = (Long) GenericObjectMapper.read(values
+        .get(FlowRunColumn.MIN_START_TIME.getColumnQualifierBytes()));
+    Long expmin = entityMinStartTime.getCreatedTime();
+    assertEquals(expmin, starttime);
+    assertEquals(endTs, GenericObjectMapper.read(values
+        .get(FlowRunColumn.MAX_END_TIME.getColumnQualifierBytes())));
+  }
+
+  boolean isFlowRunRowKeyCorrect(byte[] rowKey, String cluster, String user,
+      String flow, Long runid) {
+    byte[][] rowKeyComponents = Separator.QUALIFIERS.split(rowKey, -1);
+    assertTrue(rowKeyComponents.length == 4);
+    assertEquals(cluster, Bytes.toString(rowKeyComponents[0]));
+    assertEquals(user, Bytes.toString(rowKeyComponents[1]));
+    assertEquals(flow, Bytes.toString(rowKeyComponents[2]));
+    assertEquals(TimelineWriterUtils.invert(runid),
+        Bytes.toLong(rowKeyComponents[3]));
+    return true;
+  }
+
+  /**
+   * Writes two application entities of the same flow run. Each application has
+   * two metrics: slot millis and hdfs bytes read. Each metric has values at 
two
+   * timestamps.
+   *
+   * Checks the metric values of the flow in the flow run table. Flow metric
+   * values should be the sum of individual metric values that belong to the
+   * latest timestamp for that metric
+   */
+  @Test
+  public void testWriteFlowRunMetricsOneFlow() throws Exception {
+    String cluster = "testWriteFlowRunMetricsOneFlow_cluster1";
+    String user = "testWriteFlowRunMetricsOneFlow_user1";
+    String flow = "testing_flowRun_metrics_flow_name";
+    String flowVersion = "CF7022C10F1354";
+    Long runid = 1002345678919L;
+
+    TimelineEntities te = new TimelineEntities();
+    TimelineEntity entityApp1 = TestFlowDataGenerator.getEntityMetricsApp1();
+    te.addEntity(entityApp1);
+
+    HBaseTimelineWriterImpl hbi = null;
+    Configuration c1 = util.getConfiguration();
+    try {
+      hbi = new HBaseTimelineWriterImpl(c1);
+      hbi.init(c1);
+      String appName = "application_11111111111111_1111";
+      hbi.write(cluster, user, flow, flowVersion, runid, appName, te);
+      // write another application with same metric to this flow
+      te = new TimelineEntities();
+      TimelineEntity entityApp2 = TestFlowDataGenerator.getEntityMetricsApp2();
+      te.addEntity(entityApp2);
+      appName = "application_11111111111111_2222";
+      hbi.write(cluster, user, flow, flowVersion, runid, appName, te);
+      hbi.flush();
+    } finally {
+      hbi.close();
+    }
+
+    // check flow run
+    checkFlowRunTable(cluster, user, flow, runid, c1);
+  }
+
+  private void checkFlowRunTable(String cluster, String user, String flow,
+      long runid, Configuration c1) throws IOException {
+    Scan s = new Scan();
+    s.addFamily(FlowRunColumnFamily.INFO.getBytes());
+    byte[] startRow = FlowRunRowKey.getRowKey(cluster, user, flow, runid);
+    s.setStartRow(startRow);
+    String clusterStop = cluster + "1";
+    byte[] stopRow = FlowRunRowKey.getRowKey(clusterStop, user, flow, runid);
+    s.setStopRow(stopRow);
+    Connection conn = ConnectionFactory.createConnection(c1);
+    Table table1 = conn.getTable(TableName
+        .valueOf(FlowRunTable.DEFAULT_TABLE_NAME));
+    ResultScanner scanner = table1.getScanner(s);
+
+    int rowCount = 0;
+    for (Result result : scanner) {
+      assertNotNull(result);
+      assertTrue(!result.isEmpty());
+      Map<byte[], byte[]> values = result.getFamilyMap(FlowRunColumnFamily.INFO
+          .getBytes());
+      rowCount++;
+      // check metric1
+      byte[] q = ColumnHelper.getColumnQualifier(
+          FlowRunColumnPrefix.METRIC.getColumnPrefixBytes(), metric1);
+      assertTrue(values.containsKey(q));
+      assertEquals(141, GenericObjectMapper.read(values.get(q)));
+
+      // check metric2
+      assertEquals(2, values.size());
+      q = ColumnHelper.getColumnQualifier(
+          FlowRunColumnPrefix.METRIC.getColumnPrefixBytes(), metric2);
+      assertTrue(values.containsKey(q));
+      assertEquals(57, GenericObjectMapper.read(values.get(q)));
+    }
+    assertEquals(1, rowCount);
+  }
+
+  @AfterClass
+  public static void tearDownAfterClass() throws Exception {
+    util.shutdownMiniCluster();
+  }
+}

Reply via email to