http://git-wip-us.apache.org/repos/asf/hadoop/blob/ccdec4a1/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase-tests/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/TestPhoenixOfflineAggregationWriterImpl.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase-tests/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/TestPhoenixOfflineAggregationWriterImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase-tests/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/TestPhoenixOfflineAggregationWriterImpl.java new file mode 100644 index 0000000..58d5e61 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase-tests/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/TestPhoenixOfflineAggregationWriterImpl.java @@ -0,0 +1,161 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.timelineservice.storage; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; + +import org.apache.hadoop.hbase.IntegrationTestingUtility; +import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntities; +import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity; +import org.apache.hadoop.yarn.api.records.timelineservice.TimelineMetric; +import org.apache.hadoop.yarn.conf.YarnConfiguration; +import org.apache.hadoop.yarn.server.timelineservice.collector.TimelineCollectorContext; +import org.apache.hadoop.yarn.server.timelineservice.storage.common.OfflineAggregationInfo; +import org.junit.AfterClass; +import org.junit.BeforeClass; +import org.junit.Test; +import org.apache.phoenix.hbase.index.write.IndexWriterUtils; +import org.apache.phoenix.query.BaseTest; +import org.apache.phoenix.query.QueryServices; +import org.apache.phoenix.util.ReadOnlyProps; + +import java.sql.ResultSet; +import java.sql.SQLException; +import java.sql.Statement; +import java.util.HashMap; +import java.util.Map; + +import static org.apache.phoenix.util.TestUtil.TEST_PROPERTIES; + +public class TestPhoenixOfflineAggregationWriterImpl extends BaseTest { + private static PhoenixOfflineAggregationWriterImpl storage; + private static final int BATCH_SIZE = 3; + + @BeforeClass + public static void setup() throws Exception { + YarnConfiguration conf = new YarnConfiguration(); + storage = setupPhoenixClusterAndWriterForTest(conf); + } + + @Test(timeout = 90000) + public void testFlowLevelAggregationStorage() throws Exception { + testAggregator(OfflineAggregationInfo.FLOW_AGGREGATION); + } + + @Test(timeout = 90000) + public void testUserLevelAggregationStorage() throws Exception { + testAggregator(OfflineAggregationInfo.USER_AGGREGATION); + } + + @AfterClass + public static void cleanup() throws Exception { + storage.dropTable(OfflineAggregationInfo.FLOW_AGGREGATION_TABLE_NAME); + storage.dropTable(OfflineAggregationInfo.USER_AGGREGATION_TABLE_NAME); + tearDownMiniCluster(); + } + + private static PhoenixOfflineAggregationWriterImpl + setupPhoenixClusterAndWriterForTest(YarnConfiguration conf) + throws Exception{ + Map<String, String> props = new HashMap<>(); + // Must update config before starting server + props.put(QueryServices.STATS_USE_CURRENT_TIME_ATTRIB, + Boolean.FALSE.toString()); + props.put("java.security.krb5.realm", ""); + props.put("java.security.krb5.kdc", ""); + props.put(IntegrationTestingUtility.IS_DISTRIBUTED_CLUSTER, + Boolean.FALSE.toString()); + props.put(QueryServices.QUEUE_SIZE_ATTRIB, Integer.toString(5000)); + props.put(IndexWriterUtils.HTABLE_THREAD_KEY, Integer.toString(100)); + // Make a small batch size to test multiple calls to reserve sequences + props.put(QueryServices.SEQUENCE_CACHE_SIZE_ATTRIB, + Long.toString(BATCH_SIZE)); + // Must update config before starting server + setUpTestDriver(new ReadOnlyProps(props.entrySet().iterator())); + + // Change connection settings for test + conf.set( + YarnConfiguration.PHOENIX_OFFLINE_STORAGE_CONN_STR, + getUrl()); + PhoenixOfflineAggregationWriterImpl + myWriter = new PhoenixOfflineAggregationWriterImpl(TEST_PROPERTIES); + myWriter.init(conf); + myWriter.start(); + myWriter.createPhoenixTables(); + return myWriter; + } + + private static TimelineEntity getTestAggregationTimelineEntity() { + TimelineEntity entity = new TimelineEntity(); + String id = "hello1"; + String type = "testAggregationType"; + entity.setId(id); + entity.setType(type); + entity.setCreatedTime(1425016501000L); + + TimelineMetric metric = new TimelineMetric(); + metric.setId("HDFS_BYTES_READ"); + metric.addValue(1425016501100L, 8000); + entity.addMetric(metric); + + return entity; + } + + private void testAggregator(OfflineAggregationInfo aggregationInfo) + throws Exception { + // Set up a list of timeline entities and write them back to Phoenix + int numEntity = 1; + TimelineEntities te = new TimelineEntities(); + te.addEntity(getTestAggregationTimelineEntity()); + TimelineCollectorContext context = new TimelineCollectorContext("cluster_1", + "user1", "testFlow", null, 0L, null); + storage.writeAggregatedEntity(context, te, + aggregationInfo); + + // Verify if we're storing all entities + String[] primaryKeyList = aggregationInfo.getPrimaryKeyList(); + String sql = "SELECT COUNT(" + primaryKeyList[primaryKeyList.length - 1] + +") FROM " + aggregationInfo.getTableName(); + verifySQLWithCount(sql, numEntity, "Number of entities should be "); + // Check metric + sql = "SELECT COUNT(m.HDFS_BYTES_READ) FROM " + + aggregationInfo.getTableName() + "(m.HDFS_BYTES_READ VARBINARY) "; + verifySQLWithCount(sql, numEntity, + "Number of entities with info should be "); + } + + + private void verifySQLWithCount(String sql, int targetCount, String message) + throws Exception { + try ( + Statement stmt = + storage.getConnection().createStatement(); + ResultSet rs = stmt.executeQuery(sql)) { + assertTrue("Result set empty on statement " + sql, rs.next()); + assertNotNull("Fail to execute query " + sql, rs); + assertEquals(message + " " + targetCount, targetCount, rs.getInt(1)); + } catch (SQLException se) { + fail("SQL exception on query: " + sql + + " With exception message: " + se.getLocalizedMessage()); + } + } +}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/ccdec4a1/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase-tests/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/TestFlowDataGenerator.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase-tests/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/TestFlowDataGenerator.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase-tests/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/TestFlowDataGenerator.java new file mode 100644 index 0000000..3b8036d --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase-tests/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/TestFlowDataGenerator.java @@ -0,0 +1,383 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.timelineservice.storage.flow; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Map; +import java.util.Set; + +import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity; +import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntityType; +import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEvent; +import org.apache.hadoop.yarn.api.records.timelineservice.TimelineMetric; +import org.apache.hadoop.yarn.api.records.timelineservice.TimelineMetric.Type; +import org.apache.hadoop.yarn.server.metrics.ApplicationMetricsConstants; +import org.apache.hadoop.conf.Configuration; + +/** + * Generates the data/entities for the FlowRun and FlowActivity Tables + */ +class TestFlowDataGenerator { + + private static final String metric1 = "MAP_SLOT_MILLIS"; + private static final String metric2 = "HDFS_BYTES_READ"; + public static final long END_TS_INCR = 10000L; + + static TimelineEntity getEntityMetricsApp1(long insertTs, Configuration c1) { + TimelineEntity entity = new TimelineEntity(); + String id = "flowRunMetrics_test"; + String type = TimelineEntityType.YARN_APPLICATION.toString(); + entity.setId(id); + entity.setType(type); + long cTime = 1425016501000L; + entity.setCreatedTime(cTime); + + // add metrics + Set<TimelineMetric> metrics = new HashSet<>(); + TimelineMetric m1 = new TimelineMetric(); + m1.setId(metric1); + Map<Long, Number> metricValues = new HashMap<Long, Number>(); + long ts = insertTs; + + for (int k=1; k< 100 ; k++) { + metricValues.put(ts - k*200000, 20L); + } + metricValues.put(ts - 80000, 40L); + m1.setType(Type.TIME_SERIES); + m1.setValues(metricValues); + metrics.add(m1); + + TimelineMetric m2 = new TimelineMetric(); + m2.setId(metric2); + metricValues = new HashMap<Long, Number>(); + ts = System.currentTimeMillis(); + for (int k=1; k< 100 ; k++) { + metricValues.put(ts - k*100000, 31L); + } + + metricValues.put(ts - 80000, 57L); + m2.setType(Type.TIME_SERIES); + m2.setValues(metricValues); + metrics.add(m2); + + entity.addMetrics(metrics); + return entity; + } + + + static TimelineEntity getEntityMetricsApp1Complete(long insertTs, Configuration c1) { + TimelineEntity entity = new TimelineEntity(); + String id = "flowRunMetrics_test"; + String type = TimelineEntityType.YARN_APPLICATION.toString(); + entity.setId(id); + entity.setType(type); + long cTime = 1425016501000L; + entity.setCreatedTime(cTime); + + // add metrics + Set<TimelineMetric> metrics = new HashSet<>(); + TimelineMetric m1 = new TimelineMetric(); + m1.setId(metric1); + Map<Long, Number> metricValues = new HashMap<Long, Number>(); + long ts = insertTs; + + metricValues.put(ts - 80000, 40L); + m1.setType(Type.TIME_SERIES); + m1.setValues(metricValues); + metrics.add(m1); + + TimelineMetric m2 = new TimelineMetric(); + m2.setId(metric2); + metricValues = new HashMap<Long, Number>(); + ts = insertTs; + metricValues.put(ts - 80000, 57L); + m2.setType(Type.TIME_SERIES); + m2.setValues(metricValues); + metrics.add(m2); + + entity.addMetrics(metrics); + + TimelineEvent event = new TimelineEvent(); + event.setId(ApplicationMetricsConstants.FINISHED_EVENT_TYPE); + event.setTimestamp(insertTs); + event.addInfo("done", "insertTs=" + insertTs); + entity.addEvent(event); + return entity; + } + + + static TimelineEntity getEntityMetricsApp1(long insertTs) { + TimelineEntity entity = new TimelineEntity(); + String id = "flowRunMetrics_test"; + String type = TimelineEntityType.YARN_APPLICATION.toString(); + entity.setId(id); + entity.setType(type); + long cTime = 1425016501000L; + entity.setCreatedTime(cTime); + + // add metrics + Set<TimelineMetric> metrics = new HashSet<>(); + TimelineMetric m1 = new TimelineMetric(); + m1.setId(metric1); + Map<Long, Number> metricValues = new HashMap<Long, Number>(); + long ts = insertTs; + metricValues.put(ts - 100000, 2L); + metricValues.put(ts - 80000, 40L); + m1.setType(Type.TIME_SERIES); + m1.setValues(metricValues); + metrics.add(m1); + + TimelineMetric m2 = new TimelineMetric(); + m2.setId(metric2); + metricValues = new HashMap<Long, Number>(); + ts = insertTs; + metricValues.put(ts - 100000, 31L); + metricValues.put(ts - 80000, 57L); + m2.setType(Type.TIME_SERIES); + m2.setValues(metricValues); + metrics.add(m2); + + entity.addMetrics(metrics); + TimelineEvent event = new TimelineEvent(); + event.setId(ApplicationMetricsConstants.CREATED_EVENT_TYPE); + long endTs = 1439379885000L; + event.setTimestamp(endTs); + String expKey = "foo_event_greater"; + String expVal = "test_app_greater"; + event.addInfo(expKey, expVal); + entity.addEvent(event); + return entity; + } + + + static TimelineEntity getEntityMetricsApp2(long insertTs) { + TimelineEntity entity = new TimelineEntity(); + String id = "flowRunMetrics_test"; + String type = TimelineEntityType.YARN_APPLICATION.toString(); + entity.setId(id); + entity.setType(type); + long cTime = 1425016501000L; + entity.setCreatedTime(cTime); + // add metrics + Set<TimelineMetric> metrics = new HashSet<>(); + TimelineMetric m1 = new TimelineMetric(); + m1.setId(metric1); + Map<Long, Number> metricValues = new HashMap<Long, Number>(); + long ts = insertTs; + metricValues.put(ts - 100000, 5L); + metricValues.put(ts - 80000, 101L); + m1.setType(Type.TIME_SERIES); + m1.setValues(metricValues); + metrics.add(m1); + entity.addMetrics(metrics); + TimelineEvent event = new TimelineEvent(); + event.setId(ApplicationMetricsConstants.CREATED_EVENT_TYPE); + long endTs = 1439379885000L; + event.setTimestamp(endTs); + String expKey = "foo_event_greater"; + String expVal = "test_app_greater"; + event.addInfo(expKey, expVal); + entity.addEvent(event); + return entity; + } + + static TimelineEntity getEntity1() { + TimelineEntity entity = new TimelineEntity(); + String id = "flowRunHello"; + String type = TimelineEntityType.YARN_APPLICATION.toString(); + entity.setId(id); + entity.setType(type); + long cTime = 1425026901000L; + entity.setCreatedTime(cTime); + // add metrics + Set<TimelineMetric> metrics = new HashSet<>(); + TimelineMetric m1 = new TimelineMetric(); + m1.setId(metric1); + Map<Long, Number> metricValues = new HashMap<Long, Number>(); + long ts = System.currentTimeMillis(); + metricValues.put(ts - 120000, 100000000L); + metricValues.put(ts - 100000, 200000000L); + metricValues.put(ts - 80000, 300000000L); + metricValues.put(ts - 60000, 400000000L); + metricValues.put(ts - 40000, 50000000000L); + metricValues.put(ts - 20000, 60000000000L); + m1.setType(Type.TIME_SERIES); + m1.setValues(metricValues); + metrics.add(m1); + entity.addMetrics(metrics); + + TimelineEvent event = new TimelineEvent(); + event.setId(ApplicationMetricsConstants.CREATED_EVENT_TYPE); + event.setTimestamp(cTime); + String expKey = "foo_event"; + Object expVal = "test"; + event.addInfo(expKey, expVal); + entity.addEvent(event); + + event = new TimelineEvent(); + event.setId(ApplicationMetricsConstants.FINISHED_EVENT_TYPE); + long expTs = cTime + 21600000;// start time + 6hrs + event.setTimestamp(expTs); + event.addInfo(expKey, expVal); + entity.addEvent(event); + + return entity; + } + + static TimelineEntity getAFullEntity(long ts, long endTs) { + TimelineEntity entity = new TimelineEntity(); + String id = "flowRunFullEntity"; + String type = TimelineEntityType.YARN_APPLICATION.toString(); + entity.setId(id); + entity.setType(type); + entity.setCreatedTime(ts); + // add metrics + Set<TimelineMetric> metrics = new HashSet<>(); + TimelineMetric m1 = new TimelineMetric(); + m1.setId(metric1); + Map<Long, Number> metricValues = new HashMap<Long, Number>(); + metricValues.put(ts - 120000, 100000000L); + metricValues.put(ts - 100000, 200000000L); + metricValues.put(ts - 80000, 300000000L); + metricValues.put(ts - 60000, 400000000L); + metricValues.put(ts - 40000, 50000000000L); + metricValues.put(ts - 20000, 60000000000L); + m1.setType(Type.TIME_SERIES); + m1.setValues(metricValues); + metrics.add(m1); + TimelineMetric m2 = new TimelineMetric(); + m2.setId(metric2); + metricValues = new HashMap<Long, Number>(); + metricValues.put(ts - 900000, 31L); + metricValues.put(ts - 30000, 57L); + m2.setType(Type.TIME_SERIES); + m2.setValues(metricValues); + metrics.add(m2); + entity.addMetrics(metrics); + + TimelineEvent event = new TimelineEvent(); + event.setId(ApplicationMetricsConstants.CREATED_EVENT_TYPE); + event.setTimestamp(ts); + String expKey = "foo_event"; + Object expVal = "test"; + event.addInfo(expKey, expVal); + entity.addEvent(event); + + event = new TimelineEvent(); + event.setId(ApplicationMetricsConstants.FINISHED_EVENT_TYPE); + long expTs = ts + 21600000;// start time + 6hrs + event.setTimestamp(expTs); + event.addInfo(expKey, expVal); + entity.addEvent(event); + + return entity; + } + + static TimelineEntity getEntityGreaterStartTime(long startTs) { + TimelineEntity entity = new TimelineEntity(); + entity.setCreatedTime(startTs); + entity.setId("flowRunHello with greater start time"); + String type = TimelineEntityType.YARN_APPLICATION.toString(); + entity.setType(type); + TimelineEvent event = new TimelineEvent(); + event.setId(ApplicationMetricsConstants.CREATED_EVENT_TYPE); + event.setTimestamp(startTs); + String expKey = "foo_event_greater"; + String expVal = "test_app_greater"; + event.addInfo(expKey, expVal); + entity.addEvent(event); + return entity; + } + + static TimelineEntity getEntityMaxEndTime(long endTs) { + TimelineEntity entity = new TimelineEntity(); + entity.setId("flowRunHello Max End time"); + entity.setType(TimelineEntityType.YARN_APPLICATION.toString()); + TimelineEvent event = new TimelineEvent(); + event.setId(ApplicationMetricsConstants.FINISHED_EVENT_TYPE); + event.setTimestamp(endTs); + String expKey = "foo_even_max_ finished"; + String expVal = "test_app_max_finished"; + event.addInfo(expKey, expVal); + entity.addEvent(event); + return entity; + } + + static TimelineEntity getEntityMinStartTime(long startTs) { + TimelineEntity entity = new TimelineEntity(); + String id = "flowRunHelloMInStartTime"; + String type = TimelineEntityType.YARN_APPLICATION.toString(); + entity.setId(id); + entity.setType(type); + entity.setCreatedTime(startTs); + TimelineEvent event = new TimelineEvent(); + event.setId(ApplicationMetricsConstants.CREATED_EVENT_TYPE); + event.setTimestamp(startTs); + entity.addEvent(event); + return entity; + } + + static TimelineEntity getMinFlushEntity(long startTs) { + TimelineEntity entity = new TimelineEntity(); + String id = "flowRunHelloFlushEntityMin"; + String type = TimelineEntityType.YARN_APPLICATION.toString(); + entity.setId(id); + entity.setType(type); + entity.setCreatedTime(startTs); + TimelineEvent event = new TimelineEvent(); + event.setId(ApplicationMetricsConstants.CREATED_EVENT_TYPE); + event.setTimestamp(startTs); + entity.addEvent(event); + return entity; + } + + static TimelineEntity getMaxFlushEntity(long startTs) { + TimelineEntity entity = new TimelineEntity(); + String id = "flowRunHelloFlushEntityMax"; + String type = TimelineEntityType.YARN_APPLICATION.toString(); + entity.setId(id); + entity.setType(type); + entity.setCreatedTime(startTs); + + TimelineEvent event = new TimelineEvent(); + event.setId(ApplicationMetricsConstants.FINISHED_EVENT_TYPE); + event.setTimestamp(startTs + END_TS_INCR); + entity.addEvent(event); + return entity; + } + + static TimelineEntity getFlowApp1(long appCreatedTime) { + TimelineEntity entity = new TimelineEntity(); + String id = "flowActivity_test"; + String type = TimelineEntityType.YARN_APPLICATION.toString(); + entity.setId(id); + entity.setType(type); + entity.setCreatedTime(appCreatedTime); + + TimelineEvent event = new TimelineEvent(); + event.setId(ApplicationMetricsConstants.CREATED_EVENT_TYPE); + event.setTimestamp(appCreatedTime); + String expKey = "foo_event"; + Object expVal = "test"; + event.addInfo(expKey, expVal); + entity.addEvent(event); + + return entity; + } +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/ccdec4a1/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase-tests/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/TestHBaseStorageFlowActivity.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase-tests/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/TestHBaseStorageFlowActivity.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase-tests/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/TestHBaseStorageFlowActivity.java new file mode 100644 index 0000000..6b23b6c --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase-tests/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/TestHBaseStorageFlowActivity.java @@ -0,0 +1,469 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.timelineservice.storage.flow; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; + +import java.io.IOException; +import java.util.Map; +import java.util.NavigableSet; +import java.util.Set; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.HBaseTestingUtility; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.client.Connection; +import org.apache.hadoop.hbase.client.ConnectionFactory; +import org.apache.hadoop.hbase.client.Get; +import org.apache.hadoop.hbase.client.Result; +import org.apache.hadoop.hbase.client.ResultScanner; +import org.apache.hadoop.hbase.client.Scan; +import org.apache.hadoop.hbase.client.Table; +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.yarn.api.records.timelineservice.FlowActivityEntity; +import org.apache.hadoop.yarn.api.records.timelineservice.FlowRunEntity; +import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntities; +import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity; +import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntityType; +import org.apache.hadoop.yarn.server.timeline.GenericObjectMapper; +import org.apache.hadoop.yarn.server.timelineservice.reader.TimelineDataToRetrieve; +import org.apache.hadoop.yarn.server.timelineservice.reader.TimelineEntityFilters; +import org.apache.hadoop.yarn.server.timelineservice.reader.TimelineReaderContext; +import org.apache.hadoop.yarn.server.timelineservice.storage.HBaseTimelineReaderImpl; +import org.apache.hadoop.yarn.server.timelineservice.storage.HBaseTimelineWriterImpl; +import org.apache.hadoop.yarn.server.timelineservice.storage.TimelineSchemaCreator; +import org.apache.hadoop.yarn.server.timelineservice.storage.common.ColumnHelper; +import org.apache.hadoop.yarn.server.timelineservice.storage.common.TimelineStorageUtils; +import org.junit.AfterClass; +import org.junit.BeforeClass; +import org.junit.Test; + +/** + * Tests the FlowRun and FlowActivity Tables + */ +public class TestHBaseStorageFlowActivity { + + private static HBaseTestingUtility util; + + @BeforeClass + public static void setupBeforeClass() throws Exception { + util = new HBaseTestingUtility(); + Configuration conf = util.getConfiguration(); + conf.setInt("hfile.format.version", 3); + util.startMiniCluster(); + createSchema(); + } + + private static void createSchema() throws IOException { + TimelineSchemaCreator.createAllTables(util.getConfiguration(), false); + } + + /** + * Writes 4 timeline entities belonging to one flow run through the + * {@link HBaseTimelineWriterImpl} + * + * Checks the flow run table contents + * + * The first entity has a created event, metrics and a finish event. + * + * The second entity has a created event and this is the entity with smallest + * start time. This should be the start time for the flow run. + * + * The third entity has a finish event and this is the entity with the max end + * time. This should be the end time for the flow run. + * + * The fourth entity has a created event which has a start time that is + * greater than min start time. + * + * The test also checks in the flow activity table that one entry has been + * made for all of these 4 application entities since they belong to the same + * flow run. + */ + @Test + public void testWriteFlowRunMinMax() throws Exception { + + TimelineEntities te = new TimelineEntities(); + te.addEntity(TestFlowDataGenerator.getEntity1()); + + HBaseTimelineWriterImpl hbi = null; + Configuration c1 = util.getConfiguration(); + String cluster = "testWriteFlowRunMinMaxToHBase_cluster1"; + String user = "testWriteFlowRunMinMaxToHBase_user1"; + String flow = "testing_flowRun_flow_name"; + String flowVersion = "CF7022C10F1354"; + long runid = 1002345678919L; + String appName = "application_100000000000_1111"; + long minStartTs = 1424995200300L; + long greaterStartTs = 1424995200300L + 864000L; + long endTs = 1424995200300L + 86000000L;; + TimelineEntity entityMinStartTime = TestFlowDataGenerator + .getEntityMinStartTime(minStartTs); + + try { + hbi = new HBaseTimelineWriterImpl(c1); + hbi.init(c1); + hbi.write(cluster, user, flow, flowVersion, runid, appName, te); + + // write another entity with the right min start time + te = new TimelineEntities(); + te.addEntity(entityMinStartTime); + appName = "application_100000000000_3333"; + hbi.write(cluster, user, flow, flowVersion, runid, appName, te); + + // writer another entity for max end time + TimelineEntity entityMaxEndTime = TestFlowDataGenerator + .getEntityMaxEndTime(endTs); + te = new TimelineEntities(); + te.addEntity(entityMaxEndTime); + appName = "application_100000000000_4444"; + hbi.write(cluster, user, flow, flowVersion, runid, appName, te); + + // writer another entity with greater start time + TimelineEntity entityGreaterStartTime = TestFlowDataGenerator + .getEntityGreaterStartTime(greaterStartTs); + te = new TimelineEntities(); + te.addEntity(entityGreaterStartTime); + appName = "application_1000000000000000_2222"; + hbi.write(cluster, user, flow, flowVersion, runid, appName, te); + + // flush everything to hbase + hbi.flush(); + } finally { + hbi.close(); + } + + Connection conn = ConnectionFactory.createConnection(c1); + // check in flow activity table + Table table1 = conn.getTable(TableName + .valueOf(FlowActivityTable.DEFAULT_TABLE_NAME)); + byte[] startRow = + FlowActivityRowKey.getRowKey(cluster, minStartTs, user, flow); + Get g = new Get(startRow); + Result r1 = table1.get(g); + assertNotNull(r1); + assertTrue(!r1.isEmpty()); + Map<byte[], byte[]> values = r1.getFamilyMap(FlowActivityColumnFamily.INFO + .getBytes()); + assertEquals(1, values.size()); + byte[] row = r1.getRow(); + FlowActivityRowKey flowActivityRowKey = FlowActivityRowKey.parseRowKey(row); + assertNotNull(flowActivityRowKey); + assertEquals(cluster, flowActivityRowKey.getClusterId()); + assertEquals(user, flowActivityRowKey.getUserId()); + assertEquals(flow, flowActivityRowKey.getFlowName()); + long dayTs = TimelineStorageUtils.getTopOfTheDayTimestamp(minStartTs); + assertEquals(dayTs, flowActivityRowKey.getDayTimestamp()); + assertEquals(1, values.size()); + checkFlowActivityRunId(runid, flowVersion, values); + + // use the timeline reader to verify data + HBaseTimelineReaderImpl hbr = null; + try { + hbr = new HBaseTimelineReaderImpl(); + hbr.init(c1); + hbr.start(); + // get the flow activity entity + Set<TimelineEntity> entities = hbr.getEntities( + new TimelineReaderContext(cluster, null, null, null, null, + TimelineEntityType.YARN_FLOW_ACTIVITY.toString(), null), + new TimelineEntityFilters(10L, null, null, null, null, null, + null, null, null), + new TimelineDataToRetrieve()); + assertEquals(1, entities.size()); + for (TimelineEntity e : entities) { + FlowActivityEntity flowActivity = (FlowActivityEntity)e; + assertEquals(cluster, flowActivity.getCluster()); + assertEquals(user, flowActivity.getUser()); + assertEquals(flow, flowActivity.getFlowName()); + assertEquals(dayTs, flowActivity.getDate().getTime()); + Set<FlowRunEntity> flowRuns = flowActivity.getFlowRuns(); + assertEquals(1, flowRuns.size()); + } + } finally { + hbr.close(); + } + } + + /** + * Write 1 application entity and checks the record for today in the flow + * activity table + */ + @Test + public void testWriteFlowActivityOneFlow() throws Exception { + String cluster = "testWriteFlowActivityOneFlow_cluster1"; + String user = "testWriteFlowActivityOneFlow_user1"; + String flow = "flow_activity_test_flow_name"; + String flowVersion = "A122110F135BC4"; + long runid = 1001111178919L; + + TimelineEntities te = new TimelineEntities(); + long appCreatedTime = 1425016501000L; + TimelineEntity entityApp1 = + TestFlowDataGenerator.getFlowApp1(appCreatedTime); + te.addEntity(entityApp1); + + HBaseTimelineWriterImpl hbi = null; + Configuration c1 = util.getConfiguration(); + try { + hbi = new HBaseTimelineWriterImpl(c1); + hbi.init(c1); + String appName = "application_1111999999_1234"; + hbi.write(cluster, user, flow, flowVersion, runid, appName, te); + hbi.flush(); + } finally { + hbi.close(); + } + // check flow activity + checkFlowActivityTable(cluster, user, flow, flowVersion, runid, c1, + appCreatedTime); + + // use the reader to verify the data + HBaseTimelineReaderImpl hbr = null; + try { + hbr = new HBaseTimelineReaderImpl(); + hbr.init(c1); + hbr.start(); + + Set<TimelineEntity> entities = hbr.getEntities( + new TimelineReaderContext(cluster, user, flow, null, null, + TimelineEntityType.YARN_FLOW_ACTIVITY.toString(), null), + new TimelineEntityFilters(10L, null, null, null, null, null, + null, null, null), + new TimelineDataToRetrieve()); + assertEquals(1, entities.size()); + for (TimelineEntity e : entities) { + FlowActivityEntity entity = (FlowActivityEntity)e; + NavigableSet<FlowRunEntity> flowRuns = entity.getFlowRuns(); + assertEquals(1, flowRuns.size()); + for (FlowRunEntity flowRun : flowRuns) { + assertEquals(runid, flowRun.getRunId()); + assertEquals(flowVersion, flowRun.getVersion()); + } + } + } finally { + hbr.close(); + } + } + + private void checkFlowActivityTable(String cluster, String user, String flow, + String flowVersion, long runid, Configuration c1, long appCreatedTime) + throws IOException { + Scan s = new Scan(); + s.addFamily(FlowActivityColumnFamily.INFO.getBytes()); + byte[] startRow = + FlowActivityRowKey.getRowKey(cluster, appCreatedTime, user, flow); + s.setStartRow(startRow); + String clusterStop = cluster + "1"; + byte[] stopRow = + FlowActivityRowKey.getRowKey(clusterStop, appCreatedTime, user, flow); + s.setStopRow(stopRow); + Connection conn = ConnectionFactory.createConnection(c1); + Table table1 = conn.getTable(TableName + .valueOf(FlowActivityTable.DEFAULT_TABLE_NAME)); + ResultScanner scanner = table1.getScanner(s); + int rowCount = 0; + for (Result result : scanner) { + assertNotNull(result); + assertTrue(!result.isEmpty()); + Map<byte[], byte[]> values = result + .getFamilyMap(FlowActivityColumnFamily.INFO.getBytes()); + rowCount++; + byte[] row = result.getRow(); + FlowActivityRowKey flowActivityRowKey = FlowActivityRowKey + .parseRowKey(row); + assertNotNull(flowActivityRowKey); + assertEquals(cluster, flowActivityRowKey.getClusterId()); + assertEquals(user, flowActivityRowKey.getUserId()); + assertEquals(flow, flowActivityRowKey.getFlowName()); + long dayTs = TimelineStorageUtils.getTopOfTheDayTimestamp(appCreatedTime); + assertEquals(dayTs, flowActivityRowKey.getDayTimestamp()); + assertEquals(1, values.size()); + checkFlowActivityRunId(runid, flowVersion, values); + } + assertEquals(1, rowCount); + } + + /** + * Writes 3 applications each with a different run id and version for the same + * {cluster, user, flow} + * + * They should be getting inserted into one record in the flow activity table + * with 3 columns, one per run id + */ + @Test + public void testFlowActivityTableOneFlowMultipleRunIds() throws IOException { + String cluster = "testManyRunsFlowActivity_cluster1"; + String user = "testManyRunsFlowActivity_c_user1"; + String flow = "flow_activity_test_flow_name"; + String flowVersion1 = "A122110F135BC4"; + long runid1 = 11111111111L; + + String flowVersion2 = "A12222222222C4"; + long runid2 = 2222222222222L; + + String flowVersion3 = "A1333333333C4"; + long runid3 = 3333333333333L; + + TimelineEntities te = new TimelineEntities(); + long appCreatedTime = 1425016501000L; + TimelineEntity entityApp1 = + TestFlowDataGenerator.getFlowApp1(appCreatedTime); + te.addEntity(entityApp1); + + HBaseTimelineWriterImpl hbi = null; + Configuration c1 = util.getConfiguration(); + try { + hbi = new HBaseTimelineWriterImpl(c1); + hbi.init(c1); + String appName = "application_11888888888_1111"; + hbi.write(cluster, user, flow, flowVersion1, runid1, appName, te); + + // write an application with to this flow but a different runid/ version + te = new TimelineEntities(); + te.addEntity(entityApp1); + appName = "application_11888888888_2222"; + hbi.write(cluster, user, flow, flowVersion2, runid2, appName, te); + + // write an application with to this flow but a different runid/ version + te = new TimelineEntities(); + te.addEntity(entityApp1); + appName = "application_11888888888_3333"; + hbi.write(cluster, user, flow, flowVersion3, runid3, appName, te); + + hbi.flush(); + } finally { + hbi.close(); + } + // check flow activity + checkFlowActivityTableSeveralRuns(cluster, user, flow, c1, flowVersion1, + runid1, flowVersion2, runid2, flowVersion3, runid3, appCreatedTime); + + // use the timeline reader to verify data + HBaseTimelineReaderImpl hbr = null; + try { + hbr = new HBaseTimelineReaderImpl(); + hbr.init(c1); + hbr.start(); + + Set<TimelineEntity> entities = hbr.getEntities( + new TimelineReaderContext(cluster, null, null, null, null, + TimelineEntityType.YARN_FLOW_ACTIVITY.toString(), null), + new TimelineEntityFilters(10L, null, null, null, null, null, + null, null, null), + new TimelineDataToRetrieve()); + assertEquals(1, entities.size()); + for (TimelineEntity e : entities) { + FlowActivityEntity flowActivity = (FlowActivityEntity)e; + assertEquals(cluster, flowActivity.getCluster()); + assertEquals(user, flowActivity.getUser()); + assertEquals(flow, flowActivity.getFlowName()); + long dayTs = + TimelineStorageUtils.getTopOfTheDayTimestamp(appCreatedTime); + assertEquals(dayTs, flowActivity.getDate().getTime()); + Set<FlowRunEntity> flowRuns = flowActivity.getFlowRuns(); + assertEquals(3, flowRuns.size()); + for (FlowRunEntity flowRun : flowRuns) { + long runId = flowRun.getRunId(); + String version = flowRun.getVersion(); + if (runId == runid1) { + assertEquals(flowVersion1, version); + } else if (runId == runid2) { + assertEquals(flowVersion2, version); + } else if (runId == runid3) { + assertEquals(flowVersion3, version); + } else { + fail("unknown run id: " + runId); + } + } + } + } finally { + hbr.close(); + } + } + + private void checkFlowActivityTableSeveralRuns(String cluster, String user, + String flow, Configuration c1, String flowVersion1, long runid1, + String flowVersion2, long runid2, String flowVersion3, long runid3, + long appCreatedTime) + throws IOException { + Scan s = new Scan(); + s.addFamily(FlowActivityColumnFamily.INFO.getBytes()); + byte[] startRow = + FlowActivityRowKey.getRowKey(cluster, appCreatedTime, user, flow); + s.setStartRow(startRow); + String clusterStop = cluster + "1"; + byte[] stopRow = + FlowActivityRowKey.getRowKey(clusterStop, appCreatedTime, user, flow); + s.setStopRow(stopRow); + Connection conn = ConnectionFactory.createConnection(c1); + Table table1 = conn.getTable(TableName + .valueOf(FlowActivityTable.DEFAULT_TABLE_NAME)); + ResultScanner scanner = table1.getScanner(s); + int rowCount = 0; + for (Result result : scanner) { + assertNotNull(result); + assertTrue(!result.isEmpty()); + byte[] row = result.getRow(); + FlowActivityRowKey flowActivityRowKey = FlowActivityRowKey + .parseRowKey(row); + assertNotNull(flowActivityRowKey); + assertEquals(cluster, flowActivityRowKey.getClusterId()); + assertEquals(user, flowActivityRowKey.getUserId()); + assertEquals(flow, flowActivityRowKey.getFlowName()); + long dayTs = TimelineStorageUtils.getTopOfTheDayTimestamp(appCreatedTime); + assertEquals(dayTs, flowActivityRowKey.getDayTimestamp()); + + Map<byte[], byte[]> values = result + .getFamilyMap(FlowActivityColumnFamily.INFO.getBytes()); + rowCount++; + assertEquals(3, values.size()); + checkFlowActivityRunId(runid1, flowVersion1, values); + checkFlowActivityRunId(runid2, flowVersion2, values); + checkFlowActivityRunId(runid3, flowVersion3, values); + } + // the flow activity table is such that it will insert + // into current day's record + // hence, if this test runs across the midnight boundary, + // it may fail since it would insert into two records + // one for each day + assertEquals(1, rowCount); + } + + private void checkFlowActivityRunId(long runid, String flowVersion, + Map<byte[], byte[]> values) throws IOException { + byte[] rq = ColumnHelper.getColumnQualifier( + FlowActivityColumnPrefix.RUN_ID.getColumnPrefixBytes(), + GenericObjectMapper.write(runid)); + for (Map.Entry<byte[], byte[]> k : values.entrySet()) { + String actualQ = Bytes.toString(k.getKey()); + if (Bytes.toString(rq).equals(actualQ)) { + String actualV = (String) GenericObjectMapper.read(k.getValue()); + assertEquals(flowVersion, actualV); + } + } + } + + @AfterClass + public static void tearDownAfterClass() throws Exception { + util.shutdownMiniCluster(); + } +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/ccdec4a1/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase-tests/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/TestHBaseStorageFlowRun.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase-tests/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/TestHBaseStorageFlowRun.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase-tests/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/TestHBaseStorageFlowRun.java new file mode 100644 index 0000000..801d43c --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase-tests/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/TestHBaseStorageFlowRun.java @@ -0,0 +1,851 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.timelineservice.storage.flow; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; + +import java.io.IOException; +import java.util.EnumSet; +import java.util.List; +import java.util.Map; +import java.util.Set; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.HBaseTestingUtility; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.client.Admin; +import org.apache.hadoop.hbase.client.Connection; +import org.apache.hadoop.hbase.client.ConnectionFactory; +import org.apache.hadoop.hbase.client.Get; +import org.apache.hadoop.hbase.client.Result; +import org.apache.hadoop.hbase.client.ResultScanner; +import org.apache.hadoop.hbase.client.Scan; +import org.apache.hadoop.hbase.client.Table; +import org.apache.hadoop.hbase.regionserver.HRegion; +import org.apache.hadoop.hbase.regionserver.HRegionServer; +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.yarn.api.records.timelineservice.FlowRunEntity; +import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntities; +import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity; +import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntityType; +import org.apache.hadoop.yarn.api.records.timelineservice.TimelineMetric; +import org.apache.hadoop.yarn.server.timelineservice.reader.TimelineDataToRetrieve; +import org.apache.hadoop.yarn.server.timelineservice.reader.TimelineEntityFilters; +import org.apache.hadoop.yarn.server.timelineservice.reader.TimelineReaderContext; +import org.apache.hadoop.yarn.server.timelineservice.reader.filter.TimelineCompareFilter; +import org.apache.hadoop.yarn.server.timelineservice.reader.filter.TimelineCompareOp; +import org.apache.hadoop.yarn.server.timelineservice.reader.filter.TimelineFilterList; +import org.apache.hadoop.yarn.server.timelineservice.reader.filter.TimelineFilterList.Operator; +import org.apache.hadoop.yarn.server.timelineservice.reader.filter.TimelinePrefixFilter; +import org.apache.hadoop.yarn.server.timelineservice.storage.HBaseTimelineReaderImpl; +import org.apache.hadoop.yarn.server.timelineservice.storage.HBaseTimelineWriterImpl; +import org.apache.hadoop.yarn.server.timelineservice.storage.TimelineSchemaCreator; +import org.apache.hadoop.yarn.server.timelineservice.storage.TimelineReader.Field; +import org.apache.hadoop.yarn.server.timelineservice.storage.common.ColumnHelper; +import org.apache.hadoop.yarn.server.timelineservice.storage.common.TimelineStorageUtils; +import org.apache.hadoop.yarn.server.timelineservice.storage.entity.EntityTable; +import org.junit.AfterClass; +import org.junit.BeforeClass; +import org.junit.Test; + +/** + * Tests the FlowRun and FlowActivity Tables + */ +public class TestHBaseStorageFlowRun { + + private static HBaseTestingUtility util; + + private final String metric1 = "MAP_SLOT_MILLIS"; + private final String metric2 = "HDFS_BYTES_READ"; + + @BeforeClass + public static void setupBeforeClass() throws Exception { + util = new HBaseTestingUtility(); + Configuration conf = util.getConfiguration(); + conf.setInt("hfile.format.version", 3); + util.startMiniCluster(); + createSchema(); + } + + private static void createSchema() throws IOException { + TimelineSchemaCreator.createAllTables(util.getConfiguration(), false); + } + + @Test + public void checkCoProcessorOff() throws IOException, InterruptedException { + Configuration hbaseConf = util.getConfiguration(); + TableName table = TableName.valueOf(hbaseConf.get( + FlowRunTable.TABLE_NAME_CONF_NAME, FlowRunTable.DEFAULT_TABLE_NAME)); + Connection conn = null; + conn = ConnectionFactory.createConnection(hbaseConf); + Admin admin = conn.getAdmin(); + if (admin == null) { + throw new IOException("Can't check tables since admin is null"); + } + if (admin.tableExists(table)) { + // check the regions. + // check in flow run table + util.waitUntilAllRegionsAssigned(table); + HRegionServer server = util.getRSForFirstRegionInTable(table); + List<HRegion> regions = server.getOnlineRegions(table); + for (HRegion region : regions) { + assertTrue(TimelineStorageUtils.isFlowRunTable(region.getRegionInfo(), + hbaseConf)); + } + } + + table = TableName.valueOf(hbaseConf.get( + FlowActivityTable.TABLE_NAME_CONF_NAME, + FlowActivityTable.DEFAULT_TABLE_NAME)); + if (admin.tableExists(table)) { + // check the regions. + // check in flow activity table + util.waitUntilAllRegionsAssigned(table); + HRegionServer server = util.getRSForFirstRegionInTable(table); + List<HRegion> regions = server.getOnlineRegions(table); + for (HRegion region : regions) { + assertFalse(TimelineStorageUtils.isFlowRunTable(region.getRegionInfo(), + hbaseConf)); + } + } + + table = TableName.valueOf(hbaseConf.get( + EntityTable.TABLE_NAME_CONF_NAME, + EntityTable.DEFAULT_TABLE_NAME)); + if (admin.tableExists(table)) { + // check the regions. + // check in entity run table + util.waitUntilAllRegionsAssigned(table); + HRegionServer server = util.getRSForFirstRegionInTable(table); + List<HRegion> regions = server.getOnlineRegions(table); + for (HRegion region : regions) { + assertFalse(TimelineStorageUtils.isFlowRunTable(region.getRegionInfo(), + hbaseConf)); + } + } + } + + /** + * Writes 4 timeline entities belonging to one flow run through the + * {@link HBaseTimelineWriterImpl} + * + * Checks the flow run table contents + * + * The first entity has a created event, metrics and a finish event. + * + * The second entity has a created event and this is the entity with smallest + * start time. This should be the start time for the flow run. + * + * The third entity has a finish event and this is the entity with the max end + * time. This should be the end time for the flow run. + * + * The fourth entity has a created event which has a start time that is + * greater than min start time. + * + */ + @Test + public void testWriteFlowRunMinMax() throws Exception { + + TimelineEntities te = new TimelineEntities(); + te.addEntity(TestFlowDataGenerator.getEntity1()); + + HBaseTimelineWriterImpl hbi = null; + Configuration c1 = util.getConfiguration(); + String cluster = "testWriteFlowRunMinMaxToHBase_cluster1"; + String user = "testWriteFlowRunMinMaxToHBase_user1"; + String flow = "testing_flowRun_flow_name"; + String flowVersion = "CF7022C10F1354"; + long runid = 1002345678919L; + String appName = "application_100000000000_1111"; + long minStartTs = 1425026900000L; + long greaterStartTs = 30000000000000L; + long endTs = 1439750690000L; + TimelineEntity entityMinStartTime = TestFlowDataGenerator + .getEntityMinStartTime(minStartTs); + + try { + hbi = new HBaseTimelineWriterImpl(c1); + hbi.init(c1); + hbi.write(cluster, user, flow, flowVersion, runid, appName, te); + + // write another entity with the right min start time + te = new TimelineEntities(); + te.addEntity(entityMinStartTime); + appName = "application_100000000000_3333"; + hbi.write(cluster, user, flow, flowVersion, runid, appName, te); + + // writer another entity for max end time + TimelineEntity entityMaxEndTime = TestFlowDataGenerator + .getEntityMaxEndTime(endTs); + te = new TimelineEntities(); + te.addEntity(entityMaxEndTime); + appName = "application_100000000000_4444"; + hbi.write(cluster, user, flow, flowVersion, runid, appName, te); + + // writer another entity with greater start time + TimelineEntity entityGreaterStartTime = TestFlowDataGenerator + .getEntityGreaterStartTime(greaterStartTs); + te = new TimelineEntities(); + te.addEntity(entityGreaterStartTime); + appName = "application_1000000000000000_2222"; + hbi.write(cluster, user, flow, flowVersion, runid, appName, te); + + // flush everything to hbase + hbi.flush(); + } finally { + hbi.close(); + } + + Connection conn = ConnectionFactory.createConnection(c1); + // check in flow run table + Table table1 = conn.getTable(TableName + .valueOf(FlowRunTable.DEFAULT_TABLE_NAME)); + // scan the table and see that we get back the right min and max + // timestamps + byte[] startRow = FlowRunRowKey.getRowKey(cluster, user, flow, runid); + Get g = new Get(startRow); + g.addColumn(FlowRunColumnFamily.INFO.getBytes(), + FlowRunColumn.MIN_START_TIME.getColumnQualifierBytes()); + g.addColumn(FlowRunColumnFamily.INFO.getBytes(), + FlowRunColumn.MAX_END_TIME.getColumnQualifierBytes()); + Result r1 = table1.get(g); + assertNotNull(r1); + assertTrue(!r1.isEmpty()); + Map<byte[], byte[]> values = r1.getFamilyMap(FlowRunColumnFamily.INFO + .getBytes()); + + assertEquals(2, r1.size()); + long starttime = Bytes.toLong(values.get( + FlowRunColumn.MIN_START_TIME.getColumnQualifierBytes())); + assertEquals(minStartTs, starttime); + assertEquals(endTs, Bytes.toLong(values + .get(FlowRunColumn.MAX_END_TIME.getColumnQualifierBytes()))); + + // use the timeline reader to verify data + HBaseTimelineReaderImpl hbr = null; + try { + hbr = new HBaseTimelineReaderImpl(); + hbr.init(c1); + hbr.start(); + // get the flow run entity + TimelineEntity entity = hbr.getEntity( + new TimelineReaderContext(cluster, user, flow, runid, null, + TimelineEntityType.YARN_FLOW_RUN.toString(), null), + new TimelineDataToRetrieve()); + assertTrue(TimelineEntityType.YARN_FLOW_RUN.matches(entity.getType())); + FlowRunEntity flowRun = (FlowRunEntity)entity; + assertEquals(minStartTs, flowRun.getStartTime()); + assertEquals(endTs, flowRun.getMaxEndTime()); + } finally { + hbr.close(); + } + } + + /** + * Writes two application entities of the same flow run. Each application has + * two metrics: slot millis and hdfs bytes read. Each metric has values at two + * timestamps. + * + * Checks the metric values of the flow in the flow run table. Flow metric + * values should be the sum of individual metric values that belong to the + * latest timestamp for that metric + */ + @Test + public void testWriteFlowRunMetricsOneFlow() throws Exception { + String cluster = "testWriteFlowRunMetricsOneFlow_cluster1"; + String user = "testWriteFlowRunMetricsOneFlow_user1"; + String flow = "testing_flowRun_metrics_flow_name"; + String flowVersion = "CF7022C10F1354"; + long runid = 1002345678919L; + + TimelineEntities te = new TimelineEntities(); + TimelineEntity entityApp1 = TestFlowDataGenerator + .getEntityMetricsApp1(System.currentTimeMillis()); + te.addEntity(entityApp1); + + HBaseTimelineWriterImpl hbi = null; + Configuration c1 = util.getConfiguration(); + try { + hbi = new HBaseTimelineWriterImpl(c1); + hbi.init(c1); + String appName = "application_11111111111111_1111"; + hbi.write(cluster, user, flow, flowVersion, runid, appName, te); + // write another application with same metric to this flow + te = new TimelineEntities(); + TimelineEntity entityApp2 = TestFlowDataGenerator + .getEntityMetricsApp2(System.currentTimeMillis()); + te.addEntity(entityApp2); + appName = "application_11111111111111_2222"; + hbi.write(cluster, user, flow, flowVersion, runid, appName, te); + hbi.flush(); + } finally { + hbi.close(); + } + + // check flow run + checkFlowRunTable(cluster, user, flow, runid, c1); + + // use the timeline reader to verify data + HBaseTimelineReaderImpl hbr = null; + try { + hbr = new HBaseTimelineReaderImpl(); + hbr.init(c1); + hbr.start(); + TimelineEntity entity = hbr.getEntity( + new TimelineReaderContext(cluster, user, flow, runid, null, + TimelineEntityType.YARN_FLOW_RUN.toString(), null), + new TimelineDataToRetrieve()); + assertTrue(TimelineEntityType.YARN_FLOW_RUN.matches(entity.getType())); + Set<TimelineMetric> metrics = entity.getMetrics(); + assertEquals(2, metrics.size()); + for (TimelineMetric metric : metrics) { + String id = metric.getId(); + Map<Long, Number> values = metric.getValues(); + assertEquals(1, values.size()); + Number value = null; + for (Number n : values.values()) { + value = n; + } + switch (id) { + case metric1: + assertEquals(141L, value); + break; + case metric2: + assertEquals(57L, value); + break; + default: + fail("unrecognized metric: " + id); + } + } + } finally { + hbr.close(); + } + } + + private void checkFlowRunTable(String cluster, String user, String flow, + long runid, Configuration c1) throws IOException { + Scan s = new Scan(); + s.addFamily(FlowRunColumnFamily.INFO.getBytes()); + byte[] startRow = FlowRunRowKey.getRowKey(cluster, user, flow, runid); + s.setStartRow(startRow); + String clusterStop = cluster + "1"; + byte[] stopRow = FlowRunRowKey.getRowKey(clusterStop, user, flow, runid); + s.setStopRow(stopRow); + Connection conn = ConnectionFactory.createConnection(c1); + Table table1 = conn.getTable(TableName + .valueOf(FlowRunTable.DEFAULT_TABLE_NAME)); + ResultScanner scanner = table1.getScanner(s); + + int rowCount = 0; + for (Result result : scanner) { + assertNotNull(result); + assertTrue(!result.isEmpty()); + Map<byte[], byte[]> values = result.getFamilyMap(FlowRunColumnFamily.INFO + .getBytes()); + rowCount++; + // check metric1 + byte[] q = ColumnHelper.getColumnQualifier( + FlowRunColumnPrefix.METRIC.getColumnPrefixBytes(), metric1); + assertTrue(values.containsKey(q)); + assertEquals(141L, Bytes.toLong(values.get(q))); + + // check metric2 + assertEquals(3, values.size()); + q = ColumnHelper.getColumnQualifier( + FlowRunColumnPrefix.METRIC.getColumnPrefixBytes(), metric2); + assertTrue(values.containsKey(q)); + assertEquals(57L, Bytes.toLong(values.get(q))); + } + assertEquals(1, rowCount); + } + + @Test + public void testWriteFlowRunMetricsPrefix() throws Exception { + String cluster = "testWriteFlowRunMetricsPrefix_cluster1"; + String user = "testWriteFlowRunMetricsPrefix_user1"; + String flow = "testWriteFlowRunMetricsPrefix_flow_name"; + String flowVersion = "CF7022C10F1354"; + + TimelineEntities te = new TimelineEntities(); + TimelineEntity entityApp1 = TestFlowDataGenerator + .getEntityMetricsApp1(System.currentTimeMillis()); + te.addEntity(entityApp1); + + HBaseTimelineWriterImpl hbi = null; + Configuration c1 = util.getConfiguration(); + try { + hbi = new HBaseTimelineWriterImpl(c1); + hbi.init(c1); + String appName = "application_11111111111111_1111"; + hbi.write(cluster, user, flow, flowVersion, 1002345678919L, appName, te); + // write another application with same metric to this flow + te = new TimelineEntities(); + TimelineEntity entityApp2 = TestFlowDataGenerator + .getEntityMetricsApp2(System.currentTimeMillis()); + te.addEntity(entityApp2); + appName = "application_11111111111111_2222"; + hbi.write(cluster, user, flow, flowVersion, 1002345678918L, appName, te); + hbi.flush(); + } finally { + hbi.close(); + } + + // use the timeline reader to verify data + HBaseTimelineReaderImpl hbr = null; + try { + hbr = new HBaseTimelineReaderImpl(); + hbr.init(c1); + hbr.start(); + TimelineFilterList metricsToRetrieve = new TimelineFilterList( + Operator.OR, new TimelinePrefixFilter(TimelineCompareOp.EQUAL, + metric1.substring(0, metric1.indexOf("_") + 1))); + TimelineEntity entity = hbr.getEntity( + new TimelineReaderContext(cluster, user, flow, 1002345678919L, null, + TimelineEntityType.YARN_FLOW_RUN.toString(), null), + new TimelineDataToRetrieve(null, metricsToRetrieve, null)); + assertTrue(TimelineEntityType.YARN_FLOW_RUN.matches(entity.getType())); + Set<TimelineMetric> metrics = entity.getMetrics(); + assertEquals(1, metrics.size()); + for (TimelineMetric metric : metrics) { + String id = metric.getId(); + Map<Long, Number> values = metric.getValues(); + assertEquals(1, values.size()); + Number value = null; + for (Number n : values.values()) { + value = n; + } + switch (id) { + case metric1: + assertEquals(40L, value); + break; + default: + fail("unrecognized metric: " + id); + } + } + + Set<TimelineEntity> entities = hbr.getEntities( + new TimelineReaderContext(cluster, user, flow, null, null, + TimelineEntityType.YARN_FLOW_RUN.toString(), null), + new TimelineEntityFilters(), + new TimelineDataToRetrieve(null, metricsToRetrieve, null)); + assertEquals(2, entities.size()); + int metricCnt = 0; + for (TimelineEntity timelineEntity : entities) { + metricCnt += timelineEntity.getMetrics().size(); + } + assertEquals(2, metricCnt); + } finally { + hbr.close(); + } + } + + @Test + public void testWriteFlowRunsMetricFields() throws Exception { + String cluster = "testWriteFlowRunsMetricFields_cluster1"; + String user = "testWriteFlowRunsMetricFields_user1"; + String flow = "testWriteFlowRunsMetricFields_flow_name"; + String flowVersion = "CF7022C10F1354"; + long runid = 1002345678919L; + + TimelineEntities te = new TimelineEntities(); + TimelineEntity entityApp1 = TestFlowDataGenerator + .getEntityMetricsApp1(System.currentTimeMillis()); + te.addEntity(entityApp1); + + HBaseTimelineWriterImpl hbi = null; + Configuration c1 = util.getConfiguration(); + try { + hbi = new HBaseTimelineWriterImpl(c1); + hbi.init(c1); + String appName = "application_11111111111111_1111"; + hbi.write(cluster, user, flow, flowVersion, runid, appName, te); + // write another application with same metric to this flow + te = new TimelineEntities(); + TimelineEntity entityApp2 = TestFlowDataGenerator + .getEntityMetricsApp2(System.currentTimeMillis()); + te.addEntity(entityApp2); + appName = "application_11111111111111_2222"; + hbi.write(cluster, user, flow, flowVersion, runid, appName, te); + hbi.flush(); + } finally { + hbi.close(); + } + + // check flow run + checkFlowRunTable(cluster, user, flow, runid, c1); + + // use the timeline reader to verify data + HBaseTimelineReaderImpl hbr = null; + try { + hbr = new HBaseTimelineReaderImpl(); + hbr.init(c1); + hbr.start(); + Set<TimelineEntity> entities = hbr.getEntities( + new TimelineReaderContext(cluster, user, flow, runid, null, + TimelineEntityType.YARN_FLOW_RUN.toString(), null), + new TimelineEntityFilters(), + new TimelineDataToRetrieve()); + assertEquals(1, entities.size()); + for (TimelineEntity timelineEntity : entities) { + assertEquals(0, timelineEntity.getMetrics().size()); + } + + entities = hbr.getEntities( + new TimelineReaderContext(cluster, user, flow, runid, null, + TimelineEntityType.YARN_FLOW_RUN.toString(), null), + new TimelineEntityFilters(), + new TimelineDataToRetrieve(null, null, EnumSet.of(Field.METRICS))); + assertEquals(1, entities.size()); + for (TimelineEntity timelineEntity : entities) { + Set<TimelineMetric> timelineMetrics = timelineEntity.getMetrics(); + assertEquals(2, timelineMetrics.size()); + for (TimelineMetric metric : timelineMetrics) { + String id = metric.getId(); + Map<Long, Number> values = metric.getValues(); + assertEquals(1, values.size()); + Number value = null; + for (Number n : values.values()) { + value = n; + } + switch (id) { + case metric1: + assertEquals(141L, value); + break; + case metric2: + assertEquals(57L, value); + break; + default: + fail("unrecognized metric: " + id); + } + } + } + } finally { + hbr.close(); + } + } + + @Test + public void testWriteFlowRunFlush() throws Exception { + String cluster = "atestFlushFlowRun_cluster1"; + String user = "atestFlushFlowRun__user1"; + String flow = "atestFlushFlowRun_flow_name"; + String flowVersion = "AF1021C19F1351"; + long runid = 1449526652000L; + + int start = 10; + int count = 20000; + int appIdSuffix = 1; + HBaseTimelineWriterImpl hbi = null; + long insertTs = 1449796654827L - count; + long minTS = insertTs + 1; + long startTs = insertTs; + Configuration c1 = util.getConfiguration(); + TimelineEntities te1 = null; + TimelineEntity entityApp1 = null; + TimelineEntity entityApp2 = null; + try { + hbi = new HBaseTimelineWriterImpl(c1); + hbi.init(c1); + + for (int i = start; i < count; i++) { + String appName = "application_1060350000000_" + appIdSuffix; + insertTs++; + te1 = new TimelineEntities(); + entityApp1 = TestFlowDataGenerator.getMinFlushEntity(insertTs); + te1.addEntity(entityApp1); + entityApp2 = TestFlowDataGenerator.getMaxFlushEntity(insertTs); + te1.addEntity(entityApp2); + hbi.write(cluster, user, flow, flowVersion, runid, appName, te1); + Thread.sleep(1); + + appName = "application_1001199480000_7" + appIdSuffix; + insertTs++; + appIdSuffix++; + te1 = new TimelineEntities(); + entityApp1 = TestFlowDataGenerator.getMinFlushEntity(insertTs); + te1.addEntity(entityApp1); + entityApp2 = TestFlowDataGenerator.getMaxFlushEntity(insertTs); + te1.addEntity(entityApp2); + + hbi.write(cluster, user, flow, flowVersion, runid, appName, te1); + if (i % 1000 == 0) { + hbi.flush(); + checkMinMaxFlush(c1, minTS, startTs, count, cluster, user, flow, + runid, false); + } + } + } finally { + hbi.flush(); + hbi.close(); + checkMinMaxFlush(c1, minTS, startTs, count, cluster, user, flow, runid, + true); + } + } + + private void checkMinMaxFlush(Configuration c1, long minTS, long startTs, + int count, String cluster, String user, String flow, long runid, + boolean checkMax) throws IOException { + Connection conn = ConnectionFactory.createConnection(c1); + // check in flow run table + Table table1 = conn.getTable(TableName + .valueOf(FlowRunTable.DEFAULT_TABLE_NAME)); + // scan the table and see that we get back the right min and max + // timestamps + byte[] startRow = FlowRunRowKey.getRowKey(cluster, user, flow, runid); + Get g = new Get(startRow); + g.addColumn(FlowRunColumnFamily.INFO.getBytes(), + FlowRunColumn.MIN_START_TIME.getColumnQualifierBytes()); + g.addColumn(FlowRunColumnFamily.INFO.getBytes(), + FlowRunColumn.MAX_END_TIME.getColumnQualifierBytes()); + + Result r1 = table1.get(g); + assertNotNull(r1); + assertTrue(!r1.isEmpty()); + Map<byte[], byte[]> values = r1.getFamilyMap(FlowRunColumnFamily.INFO + .getBytes()); + int start = 10; + assertEquals(2, r1.size()); + long starttime = Bytes.toLong(values + .get(FlowRunColumn.MIN_START_TIME.getColumnQualifierBytes())); + assertEquals(minTS, starttime); + if (checkMax) { + assertEquals(startTs + 2 * (count - start) + + TestFlowDataGenerator.END_TS_INCR, + Bytes.toLong(values + .get(FlowRunColumn.MAX_END_TIME.getColumnQualifierBytes()))); + } + } + + @Test + public void testFilterFlowRunsByCreatedTime() throws Exception { + String cluster = "cluster2"; + String user = "user2"; + String flow = "flow_name2"; + + TimelineEntities te = new TimelineEntities(); + TimelineEntity entityApp1 = TestFlowDataGenerator.getEntityMetricsApp1( + System.currentTimeMillis()); + entityApp1.setCreatedTime(1425016501000L); + te.addEntity(entityApp1); + + HBaseTimelineWriterImpl hbi = null; + Configuration c1 = util.getConfiguration(); + try { + hbi = new HBaseTimelineWriterImpl(c1); + hbi.init(c1); + hbi.write(cluster, user, flow, "CF7022C10F1354", 1002345678919L, + "application_11111111111111_1111", te); + // write another application with same metric to this flow + te = new TimelineEntities(); + TimelineEntity entityApp2 = TestFlowDataGenerator.getEntityMetricsApp2( + System.currentTimeMillis()); + entityApp2.setCreatedTime(1425016502000L); + te.addEntity(entityApp2); + hbi.write(cluster, user, flow, "CF7022C10F1354", 1002345678918L, + "application_11111111111111_2222", te); + hbi.flush(); + } finally { + hbi.close(); + } + + // use the timeline reader to verify data + HBaseTimelineReaderImpl hbr = null; + try { + hbr = new HBaseTimelineReaderImpl(); + hbr.init(c1); + hbr.start(); + + Set<TimelineEntity> entities = hbr.getEntities( + new TimelineReaderContext(cluster, user, flow, + null, null, TimelineEntityType.YARN_FLOW_RUN.toString(), null), + new TimelineEntityFilters(null, 1425016501000L, 1425016502001L, null, + null, null, null, null, null), new TimelineDataToRetrieve()); + assertEquals(2, entities.size()); + for (TimelineEntity entity : entities) { + if (!entity.getId().equals("user2@flow_name2/1002345678918") && + !entity.getId().equals("user2@flow_name2/1002345678919")) { + fail("Entities with flow runs 1002345678918 and 1002345678919" + + "should be present."); + } + } + entities = hbr.getEntities( + new TimelineReaderContext(cluster, user, flow, null, null, + TimelineEntityType.YARN_FLOW_RUN.toString(), null), + new TimelineEntityFilters(null, 1425016501050L, null, null, null, + null, null, null, null), new TimelineDataToRetrieve()); + assertEquals(1, entities.size()); + for (TimelineEntity entity : entities) { + if (!entity.getId().equals("user2@flow_name2/1002345678918")) { + fail("Entity with flow run 1002345678918 should be present."); + } + } + entities = hbr.getEntities( + new TimelineReaderContext(cluster, user, flow, null, null, + TimelineEntityType.YARN_FLOW_RUN.toString(), null), + new TimelineEntityFilters(null, null, 1425016501050L, null, null, + null, null, null, null), new TimelineDataToRetrieve()); + assertEquals(1, entities.size()); + for (TimelineEntity entity : entities) { + if (!entity.getId().equals("user2@flow_name2/1002345678919")) { + fail("Entity with flow run 1002345678919 should be present."); + } + } + } finally { + hbr.close(); + } + } + + @Test + public void testMetricFilters() throws Exception { + String cluster = "cluster1"; + String user = "user1"; + String flow = "flow_name1"; + + TimelineEntities te = new TimelineEntities(); + TimelineEntity entityApp1 = TestFlowDataGenerator.getEntityMetricsApp1( + System.currentTimeMillis()); + te.addEntity(entityApp1); + + HBaseTimelineWriterImpl hbi = null; + Configuration c1 = util.getConfiguration(); + try { + hbi = new HBaseTimelineWriterImpl(c1); + hbi.init(c1); + hbi.write(cluster, user, flow, "CF7022C10F1354", 1002345678919L, + "application_11111111111111_1111", te); + // write another application with same metric to this flow + te = new TimelineEntities(); + TimelineEntity entityApp2 = TestFlowDataGenerator.getEntityMetricsApp2( + System.currentTimeMillis()); + te.addEntity(entityApp2); + hbi.write(cluster, user, flow, "CF7022C10F1354", 1002345678918L, + "application_11111111111111_2222", te); + hbi.flush(); + } finally { + hbi.close(); + } + + // use the timeline reader to verify data + HBaseTimelineReaderImpl hbr = null; + try { + hbr = new HBaseTimelineReaderImpl(); + hbr.init(c1); + hbr.start(); + + TimelineFilterList list1 = new TimelineFilterList(); + list1.addFilter(new TimelineCompareFilter( + TimelineCompareOp.GREATER_OR_EQUAL, metric1, 101)); + TimelineFilterList list2 = new TimelineFilterList(); + list2.addFilter(new TimelineCompareFilter( + TimelineCompareOp.LESS_THAN, metric1, 43)); + list2.addFilter(new TimelineCompareFilter( + TimelineCompareOp.EQUAL, metric2, 57)); + TimelineFilterList metricFilterList = + new TimelineFilterList(Operator.OR, list1, list2); + Set<TimelineEntity> entities = hbr.getEntities( + new TimelineReaderContext(cluster, user, flow, null, + null, TimelineEntityType.YARN_FLOW_RUN.toString(), null), + new TimelineEntityFilters(null, null, null, null, null, null, null, + metricFilterList, null), + new TimelineDataToRetrieve(null, null, EnumSet.of(Field.METRICS))); + assertEquals(2, entities.size()); + int metricCnt = 0; + for (TimelineEntity entity : entities) { + metricCnt += entity.getMetrics().size(); + } + assertEquals(3, metricCnt); + + TimelineFilterList metricFilterList1 = new TimelineFilterList( + new TimelineCompareFilter( + TimelineCompareOp.LESS_OR_EQUAL, metric1, 127), + new TimelineCompareFilter(TimelineCompareOp.NOT_EQUAL, metric2, 30)); + entities = hbr.getEntities( + new TimelineReaderContext(cluster, user, flow, null, null, + TimelineEntityType.YARN_FLOW_RUN.toString(), null), + new TimelineEntityFilters(null, null, null, null, null, null, null, + metricFilterList1, null), + new TimelineDataToRetrieve(null, null, EnumSet.of(Field.METRICS))); + assertEquals(1, entities.size()); + metricCnt = 0; + for (TimelineEntity entity : entities) { + metricCnt += entity.getMetrics().size(); + } + assertEquals(2, metricCnt); + + TimelineFilterList metricFilterList2 = new TimelineFilterList( + new TimelineCompareFilter(TimelineCompareOp.LESS_THAN, metric1, 32), + new TimelineCompareFilter(TimelineCompareOp.NOT_EQUAL, metric2, 57)); + entities = hbr.getEntities( + new TimelineReaderContext(cluster, user, flow, null, null, + TimelineEntityType.YARN_FLOW_RUN.toString(), null), + new TimelineEntityFilters(null, null, null, null, null, null, null, + metricFilterList2, null), + new TimelineDataToRetrieve(null, null, EnumSet.of(Field.METRICS))); + assertEquals(0, entities.size()); + + TimelineFilterList metricFilterList3 = new TimelineFilterList( + new TimelineCompareFilter(TimelineCompareOp.EQUAL, "s_metric", 32)); + entities = hbr.getEntities( + new TimelineReaderContext(cluster, user, flow, null, null, + TimelineEntityType.YARN_FLOW_RUN.toString(), null), + new TimelineEntityFilters(null, null, null, null, null, null, null, + metricFilterList3, null), + new TimelineDataToRetrieve(null, null, EnumSet.of(Field.METRICS))); + assertEquals(0, entities.size()); + + TimelineFilterList list3 = new TimelineFilterList(); + list3.addFilter(new TimelineCompareFilter( + TimelineCompareOp.GREATER_OR_EQUAL, metric1, 101)); + TimelineFilterList list4 = new TimelineFilterList(); + list4.addFilter(new TimelineCompareFilter( + TimelineCompareOp.LESS_THAN, metric1, 43)); + list4.addFilter(new TimelineCompareFilter( + TimelineCompareOp.EQUAL, metric2, 57)); + TimelineFilterList metricFilterList4 = + new TimelineFilterList(Operator.OR, list3, list4); + TimelineFilterList metricsToRetrieve = new TimelineFilterList(Operator.OR, + new TimelinePrefixFilter(TimelineCompareOp.EQUAL, + metric2.substring(0, metric2.indexOf("_") + 1))); + entities = hbr.getEntities( + new TimelineReaderContext(cluster, user, flow, null, null, + TimelineEntityType.YARN_FLOW_RUN.toString(), null), + new TimelineEntityFilters(null, null, null, null, null, null, null, + metricFilterList4, null), + new TimelineDataToRetrieve(null, metricsToRetrieve, + EnumSet.of(Field.ALL))); + assertEquals(2, entities.size()); + metricCnt = 0; + for (TimelineEntity entity : entities) { + metricCnt += entity.getMetrics().size(); + } + assertEquals(1, metricCnt); + } finally { + hbr.close(); + } + } + + @AfterClass + public static void tearDownAfterClass() throws Exception { + util.shutdownMiniCluster(); + } +} --------------------------------------------------------------------- To unsubscribe, e-mail: common-commits-unsubscr...@hadoop.apache.org For additional commands, e-mail: common-commits-h...@hadoop.apache.org