http://git-wip-us.apache.org/repos/asf/hadoop/blob/47ffa5f8/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/TestPhoenixOfflineAggregationWriterImpl.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/TestPhoenixOfflineAggregationWriterImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/TestPhoenixOfflineAggregationWriterImpl.java deleted file mode 100644 index 58d5e61..0000000 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/TestPhoenixOfflineAggregationWriterImpl.java +++ /dev/null @@ -1,161 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.hadoop.yarn.server.timelineservice.storage; - -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertNotNull; -import static org.junit.Assert.assertTrue; -import static org.junit.Assert.fail; - -import org.apache.hadoop.hbase.IntegrationTestingUtility; -import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntities; -import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity; -import org.apache.hadoop.yarn.api.records.timelineservice.TimelineMetric; -import org.apache.hadoop.yarn.conf.YarnConfiguration; -import org.apache.hadoop.yarn.server.timelineservice.collector.TimelineCollectorContext; -import org.apache.hadoop.yarn.server.timelineservice.storage.common.OfflineAggregationInfo; -import org.junit.AfterClass; -import org.junit.BeforeClass; -import org.junit.Test; -import org.apache.phoenix.hbase.index.write.IndexWriterUtils; -import org.apache.phoenix.query.BaseTest; -import org.apache.phoenix.query.QueryServices; -import org.apache.phoenix.util.ReadOnlyProps; - -import java.sql.ResultSet; -import java.sql.SQLException; -import java.sql.Statement; -import java.util.HashMap; -import java.util.Map; - -import static org.apache.phoenix.util.TestUtil.TEST_PROPERTIES; - -public class TestPhoenixOfflineAggregationWriterImpl extends BaseTest { - private static PhoenixOfflineAggregationWriterImpl storage; - private static final int BATCH_SIZE = 3; - - @BeforeClass - public static void setup() throws Exception { - YarnConfiguration conf = new YarnConfiguration(); - storage = setupPhoenixClusterAndWriterForTest(conf); - } - - @Test(timeout = 90000) - public void testFlowLevelAggregationStorage() throws Exception { - testAggregator(OfflineAggregationInfo.FLOW_AGGREGATION); - } - - @Test(timeout = 90000) - public void testUserLevelAggregationStorage() throws Exception { - testAggregator(OfflineAggregationInfo.USER_AGGREGATION); - } - - @AfterClass - public static void cleanup() throws Exception { - storage.dropTable(OfflineAggregationInfo.FLOW_AGGREGATION_TABLE_NAME); - storage.dropTable(OfflineAggregationInfo.USER_AGGREGATION_TABLE_NAME); - tearDownMiniCluster(); - } - - private static PhoenixOfflineAggregationWriterImpl - setupPhoenixClusterAndWriterForTest(YarnConfiguration conf) - throws Exception{ - Map<String, String> props = new HashMap<>(); - // Must update config before starting server - props.put(QueryServices.STATS_USE_CURRENT_TIME_ATTRIB, - Boolean.FALSE.toString()); - props.put("java.security.krb5.realm", ""); - props.put("java.security.krb5.kdc", ""); - props.put(IntegrationTestingUtility.IS_DISTRIBUTED_CLUSTER, - Boolean.FALSE.toString()); - props.put(QueryServices.QUEUE_SIZE_ATTRIB, Integer.toString(5000)); - props.put(IndexWriterUtils.HTABLE_THREAD_KEY, Integer.toString(100)); - // Make a small batch size to test multiple calls to reserve sequences - props.put(QueryServices.SEQUENCE_CACHE_SIZE_ATTRIB, - Long.toString(BATCH_SIZE)); - // Must update config before starting server - setUpTestDriver(new ReadOnlyProps(props.entrySet().iterator())); - - // Change connection settings for test - conf.set( - YarnConfiguration.PHOENIX_OFFLINE_STORAGE_CONN_STR, - getUrl()); - PhoenixOfflineAggregationWriterImpl - myWriter = new PhoenixOfflineAggregationWriterImpl(TEST_PROPERTIES); - myWriter.init(conf); - myWriter.start(); - myWriter.createPhoenixTables(); - return myWriter; - } - - private static TimelineEntity getTestAggregationTimelineEntity() { - TimelineEntity entity = new TimelineEntity(); - String id = "hello1"; - String type = "testAggregationType"; - entity.setId(id); - entity.setType(type); - entity.setCreatedTime(1425016501000L); - - TimelineMetric metric = new TimelineMetric(); - metric.setId("HDFS_BYTES_READ"); - metric.addValue(1425016501100L, 8000); - entity.addMetric(metric); - - return entity; - } - - private void testAggregator(OfflineAggregationInfo aggregationInfo) - throws Exception { - // Set up a list of timeline entities and write them back to Phoenix - int numEntity = 1; - TimelineEntities te = new TimelineEntities(); - te.addEntity(getTestAggregationTimelineEntity()); - TimelineCollectorContext context = new TimelineCollectorContext("cluster_1", - "user1", "testFlow", null, 0L, null); - storage.writeAggregatedEntity(context, te, - aggregationInfo); - - // Verify if we're storing all entities - String[] primaryKeyList = aggregationInfo.getPrimaryKeyList(); - String sql = "SELECT COUNT(" + primaryKeyList[primaryKeyList.length - 1] - +") FROM " + aggregationInfo.getTableName(); - verifySQLWithCount(sql, numEntity, "Number of entities should be "); - // Check metric - sql = "SELECT COUNT(m.HDFS_BYTES_READ) FROM " - + aggregationInfo.getTableName() + "(m.HDFS_BYTES_READ VARBINARY) "; - verifySQLWithCount(sql, numEntity, - "Number of entities with info should be "); - } - - - private void verifySQLWithCount(String sql, int targetCount, String message) - throws Exception { - try ( - Statement stmt = - storage.getConnection().createStatement(); - ResultSet rs = stmt.executeQuery(sql)) { - assertTrue("Result set empty on statement " + sql, rs.next()); - assertNotNull("Fail to execute query " + sql, rs); - assertEquals(message + " " + targetCount, targetCount, rs.getInt(1)); - } catch (SQLException se) { - fail("SQL exception on query: " + sql - + " With exception message: " + se.getLocalizedMessage()); - } - } -}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/47ffa5f8/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/TestFlowDataGenerator.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/TestFlowDataGenerator.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/TestFlowDataGenerator.java deleted file mode 100644 index 3b8036d..0000000 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/TestFlowDataGenerator.java +++ /dev/null @@ -1,383 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.hadoop.yarn.server.timelineservice.storage.flow; -import java.util.HashMap; -import java.util.HashSet; -import java.util.Map; -import java.util.Set; - -import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity; -import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntityType; -import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEvent; -import org.apache.hadoop.yarn.api.records.timelineservice.TimelineMetric; -import org.apache.hadoop.yarn.api.records.timelineservice.TimelineMetric.Type; -import org.apache.hadoop.yarn.server.metrics.ApplicationMetricsConstants; -import org.apache.hadoop.conf.Configuration; - -/** - * Generates the data/entities for the FlowRun and FlowActivity Tables - */ -class TestFlowDataGenerator { - - private static final String metric1 = "MAP_SLOT_MILLIS"; - private static final String metric2 = "HDFS_BYTES_READ"; - public static final long END_TS_INCR = 10000L; - - static TimelineEntity getEntityMetricsApp1(long insertTs, Configuration c1) { - TimelineEntity entity = new TimelineEntity(); - String id = "flowRunMetrics_test"; - String type = TimelineEntityType.YARN_APPLICATION.toString(); - entity.setId(id); - entity.setType(type); - long cTime = 1425016501000L; - entity.setCreatedTime(cTime); - - // add metrics - Set<TimelineMetric> metrics = new HashSet<>(); - TimelineMetric m1 = new TimelineMetric(); - m1.setId(metric1); - Map<Long, Number> metricValues = new HashMap<Long, Number>(); - long ts = insertTs; - - for (int k=1; k< 100 ; k++) { - metricValues.put(ts - k*200000, 20L); - } - metricValues.put(ts - 80000, 40L); - m1.setType(Type.TIME_SERIES); - m1.setValues(metricValues); - metrics.add(m1); - - TimelineMetric m2 = new TimelineMetric(); - m2.setId(metric2); - metricValues = new HashMap<Long, Number>(); - ts = System.currentTimeMillis(); - for (int k=1; k< 100 ; k++) { - metricValues.put(ts - k*100000, 31L); - } - - metricValues.put(ts - 80000, 57L); - m2.setType(Type.TIME_SERIES); - m2.setValues(metricValues); - metrics.add(m2); - - entity.addMetrics(metrics); - return entity; - } - - - static TimelineEntity getEntityMetricsApp1Complete(long insertTs, Configuration c1) { - TimelineEntity entity = new TimelineEntity(); - String id = "flowRunMetrics_test"; - String type = TimelineEntityType.YARN_APPLICATION.toString(); - entity.setId(id); - entity.setType(type); - long cTime = 1425016501000L; - entity.setCreatedTime(cTime); - - // add metrics - Set<TimelineMetric> metrics = new HashSet<>(); - TimelineMetric m1 = new TimelineMetric(); - m1.setId(metric1); - Map<Long, Number> metricValues = new HashMap<Long, Number>(); - long ts = insertTs; - - metricValues.put(ts - 80000, 40L); - m1.setType(Type.TIME_SERIES); - m1.setValues(metricValues); - metrics.add(m1); - - TimelineMetric m2 = new TimelineMetric(); - m2.setId(metric2); - metricValues = new HashMap<Long, Number>(); - ts = insertTs; - metricValues.put(ts - 80000, 57L); - m2.setType(Type.TIME_SERIES); - m2.setValues(metricValues); - metrics.add(m2); - - entity.addMetrics(metrics); - - TimelineEvent event = new TimelineEvent(); - event.setId(ApplicationMetricsConstants.FINISHED_EVENT_TYPE); - event.setTimestamp(insertTs); - event.addInfo("done", "insertTs=" + insertTs); - entity.addEvent(event); - return entity; - } - - - static TimelineEntity getEntityMetricsApp1(long insertTs) { - TimelineEntity entity = new TimelineEntity(); - String id = "flowRunMetrics_test"; - String type = TimelineEntityType.YARN_APPLICATION.toString(); - entity.setId(id); - entity.setType(type); - long cTime = 1425016501000L; - entity.setCreatedTime(cTime); - - // add metrics - Set<TimelineMetric> metrics = new HashSet<>(); - TimelineMetric m1 = new TimelineMetric(); - m1.setId(metric1); - Map<Long, Number> metricValues = new HashMap<Long, Number>(); - long ts = insertTs; - metricValues.put(ts - 100000, 2L); - metricValues.put(ts - 80000, 40L); - m1.setType(Type.TIME_SERIES); - m1.setValues(metricValues); - metrics.add(m1); - - TimelineMetric m2 = new TimelineMetric(); - m2.setId(metric2); - metricValues = new HashMap<Long, Number>(); - ts = insertTs; - metricValues.put(ts - 100000, 31L); - metricValues.put(ts - 80000, 57L); - m2.setType(Type.TIME_SERIES); - m2.setValues(metricValues); - metrics.add(m2); - - entity.addMetrics(metrics); - TimelineEvent event = new TimelineEvent(); - event.setId(ApplicationMetricsConstants.CREATED_EVENT_TYPE); - long endTs = 1439379885000L; - event.setTimestamp(endTs); - String expKey = "foo_event_greater"; - String expVal = "test_app_greater"; - event.addInfo(expKey, expVal); - entity.addEvent(event); - return entity; - } - - - static TimelineEntity getEntityMetricsApp2(long insertTs) { - TimelineEntity entity = new TimelineEntity(); - String id = "flowRunMetrics_test"; - String type = TimelineEntityType.YARN_APPLICATION.toString(); - entity.setId(id); - entity.setType(type); - long cTime = 1425016501000L; - entity.setCreatedTime(cTime); - // add metrics - Set<TimelineMetric> metrics = new HashSet<>(); - TimelineMetric m1 = new TimelineMetric(); - m1.setId(metric1); - Map<Long, Number> metricValues = new HashMap<Long, Number>(); - long ts = insertTs; - metricValues.put(ts - 100000, 5L); - metricValues.put(ts - 80000, 101L); - m1.setType(Type.TIME_SERIES); - m1.setValues(metricValues); - metrics.add(m1); - entity.addMetrics(metrics); - TimelineEvent event = new TimelineEvent(); - event.setId(ApplicationMetricsConstants.CREATED_EVENT_TYPE); - long endTs = 1439379885000L; - event.setTimestamp(endTs); - String expKey = "foo_event_greater"; - String expVal = "test_app_greater"; - event.addInfo(expKey, expVal); - entity.addEvent(event); - return entity; - } - - static TimelineEntity getEntity1() { - TimelineEntity entity = new TimelineEntity(); - String id = "flowRunHello"; - String type = TimelineEntityType.YARN_APPLICATION.toString(); - entity.setId(id); - entity.setType(type); - long cTime = 1425026901000L; - entity.setCreatedTime(cTime); - // add metrics - Set<TimelineMetric> metrics = new HashSet<>(); - TimelineMetric m1 = new TimelineMetric(); - m1.setId(metric1); - Map<Long, Number> metricValues = new HashMap<Long, Number>(); - long ts = System.currentTimeMillis(); - metricValues.put(ts - 120000, 100000000L); - metricValues.put(ts - 100000, 200000000L); - metricValues.put(ts - 80000, 300000000L); - metricValues.put(ts - 60000, 400000000L); - metricValues.put(ts - 40000, 50000000000L); - metricValues.put(ts - 20000, 60000000000L); - m1.setType(Type.TIME_SERIES); - m1.setValues(metricValues); - metrics.add(m1); - entity.addMetrics(metrics); - - TimelineEvent event = new TimelineEvent(); - event.setId(ApplicationMetricsConstants.CREATED_EVENT_TYPE); - event.setTimestamp(cTime); - String expKey = "foo_event"; - Object expVal = "test"; - event.addInfo(expKey, expVal); - entity.addEvent(event); - - event = new TimelineEvent(); - event.setId(ApplicationMetricsConstants.FINISHED_EVENT_TYPE); - long expTs = cTime + 21600000;// start time + 6hrs - event.setTimestamp(expTs); - event.addInfo(expKey, expVal); - entity.addEvent(event); - - return entity; - } - - static TimelineEntity getAFullEntity(long ts, long endTs) { - TimelineEntity entity = new TimelineEntity(); - String id = "flowRunFullEntity"; - String type = TimelineEntityType.YARN_APPLICATION.toString(); - entity.setId(id); - entity.setType(type); - entity.setCreatedTime(ts); - // add metrics - Set<TimelineMetric> metrics = new HashSet<>(); - TimelineMetric m1 = new TimelineMetric(); - m1.setId(metric1); - Map<Long, Number> metricValues = new HashMap<Long, Number>(); - metricValues.put(ts - 120000, 100000000L); - metricValues.put(ts - 100000, 200000000L); - metricValues.put(ts - 80000, 300000000L); - metricValues.put(ts - 60000, 400000000L); - metricValues.put(ts - 40000, 50000000000L); - metricValues.put(ts - 20000, 60000000000L); - m1.setType(Type.TIME_SERIES); - m1.setValues(metricValues); - metrics.add(m1); - TimelineMetric m2 = new TimelineMetric(); - m2.setId(metric2); - metricValues = new HashMap<Long, Number>(); - metricValues.put(ts - 900000, 31L); - metricValues.put(ts - 30000, 57L); - m2.setType(Type.TIME_SERIES); - m2.setValues(metricValues); - metrics.add(m2); - entity.addMetrics(metrics); - - TimelineEvent event = new TimelineEvent(); - event.setId(ApplicationMetricsConstants.CREATED_EVENT_TYPE); - event.setTimestamp(ts); - String expKey = "foo_event"; - Object expVal = "test"; - event.addInfo(expKey, expVal); - entity.addEvent(event); - - event = new TimelineEvent(); - event.setId(ApplicationMetricsConstants.FINISHED_EVENT_TYPE); - long expTs = ts + 21600000;// start time + 6hrs - event.setTimestamp(expTs); - event.addInfo(expKey, expVal); - entity.addEvent(event); - - return entity; - } - - static TimelineEntity getEntityGreaterStartTime(long startTs) { - TimelineEntity entity = new TimelineEntity(); - entity.setCreatedTime(startTs); - entity.setId("flowRunHello with greater start time"); - String type = TimelineEntityType.YARN_APPLICATION.toString(); - entity.setType(type); - TimelineEvent event = new TimelineEvent(); - event.setId(ApplicationMetricsConstants.CREATED_EVENT_TYPE); - event.setTimestamp(startTs); - String expKey = "foo_event_greater"; - String expVal = "test_app_greater"; - event.addInfo(expKey, expVal); - entity.addEvent(event); - return entity; - } - - static TimelineEntity getEntityMaxEndTime(long endTs) { - TimelineEntity entity = new TimelineEntity(); - entity.setId("flowRunHello Max End time"); - entity.setType(TimelineEntityType.YARN_APPLICATION.toString()); - TimelineEvent event = new TimelineEvent(); - event.setId(ApplicationMetricsConstants.FINISHED_EVENT_TYPE); - event.setTimestamp(endTs); - String expKey = "foo_even_max_ finished"; - String expVal = "test_app_max_finished"; - event.addInfo(expKey, expVal); - entity.addEvent(event); - return entity; - } - - static TimelineEntity getEntityMinStartTime(long startTs) { - TimelineEntity entity = new TimelineEntity(); - String id = "flowRunHelloMInStartTime"; - String type = TimelineEntityType.YARN_APPLICATION.toString(); - entity.setId(id); - entity.setType(type); - entity.setCreatedTime(startTs); - TimelineEvent event = new TimelineEvent(); - event.setId(ApplicationMetricsConstants.CREATED_EVENT_TYPE); - event.setTimestamp(startTs); - entity.addEvent(event); - return entity; - } - - static TimelineEntity getMinFlushEntity(long startTs) { - TimelineEntity entity = new TimelineEntity(); - String id = "flowRunHelloFlushEntityMin"; - String type = TimelineEntityType.YARN_APPLICATION.toString(); - entity.setId(id); - entity.setType(type); - entity.setCreatedTime(startTs); - TimelineEvent event = new TimelineEvent(); - event.setId(ApplicationMetricsConstants.CREATED_EVENT_TYPE); - event.setTimestamp(startTs); - entity.addEvent(event); - return entity; - } - - static TimelineEntity getMaxFlushEntity(long startTs) { - TimelineEntity entity = new TimelineEntity(); - String id = "flowRunHelloFlushEntityMax"; - String type = TimelineEntityType.YARN_APPLICATION.toString(); - entity.setId(id); - entity.setType(type); - entity.setCreatedTime(startTs); - - TimelineEvent event = new TimelineEvent(); - event.setId(ApplicationMetricsConstants.FINISHED_EVENT_TYPE); - event.setTimestamp(startTs + END_TS_INCR); - entity.addEvent(event); - return entity; - } - - static TimelineEntity getFlowApp1(long appCreatedTime) { - TimelineEntity entity = new TimelineEntity(); - String id = "flowActivity_test"; - String type = TimelineEntityType.YARN_APPLICATION.toString(); - entity.setId(id); - entity.setType(type); - entity.setCreatedTime(appCreatedTime); - - TimelineEvent event = new TimelineEvent(); - event.setId(ApplicationMetricsConstants.CREATED_EVENT_TYPE); - event.setTimestamp(appCreatedTime); - String expKey = "foo_event"; - Object expVal = "test"; - event.addInfo(expKey, expVal); - entity.addEvent(event); - - return entity; - } -} http://git-wip-us.apache.org/repos/asf/hadoop/blob/47ffa5f8/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/TestHBaseStorageFlowActivity.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/TestHBaseStorageFlowActivity.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/TestHBaseStorageFlowActivity.java deleted file mode 100644 index 6b23b6c..0000000 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/TestHBaseStorageFlowActivity.java +++ /dev/null @@ -1,469 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.hadoop.yarn.server.timelineservice.storage.flow; - -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertNotNull; -import static org.junit.Assert.assertTrue; -import static org.junit.Assert.fail; - -import java.io.IOException; -import java.util.Map; -import java.util.NavigableSet; -import java.util.Set; - -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.hbase.HBaseTestingUtility; -import org.apache.hadoop.hbase.TableName; -import org.apache.hadoop.hbase.client.Connection; -import org.apache.hadoop.hbase.client.ConnectionFactory; -import org.apache.hadoop.hbase.client.Get; -import org.apache.hadoop.hbase.client.Result; -import org.apache.hadoop.hbase.client.ResultScanner; -import org.apache.hadoop.hbase.client.Scan; -import org.apache.hadoop.hbase.client.Table; -import org.apache.hadoop.hbase.util.Bytes; -import org.apache.hadoop.yarn.api.records.timelineservice.FlowActivityEntity; -import org.apache.hadoop.yarn.api.records.timelineservice.FlowRunEntity; -import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntities; -import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity; -import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntityType; -import org.apache.hadoop.yarn.server.timeline.GenericObjectMapper; -import org.apache.hadoop.yarn.server.timelineservice.reader.TimelineDataToRetrieve; -import org.apache.hadoop.yarn.server.timelineservice.reader.TimelineEntityFilters; -import org.apache.hadoop.yarn.server.timelineservice.reader.TimelineReaderContext; -import org.apache.hadoop.yarn.server.timelineservice.storage.HBaseTimelineReaderImpl; -import org.apache.hadoop.yarn.server.timelineservice.storage.HBaseTimelineWriterImpl; -import org.apache.hadoop.yarn.server.timelineservice.storage.TimelineSchemaCreator; -import org.apache.hadoop.yarn.server.timelineservice.storage.common.ColumnHelper; -import org.apache.hadoop.yarn.server.timelineservice.storage.common.TimelineStorageUtils; -import org.junit.AfterClass; -import org.junit.BeforeClass; -import org.junit.Test; - -/** - * Tests the FlowRun and FlowActivity Tables - */ -public class TestHBaseStorageFlowActivity { - - private static HBaseTestingUtility util; - - @BeforeClass - public static void setupBeforeClass() throws Exception { - util = new HBaseTestingUtility(); - Configuration conf = util.getConfiguration(); - conf.setInt("hfile.format.version", 3); - util.startMiniCluster(); - createSchema(); - } - - private static void createSchema() throws IOException { - TimelineSchemaCreator.createAllTables(util.getConfiguration(), false); - } - - /** - * Writes 4 timeline entities belonging to one flow run through the - * {@link HBaseTimelineWriterImpl} - * - * Checks the flow run table contents - * - * The first entity has a created event, metrics and a finish event. - * - * The second entity has a created event and this is the entity with smallest - * start time. This should be the start time for the flow run. - * - * The third entity has a finish event and this is the entity with the max end - * time. This should be the end time for the flow run. - * - * The fourth entity has a created event which has a start time that is - * greater than min start time. - * - * The test also checks in the flow activity table that one entry has been - * made for all of these 4 application entities since they belong to the same - * flow run. - */ - @Test - public void testWriteFlowRunMinMax() throws Exception { - - TimelineEntities te = new TimelineEntities(); - te.addEntity(TestFlowDataGenerator.getEntity1()); - - HBaseTimelineWriterImpl hbi = null; - Configuration c1 = util.getConfiguration(); - String cluster = "testWriteFlowRunMinMaxToHBase_cluster1"; - String user = "testWriteFlowRunMinMaxToHBase_user1"; - String flow = "testing_flowRun_flow_name"; - String flowVersion = "CF7022C10F1354"; - long runid = 1002345678919L; - String appName = "application_100000000000_1111"; - long minStartTs = 1424995200300L; - long greaterStartTs = 1424995200300L + 864000L; - long endTs = 1424995200300L + 86000000L;; - TimelineEntity entityMinStartTime = TestFlowDataGenerator - .getEntityMinStartTime(minStartTs); - - try { - hbi = new HBaseTimelineWriterImpl(c1); - hbi.init(c1); - hbi.write(cluster, user, flow, flowVersion, runid, appName, te); - - // write another entity with the right min start time - te = new TimelineEntities(); - te.addEntity(entityMinStartTime); - appName = "application_100000000000_3333"; - hbi.write(cluster, user, flow, flowVersion, runid, appName, te); - - // writer another entity for max end time - TimelineEntity entityMaxEndTime = TestFlowDataGenerator - .getEntityMaxEndTime(endTs); - te = new TimelineEntities(); - te.addEntity(entityMaxEndTime); - appName = "application_100000000000_4444"; - hbi.write(cluster, user, flow, flowVersion, runid, appName, te); - - // writer another entity with greater start time - TimelineEntity entityGreaterStartTime = TestFlowDataGenerator - .getEntityGreaterStartTime(greaterStartTs); - te = new TimelineEntities(); - te.addEntity(entityGreaterStartTime); - appName = "application_1000000000000000_2222"; - hbi.write(cluster, user, flow, flowVersion, runid, appName, te); - - // flush everything to hbase - hbi.flush(); - } finally { - hbi.close(); - } - - Connection conn = ConnectionFactory.createConnection(c1); - // check in flow activity table - Table table1 = conn.getTable(TableName - .valueOf(FlowActivityTable.DEFAULT_TABLE_NAME)); - byte[] startRow = - FlowActivityRowKey.getRowKey(cluster, minStartTs, user, flow); - Get g = new Get(startRow); - Result r1 = table1.get(g); - assertNotNull(r1); - assertTrue(!r1.isEmpty()); - Map<byte[], byte[]> values = r1.getFamilyMap(FlowActivityColumnFamily.INFO - .getBytes()); - assertEquals(1, values.size()); - byte[] row = r1.getRow(); - FlowActivityRowKey flowActivityRowKey = FlowActivityRowKey.parseRowKey(row); - assertNotNull(flowActivityRowKey); - assertEquals(cluster, flowActivityRowKey.getClusterId()); - assertEquals(user, flowActivityRowKey.getUserId()); - assertEquals(flow, flowActivityRowKey.getFlowName()); - long dayTs = TimelineStorageUtils.getTopOfTheDayTimestamp(minStartTs); - assertEquals(dayTs, flowActivityRowKey.getDayTimestamp()); - assertEquals(1, values.size()); - checkFlowActivityRunId(runid, flowVersion, values); - - // use the timeline reader to verify data - HBaseTimelineReaderImpl hbr = null; - try { - hbr = new HBaseTimelineReaderImpl(); - hbr.init(c1); - hbr.start(); - // get the flow activity entity - Set<TimelineEntity> entities = hbr.getEntities( - new TimelineReaderContext(cluster, null, null, null, null, - TimelineEntityType.YARN_FLOW_ACTIVITY.toString(), null), - new TimelineEntityFilters(10L, null, null, null, null, null, - null, null, null), - new TimelineDataToRetrieve()); - assertEquals(1, entities.size()); - for (TimelineEntity e : entities) { - FlowActivityEntity flowActivity = (FlowActivityEntity)e; - assertEquals(cluster, flowActivity.getCluster()); - assertEquals(user, flowActivity.getUser()); - assertEquals(flow, flowActivity.getFlowName()); - assertEquals(dayTs, flowActivity.getDate().getTime()); - Set<FlowRunEntity> flowRuns = flowActivity.getFlowRuns(); - assertEquals(1, flowRuns.size()); - } - } finally { - hbr.close(); - } - } - - /** - * Write 1 application entity and checks the record for today in the flow - * activity table - */ - @Test - public void testWriteFlowActivityOneFlow() throws Exception { - String cluster = "testWriteFlowActivityOneFlow_cluster1"; - String user = "testWriteFlowActivityOneFlow_user1"; - String flow = "flow_activity_test_flow_name"; - String flowVersion = "A122110F135BC4"; - long runid = 1001111178919L; - - TimelineEntities te = new TimelineEntities(); - long appCreatedTime = 1425016501000L; - TimelineEntity entityApp1 = - TestFlowDataGenerator.getFlowApp1(appCreatedTime); - te.addEntity(entityApp1); - - HBaseTimelineWriterImpl hbi = null; - Configuration c1 = util.getConfiguration(); - try { - hbi = new HBaseTimelineWriterImpl(c1); - hbi.init(c1); - String appName = "application_1111999999_1234"; - hbi.write(cluster, user, flow, flowVersion, runid, appName, te); - hbi.flush(); - } finally { - hbi.close(); - } - // check flow activity - checkFlowActivityTable(cluster, user, flow, flowVersion, runid, c1, - appCreatedTime); - - // use the reader to verify the data - HBaseTimelineReaderImpl hbr = null; - try { - hbr = new HBaseTimelineReaderImpl(); - hbr.init(c1); - hbr.start(); - - Set<TimelineEntity> entities = hbr.getEntities( - new TimelineReaderContext(cluster, user, flow, null, null, - TimelineEntityType.YARN_FLOW_ACTIVITY.toString(), null), - new TimelineEntityFilters(10L, null, null, null, null, null, - null, null, null), - new TimelineDataToRetrieve()); - assertEquals(1, entities.size()); - for (TimelineEntity e : entities) { - FlowActivityEntity entity = (FlowActivityEntity)e; - NavigableSet<FlowRunEntity> flowRuns = entity.getFlowRuns(); - assertEquals(1, flowRuns.size()); - for (FlowRunEntity flowRun : flowRuns) { - assertEquals(runid, flowRun.getRunId()); - assertEquals(flowVersion, flowRun.getVersion()); - } - } - } finally { - hbr.close(); - } - } - - private void checkFlowActivityTable(String cluster, String user, String flow, - String flowVersion, long runid, Configuration c1, long appCreatedTime) - throws IOException { - Scan s = new Scan(); - s.addFamily(FlowActivityColumnFamily.INFO.getBytes()); - byte[] startRow = - FlowActivityRowKey.getRowKey(cluster, appCreatedTime, user, flow); - s.setStartRow(startRow); - String clusterStop = cluster + "1"; - byte[] stopRow = - FlowActivityRowKey.getRowKey(clusterStop, appCreatedTime, user, flow); - s.setStopRow(stopRow); - Connection conn = ConnectionFactory.createConnection(c1); - Table table1 = conn.getTable(TableName - .valueOf(FlowActivityTable.DEFAULT_TABLE_NAME)); - ResultScanner scanner = table1.getScanner(s); - int rowCount = 0; - for (Result result : scanner) { - assertNotNull(result); - assertTrue(!result.isEmpty()); - Map<byte[], byte[]> values = result - .getFamilyMap(FlowActivityColumnFamily.INFO.getBytes()); - rowCount++; - byte[] row = result.getRow(); - FlowActivityRowKey flowActivityRowKey = FlowActivityRowKey - .parseRowKey(row); - assertNotNull(flowActivityRowKey); - assertEquals(cluster, flowActivityRowKey.getClusterId()); - assertEquals(user, flowActivityRowKey.getUserId()); - assertEquals(flow, flowActivityRowKey.getFlowName()); - long dayTs = TimelineStorageUtils.getTopOfTheDayTimestamp(appCreatedTime); - assertEquals(dayTs, flowActivityRowKey.getDayTimestamp()); - assertEquals(1, values.size()); - checkFlowActivityRunId(runid, flowVersion, values); - } - assertEquals(1, rowCount); - } - - /** - * Writes 3 applications each with a different run id and version for the same - * {cluster, user, flow} - * - * They should be getting inserted into one record in the flow activity table - * with 3 columns, one per run id - */ - @Test - public void testFlowActivityTableOneFlowMultipleRunIds() throws IOException { - String cluster = "testManyRunsFlowActivity_cluster1"; - String user = "testManyRunsFlowActivity_c_user1"; - String flow = "flow_activity_test_flow_name"; - String flowVersion1 = "A122110F135BC4"; - long runid1 = 11111111111L; - - String flowVersion2 = "A12222222222C4"; - long runid2 = 2222222222222L; - - String flowVersion3 = "A1333333333C4"; - long runid3 = 3333333333333L; - - TimelineEntities te = new TimelineEntities(); - long appCreatedTime = 1425016501000L; - TimelineEntity entityApp1 = - TestFlowDataGenerator.getFlowApp1(appCreatedTime); - te.addEntity(entityApp1); - - HBaseTimelineWriterImpl hbi = null; - Configuration c1 = util.getConfiguration(); - try { - hbi = new HBaseTimelineWriterImpl(c1); - hbi.init(c1); - String appName = "application_11888888888_1111"; - hbi.write(cluster, user, flow, flowVersion1, runid1, appName, te); - - // write an application with to this flow but a different runid/ version - te = new TimelineEntities(); - te.addEntity(entityApp1); - appName = "application_11888888888_2222"; - hbi.write(cluster, user, flow, flowVersion2, runid2, appName, te); - - // write an application with to this flow but a different runid/ version - te = new TimelineEntities(); - te.addEntity(entityApp1); - appName = "application_11888888888_3333"; - hbi.write(cluster, user, flow, flowVersion3, runid3, appName, te); - - hbi.flush(); - } finally { - hbi.close(); - } - // check flow activity - checkFlowActivityTableSeveralRuns(cluster, user, flow, c1, flowVersion1, - runid1, flowVersion2, runid2, flowVersion3, runid3, appCreatedTime); - - // use the timeline reader to verify data - HBaseTimelineReaderImpl hbr = null; - try { - hbr = new HBaseTimelineReaderImpl(); - hbr.init(c1); - hbr.start(); - - Set<TimelineEntity> entities = hbr.getEntities( - new TimelineReaderContext(cluster, null, null, null, null, - TimelineEntityType.YARN_FLOW_ACTIVITY.toString(), null), - new TimelineEntityFilters(10L, null, null, null, null, null, - null, null, null), - new TimelineDataToRetrieve()); - assertEquals(1, entities.size()); - for (TimelineEntity e : entities) { - FlowActivityEntity flowActivity = (FlowActivityEntity)e; - assertEquals(cluster, flowActivity.getCluster()); - assertEquals(user, flowActivity.getUser()); - assertEquals(flow, flowActivity.getFlowName()); - long dayTs = - TimelineStorageUtils.getTopOfTheDayTimestamp(appCreatedTime); - assertEquals(dayTs, flowActivity.getDate().getTime()); - Set<FlowRunEntity> flowRuns = flowActivity.getFlowRuns(); - assertEquals(3, flowRuns.size()); - for (FlowRunEntity flowRun : flowRuns) { - long runId = flowRun.getRunId(); - String version = flowRun.getVersion(); - if (runId == runid1) { - assertEquals(flowVersion1, version); - } else if (runId == runid2) { - assertEquals(flowVersion2, version); - } else if (runId == runid3) { - assertEquals(flowVersion3, version); - } else { - fail("unknown run id: " + runId); - } - } - } - } finally { - hbr.close(); - } - } - - private void checkFlowActivityTableSeveralRuns(String cluster, String user, - String flow, Configuration c1, String flowVersion1, long runid1, - String flowVersion2, long runid2, String flowVersion3, long runid3, - long appCreatedTime) - throws IOException { - Scan s = new Scan(); - s.addFamily(FlowActivityColumnFamily.INFO.getBytes()); - byte[] startRow = - FlowActivityRowKey.getRowKey(cluster, appCreatedTime, user, flow); - s.setStartRow(startRow); - String clusterStop = cluster + "1"; - byte[] stopRow = - FlowActivityRowKey.getRowKey(clusterStop, appCreatedTime, user, flow); - s.setStopRow(stopRow); - Connection conn = ConnectionFactory.createConnection(c1); - Table table1 = conn.getTable(TableName - .valueOf(FlowActivityTable.DEFAULT_TABLE_NAME)); - ResultScanner scanner = table1.getScanner(s); - int rowCount = 0; - for (Result result : scanner) { - assertNotNull(result); - assertTrue(!result.isEmpty()); - byte[] row = result.getRow(); - FlowActivityRowKey flowActivityRowKey = FlowActivityRowKey - .parseRowKey(row); - assertNotNull(flowActivityRowKey); - assertEquals(cluster, flowActivityRowKey.getClusterId()); - assertEquals(user, flowActivityRowKey.getUserId()); - assertEquals(flow, flowActivityRowKey.getFlowName()); - long dayTs = TimelineStorageUtils.getTopOfTheDayTimestamp(appCreatedTime); - assertEquals(dayTs, flowActivityRowKey.getDayTimestamp()); - - Map<byte[], byte[]> values = result - .getFamilyMap(FlowActivityColumnFamily.INFO.getBytes()); - rowCount++; - assertEquals(3, values.size()); - checkFlowActivityRunId(runid1, flowVersion1, values); - checkFlowActivityRunId(runid2, flowVersion2, values); - checkFlowActivityRunId(runid3, flowVersion3, values); - } - // the flow activity table is such that it will insert - // into current day's record - // hence, if this test runs across the midnight boundary, - // it may fail since it would insert into two records - // one for each day - assertEquals(1, rowCount); - } - - private void checkFlowActivityRunId(long runid, String flowVersion, - Map<byte[], byte[]> values) throws IOException { - byte[] rq = ColumnHelper.getColumnQualifier( - FlowActivityColumnPrefix.RUN_ID.getColumnPrefixBytes(), - GenericObjectMapper.write(runid)); - for (Map.Entry<byte[], byte[]> k : values.entrySet()) { - String actualQ = Bytes.toString(k.getKey()); - if (Bytes.toString(rq).equals(actualQ)) { - String actualV = (String) GenericObjectMapper.read(k.getValue()); - assertEquals(flowVersion, actualV); - } - } - } - - @AfterClass - public static void tearDownAfterClass() throws Exception { - util.shutdownMiniCluster(); - } -} http://git-wip-us.apache.org/repos/asf/hadoop/blob/47ffa5f8/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/TestHBaseStorageFlowRun.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/TestHBaseStorageFlowRun.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/TestHBaseStorageFlowRun.java deleted file mode 100644 index 801d43c..0000000 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/TestHBaseStorageFlowRun.java +++ /dev/null @@ -1,851 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.hadoop.yarn.server.timelineservice.storage.flow; - -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertFalse; -import static org.junit.Assert.assertNotNull; -import static org.junit.Assert.assertTrue; -import static org.junit.Assert.fail; - -import java.io.IOException; -import java.util.EnumSet; -import java.util.List; -import java.util.Map; -import java.util.Set; - -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.hbase.HBaseTestingUtility; -import org.apache.hadoop.hbase.TableName; -import org.apache.hadoop.hbase.client.Admin; -import org.apache.hadoop.hbase.client.Connection; -import org.apache.hadoop.hbase.client.ConnectionFactory; -import org.apache.hadoop.hbase.client.Get; -import org.apache.hadoop.hbase.client.Result; -import org.apache.hadoop.hbase.client.ResultScanner; -import org.apache.hadoop.hbase.client.Scan; -import org.apache.hadoop.hbase.client.Table; -import org.apache.hadoop.hbase.regionserver.HRegion; -import org.apache.hadoop.hbase.regionserver.HRegionServer; -import org.apache.hadoop.hbase.util.Bytes; -import org.apache.hadoop.yarn.api.records.timelineservice.FlowRunEntity; -import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntities; -import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity; -import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntityType; -import org.apache.hadoop.yarn.api.records.timelineservice.TimelineMetric; -import org.apache.hadoop.yarn.server.timelineservice.reader.TimelineDataToRetrieve; -import org.apache.hadoop.yarn.server.timelineservice.reader.TimelineEntityFilters; -import org.apache.hadoop.yarn.server.timelineservice.reader.TimelineReaderContext; -import org.apache.hadoop.yarn.server.timelineservice.reader.filter.TimelineCompareFilter; -import org.apache.hadoop.yarn.server.timelineservice.reader.filter.TimelineCompareOp; -import org.apache.hadoop.yarn.server.timelineservice.reader.filter.TimelineFilterList; -import org.apache.hadoop.yarn.server.timelineservice.reader.filter.TimelineFilterList.Operator; -import org.apache.hadoop.yarn.server.timelineservice.reader.filter.TimelinePrefixFilter; -import org.apache.hadoop.yarn.server.timelineservice.storage.HBaseTimelineReaderImpl; -import org.apache.hadoop.yarn.server.timelineservice.storage.HBaseTimelineWriterImpl; -import org.apache.hadoop.yarn.server.timelineservice.storage.TimelineSchemaCreator; -import org.apache.hadoop.yarn.server.timelineservice.storage.TimelineReader.Field; -import org.apache.hadoop.yarn.server.timelineservice.storage.common.ColumnHelper; -import org.apache.hadoop.yarn.server.timelineservice.storage.common.TimelineStorageUtils; -import org.apache.hadoop.yarn.server.timelineservice.storage.entity.EntityTable; -import org.junit.AfterClass; -import org.junit.BeforeClass; -import org.junit.Test; - -/** - * Tests the FlowRun and FlowActivity Tables - */ -public class TestHBaseStorageFlowRun { - - private static HBaseTestingUtility util; - - private final String metric1 = "MAP_SLOT_MILLIS"; - private final String metric2 = "HDFS_BYTES_READ"; - - @BeforeClass - public static void setupBeforeClass() throws Exception { - util = new HBaseTestingUtility(); - Configuration conf = util.getConfiguration(); - conf.setInt("hfile.format.version", 3); - util.startMiniCluster(); - createSchema(); - } - - private static void createSchema() throws IOException { - TimelineSchemaCreator.createAllTables(util.getConfiguration(), false); - } - - @Test - public void checkCoProcessorOff() throws IOException, InterruptedException { - Configuration hbaseConf = util.getConfiguration(); - TableName table = TableName.valueOf(hbaseConf.get( - FlowRunTable.TABLE_NAME_CONF_NAME, FlowRunTable.DEFAULT_TABLE_NAME)); - Connection conn = null; - conn = ConnectionFactory.createConnection(hbaseConf); - Admin admin = conn.getAdmin(); - if (admin == null) { - throw new IOException("Can't check tables since admin is null"); - } - if (admin.tableExists(table)) { - // check the regions. - // check in flow run table - util.waitUntilAllRegionsAssigned(table); - HRegionServer server = util.getRSForFirstRegionInTable(table); - List<HRegion> regions = server.getOnlineRegions(table); - for (HRegion region : regions) { - assertTrue(TimelineStorageUtils.isFlowRunTable(region.getRegionInfo(), - hbaseConf)); - } - } - - table = TableName.valueOf(hbaseConf.get( - FlowActivityTable.TABLE_NAME_CONF_NAME, - FlowActivityTable.DEFAULT_TABLE_NAME)); - if (admin.tableExists(table)) { - // check the regions. - // check in flow activity table - util.waitUntilAllRegionsAssigned(table); - HRegionServer server = util.getRSForFirstRegionInTable(table); - List<HRegion> regions = server.getOnlineRegions(table); - for (HRegion region : regions) { - assertFalse(TimelineStorageUtils.isFlowRunTable(region.getRegionInfo(), - hbaseConf)); - } - } - - table = TableName.valueOf(hbaseConf.get( - EntityTable.TABLE_NAME_CONF_NAME, - EntityTable.DEFAULT_TABLE_NAME)); - if (admin.tableExists(table)) { - // check the regions. - // check in entity run table - util.waitUntilAllRegionsAssigned(table); - HRegionServer server = util.getRSForFirstRegionInTable(table); - List<HRegion> regions = server.getOnlineRegions(table); - for (HRegion region : regions) { - assertFalse(TimelineStorageUtils.isFlowRunTable(region.getRegionInfo(), - hbaseConf)); - } - } - } - - /** - * Writes 4 timeline entities belonging to one flow run through the - * {@link HBaseTimelineWriterImpl} - * - * Checks the flow run table contents - * - * The first entity has a created event, metrics and a finish event. - * - * The second entity has a created event and this is the entity with smallest - * start time. This should be the start time for the flow run. - * - * The third entity has a finish event and this is the entity with the max end - * time. This should be the end time for the flow run. - * - * The fourth entity has a created event which has a start time that is - * greater than min start time. - * - */ - @Test - public void testWriteFlowRunMinMax() throws Exception { - - TimelineEntities te = new TimelineEntities(); - te.addEntity(TestFlowDataGenerator.getEntity1()); - - HBaseTimelineWriterImpl hbi = null; - Configuration c1 = util.getConfiguration(); - String cluster = "testWriteFlowRunMinMaxToHBase_cluster1"; - String user = "testWriteFlowRunMinMaxToHBase_user1"; - String flow = "testing_flowRun_flow_name"; - String flowVersion = "CF7022C10F1354"; - long runid = 1002345678919L; - String appName = "application_100000000000_1111"; - long minStartTs = 1425026900000L; - long greaterStartTs = 30000000000000L; - long endTs = 1439750690000L; - TimelineEntity entityMinStartTime = TestFlowDataGenerator - .getEntityMinStartTime(minStartTs); - - try { - hbi = new HBaseTimelineWriterImpl(c1); - hbi.init(c1); - hbi.write(cluster, user, flow, flowVersion, runid, appName, te); - - // write another entity with the right min start time - te = new TimelineEntities(); - te.addEntity(entityMinStartTime); - appName = "application_100000000000_3333"; - hbi.write(cluster, user, flow, flowVersion, runid, appName, te); - - // writer another entity for max end time - TimelineEntity entityMaxEndTime = TestFlowDataGenerator - .getEntityMaxEndTime(endTs); - te = new TimelineEntities(); - te.addEntity(entityMaxEndTime); - appName = "application_100000000000_4444"; - hbi.write(cluster, user, flow, flowVersion, runid, appName, te); - - // writer another entity with greater start time - TimelineEntity entityGreaterStartTime = TestFlowDataGenerator - .getEntityGreaterStartTime(greaterStartTs); - te = new TimelineEntities(); - te.addEntity(entityGreaterStartTime); - appName = "application_1000000000000000_2222"; - hbi.write(cluster, user, flow, flowVersion, runid, appName, te); - - // flush everything to hbase - hbi.flush(); - } finally { - hbi.close(); - } - - Connection conn = ConnectionFactory.createConnection(c1); - // check in flow run table - Table table1 = conn.getTable(TableName - .valueOf(FlowRunTable.DEFAULT_TABLE_NAME)); - // scan the table and see that we get back the right min and max - // timestamps - byte[] startRow = FlowRunRowKey.getRowKey(cluster, user, flow, runid); - Get g = new Get(startRow); - g.addColumn(FlowRunColumnFamily.INFO.getBytes(), - FlowRunColumn.MIN_START_TIME.getColumnQualifierBytes()); - g.addColumn(FlowRunColumnFamily.INFO.getBytes(), - FlowRunColumn.MAX_END_TIME.getColumnQualifierBytes()); - Result r1 = table1.get(g); - assertNotNull(r1); - assertTrue(!r1.isEmpty()); - Map<byte[], byte[]> values = r1.getFamilyMap(FlowRunColumnFamily.INFO - .getBytes()); - - assertEquals(2, r1.size()); - long starttime = Bytes.toLong(values.get( - FlowRunColumn.MIN_START_TIME.getColumnQualifierBytes())); - assertEquals(minStartTs, starttime); - assertEquals(endTs, Bytes.toLong(values - .get(FlowRunColumn.MAX_END_TIME.getColumnQualifierBytes()))); - - // use the timeline reader to verify data - HBaseTimelineReaderImpl hbr = null; - try { - hbr = new HBaseTimelineReaderImpl(); - hbr.init(c1); - hbr.start(); - // get the flow run entity - TimelineEntity entity = hbr.getEntity( - new TimelineReaderContext(cluster, user, flow, runid, null, - TimelineEntityType.YARN_FLOW_RUN.toString(), null), - new TimelineDataToRetrieve()); - assertTrue(TimelineEntityType.YARN_FLOW_RUN.matches(entity.getType())); - FlowRunEntity flowRun = (FlowRunEntity)entity; - assertEquals(minStartTs, flowRun.getStartTime()); - assertEquals(endTs, flowRun.getMaxEndTime()); - } finally { - hbr.close(); - } - } - - /** - * Writes two application entities of the same flow run. Each application has - * two metrics: slot millis and hdfs bytes read. Each metric has values at two - * timestamps. - * - * Checks the metric values of the flow in the flow run table. Flow metric - * values should be the sum of individual metric values that belong to the - * latest timestamp for that metric - */ - @Test - public void testWriteFlowRunMetricsOneFlow() throws Exception { - String cluster = "testWriteFlowRunMetricsOneFlow_cluster1"; - String user = "testWriteFlowRunMetricsOneFlow_user1"; - String flow = "testing_flowRun_metrics_flow_name"; - String flowVersion = "CF7022C10F1354"; - long runid = 1002345678919L; - - TimelineEntities te = new TimelineEntities(); - TimelineEntity entityApp1 = TestFlowDataGenerator - .getEntityMetricsApp1(System.currentTimeMillis()); - te.addEntity(entityApp1); - - HBaseTimelineWriterImpl hbi = null; - Configuration c1 = util.getConfiguration(); - try { - hbi = new HBaseTimelineWriterImpl(c1); - hbi.init(c1); - String appName = "application_11111111111111_1111"; - hbi.write(cluster, user, flow, flowVersion, runid, appName, te); - // write another application with same metric to this flow - te = new TimelineEntities(); - TimelineEntity entityApp2 = TestFlowDataGenerator - .getEntityMetricsApp2(System.currentTimeMillis()); - te.addEntity(entityApp2); - appName = "application_11111111111111_2222"; - hbi.write(cluster, user, flow, flowVersion, runid, appName, te); - hbi.flush(); - } finally { - hbi.close(); - } - - // check flow run - checkFlowRunTable(cluster, user, flow, runid, c1); - - // use the timeline reader to verify data - HBaseTimelineReaderImpl hbr = null; - try { - hbr = new HBaseTimelineReaderImpl(); - hbr.init(c1); - hbr.start(); - TimelineEntity entity = hbr.getEntity( - new TimelineReaderContext(cluster, user, flow, runid, null, - TimelineEntityType.YARN_FLOW_RUN.toString(), null), - new TimelineDataToRetrieve()); - assertTrue(TimelineEntityType.YARN_FLOW_RUN.matches(entity.getType())); - Set<TimelineMetric> metrics = entity.getMetrics(); - assertEquals(2, metrics.size()); - for (TimelineMetric metric : metrics) { - String id = metric.getId(); - Map<Long, Number> values = metric.getValues(); - assertEquals(1, values.size()); - Number value = null; - for (Number n : values.values()) { - value = n; - } - switch (id) { - case metric1: - assertEquals(141L, value); - break; - case metric2: - assertEquals(57L, value); - break; - default: - fail("unrecognized metric: " + id); - } - } - } finally { - hbr.close(); - } - } - - private void checkFlowRunTable(String cluster, String user, String flow, - long runid, Configuration c1) throws IOException { - Scan s = new Scan(); - s.addFamily(FlowRunColumnFamily.INFO.getBytes()); - byte[] startRow = FlowRunRowKey.getRowKey(cluster, user, flow, runid); - s.setStartRow(startRow); - String clusterStop = cluster + "1"; - byte[] stopRow = FlowRunRowKey.getRowKey(clusterStop, user, flow, runid); - s.setStopRow(stopRow); - Connection conn = ConnectionFactory.createConnection(c1); - Table table1 = conn.getTable(TableName - .valueOf(FlowRunTable.DEFAULT_TABLE_NAME)); - ResultScanner scanner = table1.getScanner(s); - - int rowCount = 0; - for (Result result : scanner) { - assertNotNull(result); - assertTrue(!result.isEmpty()); - Map<byte[], byte[]> values = result.getFamilyMap(FlowRunColumnFamily.INFO - .getBytes()); - rowCount++; - // check metric1 - byte[] q = ColumnHelper.getColumnQualifier( - FlowRunColumnPrefix.METRIC.getColumnPrefixBytes(), metric1); - assertTrue(values.containsKey(q)); - assertEquals(141L, Bytes.toLong(values.get(q))); - - // check metric2 - assertEquals(3, values.size()); - q = ColumnHelper.getColumnQualifier( - FlowRunColumnPrefix.METRIC.getColumnPrefixBytes(), metric2); - assertTrue(values.containsKey(q)); - assertEquals(57L, Bytes.toLong(values.get(q))); - } - assertEquals(1, rowCount); - } - - @Test - public void testWriteFlowRunMetricsPrefix() throws Exception { - String cluster = "testWriteFlowRunMetricsPrefix_cluster1"; - String user = "testWriteFlowRunMetricsPrefix_user1"; - String flow = "testWriteFlowRunMetricsPrefix_flow_name"; - String flowVersion = "CF7022C10F1354"; - - TimelineEntities te = new TimelineEntities(); - TimelineEntity entityApp1 = TestFlowDataGenerator - .getEntityMetricsApp1(System.currentTimeMillis()); - te.addEntity(entityApp1); - - HBaseTimelineWriterImpl hbi = null; - Configuration c1 = util.getConfiguration(); - try { - hbi = new HBaseTimelineWriterImpl(c1); - hbi.init(c1); - String appName = "application_11111111111111_1111"; - hbi.write(cluster, user, flow, flowVersion, 1002345678919L, appName, te); - // write another application with same metric to this flow - te = new TimelineEntities(); - TimelineEntity entityApp2 = TestFlowDataGenerator - .getEntityMetricsApp2(System.currentTimeMillis()); - te.addEntity(entityApp2); - appName = "application_11111111111111_2222"; - hbi.write(cluster, user, flow, flowVersion, 1002345678918L, appName, te); - hbi.flush(); - } finally { - hbi.close(); - } - - // use the timeline reader to verify data - HBaseTimelineReaderImpl hbr = null; - try { - hbr = new HBaseTimelineReaderImpl(); - hbr.init(c1); - hbr.start(); - TimelineFilterList metricsToRetrieve = new TimelineFilterList( - Operator.OR, new TimelinePrefixFilter(TimelineCompareOp.EQUAL, - metric1.substring(0, metric1.indexOf("_") + 1))); - TimelineEntity entity = hbr.getEntity( - new TimelineReaderContext(cluster, user, flow, 1002345678919L, null, - TimelineEntityType.YARN_FLOW_RUN.toString(), null), - new TimelineDataToRetrieve(null, metricsToRetrieve, null)); - assertTrue(TimelineEntityType.YARN_FLOW_RUN.matches(entity.getType())); - Set<TimelineMetric> metrics = entity.getMetrics(); - assertEquals(1, metrics.size()); - for (TimelineMetric metric : metrics) { - String id = metric.getId(); - Map<Long, Number> values = metric.getValues(); - assertEquals(1, values.size()); - Number value = null; - for (Number n : values.values()) { - value = n; - } - switch (id) { - case metric1: - assertEquals(40L, value); - break; - default: - fail("unrecognized metric: " + id); - } - } - - Set<TimelineEntity> entities = hbr.getEntities( - new TimelineReaderContext(cluster, user, flow, null, null, - TimelineEntityType.YARN_FLOW_RUN.toString(), null), - new TimelineEntityFilters(), - new TimelineDataToRetrieve(null, metricsToRetrieve, null)); - assertEquals(2, entities.size()); - int metricCnt = 0; - for (TimelineEntity timelineEntity : entities) { - metricCnt += timelineEntity.getMetrics().size(); - } - assertEquals(2, metricCnt); - } finally { - hbr.close(); - } - } - - @Test - public void testWriteFlowRunsMetricFields() throws Exception { - String cluster = "testWriteFlowRunsMetricFields_cluster1"; - String user = "testWriteFlowRunsMetricFields_user1"; - String flow = "testWriteFlowRunsMetricFields_flow_name"; - String flowVersion = "CF7022C10F1354"; - long runid = 1002345678919L; - - TimelineEntities te = new TimelineEntities(); - TimelineEntity entityApp1 = TestFlowDataGenerator - .getEntityMetricsApp1(System.currentTimeMillis()); - te.addEntity(entityApp1); - - HBaseTimelineWriterImpl hbi = null; - Configuration c1 = util.getConfiguration(); - try { - hbi = new HBaseTimelineWriterImpl(c1); - hbi.init(c1); - String appName = "application_11111111111111_1111"; - hbi.write(cluster, user, flow, flowVersion, runid, appName, te); - // write another application with same metric to this flow - te = new TimelineEntities(); - TimelineEntity entityApp2 = TestFlowDataGenerator - .getEntityMetricsApp2(System.currentTimeMillis()); - te.addEntity(entityApp2); - appName = "application_11111111111111_2222"; - hbi.write(cluster, user, flow, flowVersion, runid, appName, te); - hbi.flush(); - } finally { - hbi.close(); - } - - // check flow run - checkFlowRunTable(cluster, user, flow, runid, c1); - - // use the timeline reader to verify data - HBaseTimelineReaderImpl hbr = null; - try { - hbr = new HBaseTimelineReaderImpl(); - hbr.init(c1); - hbr.start(); - Set<TimelineEntity> entities = hbr.getEntities( - new TimelineReaderContext(cluster, user, flow, runid, null, - TimelineEntityType.YARN_FLOW_RUN.toString(), null), - new TimelineEntityFilters(), - new TimelineDataToRetrieve()); - assertEquals(1, entities.size()); - for (TimelineEntity timelineEntity : entities) { - assertEquals(0, timelineEntity.getMetrics().size()); - } - - entities = hbr.getEntities( - new TimelineReaderContext(cluster, user, flow, runid, null, - TimelineEntityType.YARN_FLOW_RUN.toString(), null), - new TimelineEntityFilters(), - new TimelineDataToRetrieve(null, null, EnumSet.of(Field.METRICS))); - assertEquals(1, entities.size()); - for (TimelineEntity timelineEntity : entities) { - Set<TimelineMetric> timelineMetrics = timelineEntity.getMetrics(); - assertEquals(2, timelineMetrics.size()); - for (TimelineMetric metric : timelineMetrics) { - String id = metric.getId(); - Map<Long, Number> values = metric.getValues(); - assertEquals(1, values.size()); - Number value = null; - for (Number n : values.values()) { - value = n; - } - switch (id) { - case metric1: - assertEquals(141L, value); - break; - case metric2: - assertEquals(57L, value); - break; - default: - fail("unrecognized metric: " + id); - } - } - } - } finally { - hbr.close(); - } - } - - @Test - public void testWriteFlowRunFlush() throws Exception { - String cluster = "atestFlushFlowRun_cluster1"; - String user = "atestFlushFlowRun__user1"; - String flow = "atestFlushFlowRun_flow_name"; - String flowVersion = "AF1021C19F1351"; - long runid = 1449526652000L; - - int start = 10; - int count = 20000; - int appIdSuffix = 1; - HBaseTimelineWriterImpl hbi = null; - long insertTs = 1449796654827L - count; - long minTS = insertTs + 1; - long startTs = insertTs; - Configuration c1 = util.getConfiguration(); - TimelineEntities te1 = null; - TimelineEntity entityApp1 = null; - TimelineEntity entityApp2 = null; - try { - hbi = new HBaseTimelineWriterImpl(c1); - hbi.init(c1); - - for (int i = start; i < count; i++) { - String appName = "application_1060350000000_" + appIdSuffix; - insertTs++; - te1 = new TimelineEntities(); - entityApp1 = TestFlowDataGenerator.getMinFlushEntity(insertTs); - te1.addEntity(entityApp1); - entityApp2 = TestFlowDataGenerator.getMaxFlushEntity(insertTs); - te1.addEntity(entityApp2); - hbi.write(cluster, user, flow, flowVersion, runid, appName, te1); - Thread.sleep(1); - - appName = "application_1001199480000_7" + appIdSuffix; - insertTs++; - appIdSuffix++; - te1 = new TimelineEntities(); - entityApp1 = TestFlowDataGenerator.getMinFlushEntity(insertTs); - te1.addEntity(entityApp1); - entityApp2 = TestFlowDataGenerator.getMaxFlushEntity(insertTs); - te1.addEntity(entityApp2); - - hbi.write(cluster, user, flow, flowVersion, runid, appName, te1); - if (i % 1000 == 0) { - hbi.flush(); - checkMinMaxFlush(c1, minTS, startTs, count, cluster, user, flow, - runid, false); - } - } - } finally { - hbi.flush(); - hbi.close(); - checkMinMaxFlush(c1, minTS, startTs, count, cluster, user, flow, runid, - true); - } - } - - private void checkMinMaxFlush(Configuration c1, long minTS, long startTs, - int count, String cluster, String user, String flow, long runid, - boolean checkMax) throws IOException { - Connection conn = ConnectionFactory.createConnection(c1); - // check in flow run table - Table table1 = conn.getTable(TableName - .valueOf(FlowRunTable.DEFAULT_TABLE_NAME)); - // scan the table and see that we get back the right min and max - // timestamps - byte[] startRow = FlowRunRowKey.getRowKey(cluster, user, flow, runid); - Get g = new Get(startRow); - g.addColumn(FlowRunColumnFamily.INFO.getBytes(), - FlowRunColumn.MIN_START_TIME.getColumnQualifierBytes()); - g.addColumn(FlowRunColumnFamily.INFO.getBytes(), - FlowRunColumn.MAX_END_TIME.getColumnQualifierBytes()); - - Result r1 = table1.get(g); - assertNotNull(r1); - assertTrue(!r1.isEmpty()); - Map<byte[], byte[]> values = r1.getFamilyMap(FlowRunColumnFamily.INFO - .getBytes()); - int start = 10; - assertEquals(2, r1.size()); - long starttime = Bytes.toLong(values - .get(FlowRunColumn.MIN_START_TIME.getColumnQualifierBytes())); - assertEquals(minTS, starttime); - if (checkMax) { - assertEquals(startTs + 2 * (count - start) - + TestFlowDataGenerator.END_TS_INCR, - Bytes.toLong(values - .get(FlowRunColumn.MAX_END_TIME.getColumnQualifierBytes()))); - } - } - - @Test - public void testFilterFlowRunsByCreatedTime() throws Exception { - String cluster = "cluster2"; - String user = "user2"; - String flow = "flow_name2"; - - TimelineEntities te = new TimelineEntities(); - TimelineEntity entityApp1 = TestFlowDataGenerator.getEntityMetricsApp1( - System.currentTimeMillis()); - entityApp1.setCreatedTime(1425016501000L); - te.addEntity(entityApp1); - - HBaseTimelineWriterImpl hbi = null; - Configuration c1 = util.getConfiguration(); - try { - hbi = new HBaseTimelineWriterImpl(c1); - hbi.init(c1); - hbi.write(cluster, user, flow, "CF7022C10F1354", 1002345678919L, - "application_11111111111111_1111", te); - // write another application with same metric to this flow - te = new TimelineEntities(); - TimelineEntity entityApp2 = TestFlowDataGenerator.getEntityMetricsApp2( - System.currentTimeMillis()); - entityApp2.setCreatedTime(1425016502000L); - te.addEntity(entityApp2); - hbi.write(cluster, user, flow, "CF7022C10F1354", 1002345678918L, - "application_11111111111111_2222", te); - hbi.flush(); - } finally { - hbi.close(); - } - - // use the timeline reader to verify data - HBaseTimelineReaderImpl hbr = null; - try { - hbr = new HBaseTimelineReaderImpl(); - hbr.init(c1); - hbr.start(); - - Set<TimelineEntity> entities = hbr.getEntities( - new TimelineReaderContext(cluster, user, flow, - null, null, TimelineEntityType.YARN_FLOW_RUN.toString(), null), - new TimelineEntityFilters(null, 1425016501000L, 1425016502001L, null, - null, null, null, null, null), new TimelineDataToRetrieve()); - assertEquals(2, entities.size()); - for (TimelineEntity entity : entities) { - if (!entity.getId().equals("user2@flow_name2/1002345678918") && - !entity.getId().equals("user2@flow_name2/1002345678919")) { - fail("Entities with flow runs 1002345678918 and 1002345678919" + - "should be present."); - } - } - entities = hbr.getEntities( - new TimelineReaderContext(cluster, user, flow, null, null, - TimelineEntityType.YARN_FLOW_RUN.toString(), null), - new TimelineEntityFilters(null, 1425016501050L, null, null, null, - null, null, null, null), new TimelineDataToRetrieve()); - assertEquals(1, entities.size()); - for (TimelineEntity entity : entities) { - if (!entity.getId().equals("user2@flow_name2/1002345678918")) { - fail("Entity with flow run 1002345678918 should be present."); - } - } - entities = hbr.getEntities( - new TimelineReaderContext(cluster, user, flow, null, null, - TimelineEntityType.YARN_FLOW_RUN.toString(), null), - new TimelineEntityFilters(null, null, 1425016501050L, null, null, - null, null, null, null), new TimelineDataToRetrieve()); - assertEquals(1, entities.size()); - for (TimelineEntity entity : entities) { - if (!entity.getId().equals("user2@flow_name2/1002345678919")) { - fail("Entity with flow run 1002345678919 should be present."); - } - } - } finally { - hbr.close(); - } - } - - @Test - public void testMetricFilters() throws Exception { - String cluster = "cluster1"; - String user = "user1"; - String flow = "flow_name1"; - - TimelineEntities te = new TimelineEntities(); - TimelineEntity entityApp1 = TestFlowDataGenerator.getEntityMetricsApp1( - System.currentTimeMillis()); - te.addEntity(entityApp1); - - HBaseTimelineWriterImpl hbi = null; - Configuration c1 = util.getConfiguration(); - try { - hbi = new HBaseTimelineWriterImpl(c1); - hbi.init(c1); - hbi.write(cluster, user, flow, "CF7022C10F1354", 1002345678919L, - "application_11111111111111_1111", te); - // write another application with same metric to this flow - te = new TimelineEntities(); - TimelineEntity entityApp2 = TestFlowDataGenerator.getEntityMetricsApp2( - System.currentTimeMillis()); - te.addEntity(entityApp2); - hbi.write(cluster, user, flow, "CF7022C10F1354", 1002345678918L, - "application_11111111111111_2222", te); - hbi.flush(); - } finally { - hbi.close(); - } - - // use the timeline reader to verify data - HBaseTimelineReaderImpl hbr = null; - try { - hbr = new HBaseTimelineReaderImpl(); - hbr.init(c1); - hbr.start(); - - TimelineFilterList list1 = new TimelineFilterList(); - list1.addFilter(new TimelineCompareFilter( - TimelineCompareOp.GREATER_OR_EQUAL, metric1, 101)); - TimelineFilterList list2 = new TimelineFilterList(); - list2.addFilter(new TimelineCompareFilter( - TimelineCompareOp.LESS_THAN, metric1, 43)); - list2.addFilter(new TimelineCompareFilter( - TimelineCompareOp.EQUAL, metric2, 57)); - TimelineFilterList metricFilterList = - new TimelineFilterList(Operator.OR, list1, list2); - Set<TimelineEntity> entities = hbr.getEntities( - new TimelineReaderContext(cluster, user, flow, null, - null, TimelineEntityType.YARN_FLOW_RUN.toString(), null), - new TimelineEntityFilters(null, null, null, null, null, null, null, - metricFilterList, null), - new TimelineDataToRetrieve(null, null, EnumSet.of(Field.METRICS))); - assertEquals(2, entities.size()); - int metricCnt = 0; - for (TimelineEntity entity : entities) { - metricCnt += entity.getMetrics().size(); - } - assertEquals(3, metricCnt); - - TimelineFilterList metricFilterList1 = new TimelineFilterList( - new TimelineCompareFilter( - TimelineCompareOp.LESS_OR_EQUAL, metric1, 127), - new TimelineCompareFilter(TimelineCompareOp.NOT_EQUAL, metric2, 30)); - entities = hbr.getEntities( - new TimelineReaderContext(cluster, user, flow, null, null, - TimelineEntityType.YARN_FLOW_RUN.toString(), null), - new TimelineEntityFilters(null, null, null, null, null, null, null, - metricFilterList1, null), - new TimelineDataToRetrieve(null, null, EnumSet.of(Field.METRICS))); - assertEquals(1, entities.size()); - metricCnt = 0; - for (TimelineEntity entity : entities) { - metricCnt += entity.getMetrics().size(); - } - assertEquals(2, metricCnt); - - TimelineFilterList metricFilterList2 = new TimelineFilterList( - new TimelineCompareFilter(TimelineCompareOp.LESS_THAN, metric1, 32), - new TimelineCompareFilter(TimelineCompareOp.NOT_EQUAL, metric2, 57)); - entities = hbr.getEntities( - new TimelineReaderContext(cluster, user, flow, null, null, - TimelineEntityType.YARN_FLOW_RUN.toString(), null), - new TimelineEntityFilters(null, null, null, null, null, null, null, - metricFilterList2, null), - new TimelineDataToRetrieve(null, null, EnumSet.of(Field.METRICS))); - assertEquals(0, entities.size()); - - TimelineFilterList metricFilterList3 = new TimelineFilterList( - new TimelineCompareFilter(TimelineCompareOp.EQUAL, "s_metric", 32)); - entities = hbr.getEntities( - new TimelineReaderContext(cluster, user, flow, null, null, - TimelineEntityType.YARN_FLOW_RUN.toString(), null), - new TimelineEntityFilters(null, null, null, null, null, null, null, - metricFilterList3, null), - new TimelineDataToRetrieve(null, null, EnumSet.of(Field.METRICS))); - assertEquals(0, entities.size()); - - TimelineFilterList list3 = new TimelineFilterList(); - list3.addFilter(new TimelineCompareFilter( - TimelineCompareOp.GREATER_OR_EQUAL, metric1, 101)); - TimelineFilterList list4 = new TimelineFilterList(); - list4.addFilter(new TimelineCompareFilter( - TimelineCompareOp.LESS_THAN, metric1, 43)); - list4.addFilter(new TimelineCompareFilter( - TimelineCompareOp.EQUAL, metric2, 57)); - TimelineFilterList metricFilterList4 = - new TimelineFilterList(Operator.OR, list3, list4); - TimelineFilterList metricsToRetrieve = new TimelineFilterList(Operator.OR, - new TimelinePrefixFilter(TimelineCompareOp.EQUAL, - metric2.substring(0, metric2.indexOf("_") + 1))); - entities = hbr.getEntities( - new TimelineReaderContext(cluster, user, flow, null, null, - TimelineEntityType.YARN_FLOW_RUN.toString(), null), - new TimelineEntityFilters(null, null, null, null, null, null, null, - metricFilterList4, null), - new TimelineDataToRetrieve(null, metricsToRetrieve, - EnumSet.of(Field.ALL))); - assertEquals(2, entities.size()); - metricCnt = 0; - for (TimelineEntity entity : entities) { - metricCnt += entity.getMetrics().size(); - } - assertEquals(1, metricCnt); - } finally { - hbr.close(); - } - } - - @AfterClass - public static void tearDownAfterClass() throws Exception { - util.shutdownMiniCluster(); - } -} --------------------------------------------------------------------- To unsubscribe, e-mail: common-commits-unsubscr...@hadoop.apache.org For additional commands, e-mail: common-commits-h...@hadoop.apache.org