Repository: hadoop Updated Branches: refs/heads/feature-YARN-2928 7c861634e -> cdb96df97
http://git-wip-us.apache.org/repos/asf/hadoop/blob/cdb96df9/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowActivityColumnPrefix.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowActivityColumnPrefix.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowActivityColumnPrefix.java index 38c0f3f..21ddcc2 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowActivityColumnPrefix.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowActivityColumnPrefix.java @@ -83,6 +83,18 @@ public enum FlowActivityColumnPrefix implements ColumnPrefix<FlowActivityTable> return columnPrefix; } + @Override + public byte[] getColumnPrefixBytes(byte[] qualifierPrefix) { + return ColumnHelper.getColumnQualifier( + this.columnPrefixBytes, qualifierPrefix); + } + + @Override + public byte[] getColumnPrefixBytes(String qualifierPrefix) { + return ColumnHelper.getColumnQualifier( + this.columnPrefixBytes, qualifierPrefix); + } + public byte[] getColumnPrefixBytes() { return columnPrefixBytes.clone(); } @@ -112,8 +124,7 @@ public enum FlowActivityColumnPrefix implements ColumnPrefix<FlowActivityTable> + tableMutator.getName().getNameAsString()); } - byte[] columnQualifier = ColumnHelper.getColumnQualifier( - this.columnPrefixBytes, qualifier); + byte[] columnQualifier = getColumnPrefixBytes(qualifier); Attribute[] combinedAttributes = TimelineStorageUtils.combineAttributes( attributes, this.aggOp); column.store(rowKey, tableMutator, columnQualifier, timestamp, inputValue, @@ -233,8 +244,7 @@ public enum FlowActivityColumnPrefix implements ColumnPrefix<FlowActivityTable> + tableMutator.getName().getNameAsString()); } - byte[] columnQualifier = ColumnHelper.getColumnQualifier( - this.columnPrefixBytes, qualifier); + byte[] columnQualifier = getColumnPrefixBytes(qualifier); Attribute[] combinedAttributes = TimelineStorageUtils.combineAttributes( attributes, this.aggOp); column.store(rowKey, tableMutator, columnQualifier, null, inputValue, http://git-wip-us.apache.org/repos/asf/hadoop/blob/cdb96df9/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowRunColumnPrefix.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowRunColumnPrefix.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowRunColumnPrefix.java index eb055fe..e3bb52d 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowRunColumnPrefix.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowRunColumnPrefix.java @@ -89,8 +89,16 @@ public enum FlowRunColumnPrefix implements ColumnPrefix<FlowRunTable> { return columnPrefixBytes.clone(); } - public byte[] getColumnPrefixBytes(String qualifier) { - return ColumnHelper.getColumnQualifier(this.columnPrefixBytes, qualifier); + @Override + public byte[] getColumnPrefixBytes(byte[] qualifierPrefix) { + return ColumnHelper.getColumnQualifier( + this.columnPrefixBytes, qualifierPrefix); + } + + @Override + public byte[] getColumnPrefixBytes(String qualifierPrefix) { + return ColumnHelper.getColumnQualifier( + this.columnPrefixBytes, qualifierPrefix); } public byte[] getColumnFamilyBytes() { @@ -121,8 +129,7 @@ public enum FlowRunColumnPrefix implements ColumnPrefix<FlowRunTable> { + tableMutator.getName().getNameAsString()); } - byte[] columnQualifier = ColumnHelper.getColumnQualifier( - this.columnPrefixBytes, qualifier); + byte[] columnQualifier = getColumnPrefixBytes(qualifier); Attribute[] combinedAttributes = TimelineStorageUtils.combineAttributes( attributes, this.aggOp); column.store(rowKey, tableMutator, columnQualifier, timestamp, inputValue, @@ -149,8 +156,7 @@ public enum FlowRunColumnPrefix implements ColumnPrefix<FlowRunTable> { + tableMutator.getName().getNameAsString()); } - byte[] columnQualifier = ColumnHelper.getColumnQualifier( - this.columnPrefixBytes, qualifier); + byte[] columnQualifier = getColumnPrefixBytes(qualifier); Attribute[] combinedAttributes = TimelineStorageUtils.combineAttributes( attributes, this.aggOp); column.store(rowKey, tableMutator, columnQualifier, timestamp, inputValue, http://git-wip-us.apache.org/repos/asf/hadoop/blob/cdb96df9/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/TestFileSystemTimelineReaderImpl.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/TestFileSystemTimelineReaderImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/TestFileSystemTimelineReaderImpl.java index 4e23e49..e864d61 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/TestFileSystemTimelineReaderImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/TestFileSystemTimelineReaderImpl.java @@ -266,7 +266,7 @@ public class TestFileSystemTimelineReaderImpl { // only the id, created and modified time TimelineEntity result = reader.getEntity("user1", "cluster1", "flow1", 1L, "app1", - "app", "id_1", null); + "app", "id_1", null, null, null); Assert.assertEquals( (new TimelineEntity.Identifier("app", "id_1")).toString(), result.getIdentifier().toString()); @@ -281,7 +281,7 @@ public class TestFileSystemTimelineReaderImpl { // Cluster and AppId should be enough to get an entity. TimelineEntity result = reader.getEntity(null, "cluster1", null, null, "app1", - "app", "id_1", null); + "app", "id_1", null, null, null); Assert.assertEquals( (new TimelineEntity.Identifier("app", "id_1")).toString(), result.getIdentifier().toString()); @@ -298,7 +298,7 @@ public class TestFileSystemTimelineReaderImpl { // in app flow mapping csv has commas. TimelineEntity result = reader.getEntity(null, "cluster1", null, null, "app2", - "app", "id_5", null); + "app", "id_5", null, null, null); Assert.assertEquals( (new TimelineEntity.Identifier("app", "id_5")).toString(), result.getIdentifier().toString()); @@ -311,7 +311,7 @@ public class TestFileSystemTimelineReaderImpl { // Specified fields in addition to default view will be returned. TimelineEntity result = reader.getEntity("user1", "cluster1", "flow1", 1L, - "app1", "app", "id_1", + "app1", "app", "id_1", null, null, EnumSet.of(Field.INFO, Field.CONFIGS, Field.METRICS)); Assert.assertEquals( (new TimelineEntity.Identifier("app", "id_1")).toString(), @@ -329,8 +329,8 @@ public class TestFileSystemTimelineReaderImpl { public void testGetEntityAllFields() throws Exception { // All fields of TimelineEntity will be returned. TimelineEntity result = - reader.getEntity("user1", "cluster1", "flow1", 1L, - "app1", "app", "id_1", EnumSet.of(Field.ALL)); + reader.getEntity("user1", "cluster1", "flow1", 1L, "app1", "app", + "id_1", null, null, EnumSet.of(Field.ALL)); Assert.assertEquals( (new TimelineEntity.Identifier("app", "id_1")).toString(), result.getIdentifier().toString()); @@ -347,7 +347,7 @@ public class TestFileSystemTimelineReaderImpl { Set<TimelineEntity> result = reader.getEntities("user1", "cluster1", "flow1", 1L, "app1", "app", null, null, null, null, null, null, null, null, null, null, - null, null); + null, null, null, null); // All 3 entities will be returned Assert.assertEquals(4, result.size()); } @@ -357,7 +357,7 @@ public class TestFileSystemTimelineReaderImpl { Set<TimelineEntity> result = reader.getEntities("user1", "cluster1", "flow1", 1L, "app1", "app", 2L, null, null, null, null, null, null, null, null, null, - null, null); + null, null, null, null); Assert.assertEquals(2, result.size()); // Needs to be rewritten once hashcode and equals for // TimelineEntity is implemented @@ -371,7 +371,7 @@ public class TestFileSystemTimelineReaderImpl { result = reader.getEntities("user1", "cluster1", "flow1", 1L, "app1", "app", 3L, null, null, null, null, null, null, null, null, null, - null, null); + null, null, null, null); // Even though 2 entities out of 4 have same created time, one entity // is left out due to limit Assert.assertEquals(3, result.size()); @@ -383,7 +383,7 @@ public class TestFileSystemTimelineReaderImpl { Set<TimelineEntity> result = reader.getEntities("user1", "cluster1", "flow1", 1L, "app1", "app", null, 1425016502030L, 1425016502060L, null, null, null, null, null, - null, null, null, null); + null, null, null, null, null, null); Assert.assertEquals(1, result.size()); // Only one entity with ID id_4 should be returned. for (TimelineEntity entity : result) { @@ -396,7 +396,7 @@ public class TestFileSystemTimelineReaderImpl { result = reader.getEntities("user1", "cluster1", "flow1", 1L, "app1", "app", null, null, 1425016502010L, null, null, null, null, null, null, - null, null, null); + null, null, null, null, null); Assert.assertEquals(3, result.size()); for (TimelineEntity entity : result) { if (entity.getId().equals("id_4")) { @@ -408,7 +408,7 @@ public class TestFileSystemTimelineReaderImpl { result = reader.getEntities("user1", "cluster1", "flow1", 1L, "app1", "app", null, 1425016502010L, null, null, null, null, null, null, null, - null, null, null); + null, null, null, null, null); Assert.assertEquals(1, result.size()); for (TimelineEntity entity : result) { if (!entity.getId().equals("id_4")) { @@ -420,7 +420,7 @@ public class TestFileSystemTimelineReaderImpl { result = reader.getEntities("user1", "cluster1", "flow1", 1L, "app1", "app", null, null, null, 1425016502090L, 1425016503020L, null, null, null, - null, null, null, null); + null, null, null, null, null, null); Assert.assertEquals(2, result.size()); // Two entities with IDs' id_1 and id_4 should be returned. for (TimelineEntity entity : result) { @@ -433,7 +433,7 @@ public class TestFileSystemTimelineReaderImpl { result = reader.getEntities("user1", "cluster1", "flow1", 1L, "app1", "app", null, null, null, null, 1425016502090L, null, null, null, null, - null, null, null); + null, null, null, null, null); Assert.assertEquals(2, result.size()); for (TimelineEntity entity : result) { if (entity.getId().equals("id_1") || entity.getId().equals("id_4")) { @@ -445,7 +445,7 @@ public class TestFileSystemTimelineReaderImpl { result = reader.getEntities("user1", "cluster1", "flow1", 1L, "app1", "app", null, null, null, 1425016503005L, null, null, null, null, null, - null, null, null); + null, null, null, null, null); Assert.assertEquals(1, result.size()); for (TimelineEntity entity : result) { if (!entity.getId().equals("id_4")) { @@ -462,7 +462,7 @@ public class TestFileSystemTimelineReaderImpl { Set<TimelineEntity> result = reader.getEntities("user1", "cluster1", "flow1", 1L, "app1", "app", null, null, null, null, null, null, null, infoFilters, null, null, - null, null); + null, null, null, null); Assert.assertEquals(1, result.size()); // Only one entity with ID id_3 should be returned. for (TimelineEntity entity : result) { @@ -478,7 +478,7 @@ public class TestFileSystemTimelineReaderImpl { result = reader.getEntities("user1", "cluster1", "flow1", 1L, "app1", "app", null, null, null, null, null, null, null, null, configFilters, null, - null, null); + null, null, null, null); Assert.assertEquals(2, result.size()); for (TimelineEntity entity : result) { if (!entity.getId().equals("id_1") && !entity.getId().equals("id_3")) { @@ -493,7 +493,7 @@ public class TestFileSystemTimelineReaderImpl { result = reader.getEntities("user1", "cluster1", "flow1", 1L, "app1", "app", null, null, null, null, null, null, null, null, null, null, - eventFilters, null); + eventFilters, null, null, null); Assert.assertEquals(1, result.size()); for (TimelineEntity entity : result) { if (!entity.getId().equals("id_3")) { @@ -507,7 +507,7 @@ public class TestFileSystemTimelineReaderImpl { result = reader.getEntities("user1", "cluster1", "flow1", 1L, "app1", "app", null, null, null, null, null, null, null, null, null, metricFilters, - null, null); + null, null, null, null); Assert.assertEquals(2, result.size()); // Two entities with IDs' id_1 and id_2 should be returned. for (TimelineEntity entity : result) { @@ -527,7 +527,7 @@ public class TestFileSystemTimelineReaderImpl { Set<TimelineEntity> result = reader.getEntities("user1", "cluster1", "flow1", 1L, "app1", "app", null, null, null, null, null, relatesTo, null, null, null, null, - null, null); + null, null, null, null); Assert.assertEquals(1, result.size()); // Only one entity with ID id_1 should be returned. for (TimelineEntity entity : result) { @@ -544,7 +544,7 @@ public class TestFileSystemTimelineReaderImpl { result = reader.getEntities("user1", "cluster1", "flow1", 1L, "app1", "app", null, null, null, null, null, null, isRelatedTo, null, null, null, - null, null); + null, null, null, null); Assert.assertEquals(2, result.size()); // Two entities with IDs' id_1 and id_3 should be returned. for (TimelineEntity entity : result) { http://git-wip-us.apache.org/repos/asf/hadoop/blob/cdb96df9/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/TestHBaseTimelineStorage.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/TestHBaseTimelineStorage.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/TestHBaseTimelineStorage.java index 30ead40..bc7b3a4 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/TestHBaseTimelineStorage.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/TestHBaseTimelineStorage.java @@ -49,6 +49,11 @@ import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEvent; import org.apache.hadoop.yarn.api.records.timelineservice.TimelineMetric; import org.apache.hadoop.yarn.api.records.timelineservice.TimelineMetric.Type; import org.apache.hadoop.yarn.server.metrics.ApplicationMetricsConstants; +import org.apache.hadoop.yarn.server.timelineservice.reader.filter.TimelineCompareOp; +import org.apache.hadoop.yarn.server.timelineservice.reader.filter.TimelineFilterList; +import org.apache.hadoop.yarn.server.timelineservice.reader.filter.TimelinePrefixFilter; +import org.apache.hadoop.yarn.server.timelineservice.reader.filter.TimelineFilterList.Operator; +import org.apache.hadoop.yarn.server.timelineservice.storage.TimelineReader.Field; import org.apache.hadoop.yarn.server.timelineservice.storage.application.ApplicationColumn; import org.apache.hadoop.yarn.server.timelineservice.storage.application.ApplicationColumnPrefix; import org.apache.hadoop.yarn.server.timelineservice.storage.application.ApplicationRowKey; @@ -60,11 +65,17 @@ import org.apache.hadoop.yarn.server.timelineservice.storage.entity.EntityColumn import org.apache.hadoop.yarn.server.timelineservice.storage.entity.EntityColumnPrefix; import org.apache.hadoop.yarn.server.timelineservice.storage.entity.EntityRowKey; import org.apache.hadoop.yarn.server.timelineservice.storage.entity.EntityTable; +import org.junit.After; import org.junit.AfterClass; import org.junit.Assert; +import org.junit.Before; import org.junit.BeforeClass; import org.junit.Test; +import com.google.common.collect.ImmutableMap; +import com.google.common.collect.ImmutableSet; +import com.google.common.collect.Maps; + /** * Various tests to test writing entities to HBase and reading them back from * it. @@ -79,18 +90,344 @@ import org.junit.Test; public class TestHBaseTimelineStorage { private static HBaseTestingUtility util; + private HBaseTimelineReaderImpl reader; @BeforeClass public static void setupBeforeClass() throws Exception { util = new HBaseTestingUtility(); util.startMiniCluster(); createSchema(); + loadEntities(); + loadApps(); } private static void createSchema() throws IOException { TimelineSchemaCreator.createAllTables(util.getConfiguration(), false); } + private static void loadApps() throws IOException { + TimelineEntities te = new TimelineEntities(); + TimelineEntity entity = new TimelineEntity(); + String id = "application_1111111111_2222"; + entity.setId(id); + entity.setType(TimelineEntityType.YARN_APPLICATION.toString()); + Long cTime = 1425016501000L; + Long mTime = 1425026901000L; + entity.setCreatedTime(cTime); + entity.setModifiedTime(mTime); + // add the info map in Timeline Entity + Map<String, Object> infoMap = new HashMap<String, Object>(); + infoMap.put("infoMapKey1", "infoMapValue1"); + infoMap.put("infoMapKey2", 10); + entity.addInfo(infoMap); + // add the isRelatedToEntity info + String key = "task"; + String value = "is_related_to_entity_id_here"; + Set<String> isRelatedToSet = new HashSet<String>(); + isRelatedToSet.add(value); + Map<String, Set<String>> isRelatedTo = new HashMap<String, Set<String>>(); + isRelatedTo.put(key, isRelatedToSet); + entity.setIsRelatedToEntities(isRelatedTo); + // add the relatesTo info + key = "container"; + value = "relates_to_entity_id_here"; + Set<String> relatesToSet = new HashSet<String>(); + relatesToSet.add(value); + value = "relates_to_entity_id_here_Second"; + relatesToSet.add(value); + Map<String, Set<String>> relatesTo = new HashMap<String, Set<String>>(); + relatesTo.put(key, relatesToSet); + entity.setRelatesToEntities(relatesTo); + // add some config entries + Map<String, String> conf = new HashMap<String, String>(); + conf.put("config_param1", "value1"); + conf.put("config_param2", "value2"); + conf.put("cfg_param1", "value3"); + entity.addConfigs(conf); + // add metrics + Set<TimelineMetric> metrics = new HashSet<>(); + TimelineMetric m1 = new TimelineMetric(); + m1.setId("MAP_SLOT_MILLIS"); + Map<Long, Number> metricValues = new HashMap<Long, Number>(); + long ts = System.currentTimeMillis(); + metricValues.put(ts - 120000, 100000000); + metricValues.put(ts - 100000, 200000000); + metricValues.put(ts - 80000, 300000000); + metricValues.put(ts - 60000, 400000000); + metricValues.put(ts - 40000, 50000000000L); + metricValues.put(ts - 20000, 60000000000L); + m1.setType(Type.TIME_SERIES); + m1.setValues(metricValues); + metrics.add(m1); + + TimelineMetric m12 = new TimelineMetric(); + m12.setId("MAP1_BYTES"); + m12.addValue(ts, 50); + metrics.add(m12); + entity.addMetrics(metrics); + TimelineEvent event = new TimelineEvent(); + event.setId("event1"); + event.setTimestamp(ts - 2000); + entity.addEvent(event); + te.addEntity(entity); + + TimelineEntities te1 = new TimelineEntities(); + TimelineEntity entity1 = new TimelineEntity(); + String id1 = "application_1111111111_3333"; + entity1.setId(id1); + entity1.setType(TimelineEntityType.YARN_APPLICATION.toString()); + entity1.setCreatedTime(cTime); + entity1.setModifiedTime(mTime); + + // add the info map in Timeline Entity + Map<String, Object> infoMap1 = new HashMap<String, Object>(); + infoMap1.put("infoMapKey1", "infoMapValue1"); + infoMap1.put("infoMapKey2", 10); + entity1.addInfo(infoMap1); + + // add the isRelatedToEntity info + String key1 = "task"; + String value1 = "is_related_to_entity_id_here"; + Set<String> isRelatedToSet1 = new HashSet<String>(); + isRelatedToSet1.add(value1); + Map<String, Set<String>> isRelatedTo1 = new HashMap<String, Set<String>>(); + isRelatedTo1.put(key, isRelatedToSet1); + entity1.setIsRelatedToEntities(isRelatedTo1); + + // add the relatesTo info + key1 = "container"; + value1 = "relates_to_entity_id_here"; + Set<String> relatesToSet1 = new HashSet<String>(); + relatesToSet1.add(value1); + value1 = "relates_to_entity_id_here_Second"; + relatesToSet1.add(value1); + Map<String, Set<String>> relatesTo1 = new HashMap<String, Set<String>>(); + relatesTo1.put(key1, relatesToSet1); + entity1.setRelatesToEntities(relatesTo1); + + // add some config entries + Map<String, String> conf1 = new HashMap<String, String>(); + conf1.put("cfg_param1", "value1"); + conf1.put("cfg_param2", "value2"); + entity1.addConfigs(conf1); + + // add metrics + Set<TimelineMetric> metrics1 = new HashSet<>(); + TimelineMetric m2 = new TimelineMetric(); + m2.setId("MAP1_SLOT_MILLIS"); + Map<Long, Number> metricValues1 = new HashMap<Long, Number>(); + long ts1 = System.currentTimeMillis(); + metricValues1.put(ts1 - 120000, 100000000); + metricValues1.put(ts1 - 100000, 200000000); + metricValues1.put(ts1 - 80000, 300000000); + metricValues1.put(ts1 - 60000, 400000000); + metricValues1.put(ts1 - 40000, 50000000000L); + metricValues1.put(ts1 - 20000, 60000000000L); + m2.setType(Type.TIME_SERIES); + m2.setValues(metricValues1); + metrics1.add(m2); + entity1.addMetrics(metrics1); + te1.addEntity(entity1); + + TimelineEntities te2 = new TimelineEntities(); + TimelineEntity entity2 = new TimelineEntity(); + String id2 = "application_1111111111_4444"; + entity2.setId(id2); + entity2.setType(TimelineEntityType.YARN_APPLICATION.toString()); + entity2.setCreatedTime(cTime); + entity2.setModifiedTime(mTime); + te2.addEntity(entity2); + HBaseTimelineWriterImpl hbi = null; + try { + hbi = new HBaseTimelineWriterImpl(util.getConfiguration()); + hbi.init(util.getConfiguration()); + hbi.start(); + String cluster = "cluster1"; + String user = "user1"; + String flow = "some_flow_name"; + String flowVersion = "AB7822C10F1111"; + long runid = 1002345678919L; + String appName = "application_1111111111_2222"; + hbi.write(cluster, user, flow, flowVersion, runid, appName, te); + appName = "application_1111111111_3333"; + hbi.write(cluster, user, flow, flowVersion, runid, appName, te1); + appName = "application_1111111111_4444"; + hbi.write(cluster, user, flow, flowVersion, runid, appName, te2); + hbi.stop(); + } finally { + if (hbi != null) { + hbi.stop(); + hbi.close(); + } + } + } + + private static void loadEntities() throws IOException { + TimelineEntities te = new TimelineEntities(); + TimelineEntity entity = new TimelineEntity(); + String id = "hello"; + String type = "world"; + entity.setId(id); + entity.setType(type); + Long cTime = 1425016501000L; + Long mTime = 1425026901000L; + entity.setCreatedTime(cTime); + entity.setModifiedTime(mTime); + // add the info map in Timeline Entity + Map<String, Object> infoMap = new HashMap<String, Object>(); + infoMap.put("infoMapKey1", "infoMapValue1"); + infoMap.put("infoMapKey2", 10); + entity.addInfo(infoMap); + // add the isRelatedToEntity info + String key = "task"; + String value = "is_related_to_entity_id_here"; + Set<String> isRelatedToSet = new HashSet<String>(); + isRelatedToSet.add(value); + Map<String, Set<String>> isRelatedTo = new HashMap<String, Set<String>>(); + isRelatedTo.put(key, isRelatedToSet); + entity.setIsRelatedToEntities(isRelatedTo); + + // add the relatesTo info + key = "container"; + value = "relates_to_entity_id_here"; + Set<String> relatesToSet = new HashSet<String>(); + relatesToSet.add(value); + value = "relates_to_entity_id_here_Second"; + relatesToSet.add(value); + Map<String, Set<String>> relatesTo = new HashMap<String, Set<String>>(); + relatesTo.put(key, relatesToSet); + entity.setRelatesToEntities(relatesTo); + + // add some config entries + Map<String, String> conf = new HashMap<String, String>(); + conf.put("config_param1", "value1"); + conf.put("config_param2", "value2"); + conf.put("cfg_param1", "value3"); + entity.addConfigs(conf); + + // add metrics + Set<TimelineMetric> metrics = new HashSet<>(); + TimelineMetric m1 = new TimelineMetric(); + m1.setId("MAP_SLOT_MILLIS"); + Map<Long, Number> metricValues = new HashMap<Long, Number>(); + long ts = System.currentTimeMillis(); + metricValues.put(ts - 120000, 100000000); + metricValues.put(ts - 100000, 200000000); + metricValues.put(ts - 80000, 300000000); + metricValues.put(ts - 60000, 400000000); + metricValues.put(ts - 40000, 50000000000L); + metricValues.put(ts - 20000, 60000000000L); + m1.setType(Type.TIME_SERIES); + m1.setValues(metricValues); + metrics.add(m1); + + TimelineMetric m12 = new TimelineMetric(); + m12.setId("MAP1_BYTES"); + m12.addValue(ts, 50); + metrics.add(m12); + entity.addMetrics(metrics); + te.addEntity(entity); + + TimelineEntity entity1 = new TimelineEntity(); + String id1 = "hello1"; + entity1.setId(id1); + entity1.setType(type); + entity1.setCreatedTime(cTime); + entity1.setModifiedTime(mTime); + + // add the info map in Timeline Entity + Map<String, Object> infoMap1 = new HashMap<String, Object>(); + infoMap1.put("infoMapKey1", "infoMapValue1"); + infoMap1.put("infoMapKey2", 10); + entity1.addInfo(infoMap1); + + // add the isRelatedToEntity info + String key1 = "task"; + String value1 = "is_related_to_entity_id_here"; + Set<String> isRelatedToSet1 = new HashSet<String>(); + isRelatedToSet1.add(value1); + Map<String, Set<String>> isRelatedTo1 = new HashMap<String, Set<String>>(); + isRelatedTo1.put(key, isRelatedToSet1); + entity1.setIsRelatedToEntities(isRelatedTo1); + + // add the relatesTo info + key1 = "container"; + value1 = "relates_to_entity_id_here"; + Set<String> relatesToSet1 = new HashSet<String>(); + relatesToSet1.add(value1); + value1 = "relates_to_entity_id_here_Second"; + relatesToSet1.add(value1); + Map<String, Set<String>> relatesTo1 = new HashMap<String, Set<String>>(); + relatesTo1.put(key1, relatesToSet1); + entity1.setRelatesToEntities(relatesTo1); + + // add some config entries + Map<String, String> conf1 = new HashMap<String, String>(); + conf1.put("cfg_param1", "value1"); + conf1.put("cfg_param2", "value2"); + entity1.addConfigs(conf1); + + // add metrics + Set<TimelineMetric> metrics1 = new HashSet<>(); + TimelineMetric m2 = new TimelineMetric(); + m2.setId("MAP1_SLOT_MILLIS"); + Map<Long, Number> metricValues1 = new HashMap<Long, Number>(); + long ts1 = System.currentTimeMillis(); + metricValues1.put(ts1 - 120000, 100000000); + metricValues1.put(ts1 - 100000, 200000000); + metricValues1.put(ts1 - 80000, 300000000); + metricValues1.put(ts1 - 60000, 400000000); + metricValues1.put(ts1 - 40000, 50000000000L); + metricValues1.put(ts1 - 20000, 60000000000L); + m2.setType(Type.TIME_SERIES); + m2.setValues(metricValues1); + metrics1.add(m2); + entity1.addMetrics(metrics1); + te.addEntity(entity1); + + TimelineEntity entity2 = new TimelineEntity(); + String id2 = "hello2"; + entity2.setId(id2); + entity2.setType(type); + entity2.setCreatedTime(cTime); + entity2.setModifiedTime(mTime); + te.addEntity(entity2); + HBaseTimelineWriterImpl hbi = null; + try { + hbi = new HBaseTimelineWriterImpl(util.getConfiguration()); + hbi.init(util.getConfiguration()); + hbi.start(); + String cluster = "cluster1"; + String user = "user1"; + String flow = "some_flow_name"; + String flowVersion = "AB7822C10F1111"; + long runid = 1002345678919L; + String appName = "application_1231111111_1111"; + hbi.write(cluster, user, flow, flowVersion, runid, appName, te); + hbi.stop(); + } finally { + if (hbi != null) { + hbi.stop(); + hbi.close(); + } + } + } + + @Before + public void init() throws Exception { + reader = new HBaseTimelineReaderImpl(); + reader.init(util.getConfiguration()); + reader.start(); + } + + @After + public void stop() throws Exception { + if (reader != null) { + reader.stop(); + reader.close(); + } + } + private static void matchMetrics(Map<Long, Number> m1, Map<Long, Number> m2) { assertEquals(m1.size(), m2.size()); for (Map.Entry<Long, Number> entry : m2.entrySet()) { @@ -163,15 +500,11 @@ public class TestHBaseTimelineStorage { te.addEntity(entity); HBaseTimelineWriterImpl hbi = null; - HBaseTimelineReaderImpl hbr = null; try { Configuration c1 = util.getConfiguration(); hbi = new HBaseTimelineWriterImpl(c1); hbi.init(c1); hbi.start(); - hbr = new HBaseTimelineReaderImpl(); - hbr.init(c1); - hbr.start(); String cluster = "cluster_test_write_app"; String user = "user1"; String flow = "some_flow_name"; @@ -256,8 +589,8 @@ public class TestHBaseTimelineStorage { matchMetrics(metricValues, metricMap); // read the timeline entity using the reader this time - TimelineEntity e1 = hbr.getEntity(user, cluster, flow, runid, appId, - entity.getType(), entity.getId(), + TimelineEntity e1 = reader.getEntity(user, cluster, flow, runid, appId, + entity.getType(), entity.getId(), null, null, EnumSet.of(TimelineReader.Field.ALL)); assertNotNull(e1); @@ -290,10 +623,6 @@ public class TestHBaseTimelineStorage { hbi.stop(); hbi.close(); } - if (hbr != null) { - hbr.stop(); - hbr.close(); - } } } @@ -362,15 +691,11 @@ public class TestHBaseTimelineStorage { te.addEntity(entity); HBaseTimelineWriterImpl hbi = null; - HBaseTimelineReaderImpl hbr = null; try { Configuration c1 = util.getConfiguration(); hbi = new HBaseTimelineWriterImpl(c1); hbi.init(c1); hbi.start(); - hbr = new HBaseTimelineReaderImpl(); - hbr.init(c1); - hbr.start(); String cluster = "cluster_test_write_entity"; String user = "user1"; String flow = "some_flow_name"; @@ -468,12 +793,13 @@ public class TestHBaseTimelineStorage { assertEquals(17, colCount); // read the timeline entity using the reader this time - TimelineEntity e1 = hbr.getEntity(user, cluster, flow, runid, appName, - entity.getType(), entity.getId(), + TimelineEntity e1 = reader.getEntity(user, cluster, flow, runid, appName, + entity.getType(), entity.getId(), null, null, EnumSet.of(TimelineReader.Field.ALL)); - Set<TimelineEntity> es1 = hbr.getEntities(user, cluster, flow, runid, + Set<TimelineEntity> es1 = reader.getEntities(user, cluster, flow, runid, appName, entity.getType(), null, null, null, null, null, null, null, - null, null, null, null, EnumSet.of(TimelineReader.Field.ALL)); + null, null, null, null, null, null, + EnumSet.of(TimelineReader.Field.ALL)); assertNotNull(e1); assertEquals(1, es1.size()); @@ -505,10 +831,6 @@ public class TestHBaseTimelineStorage { hbi.stop(); hbi.close(); } - if (hbr != null) { - hbr.stop(); - hbr.close(); - } } } @@ -559,15 +881,11 @@ public class TestHBaseTimelineStorage { entities.addEntity(entity); HBaseTimelineWriterImpl hbi = null; - HBaseTimelineReaderImpl hbr = null; try { Configuration c1 = util.getConfiguration(); hbi = new HBaseTimelineWriterImpl(c1); hbi.init(c1); hbi.start(); - hbr = new HBaseTimelineReaderImpl(); - hbr.init(c1); - hbr.start(); String cluster = "cluster_test_events"; String user = "user2"; String flow = "other_flow_name"; @@ -612,11 +930,11 @@ public class TestHBaseTimelineStorage { } // read the timeline entity using the reader this time - TimelineEntity e1 = hbr.getEntity(user, cluster, flow, runid, appName, - entity.getType(), entity.getId(), + TimelineEntity e1 = reader.getEntity(user, cluster, flow, runid, appName, + entity.getType(), entity.getId(), null, null, EnumSet.of(TimelineReader.Field.ALL)); - TimelineEntity e2 = hbr.getEntity(user, cluster, null, null, appName, - entity.getType(), entity.getId(), + TimelineEntity e2 = reader.getEntity(user, cluster, null, null, appName, + entity.getType(), entity.getId(), null, null, EnumSet.of(TimelineReader.Field.ALL)); assertNotNull(e1); assertNotNull(e2); @@ -641,10 +959,6 @@ public class TestHBaseTimelineStorage { hbi.stop(); hbi.close(); } - if (hbr != null) { - hbr.stop(); - hbr.close(); - } } } @@ -665,15 +979,11 @@ public class TestHBaseTimelineStorage { entities.addEntity(entity); HBaseTimelineWriterImpl hbi = null; - HBaseTimelineReaderImpl hbr = null; try { Configuration c1 = util.getConfiguration(); hbi = new HBaseTimelineWriterImpl(c1); hbi.init(c1); hbi.start(); - hbr = new HBaseTimelineReaderImpl(); - hbr.init(c1); - hbr.start(); String cluster = "cluster_test_empty_eventkey"; String user = "user_emptyeventkey"; String flow = "other_flow_name"; @@ -726,12 +1036,13 @@ public class TestHBaseTimelineStorage { assertEquals(1, rowCount); // read the timeline entity using the reader this time - TimelineEntity e1 = hbr.getEntity(user, cluster, flow, runid, appName, - entity.getType(), entity.getId(), + TimelineEntity e1 = reader.getEntity(user, cluster, flow, runid, appName, + entity.getType(), entity.getId(), null, null, EnumSet.of(TimelineReader.Field.ALL)); - Set<TimelineEntity> es1 = hbr.getEntities(user, cluster, flow, runid, + Set<TimelineEntity> es1 = reader.getEntities(user, cluster, flow, runid, appName, entity.getType(), null, null, null, null, null, null, null, - null, null, null, null, EnumSet.of(TimelineReader.Field.ALL)); + null, null, null, null, null, null, + EnumSet.of(TimelineReader.Field.ALL)); assertNotNull(e1); assertEquals(1, es1.size()); @@ -748,8 +1059,6 @@ public class TestHBaseTimelineStorage { } finally { hbi.stop(); hbi.close(); - hbr.stop();; - hbr.close(); } } @@ -816,6 +1125,291 @@ public class TestHBaseTimelineStorage { } } + @Test + public void testReadEntities() throws Exception { + TimelineEntity e1 = reader.getEntity("user1", "cluster1", "some_flow_name", + 1002345678919L, "application_1231111111_1111","world", "hello", null, + null, EnumSet.of(Field.ALL)); + assertNotNull(e1); + assertEquals(3, e1.getConfigs().size()); + assertEquals(1, e1.getIsRelatedToEntities().size()); + Set<TimelineEntity> es1 = reader.getEntities("user1", "cluster1", + "some_flow_name", 1002345678919L, "application_1231111111_1111","world", + null, null, null, null, null, null, null, null, null, null, null, null, + null, EnumSet.of(Field.ALL)); + assertEquals(3, es1.size()); + } + + @Test + public void testReadEntitiesDefaultView() throws Exception { + TimelineEntity e1 = + reader.getEntity("user1", "cluster1", "some_flow_name", 1002345678919L, + "application_1231111111_1111","world", "hello", null, null, null); + assertNotNull(e1); + assertTrue(e1.getInfo().isEmpty() && e1.getConfigs().isEmpty() && + e1.getMetrics().isEmpty() && e1.getIsRelatedToEntities().isEmpty() && + e1.getRelatesToEntities().isEmpty()); + Set<TimelineEntity> es1 = reader.getEntities("user1", "cluster1", + "some_flow_name", 1002345678919L, "application_1231111111_1111","world", + null, null, null, null, null, null, null, null, null, null, null, null, + null, null); + assertEquals(3, es1.size()); + for (TimelineEntity e : es1) { + assertTrue(e.getInfo().isEmpty() && e.getConfigs().isEmpty() && + e.getMetrics().isEmpty() && e.getIsRelatedToEntities().isEmpty() && + e.getRelatesToEntities().isEmpty()); + } + } + + @Test + public void testReadEntitiesByFields() throws Exception { + TimelineEntity e1 = + reader.getEntity("user1", "cluster1", "some_flow_name", 1002345678919L, + "application_1231111111_1111","world", "hello", null, null, + EnumSet.of(Field.INFO, Field.CONFIGS)); + assertNotNull(e1); + assertEquals(3, e1.getConfigs().size()); + assertEquals(0, e1.getIsRelatedToEntities().size()); + Set<TimelineEntity> es1 = reader.getEntities("user1", "cluster1", + "some_flow_name", 1002345678919L, "application_1231111111_1111","world", + null, null, null, null, null, null, null, null, null, null, null, null, + null, EnumSet.of(Field.IS_RELATED_TO, Field.METRICS)); + assertEquals(3, es1.size()); + int metricsCnt = 0; + int isRelatedToCnt = 0; + int infoCnt = 0; + for (TimelineEntity entity : es1) { + metricsCnt += entity.getMetrics().size(); + isRelatedToCnt += entity.getIsRelatedToEntities().size(); + infoCnt += entity.getInfo().size(); + } + assertEquals(0, infoCnt); + assertEquals(2, isRelatedToCnt); + assertEquals(3, metricsCnt); + } + + @Test + public void testReadEntitiesConfigPrefix() throws Exception { + TimelineFilterList list = + new TimelineFilterList(Operator.OR, + new TimelinePrefixFilter(TimelineCompareOp.EQUAL, "cfg_")); + TimelineEntity e1 = + reader.getEntity("user1", "cluster1", "some_flow_name", 1002345678919L, + "application_1231111111_1111","world", "hello", list, null, null); + assertNotNull(e1); + assertEquals(1, e1.getConfigs().size()); + Set<TimelineEntity> es1 = reader.getEntities("user1", "cluster1", + "some_flow_name", 1002345678919L, "application_1231111111_1111","world", + null, null, null, null, null, null, null, null, null, null, null, + list, null, null); + int cfgCnt = 0; + for (TimelineEntity entity : es1) { + cfgCnt += entity.getConfigs().size(); + } + assertEquals(3, cfgCnt); + } + + @Test + public void testReadEntitiesConfigFilterPrefix() throws Exception { + Map<String, String> confFilters = ImmutableMap.of("cfg_param1","value1"); + TimelineFilterList list = + new TimelineFilterList(Operator.OR, + new TimelinePrefixFilter(TimelineCompareOp.EQUAL, "cfg_")); + Set<TimelineEntity> entities = reader.getEntities("user1", "cluster1", + "some_flow_name", 1002345678919L, "application_1231111111_1111","world", + null, null, null, null, null, null, null, null, confFilters, null, null, + list, null, null); + assertEquals(1, entities.size()); + int cfgCnt = 0; + for (TimelineEntity entity : entities) { + cfgCnt += entity.getConfigs().size(); + } + assertEquals(2, cfgCnt); + } + + @Test + public void testReadEntitiesMetricPrefix() throws Exception { + TimelineFilterList list = + new TimelineFilterList(Operator.OR, + new TimelinePrefixFilter(TimelineCompareOp.EQUAL, "MAP1_")); + TimelineEntity e1 = + reader.getEntity("user1", "cluster1", "some_flow_name", 1002345678919L, + "application_1231111111_1111","world", "hello", null, list, null); + assertNotNull(e1); + assertEquals(1, e1.getMetrics().size()); + Set<TimelineEntity> es1 = reader.getEntities("user1", "cluster1", + "some_flow_name", 1002345678919L, "application_1231111111_1111","world", + null, null, null, null, null, null, null, null, null, null, null, null, + list, null); + int metricCnt = 0; + for (TimelineEntity entity : es1) { + metricCnt += entity.getMetrics().size(); + } + assertEquals(2, metricCnt); + } + + @Test + public void testReadEntitiesMetricFilterPrefix() throws Exception { + Set<String> metricFilters = ImmutableSet.of("MAP1_SLOT_MILLIS"); + TimelineFilterList list = + new TimelineFilterList(Operator.OR, + new TimelinePrefixFilter(TimelineCompareOp.EQUAL, "MAP1_")); + Set<TimelineEntity> entities = reader.getEntities("user1", "cluster1", + "some_flow_name", 1002345678919L, "application_1231111111_1111","world", + null, null, null, null, null, null, null, null, null, metricFilters, + null, null, list, null); + assertEquals(1, entities.size()); + int metricCnt = 0; + for (TimelineEntity entity : entities) { + metricCnt += entity.getMetrics().size(); + } + assertEquals(1, metricCnt); + } + + @Test + public void testReadApps() throws Exception { + TimelineEntity e1 = reader.getEntity("user1", "cluster1", "some_flow_name", + 1002345678919L, "application_1111111111_2222", + TimelineEntityType.YARN_APPLICATION.toString(), null, null, null, + EnumSet.of(Field.ALL)); + assertNotNull(e1); + assertEquals(3, e1.getConfigs().size()); + assertEquals(1, e1.getIsRelatedToEntities().size()); + Set<TimelineEntity> es1 = reader.getEntities("user1", "cluster1", + "some_flow_name", 1002345678919L, null, + TimelineEntityType.YARN_APPLICATION.toString(), null, null, null, null, + null, null, null, null, null, null, null, null, null, + EnumSet.of(Field.ALL)); + assertEquals(3, es1.size()); + } + + @Test + public void testReadAppsDefaultView() throws Exception { + TimelineEntity e1 = reader.getEntity("user1", "cluster1", "some_flow_name", + 1002345678919L, "application_1111111111_2222", + TimelineEntityType.YARN_APPLICATION.toString(), null, null, null, null); + assertNotNull(e1); + assertTrue(e1.getInfo().isEmpty() && e1.getConfigs().isEmpty() && + e1.getMetrics().isEmpty() && e1.getIsRelatedToEntities().isEmpty() && + e1.getRelatesToEntities().isEmpty()); + Set<TimelineEntity> es1 = reader.getEntities("user1", "cluster1", + "some_flow_name", 1002345678919L, null, + TimelineEntityType.YARN_APPLICATION.toString(), null, null, null, null, + null, null, null, null, null, null, null, null, null, null); + assertEquals(3, es1.size()); + for (TimelineEntity e : es1) { + assertTrue(e.getInfo().isEmpty() && e.getConfigs().isEmpty() && + e.getMetrics().isEmpty() && e.getIsRelatedToEntities().isEmpty() && + e.getRelatesToEntities().isEmpty()); + } + } + + @Test + public void testReadAppsByFields() throws Exception { + TimelineEntity e1 = reader.getEntity("user1", "cluster1", "some_flow_name", + 1002345678919L, "application_1111111111_2222", + TimelineEntityType.YARN_APPLICATION.toString(), null, null, null, + EnumSet.of(Field.INFO, Field.CONFIGS)); + assertNotNull(e1); + assertEquals(3, e1.getConfigs().size()); + assertEquals(0, e1.getIsRelatedToEntities().size()); + Set<TimelineEntity> es1 = reader.getEntities("user1", "cluster1", + "some_flow_name", 1002345678919L, null, + TimelineEntityType.YARN_APPLICATION.toString(), null, null, null, null, + null, null, null, null, null, null, null, null, null, + EnumSet.of(Field.IS_RELATED_TO, Field.METRICS)); + assertEquals(3, es1.size()); + int metricsCnt = 0; + int isRelatedToCnt = 0; + int infoCnt = 0; + for (TimelineEntity entity : es1) { + metricsCnt += entity.getMetrics().size(); + isRelatedToCnt += entity.getIsRelatedToEntities().size(); + infoCnt += entity.getInfo().size(); + } + assertEquals(0, infoCnt); + assertEquals(2, isRelatedToCnt); + assertEquals(3, metricsCnt); + } + + @Test + public void testReadAppsConfigPrefix() throws Exception { + TimelineFilterList list = + new TimelineFilterList(Operator.OR, + new TimelinePrefixFilter(TimelineCompareOp.EQUAL, "cfg_")); + TimelineEntity e1 = reader.getEntity("user1", "cluster1", "some_flow_name", + 1002345678919L, "application_1111111111_2222", + TimelineEntityType.YARN_APPLICATION.toString(), null, list, null, null); + assertNotNull(e1); + assertEquals(1, e1.getConfigs().size()); + Set<TimelineEntity> es1 = reader.getEntities("user1", "cluster1", + "some_flow_name", 1002345678919L, null, + TimelineEntityType.YARN_APPLICATION.toString(), null, null, null, null, + null, null, null, null, null, null, null, list, null, null); + int cfgCnt = 0; + for (TimelineEntity entity : es1) { + cfgCnt += entity.getConfigs().size(); + } + assertEquals(3, cfgCnt); + } + + @Test + public void testReadAppsConfigFilterPrefix() throws Exception { + Map<String, String> confFilters = ImmutableMap.of("cfg_param1","value1"); + TimelineFilterList list = + new TimelineFilterList(Operator.OR, + new TimelinePrefixFilter(TimelineCompareOp.EQUAL, "cfg_")); + Set<TimelineEntity> entities = reader.getEntities("user1", "cluster1", + "some_flow_name", 1002345678919L, null, + TimelineEntityType.YARN_APPLICATION.toString(), null, null, null, null, + null, null, null, null, confFilters, null, null, list, null, null); + assertEquals(1, entities.size()); + int cfgCnt = 0; + for (TimelineEntity entity : entities) { + cfgCnt += entity.getConfigs().size(); + } + assertEquals(2, cfgCnt); + } + + @Test + public void testReadAppsMetricPrefix() throws Exception { + TimelineFilterList list = + new TimelineFilterList(Operator.OR, + new TimelinePrefixFilter(TimelineCompareOp.EQUAL, "MAP1_")); + TimelineEntity e1 = reader.getEntity("user1", "cluster1", "some_flow_name", + 1002345678919L, "application_1111111111_2222", + TimelineEntityType.YARN_APPLICATION.toString(), null, null, list, null); + assertNotNull(e1); + assertEquals(1, e1.getMetrics().size()); + Set<TimelineEntity> es1 = reader.getEntities("user1", "cluster1", + "some_flow_name", 1002345678919L, null, + TimelineEntityType.YARN_APPLICATION.toString(), null, null, null, null, + null, null, null, null, null, null, null, null, list, null); + int metricCnt = 0; + for (TimelineEntity entity : es1) { + metricCnt += entity.getMetrics().size(); + } + assertEquals(2, metricCnt); + } + + @Test + public void testReadAppsMetricFilterPrefix() throws Exception { + TimelineFilterList list = + new TimelineFilterList(Operator.OR, + new TimelinePrefixFilter(TimelineCompareOp.EQUAL, "MAP1_")); + Set<String> metricFilters = ImmutableSet.of("MAP1_SLOT_MILLIS"); + Set<TimelineEntity> entities = reader.getEntities("user1", "cluster1", + "some_flow_name", 1002345678919L, null, + TimelineEntityType.YARN_APPLICATION.toString(), null, null, null, null, + null, null, null, null, null, metricFilters, null, null, list, null); + int metricCnt = 0; + assertEquals(1, entities.size()); + for (TimelineEntity entity : entities) { + metricCnt += entity.getMetrics().size(); + } + assertEquals(1, metricCnt); + } + @AfterClass public static void tearDownAfterClass() throws Exception { util.shutdownMiniCluster(); http://git-wip-us.apache.org/repos/asf/hadoop/blob/cdb96df9/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/TestHBaseStorageFlowActivity.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/TestHBaseStorageFlowActivity.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/TestHBaseStorageFlowActivity.java index c957dad..434adac 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/TestHBaseStorageFlowActivity.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/TestHBaseStorageFlowActivity.java @@ -182,7 +182,7 @@ public class TestHBaseStorageFlowActivity { Set<TimelineEntity> entities = hbr.getEntities(null, cluster, null, null, null, TimelineEntityType.YARN_FLOW_ACTIVITY.toString(), 10L, null, null, - null, null, null, null, null, null, null, null, null); + null, null, null, null, null, null, null, null, null, null, null); assertEquals(1, entities.size()); for (TimelineEntity e : entities) { FlowActivityEntity flowActivity = (FlowActivityEntity)e; @@ -238,7 +238,7 @@ public class TestHBaseStorageFlowActivity { Set<TimelineEntity> entities = hbr.getEntities(user, cluster, flow, null, null, TimelineEntityType.YARN_FLOW_ACTIVITY.toString(), 10L, null, null, - null, null, null, null, null, null, null, null, null); + null, null, null, null, null, null, null, null, null, null, null); assertEquals(1, entities.size()); for (TimelineEntity e : entities) { FlowActivityEntity entity = (FlowActivityEntity)e; @@ -353,7 +353,7 @@ public class TestHBaseStorageFlowActivity { Set<TimelineEntity> entities = hbr.getEntities(null, cluster, null, null, null, TimelineEntityType.YARN_FLOW_ACTIVITY.toString(), 10L, null, null, - null, null, null, null, null, null, null, null, null); + null, null, null, null, null, null, null, null, null, null, null); assertEquals(1, entities.size()); for (TimelineEntity e : entities) { FlowActivityEntity flowActivity = (FlowActivityEntity)e; http://git-wip-us.apache.org/repos/asf/hadoop/blob/cdb96df9/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/TestHBaseStorageFlowRun.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/TestHBaseStorageFlowRun.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/TestHBaseStorageFlowRun.java index 4fb8f0e..5da0192 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/TestHBaseStorageFlowRun.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/TestHBaseStorageFlowRun.java @@ -24,6 +24,7 @@ import static org.junit.Assert.assertTrue; import static org.junit.Assert.fail; import java.io.IOException; +import java.util.EnumSet; import java.util.Map; import java.util.Set; @@ -44,9 +45,13 @@ import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity; import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntityType; import org.apache.hadoop.yarn.api.records.timelineservice.TimelineMetric; import org.apache.hadoop.yarn.server.timeline.GenericObjectMapper; +import org.apache.hadoop.yarn.server.timelineservice.reader.filter.TimelineCompareOp; +import org.apache.hadoop.yarn.server.timelineservice.reader.filter.TimelineFilterList; +import org.apache.hadoop.yarn.server.timelineservice.reader.filter.TimelinePrefixFilter; import org.apache.hadoop.yarn.server.timelineservice.storage.HBaseTimelineReaderImpl; import org.apache.hadoop.yarn.server.timelineservice.storage.HBaseTimelineWriterImpl; import org.apache.hadoop.yarn.server.timelineservice.storage.TimelineSchemaCreator; +import org.apache.hadoop.yarn.server.timelineservice.storage.TimelineReader.Field; import org.apache.hadoop.yarn.server.timelineservice.storage.common.ColumnHelper; import org.junit.AfterClass; import org.junit.BeforeClass; @@ -178,9 +183,8 @@ public class TestHBaseStorageFlowRun { hbr.init(c1); hbr.start(); // get the flow run entity - TimelineEntity entity = - hbr.getEntity(user, cluster, flow, runid, null, - TimelineEntityType.YARN_FLOW_RUN.toString(), null, null); + TimelineEntity entity = hbr.getEntity(user, cluster, flow, runid, null, + TimelineEntityType.YARN_FLOW_RUN.toString(), null, null, null, null); assertTrue(TimelineEntityType.YARN_FLOW_RUN.matches(entity.getType())); FlowRunEntity flowRun = (FlowRunEntity)entity; assertEquals(minStartTs, flowRun.getStartTime()); @@ -238,9 +242,8 @@ public class TestHBaseStorageFlowRun { hbr = new HBaseTimelineReaderImpl(); hbr.init(c1); hbr.start(); - TimelineEntity entity = - hbr.getEntity(user, cluster, flow, runid, null, - TimelineEntityType.YARN_FLOW_RUN.toString(), null, null); + TimelineEntity entity = hbr.getEntity(user, cluster, flow, runid, null, + TimelineEntityType.YARN_FLOW_RUN.toString(), null, null, null, null); assertTrue(TimelineEntityType.YARN_FLOW_RUN.matches(entity.getType())); Set<TimelineMetric> metrics = entity.getMetrics(); assertEquals(2, metrics.size()); @@ -305,6 +308,181 @@ public class TestHBaseStorageFlowRun { assertEquals(1, rowCount); } + @Test + public void testWriteFlowRunMetricsPrefix() throws Exception { + String cluster = "testWriteFlowRunMetricsOneFlow_cluster1"; + String user = "testWriteFlowRunMetricsOneFlow_user1"; + String flow = "testing_flowRun_metrics_flow_name"; + String flowVersion = "CF7022C10F1354"; + long runid = 1002345678919L; + + TimelineEntities te = new TimelineEntities(); + TimelineEntity entityApp1 = TestFlowDataGenerator.getEntityMetricsApp1(); + te.addEntity(entityApp1); + + HBaseTimelineWriterImpl hbi = null; + Configuration c1 = util.getConfiguration(); + try { + hbi = new HBaseTimelineWriterImpl(c1); + hbi.init(c1); + String appName = "application_11111111111111_1111"; + hbi.write(cluster, user, flow, flowVersion, runid, appName, te); + // write another application with same metric to this flow + te = new TimelineEntities(); + TimelineEntity entityApp2 = TestFlowDataGenerator.getEntityMetricsApp2(); + te.addEntity(entityApp2); + appName = "application_11111111111111_2222"; + hbi.write(cluster, user, flow, flowVersion, runid, appName, te); + hbi.flush(); + } finally { + hbi.close(); + } + + // check flow run + checkFlowRunTable(cluster, user, flow, runid, c1); + + // use the timeline reader to verify data + HBaseTimelineReaderImpl hbr = null; + try { + hbr = new HBaseTimelineReaderImpl(); + hbr.init(c1); + hbr.start(); + TimelineFilterList metricsToRetrieve = + new TimelineFilterList(new TimelinePrefixFilter(TimelineCompareOp.EQUAL, + metric1.substring(0, metric1.indexOf("_") + 1))); + TimelineEntity entity = hbr.getEntity(user, cluster, flow, runid, null, + TimelineEntityType.YARN_FLOW_RUN.toString(), null, null, + metricsToRetrieve, null); + assertTrue(TimelineEntityType.YARN_FLOW_RUN.matches(entity.getType())); + Set<TimelineMetric> metrics = entity.getMetrics(); + assertEquals(1, metrics.size()); + for (TimelineMetric metric : metrics) { + String id = metric.getId(); + Map<Long, Number> values = metric.getValues(); + assertEquals(1, values.size()); + Number value = null; + for (Number n : values.values()) { + value = n; + } + switch (id) { + case metric1: + assertEquals(141L, value); + break; + default: + fail("unrecognized metric: " + id); + } + } + + Set<TimelineEntity> entities = hbr.getEntities(user, cluster, flow, runid, + null, TimelineEntityType.YARN_FLOW_RUN.toString(), null, null, null, + null, null, null, null, null, null, null, null, null, + metricsToRetrieve, null); + assertEquals(1, entities.size()); + for (TimelineEntity timelineEntity : entities) { + Set<TimelineMetric> timelineMetrics = timelineEntity.getMetrics(); + assertEquals(1, timelineMetrics.size()); + for (TimelineMetric metric : timelineMetrics) { + String id = metric.getId(); + Map<Long, Number> values = metric.getValues(); + assertEquals(1, values.size()); + Number value = null; + for (Number n : values.values()) { + value = n; + } + switch (id) { + case metric1: + assertEquals(141L, value); + break; + default: + fail("unrecognized metric: " + id); + } + } + } + } finally { + hbr.close(); + } + } + + @Test + public void testWriteFlowRunsMetricFields() throws Exception { + String cluster = "testWriteFlowRunMetricsOneFlow_cluster1"; + String user = "testWriteFlowRunMetricsOneFlow_user1"; + String flow = "testing_flowRun_metrics_flow_name"; + String flowVersion = "CF7022C10F1354"; + long runid = 1002345678919L; + + TimelineEntities te = new TimelineEntities(); + TimelineEntity entityApp1 = TestFlowDataGenerator.getEntityMetricsApp1(); + te.addEntity(entityApp1); + + HBaseTimelineWriterImpl hbi = null; + Configuration c1 = util.getConfiguration(); + try { + hbi = new HBaseTimelineWriterImpl(c1); + hbi.init(c1); + String appName = "application_11111111111111_1111"; + hbi.write(cluster, user, flow, flowVersion, runid, appName, te); + // write another application with same metric to this flow + te = new TimelineEntities(); + TimelineEntity entityApp2 = TestFlowDataGenerator.getEntityMetricsApp2(); + te.addEntity(entityApp2); + appName = "application_11111111111111_2222"; + hbi.write(cluster, user, flow, flowVersion, runid, appName, te); + hbi.flush(); + } finally { + hbi.close(); + } + + // check flow run + checkFlowRunTable(cluster, user, flow, runid, c1); + + // use the timeline reader to verify data + HBaseTimelineReaderImpl hbr = null; + try { + hbr = new HBaseTimelineReaderImpl(); + hbr.init(c1); + hbr.start(); + Set<TimelineEntity> entities = hbr.getEntities(user, cluster, flow, runid, + null, TimelineEntityType.YARN_FLOW_RUN.toString(), null, null, null, + null, null, null, null, null, null, null, null, null, null, null); + assertEquals(1, entities.size()); + for (TimelineEntity timelineEntity : entities) { + assertEquals(0, timelineEntity.getMetrics().size()); + } + + entities = hbr.getEntities(user, cluster, flow, runid, + null, TimelineEntityType.YARN_FLOW_RUN.toString(), null, null, null, + null, null, null, null, null, null, null, null, null, + null, EnumSet.of(Field.METRICS)); + assertEquals(1, entities.size()); + for (TimelineEntity timelineEntity : entities) { + Set<TimelineMetric> timelineMetrics = timelineEntity.getMetrics(); + assertEquals(2, timelineMetrics.size()); + for (TimelineMetric metric : timelineMetrics) { + String id = metric.getId(); + Map<Long, Number> values = metric.getValues(); + assertEquals(1, values.size()); + Number value = null; + for (Number n : values.values()) { + value = n; + } + switch (id) { + case metric1: + assertEquals(141L, value); + break; + case metric2: + assertEquals(57L, value); + break; + default: + fail("unrecognized metric: " + id); + } + } + } + } finally { + hbr.close(); + } + } + @AfterClass public static void tearDownAfterClass() throws Exception { util.shutdownMiniCluster();