http://git-wip-us.apache.org/repos/asf/hadoop/blob/8e0137ad/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/TestHBaseTimelineWriterImpl.java
----------------------------------------------------------------------
diff --git
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/TestHBaseTimelineWriterImpl.java
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/TestHBaseTimelineWriterImpl.java
index ab02779..95f88d1 100644
---
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/TestHBaseTimelineWriterImpl.java
+++
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/TestHBaseTimelineWriterImpl.java
@@ -35,6 +35,7 @@ import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HBaseTestingUtility;
import org.apache.hadoop.hbase.client.Connection;
import org.apache.hadoop.hbase.client.ConnectionFactory;
+import org.apache.hadoop.hbase.client.Get;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.ResultScanner;
import org.apache.hadoop.hbase.client.Scan;
@@ -47,6 +48,10 @@ import
org.apache.hadoop.yarn.api.records.timelineservice.TimelineEvent;
import org.apache.hadoop.yarn.api.records.timelineservice.TimelineMetric;
import org.apache.hadoop.yarn.api.records.timelineservice.TimelineMetric.Type;
import org.apache.hadoop.yarn.server.metrics.ApplicationMetricsConstants;
+import
org.apache.hadoop.yarn.server.timelineservice.storage.application.ApplicationColumn;
+import
org.apache.hadoop.yarn.server.timelineservice.storage.application.ApplicationColumnPrefix;
+import
org.apache.hadoop.yarn.server.timelineservice.storage.application.ApplicationRowKey;
+import
org.apache.hadoop.yarn.server.timelineservice.storage.application.ApplicationTable;
import
org.apache.hadoop.yarn.server.timelineservice.storage.apptoflow.AppToFlowTable;
import org.apache.hadoop.yarn.server.timelineservice.storage.common.Separator;
import
org.apache.hadoop.yarn.server.timelineservice.storage.common.TimelineWriterUtils;
@@ -60,7 +65,15 @@ import org.junit.BeforeClass;
import org.junit.Test;
/**
- * @throws Exception
+ * Various tests to test writing entities to HBase and reading them back from
+ * it.
+ *
+ * It uses a single HBase mini-cluster for all tests which is a little more
+ * realistic, and helps test correctness in the presence of other data.
+ *
+ * Each test uses a different cluster name to be able to handle its own data
+ * even if other records exist in the table. Use a different cluster name if
+ * you add a new test.
*/
public class TestHBaseTimelineWriterImpl {
@@ -78,6 +91,199 @@ public class TestHBaseTimelineWriterImpl {
.createTable(util.getHBaseAdmin(), util.getConfiguration());
new AppToFlowTable()
.createTable(util.getHBaseAdmin(), util.getConfiguration());
+ new ApplicationTable()
+ .createTable(util.getHBaseAdmin(), util.getConfiguration());
+ }
+
+ @Test
+ public void testWriteApplicationToHBase() throws Exception {
+ TimelineEntities te = new TimelineEntities();
+ ApplicationEntity entity = new ApplicationEntity();
+ String id = "hello";
+ entity.setId(id);
+ Long cTime = 1425016501000L;
+ Long mTime = 1425026901000L;
+ entity.setCreatedTime(cTime);
+ entity.setModifiedTime(mTime);
+
+ // add the info map in Timeline Entity
+ Map<String, Object> infoMap = new HashMap<String, Object>();
+ infoMap.put("infoMapKey1", "infoMapValue1");
+ infoMap.put("infoMapKey2", 10);
+ entity.addInfo(infoMap);
+
+ // add the isRelatedToEntity info
+ String key = "task";
+ String value = "is_related_to_entity_id_here";
+ Set<String> isRelatedToSet = new HashSet<String>();
+ isRelatedToSet.add(value);
+ Map<String, Set<String>> isRelatedTo = new HashMap<String, Set<String>>();
+ isRelatedTo.put(key, isRelatedToSet);
+ entity.setIsRelatedToEntities(isRelatedTo);
+
+ // add the relatesTo info
+ key = "container";
+ value = "relates_to_entity_id_here";
+ Set<String> relatesToSet = new HashSet<String>();
+ relatesToSet.add(value);
+ value = "relates_to_entity_id_here_Second";
+ relatesToSet.add(value);
+ Map<String, Set<String>> relatesTo = new HashMap<String, Set<String>>();
+ relatesTo.put(key, relatesToSet);
+ entity.setRelatesToEntities(relatesTo);
+
+ // add some config entries
+ Map<String, String> conf = new HashMap<String, String>();
+ conf.put("config_param1", "value1");
+ conf.put("config_param2", "value2");
+ entity.addConfigs(conf);
+
+ // add metrics
+ Set<TimelineMetric> metrics = new HashSet<>();
+ TimelineMetric m1 = new TimelineMetric();
+ m1.setId("MAP_SLOT_MILLIS");
+ Map<Long, Number> metricValues = new HashMap<Long, Number>();
+ long ts = System.currentTimeMillis();
+ metricValues.put(ts - 120000, 100000000);
+ metricValues.put(ts - 100000, 200000000);
+ metricValues.put(ts - 80000, 300000000);
+ metricValues.put(ts - 60000, 400000000);
+ metricValues.put(ts - 40000, 50000000000L);
+ metricValues.put(ts - 20000, 60000000000L);
+ m1.setType(Type.TIME_SERIES);
+ m1.setValues(metricValues);
+ metrics.add(m1);
+ entity.addMetrics(metrics);
+
+ te.addEntity(entity);
+
+ HBaseTimelineWriterImpl hbi = null;
+ HBaseTimelineReaderImpl hbr = null;
+ try {
+ Configuration c1 = util.getConfiguration();
+ hbi = new HBaseTimelineWriterImpl(c1);
+ hbi.init(c1);
+ hbi.start();
+ hbr = new HBaseTimelineReaderImpl();
+ hbr.init(c1);
+ hbr.start();
+ String cluster = "cluster_test_write_app";
+ String user = "user1";
+ String flow = "some_flow_name";
+ String flowVersion = "AB7822C10F1111";
+ long runid = 1002345678919L;
+ hbi.write(cluster, user, flow, flowVersion, runid, id, te);
+ hbi.stop();
+
+ // retrieve the row
+ byte[] rowKey =
+ ApplicationRowKey.getRowKey(cluster, user, flow, runid, id);
+ Get get = new Get(rowKey);
+ get.setMaxVersions(Integer.MAX_VALUE);
+ Connection conn = ConnectionFactory.createConnection(c1);
+ Result result = new ApplicationTable().getResult(c1, conn, get);
+
+ assertTrue(result != null);
+ assertEquals(16, result.size());
+
+ // check the row key
+ byte[] row1 = result.getRow();
+ assertTrue(isApplicationRowKeyCorrect(row1, cluster, user, flow, runid,
+ id));
+
+ // check info column family
+ String id1 = ApplicationColumn.ID.readResult(result).toString();
+ assertEquals(id, id1);
+
+ Number val =
+ (Number) ApplicationColumn.CREATED_TIME.readResult(result);
+ Long cTime1 = val.longValue();
+ assertEquals(cTime1, cTime);
+
+ val = (Number) ApplicationColumn.MODIFIED_TIME.readResult(result);
+ Long mTime1 = val.longValue();
+ assertEquals(mTime1, mTime);
+
+ Map<String, Object> infoColumns =
+ ApplicationColumnPrefix.INFO.readResults(result);
+ assertEquals(infoMap.size(), infoColumns.size());
+ for (String infoItem : infoMap.keySet()) {
+ assertEquals(infoMap.get(infoItem), infoColumns.get(infoItem));
+ }
+
+ // Remember isRelatedTo is of type Map<String, Set<String>>
+ for (String isRelatedToKey : isRelatedTo.keySet()) {
+ Object isRelatedToValue =
+ ApplicationColumnPrefix.IS_RELATED_TO.readResult(result,
+ isRelatedToKey);
+ String compoundValue = isRelatedToValue.toString();
+ // id7?id9?id6
+ Set<String> isRelatedToValues =
+ new HashSet<String>(Separator.VALUES.splitEncoded(compoundValue));
+ assertEquals(isRelatedTo.get(isRelatedToKey).size(),
+ isRelatedToValues.size());
+ for (String v : isRelatedTo.get(isRelatedToKey)) {
+ assertTrue(isRelatedToValues.contains(v));
+ }
+ }
+
+ // RelatesTo
+ for (String relatesToKey : relatesTo.keySet()) {
+ String compoundValue =
+ ApplicationColumnPrefix.RELATES_TO.readResult(result,
+ relatesToKey).toString();
+ // id3?id4?id5
+ Set<String> relatesToValues =
+ new HashSet<String>(Separator.VALUES.splitEncoded(compoundValue));
+ assertEquals(relatesTo.get(relatesToKey).size(),
+ relatesToValues.size());
+ for (String v : relatesTo.get(relatesToKey)) {
+ assertTrue(relatesToValues.contains(v));
+ }
+ }
+
+ // Configuration
+ Map<String, Object> configColumns =
+ ApplicationColumnPrefix.CONFIG.readResults(result);
+ assertEquals(conf.size(), configColumns.size());
+ for (String configItem : conf.keySet()) {
+ assertEquals(conf.get(configItem), configColumns.get(configItem));
+ }
+
+ NavigableMap<String, NavigableMap<Long, Number>> metricsResult =
+ ApplicationColumnPrefix.METRIC.readResultsWithTimestamps(result);
+
+ NavigableMap<Long, Number> metricMap = metricsResult.get(m1.getId());
+ // We got metrics back
+ assertNotNull(metricMap);
+ // Same number of metrics as we wrote
+ assertEquals(metricValues.entrySet().size(),
metricMap.entrySet().size());
+
+ // Iterate over original metrics and confirm that they are present
+ // here.
+ for (Entry<Long, Number> metricEntry : metricValues.entrySet()) {
+ assertEquals(metricEntry.getValue(),
+ metricMap.get(metricEntry.getKey()));
+ }
+
+ TimelineEntity e1 = hbr.getEntity(user, cluster, flow, runid, id,
+ entity.getType(), entity.getId(),
+ EnumSet.of(TimelineReader.Field.ALL));
+ Set<TimelineEntity> es1 = hbr.getEntities(user, cluster, flow, runid,
+ id, entity.getType(), null, null, null, null, null, null, null,
+ null, null, null, null, EnumSet.of(TimelineReader.Field.ALL));
+ assertNotNull(e1);
+ assertEquals(1, es1.size());
+ } finally {
+ if (hbi != null) {
+ hbi.stop();
+ hbi.close();
+ }
+ if (hbr != null) {
+ hbr.stop();
+ hbr.close();
+ }
+ }
}
@Test
@@ -154,7 +360,7 @@ public class TestHBaseTimelineWriterImpl {
hbr = new HBaseTimelineReaderImpl();
hbr.init(c1);
hbr.start();
- String cluster = "cluster1";
+ String cluster = "cluster_test_write_entity";
String user = "user1";
String flow = "some_flow_name";
String flowVersion = "AB7822C10F1111";
@@ -268,7 +474,8 @@ public class TestHBaseTimelineWriterImpl {
assertEquals(17, colCount);
TimelineEntity e1 = hbr.getEntity(user, cluster, flow, runid, appName,
- entity.getType(), entity.getId(),
EnumSet.of(TimelineReader.Field.ALL));
+ entity.getType(), entity.getId(),
+ EnumSet.of(TimelineReader.Field.ALL));
Set<TimelineEntity> es1 = hbr.getEntities(user, cluster, flow, runid,
appName, entity.getType(), null, null, null, null, null, null, null,
null, null, null, null, EnumSet.of(TimelineReader.Field.ALL));
@@ -284,10 +491,6 @@ public class TestHBaseTimelineWriterImpl {
hbr.close();
}
}
-
- // Somewhat of a hack, not a separate test in order not to have to deal
with
- // test case order exectution.
- testAdditionalEntity();
}
private boolean isRowKeyCorrect(byte[] rowKey, String cluster, String user,
@@ -299,14 +502,31 @@ public class TestHBaseTimelineWriterImpl {
assertEquals(user, Bytes.toString(rowKeyComponents[0]));
assertEquals(cluster, Bytes.toString(rowKeyComponents[1]));
assertEquals(flow, Bytes.toString(rowKeyComponents[2]));
- assertEquals(TimelineWriterUtils.invert(runid),
Bytes.toLong(rowKeyComponents[3]));
+ assertEquals(TimelineWriterUtils.invert(runid),
+ Bytes.toLong(rowKeyComponents[3]));
assertEquals(appName, Bytes.toString(rowKeyComponents[4]));
assertEquals(te.getType(), Bytes.toString(rowKeyComponents[5]));
assertEquals(te.getId(), Bytes.toString(rowKeyComponents[6]));
return true;
}
- private void testAdditionalEntity() throws IOException {
+ private boolean isApplicationRowKeyCorrect(byte[] rowKey, String cluster,
+ String user, String flow, Long runid, String appName) {
+
+ byte[][] rowKeyComponents = Separator.QUALIFIERS.split(rowKey, -1);
+
+ assertTrue(rowKeyComponents.length == 5);
+ assertEquals(cluster, Bytes.toString(rowKeyComponents[0]));
+ assertEquals(user, Bytes.toString(rowKeyComponents[1]));
+ assertEquals(flow, Bytes.toString(rowKeyComponents[2]));
+ assertEquals(TimelineWriterUtils.invert(runid),
+ Bytes.toLong(rowKeyComponents[3]));
+ assertEquals(appName, Bytes.toString(rowKeyComponents[4]));
+ return true;
+ }
+
+ @Test
+ public void testEvents() throws IOException {
TimelineEvent event = new TimelineEvent();
String eventId = ApplicationMetricsConstants.CREATED_EVENT_TYPE;
event.setId(eventId);
@@ -333,7 +553,7 @@ public class TestHBaseTimelineWriterImpl {
hbr = new HBaseTimelineReaderImpl();
hbr.init(c1);
hbr.start();
- String cluster = "cluster2";
+ String cluster = "cluster_test_events";
String user = "user2";
String flow = "other_flow_name";
String flowVersion = "1111F01C2287BA";
@@ -341,50 +561,46 @@ public class TestHBaseTimelineWriterImpl {
String appName = "some app name";
hbi.write(cluster, user, flow, flowVersion, runid, appName, entities);
hbi.stop();
- // scan the table and see that entity exists
- Scan s = new Scan();
- byte[] startRow =
- EntityRowKey.getRowKeyPrefix(cluster, user, flow, runid, appName);
- s.setStartRow(startRow);
- Connection conn = ConnectionFactory.createConnection(c1);
- ResultScanner scanner = new EntityTable().getResultScanner(c1, conn, s);
-
- int rowCount = 0;
- for (Result result : scanner) {
- if (result != null && !result.isEmpty()) {
- rowCount++;
-
- // check the row key
- byte[] row1 = result.getRow();
- assertTrue(isRowKeyCorrect(row1, cluster, user, flow, runid, appName,
- entity));
- Map<String, Object> eventsResult =
- EntityColumnPrefix.EVENT.readResults(result);
- // there should be only one event
- assertEquals(1, eventsResult.size());
- // key name for the event
- byte[] compoundColumnQualifierBytes =
- Separator.VALUES.join(Bytes.toBytes(eventId),
- Bytes.toBytes(TimelineWriterUtils.invert(expTs)),
- Bytes.toBytes(expKey));
- String valueKey = Bytes.toString(compoundColumnQualifierBytes);
- for (Map.Entry<String, Object> e :
- eventsResult.entrySet()) {
- // the value key must match
- assertEquals(valueKey, e.getKey());
- Object value = e.getValue();
- // there should be only one timestamp and value
- assertEquals(expVal, value.toString());
- }
- }
+ // retrieve the row
+ byte[] rowKey =
+ ApplicationRowKey.getRowKey(cluster, user, flow, runid, appName);
+ Get get = new Get(rowKey);
+ get.setMaxVersions(Integer.MAX_VALUE);
+ Connection conn = ConnectionFactory.createConnection(c1);
+ Result result = new ApplicationTable().getResult(c1, conn, get);
+
+ assertTrue(result != null);
+
+ // check the row key
+ byte[] row1 = result.getRow();
+ assertTrue(isApplicationRowKeyCorrect(row1, cluster, user, flow, runid,
+ appName));
+
+ Map<String, Object> eventsResult =
+ ApplicationColumnPrefix.EVENT.readResults(result);
+ // there should be only one event
+ assertEquals(1, eventsResult.size());
+ // key name for the event
+ byte[] compoundColumnQualifierBytes =
+ Separator.VALUES.join(Bytes.toBytes(eventId),
+ Bytes.toBytes(TimelineWriterUtils.invert(expTs)),
+ Bytes.toBytes(expKey));
+ String valueKey = Bytes.toString(compoundColumnQualifierBytes);
+ for (Map.Entry<String, Object> e : eventsResult.entrySet()) {
+ // the value key must match
+ assertEquals(valueKey, e.getKey());
+ Object value = e.getValue();
+ // there should be only one timestamp and value
+ assertEquals(expVal, value.toString());
}
- assertEquals(1, rowCount);
TimelineEntity e1 = hbr.getEntity(user, cluster, flow, runid, appName,
- entity.getType(), entity.getId(),
EnumSet.of(TimelineReader.Field.ALL));
+ entity.getType(), entity.getId(),
+ EnumSet.of(TimelineReader.Field.ALL));
TimelineEntity e2 = hbr.getEntity(user, cluster, null, null, appName,
- entity.getType(), entity.getId(),
EnumSet.of(TimelineReader.Field.ALL));
+ entity.getType(), entity.getId(),
+ EnumSet.of(TimelineReader.Field.ALL));
Set<TimelineEntity> es1 = hbr.getEntities(user, cluster, flow, runid,
appName, entity.getType(), null, null, null, null, null, null, null,
null, null, null, null, EnumSet.of(TimelineReader.Field.ALL));
@@ -410,7 +626,7 @@ public class TestHBaseTimelineWriterImpl {
}
@Test
- public void testAdditionalEntityEmptyEventInfo() throws IOException {
+ public void testEventsWithEmptyInfo() throws IOException {
TimelineEvent event = new TimelineEvent();
String eventId = "foo_event_id";
event.setId(eventId);
@@ -435,7 +651,7 @@ public class TestHBaseTimelineWriterImpl {
hbr = new HBaseTimelineReaderImpl();
hbr.init(c1);
hbr.start();
- String cluster = "cluster_emptyeventkey";
+ String cluster = "cluster_test_empty_eventkey";
String user = "user_emptyeventkey";
String flow = "other_flow_name";
String flowVersion = "1111F01C2287BA";
@@ -487,7 +703,8 @@ public class TestHBaseTimelineWriterImpl {
assertEquals(1, rowCount);
TimelineEntity e1 = hbr.getEntity(user, cluster, flow, runid, appName,
- entity.getType(), entity.getId(),
EnumSet.of(TimelineReader.Field.ALL));
+ entity.getType(), entity.getId(),
+ EnumSet.of(TimelineReader.Field.ALL));
Set<TimelineEntity> es1 = hbr.getEntities(user, cluster, flow, runid,
appName, entity.getType(), null, null, null, null, null, null, null,
null, null, null, null, EnumSet.of(TimelineReader.Field.ALL));