Repository: hadoop Updated Branches: refs/heads/YARN-2928 b1960e0d2 -> 4b37985e6
http://git-wip-us.apache.org/repos/asf/hadoop/blob/4b37985e/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/TestHBaseStorageFlowActivity.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/TestHBaseStorageFlowActivity.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/TestHBaseStorageFlowActivity.java new file mode 100644 index 0000000..b4a0c74 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/TestHBaseStorageFlowActivity.java @@ -0,0 +1,372 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.timelineservice.storage.flow; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertTrue; + +import java.io.IOException; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Map; +import java.util.Set; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.HBaseTestingUtility; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.client.Admin; +import org.apache.hadoop.hbase.client.Connection; +import org.apache.hadoop.hbase.client.ConnectionFactory; +import org.apache.hadoop.hbase.client.Get; +import org.apache.hadoop.hbase.client.Result; +import org.apache.hadoop.hbase.client.ResultScanner; +import org.apache.hadoop.hbase.client.Scan; +import org.apache.hadoop.hbase.client.Table; +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntities; +import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity; +import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntityType; +import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEvent; +import org.apache.hadoop.yarn.api.records.timelineservice.TimelineMetric; +import org.apache.hadoop.yarn.api.records.timelineservice.TimelineMetric.Type; +import org.apache.hadoop.yarn.server.metrics.ApplicationMetricsConstants; +import org.apache.hadoop.yarn.server.timeline.GenericObjectMapper; +import org.apache.hadoop.yarn.server.timelineservice.storage.HBaseTimelineWriterImpl; +import org.apache.hadoop.yarn.server.timelineservice.storage.TimelineSchemaCreator; +import org.apache.hadoop.yarn.server.timelineservice.storage.application.ApplicationTable; +import org.apache.hadoop.yarn.server.timelineservice.storage.apptoflow.AppToFlowTable; +import org.apache.hadoop.yarn.server.timelineservice.storage.common.ColumnHelper; +import org.apache.hadoop.yarn.server.timelineservice.storage.common.TimelineWriterUtils; +import org.apache.hadoop.yarn.server.timelineservice.storage.entity.EntityTable; +import org.apache.hadoop.yarn.server.timelineservice.storage.flow.FlowActivityColumnFamily; +import org.apache.hadoop.yarn.server.timelineservice.storage.flow.FlowActivityColumnPrefix; +import org.apache.hadoop.yarn.server.timelineservice.storage.flow.FlowActivityRowKey; +import org.apache.hadoop.yarn.server.timelineservice.storage.flow.FlowActivityTable; +import org.apache.hadoop.yarn.server.timelineservice.storage.flow.FlowRunTable; +import org.junit.AfterClass; +import org.junit.BeforeClass; +import org.junit.Test; + +/** + * Tests the FlowRun and FlowActivity Tables + */ +public class TestHBaseStorageFlowActivity { + + private static HBaseTestingUtility util; + + @BeforeClass + public static void setupBeforeClass() throws Exception { + util = new HBaseTestingUtility(); + Configuration conf = util.getConfiguration(); + conf.setInt("hfile.format.version", 3); + util.startMiniCluster(); + createSchema(); + } + + private static void createSchema() throws IOException { + TimelineSchemaCreator.createAllTables(util.getConfiguration(), false); + } + + /** + * Writes 4 timeline entities belonging to one flow run through the + * {@link HBaseTimelineWriterImpl} + * + * Checks the flow run table contents + * + * The first entity has a created event, metrics and a finish event. + * + * The second entity has a created event and this is the entity with smallest + * start time. This should be the start time for the flow run. + * + * The third entity has a finish event and this is the entity with the max end + * time. This should be the end time for the flow run. + * + * The fourth entity has a created event which has a start time that is + * greater than min start time. + * + * The test also checks in the flow activity table that one entry has been + * made for all of these 4 application entities since they belong to the same + * flow run. + */ + @Test + public void testWriteFlowRunMinMax() throws Exception { + + TimelineEntities te = new TimelineEntities(); + te.addEntity(TestFlowDataGenerator.getEntity1()); + + HBaseTimelineWriterImpl hbi = null; + Configuration c1 = util.getConfiguration(); + String cluster = "testWriteFlowRunMinMaxToHBase_cluster1"; + String user = "testWriteFlowRunMinMaxToHBase_user1"; + String flow = "testing_flowRun_flow_name"; + String flowVersion = "CF7022C10F1354"; + Long runid = 1002345678919L; + String appName = "application_100000000000_1111"; + long endTs = 1439750690000L; + TimelineEntity entityMinStartTime = TestFlowDataGenerator + .getEntityMinStartTime(); + + try { + hbi = new HBaseTimelineWriterImpl(c1); + hbi.init(c1); + hbi.write(cluster, user, flow, flowVersion, runid, appName, te); + + // write another entity with the right min start time + te = new TimelineEntities(); + te.addEntity(entityMinStartTime); + appName = "application_100000000000_3333"; + hbi.write(cluster, user, flow, flowVersion, runid, appName, te); + + // writer another entity for max end time + TimelineEntity entityMaxEndTime = TestFlowDataGenerator + .getEntityMaxEndTime(endTs); + te = new TimelineEntities(); + te.addEntity(entityMaxEndTime); + appName = "application_100000000000_4444"; + hbi.write(cluster, user, flow, flowVersion, runid, appName, te); + + // writer another entity with greater start time + TimelineEntity entityGreaterStartTime = TestFlowDataGenerator + .getEntityGreaterStartTime(); + te = new TimelineEntities(); + te.addEntity(entityGreaterStartTime); + appName = "application_1000000000000000_2222"; + hbi.write(cluster, user, flow, flowVersion, runid, appName, te); + + // flush everything to hbase + hbi.flush(); + } finally { + hbi.close(); + } + + Connection conn = ConnectionFactory.createConnection(c1); + // check in flow activity table + Table table1 = conn.getTable(TableName + .valueOf(FlowActivityTable.DEFAULT_TABLE_NAME)); + byte[] startRow = FlowActivityRowKey.getRowKey(cluster, user, flow); + Get g = new Get(startRow); + Result r1 = table1.get(g); + assertNotNull(r1); + assertTrue(!r1.isEmpty()); + Map<byte[], byte[]> values = r1.getFamilyMap(FlowActivityColumnFamily.INFO + .getBytes()); + assertEquals(1, values.size()); + byte[] row = r1.getRow(); + FlowActivityRowKey flowActivityRowKey = FlowActivityRowKey.parseRowKey(row); + assertNotNull(flowActivityRowKey); + assertEquals(cluster, flowActivityRowKey.getClusterId()); + assertEquals(user, flowActivityRowKey.getUserId()); + assertEquals(flow, flowActivityRowKey.getFlowId()); + long dayTs = TimelineWriterUtils.getTopOfTheDayTimestamp(System + .currentTimeMillis()); + assertEquals(dayTs, flowActivityRowKey.getDayTimestamp()); + assertEquals(1, values.size()); + checkFlowActivityRunId(runid, flowVersion, values); + } + + /** + * Write 1 application entity and checks the record for today in the flow + * activity table + */ + @Test + public void testWriteFlowActivityOneFlow() throws Exception { + String cluster = "testWriteFlowActivityOneFlow_cluster1"; + String user = "testWriteFlowActivityOneFlow_user1"; + String flow = "flow_activity_test_flow_name"; + String flowVersion = "A122110F135BC4"; + Long runid = 1001111178919L; + + TimelineEntities te = new TimelineEntities(); + TimelineEntity entityApp1 = TestFlowDataGenerator.getFlowApp1(); + te.addEntity(entityApp1); + + HBaseTimelineWriterImpl hbi = null; + Configuration c1 = util.getConfiguration(); + try { + hbi = new HBaseTimelineWriterImpl(c1); + hbi.init(c1); + String appName = "application_1111999999_1234"; + hbi.write(cluster, user, flow, flowVersion, runid, appName, te); + hbi.flush(); + } finally { + hbi.close(); + } + // check flow activity + checkFlowActivityTable(cluster, user, flow, flowVersion, runid, c1); + } + + private void checkFlowActivityTable(String cluster, String user, String flow, + String flowVersion, Long runid, Configuration c1) throws IOException { + Scan s = new Scan(); + s.addFamily(FlowActivityColumnFamily.INFO.getBytes()); + byte[] startRow = FlowActivityRowKey.getRowKey(cluster, user, flow); + s.setStartRow(startRow); + String clusterStop = cluster + "1"; + byte[] stopRow = FlowActivityRowKey.getRowKey(clusterStop, user, flow); + s.setStopRow(stopRow); + Connection conn = ConnectionFactory.createConnection(c1); + Table table1 = conn.getTable(TableName + .valueOf(FlowActivityTable.DEFAULT_TABLE_NAME)); + ResultScanner scanner = table1.getScanner(s); + int rowCount = 0; + for (Result result : scanner) { + assertNotNull(result); + assertTrue(!result.isEmpty()); + Map<byte[], byte[]> values = result + .getFamilyMap(FlowActivityColumnFamily.INFO.getBytes()); + rowCount++; + byte[] row = result.getRow(); + FlowActivityRowKey flowActivityRowKey = FlowActivityRowKey + .parseRowKey(row); + assertNotNull(flowActivityRowKey); + assertEquals(cluster, flowActivityRowKey.getClusterId()); + assertEquals(user, flowActivityRowKey.getUserId()); + assertEquals(flow, flowActivityRowKey.getFlowId()); + long dayTs = TimelineWriterUtils.getTopOfTheDayTimestamp(System + .currentTimeMillis()); + assertEquals(dayTs, flowActivityRowKey.getDayTimestamp()); + assertEquals(1, values.size()); + checkFlowActivityRunId(runid, flowVersion, values); + } + assertEquals(1, rowCount); + } + + /** + * Writes 3 applications each with a different run id and version for the same + * {cluster, user, flow} + * + * They should be getting inserted into one record in the flow activity table + * with 3 columns, one per run id + */ + @Test + public void testFlowActivityTableOneFlowMultipleRunIds() throws IOException { + String cluster = "testManyRunsFlowActivity_cluster1"; + String user = "testManyRunsFlowActivity_c_user1"; + String flow = "flow_activity_test_flow_name"; + String flowVersion1 = "A122110F135BC4"; + Long runid1 = 11111111111L; + + String flowVersion2 = "A12222222222C4"; + long runid2 = 2222222222222L; + + String flowVersion3 = "A1333333333C4"; + long runid3 = 3333333333333L; + + TimelineEntities te = new TimelineEntities(); + TimelineEntity entityApp1 = TestFlowDataGenerator.getFlowApp1(); + te.addEntity(entityApp1); + + HBaseTimelineWriterImpl hbi = null; + Configuration c1 = util.getConfiguration(); + try { + hbi = new HBaseTimelineWriterImpl(c1); + hbi.init(c1); + String appName = "application_11888888888_1111"; + hbi.write(cluster, user, flow, flowVersion1, runid1, appName, te); + + // write an application with to this flow but a different runid/ version + te = new TimelineEntities(); + te.addEntity(entityApp1); + appName = "application_11888888888_2222"; + hbi.write(cluster, user, flow, flowVersion2, runid2, appName, te); + + // write an application with to this flow but a different runid/ version + te = new TimelineEntities(); + te.addEntity(entityApp1); + appName = "application_11888888888_3333"; + hbi.write(cluster, user, flow, flowVersion3, runid3, appName, te); + + hbi.flush(); + } finally { + hbi.close(); + } + // check flow activity + checkFlowActivityTableSeveralRuns(cluster, user, flow, c1, flowVersion1, + runid1, flowVersion2, runid2, flowVersion3, runid3); + + } + + private void checkFlowActivityTableSeveralRuns(String cluster, String user, + String flow, Configuration c1, String flowVersion1, Long runid1, + String flowVersion2, Long runid2, String flowVersion3, Long runid3) + throws IOException { + Scan s = new Scan(); + s.addFamily(FlowActivityColumnFamily.INFO.getBytes()); + byte[] startRow = FlowActivityRowKey.getRowKey(cluster, user, flow); + s.setStartRow(startRow); + String clusterStop = cluster + "1"; + byte[] stopRow = FlowActivityRowKey.getRowKey(clusterStop, user, flow); + s.setStopRow(stopRow); + Connection conn = ConnectionFactory.createConnection(c1); + Table table1 = conn.getTable(TableName + .valueOf(FlowActivityTable.DEFAULT_TABLE_NAME)); + ResultScanner scanner = table1.getScanner(s); + int rowCount = 0; + for (Result result : scanner) { + assertNotNull(result); + assertTrue(!result.isEmpty()); + byte[] row = result.getRow(); + FlowActivityRowKey flowActivityRowKey = FlowActivityRowKey + .parseRowKey(row); + assertNotNull(flowActivityRowKey); + assertEquals(cluster, flowActivityRowKey.getClusterId()); + assertEquals(user, flowActivityRowKey.getUserId()); + assertEquals(flow, flowActivityRowKey.getFlowId()); + long dayTs = TimelineWriterUtils.getTopOfTheDayTimestamp(System + .currentTimeMillis()); + assertEquals(dayTs, flowActivityRowKey.getDayTimestamp()); + + Map<byte[], byte[]> values = result + .getFamilyMap(FlowActivityColumnFamily.INFO.getBytes()); + rowCount++; + assertEquals(3, values.size()); + checkFlowActivityRunId(runid1, flowVersion1, values); + checkFlowActivityRunId(runid2, flowVersion2, values); + checkFlowActivityRunId(runid3, flowVersion3, values); + } + // the flow activity table is such that it will insert + // into current day's record + // hence, if this test runs across the midnight boundary, + // it may fail since it would insert into two records + // one for each day + assertEquals(1, rowCount); + } + + private void checkFlowActivityRunId(Long runid, String flowVersion, + Map<byte[], byte[]> values) throws IOException { + byte[] rq = ColumnHelper.getColumnQualifier( + FlowActivityColumnPrefix.RUN_ID.getColumnPrefixBytes(), + GenericObjectMapper.write(runid)); + for (Map.Entry<byte[], byte[]> k : values.entrySet()) { + String actualQ = Bytes.toString(k.getKey()); + if (Bytes.toString(rq).equals(actualQ)) { + String actualV = (String) GenericObjectMapper.read(k.getValue()); + assertEquals(flowVersion, actualV); + } + } + } + + @AfterClass + public static void tearDownAfterClass() throws Exception { + util.shutdownMiniCluster(); + } +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/4b37985e/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/TestHBaseStorageFlowRun.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/TestHBaseStorageFlowRun.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/TestHBaseStorageFlowRun.java new file mode 100644 index 0000000..bf524ea --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/TestHBaseStorageFlowRun.java @@ -0,0 +1,290 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.timelineservice.storage.flow; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertTrue; + +import java.io.IOException; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Map; +import java.util.Set; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.HBaseConfiguration; +import org.apache.hadoop.hbase.HBaseTestingUtility; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.client.Admin; +import org.apache.hadoop.hbase.client.Connection; +import org.apache.hadoop.hbase.client.ConnectionFactory; +import org.apache.hadoop.hbase.client.Get; +import org.apache.hadoop.hbase.client.Result; +import org.apache.hadoop.hbase.client.ResultScanner; +import org.apache.hadoop.hbase.client.Scan; +import org.apache.hadoop.hbase.client.Table; +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntities; +import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity; +import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntityType; +import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEvent; +import org.apache.hadoop.yarn.api.records.timelineservice.TimelineMetric; +import org.apache.hadoop.yarn.api.records.timelineservice.TimelineMetric.Type; +import org.apache.hadoop.yarn.server.metrics.ApplicationMetricsConstants; +import org.apache.hadoop.yarn.server.timeline.GenericObjectMapper; +import org.apache.hadoop.yarn.server.timelineservice.storage.HBaseTimelineWriterImpl; +import org.apache.hadoop.yarn.server.timelineservice.storage.TimelineSchemaCreator; +import org.apache.hadoop.yarn.server.timelineservice.storage.application.ApplicationTable; +import org.apache.hadoop.yarn.server.timelineservice.storage.apptoflow.AppToFlowTable; +import org.apache.hadoop.yarn.server.timelineservice.storage.common.ColumnHelper; +import org.apache.hadoop.yarn.server.timelineservice.storage.common.Separator; +import org.apache.hadoop.yarn.server.timelineservice.storage.common.TimelineWriterUtils; +import org.apache.hadoop.yarn.server.timelineservice.storage.entity.EntityTable; +import org.apache.hadoop.yarn.server.timelineservice.storage.flow.FlowActivityColumnFamily; +import org.apache.hadoop.yarn.server.timelineservice.storage.flow.FlowActivityColumnPrefix; +import org.apache.hadoop.yarn.server.timelineservice.storage.flow.FlowActivityRowKey; +import org.apache.hadoop.yarn.server.timelineservice.storage.flow.FlowActivityTable; +import org.apache.hadoop.yarn.server.timelineservice.storage.flow.FlowRunColumn; +import org.apache.hadoop.yarn.server.timelineservice.storage.flow.FlowRunColumnFamily; +import org.apache.hadoop.yarn.server.timelineservice.storage.flow.FlowRunColumnPrefix; +import org.apache.hadoop.yarn.server.timelineservice.storage.flow.FlowRunRowKey; +import org.apache.hadoop.yarn.server.timelineservice.storage.flow.FlowRunTable; +import org.junit.AfterClass; +import org.junit.BeforeClass; +import org.junit.Test; + +/** + * Tests the FlowRun and FlowActivity Tables + */ +public class TestHBaseStorageFlowRun { + + private static HBaseTestingUtility util; + + private final String metric1 = "MAP_SLOT_MILLIS"; + private final String metric2 = "HDFS_BYTES_READ"; + + @BeforeClass + public static void setupBeforeClass() throws Exception { + util = new HBaseTestingUtility(); + Configuration conf = util.getConfiguration(); + conf.setInt("hfile.format.version", 3); + util.startMiniCluster(); + createSchema(); + } + + private static void createSchema() throws IOException { + TimelineSchemaCreator.createAllTables(util.getConfiguration(), false); + } + + /** + * Writes 4 timeline entities belonging to one flow run through the + * {@link HBaseTimelineWriterImpl} + * + * Checks the flow run table contents + * + * The first entity has a created event, metrics and a finish event. + * + * The second entity has a created event and this is the entity with smallest + * start time. This should be the start time for the flow run. + * + * The third entity has a finish event and this is the entity with the max end + * time. This should be the end time for the flow run. + * + * The fourth entity has a created event which has a start time that is + * greater than min start time. + * + */ + @Test + public void testWriteFlowRunMinMax() throws Exception { + + TimelineEntities te = new TimelineEntities(); + te.addEntity(TestFlowDataGenerator.getEntity1()); + + HBaseTimelineWriterImpl hbi = null; + Configuration c1 = util.getConfiguration(); + String cluster = "testWriteFlowRunMinMaxToHBase_cluster1"; + String user = "testWriteFlowRunMinMaxToHBase_user1"; + String flow = "testing_flowRun_flow_name"; + String flowVersion = "CF7022C10F1354"; + Long runid = 1002345678919L; + String appName = "application_100000000000_1111"; + long endTs = 1439750690000L; + TimelineEntity entityMinStartTime = TestFlowDataGenerator + .getEntityMinStartTime(); + + try { + hbi = new HBaseTimelineWriterImpl(c1); + hbi.init(c1); + hbi.write(cluster, user, flow, flowVersion, runid, appName, te); + + // write another entity with the right min start time + te = new TimelineEntities(); + te.addEntity(entityMinStartTime); + appName = "application_100000000000_3333"; + hbi.write(cluster, user, flow, flowVersion, runid, appName, te); + + // writer another entity for max end time + TimelineEntity entityMaxEndTime = TestFlowDataGenerator + .getEntityMaxEndTime(endTs); + te = new TimelineEntities(); + te.addEntity(entityMaxEndTime); + appName = "application_100000000000_4444"; + hbi.write(cluster, user, flow, flowVersion, runid, appName, te); + + // writer another entity with greater start time + TimelineEntity entityGreaterStartTime = TestFlowDataGenerator + .getEntityGreaterStartTime(); + te = new TimelineEntities(); + te.addEntity(entityGreaterStartTime); + appName = "application_1000000000000000_2222"; + hbi.write(cluster, user, flow, flowVersion, runid, appName, te); + + // flush everything to hbase + hbi.flush(); + } finally { + hbi.close(); + } + + Connection conn = ConnectionFactory.createConnection(c1); + // check in flow run table + Table table1 = conn.getTable(TableName + .valueOf(FlowRunTable.DEFAULT_TABLE_NAME)); + // scan the table and see that we get back the right min and max + // timestamps + byte[] startRow = FlowRunRowKey.getRowKey(cluster, user, flow, runid); + Get g = new Get(startRow); + g.addColumn(FlowRunColumnFamily.INFO.getBytes(), + FlowRunColumn.MIN_START_TIME.getColumnQualifierBytes()); + g.addColumn(FlowRunColumnFamily.INFO.getBytes(), + FlowRunColumn.MAX_END_TIME.getColumnQualifierBytes()); + Result r1 = table1.get(g); + assertNotNull(r1); + assertTrue(!r1.isEmpty()); + Map<byte[], byte[]> values = r1.getFamilyMap(FlowRunColumnFamily.INFO + .getBytes()); + + assertEquals(2, r1.size()); + Long starttime = (Long) GenericObjectMapper.read(values + .get(FlowRunColumn.MIN_START_TIME.getColumnQualifierBytes())); + Long expmin = entityMinStartTime.getCreatedTime(); + assertEquals(expmin, starttime); + assertEquals(endTs, GenericObjectMapper.read(values + .get(FlowRunColumn.MAX_END_TIME.getColumnQualifierBytes()))); + } + + boolean isFlowRunRowKeyCorrect(byte[] rowKey, String cluster, String user, + String flow, Long runid) { + byte[][] rowKeyComponents = Separator.QUALIFIERS.split(rowKey, -1); + assertTrue(rowKeyComponents.length == 4); + assertEquals(cluster, Bytes.toString(rowKeyComponents[0])); + assertEquals(user, Bytes.toString(rowKeyComponents[1])); + assertEquals(flow, Bytes.toString(rowKeyComponents[2])); + assertEquals(TimelineWriterUtils.invert(runid), + Bytes.toLong(rowKeyComponents[3])); + return true; + } + + /** + * Writes two application entities of the same flow run. Each application has + * two metrics: slot millis and hdfs bytes read. Each metric has values at two + * timestamps. + * + * Checks the metric values of the flow in the flow run table. Flow metric + * values should be the sum of individual metric values that belong to the + * latest timestamp for that metric + */ + @Test + public void testWriteFlowRunMetricsOneFlow() throws Exception { + String cluster = "testWriteFlowRunMetricsOneFlow_cluster1"; + String user = "testWriteFlowRunMetricsOneFlow_user1"; + String flow = "testing_flowRun_metrics_flow_name"; + String flowVersion = "CF7022C10F1354"; + Long runid = 1002345678919L; + + TimelineEntities te = new TimelineEntities(); + TimelineEntity entityApp1 = TestFlowDataGenerator.getEntityMetricsApp1(); + te.addEntity(entityApp1); + + HBaseTimelineWriterImpl hbi = null; + Configuration c1 = util.getConfiguration(); + try { + hbi = new HBaseTimelineWriterImpl(c1); + hbi.init(c1); + String appName = "application_11111111111111_1111"; + hbi.write(cluster, user, flow, flowVersion, runid, appName, te); + // write another application with same metric to this flow + te = new TimelineEntities(); + TimelineEntity entityApp2 = TestFlowDataGenerator.getEntityMetricsApp2(); + te.addEntity(entityApp2); + appName = "application_11111111111111_2222"; + hbi.write(cluster, user, flow, flowVersion, runid, appName, te); + hbi.flush(); + } finally { + hbi.close(); + } + + // check flow run + checkFlowRunTable(cluster, user, flow, runid, c1); + } + + private void checkFlowRunTable(String cluster, String user, String flow, + long runid, Configuration c1) throws IOException { + Scan s = new Scan(); + s.addFamily(FlowRunColumnFamily.INFO.getBytes()); + byte[] startRow = FlowRunRowKey.getRowKey(cluster, user, flow, runid); + s.setStartRow(startRow); + String clusterStop = cluster + "1"; + byte[] stopRow = FlowRunRowKey.getRowKey(clusterStop, user, flow, runid); + s.setStopRow(stopRow); + Connection conn = ConnectionFactory.createConnection(c1); + Table table1 = conn.getTable(TableName + .valueOf(FlowRunTable.DEFAULT_TABLE_NAME)); + ResultScanner scanner = table1.getScanner(s); + + int rowCount = 0; + for (Result result : scanner) { + assertNotNull(result); + assertTrue(!result.isEmpty()); + Map<byte[], byte[]> values = result.getFamilyMap(FlowRunColumnFamily.INFO + .getBytes()); + rowCount++; + // check metric1 + byte[] q = ColumnHelper.getColumnQualifier( + FlowRunColumnPrefix.METRIC.getColumnPrefixBytes(), metric1); + assertTrue(values.containsKey(q)); + assertEquals(141, GenericObjectMapper.read(values.get(q))); + + // check metric2 + assertEquals(2, values.size()); + q = ColumnHelper.getColumnQualifier( + FlowRunColumnPrefix.METRIC.getColumnPrefixBytes(), metric2); + assertTrue(values.containsKey(q)); + assertEquals(57, GenericObjectMapper.read(values.get(q))); + } + assertEquals(1, rowCount); + } + + @AfterClass + public static void tearDownAfterClass() throws Exception { + util.shutdownMiniCluster(); + } +}