YARN-3984. Adjusted the event column key schema and avoided missing empty event. Contributed by Vrushali C.
(cherry picked from commit 895ccfa1ab9e701f2908586e323249f670fe5544) Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/dc7df298 Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/dc7df298 Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/dc7df298 Branch: refs/heads/YARN-2928-rebase Commit: dc7df298ab723f10d915064f0ae89397beb4bd79 Parents: 32fc2d3 Author: Zhijie Shen <zjs...@apache.org> Authored: Wed Aug 5 16:28:57 2015 -0700 Committer: Sangjin Lee <sj...@apache.org> Committed: Mon Nov 9 16:13:10 2015 -0800 ---------------------------------------------------------------------- hadoop-yarn-project/CHANGES.txt | 3 + .../storage/HBaseTimelineWriterImpl.java | 23 +++- .../storage/common/TimelineWriterUtils.java | 13 +++ .../storage/entity/EntityRowKey.java | 18 +--- .../storage/TestHBaseTimelineWriterImpl.java | 105 ++++++++++++++++--- 5 files changed, 128 insertions(+), 34 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hadoop/blob/dc7df298/hadoop-yarn-project/CHANGES.txt ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/CHANGES.txt b/hadoop-yarn-project/CHANGES.txt index fc940fe..075d733 100644 --- a/hadoop-yarn-project/CHANGES.txt +++ b/hadoop-yarn-project/CHANGES.txt @@ -121,6 +121,9 @@ Branch YARN-2928: Timeline Server Next Generation: Phase 1 YARN-3993. Changed to use the AM flag in ContainerContext determine AM container in TestPerNodeTimelineCollectorsAuxService. (Sunil G via zjshen) + YARN-3984. Adjusted the event column key schema and avoided missing empty + event. (Vrushali C via zjshen) + Trunk - Unreleased INCOMPATIBLE CHANGES http://git-wip-us.apache.org/repos/asf/hadoop/blob/dc7df298/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/HBaseTimelineWriterImpl.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/HBaseTimelineWriterImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/HBaseTimelineWriterImpl.java index cd2e76e..3173e87 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/HBaseTimelineWriterImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/HBaseTimelineWriterImpl.java @@ -37,6 +37,7 @@ import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEvent; import org.apache.hadoop.yarn.api.records.timelineservice.TimelineMetric; import org.apache.hadoop.yarn.api.records.timelineservice.TimelineWriteResponse; import org.apache.hadoop.yarn.server.timelineservice.storage.common.Separator; +import org.apache.hadoop.yarn.server.timelineservice.storage.common.TimelineWriterUtils; import org.apache.hadoop.yarn.server.timelineservice.storage.common.TypedBufferedMutator; import org.apache.hadoop.yarn.server.timelineservice.storage.entity.EntityColumn; import org.apache.hadoop.yarn.server.timelineservice.storage.entity.EntityColumnPrefix; @@ -200,20 +201,32 @@ public class HBaseTimelineWriterImpl extends AbstractService implements "! Using the current timestamp"); eventTimestamp = System.currentTimeMillis(); } + byte[] columnQualifierFirst = + Bytes.toBytes(Separator.VALUES.encode(eventId)); + byte[] columnQualifierWithTsBytes = + Separator.VALUES.join(columnQualifierFirst, + Bytes.toBytes(TimelineWriterUtils.invert(eventTimestamp))); Map<String, Object> eventInfo = event.getInfo(); - if (eventInfo != null) { + if ((eventInfo == null) || (eventInfo.size() == 0)) { + // add separator since event key is empty + byte[] compoundColumnQualifierBytes = + Separator.VALUES.join(columnQualifierWithTsBytes, + null); + String compoundColumnQualifier = + Bytes.toString(compoundColumnQualifierBytes); + EntityColumnPrefix.EVENT.store(rowKey, entityTable, + compoundColumnQualifier, null, TimelineWriterUtils.EMPTY_BYTES); + } else { for (Map.Entry<String, Object> info : eventInfo.entrySet()) { // eventId?infoKey - byte[] columnQualifierFirst = - Bytes.toBytes(Separator.VALUES.encode(eventId)); byte[] compoundColumnQualifierBytes = - Separator.VALUES.join(columnQualifierFirst, + Separator.VALUES.join(columnQualifierWithTsBytes, Bytes.toBytes(info.getKey())); // convert back to string to avoid additional API on store. String compoundColumnQualifier = Bytes.toString(compoundColumnQualifierBytes); EntityColumnPrefix.EVENT.store(rowKey, entityTable, - compoundColumnQualifier, eventTimestamp, info.getValue()); + compoundColumnQualifier, null, info.getValue()); } // for info: eventInfo } } http://git-wip-us.apache.org/repos/asf/hadoop/blob/dc7df298/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/TimelineWriterUtils.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/TimelineWriterUtils.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/TimelineWriterUtils.java index 28a0b6a..c957bf5 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/TimelineWriterUtils.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/TimelineWriterUtils.java @@ -124,4 +124,17 @@ public class TimelineWriterUtils { return segments; } + /** + * Converts a timestamp into it's inverse timestamp to be used in (row) keys + * where we want to have the most recent timestamp in the top of the table + * (scans start at the most recent timestamp first). + * + * @param key value to be inverted so that the latest version will be first in + * a scan. + * @return inverted long + */ + public static long invert(Long key) { + return Long.MAX_VALUE - key; + } + } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/hadoop/blob/dc7df298/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/entity/EntityRowKey.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/entity/EntityRowKey.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/entity/EntityRowKey.java index 61958c2..3e17ad0 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/entity/EntityRowKey.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/entity/EntityRowKey.java @@ -20,6 +20,7 @@ package org.apache.hadoop.yarn.server.timelineservice.storage.entity; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity; import org.apache.hadoop.yarn.server.timelineservice.storage.common.Separator; +import org.apache.hadoop.yarn.server.timelineservice.storage.common.TimelineWriterUtils; /** * Represents a rowkey for the entity table. @@ -47,7 +48,7 @@ public class EntityRowKey { flowId)); // Note that flowRunId is a long, so we can't encode them all at the same // time. - byte[] second = Bytes.toBytes(EntityRowKey.invert(flowRunId)); + byte[] second = Bytes.toBytes(TimelineWriterUtils.invert(flowRunId)); byte[] third = Bytes.toBytes(Separator.QUALIFIERS.joinEncoded(appId)); return Separator.QUALIFIERS.join(first, second, third); } @@ -70,24 +71,11 @@ public class EntityRowKey { flowId)); // Note that flowRunId is a long, so we can't encode them all at the same // time. - byte[] second = Bytes.toBytes(EntityRowKey.invert(flowRunId)); + byte[] second = Bytes.toBytes(TimelineWriterUtils.invert(flowRunId)); byte[] third = Bytes.toBytes(Separator.QUALIFIERS.joinEncoded(appId, te.getType(), te.getId())); return Separator.QUALIFIERS.join(first, second, third); } - /** - * Converts a timestamp into it's inverse timestamp to be used in (row) keys - * where we want to have the most recent timestamp in the top of the table - * (scans start at the most recent timestamp first). - * - * @param key value to be inverted so that the latest version will be first in - * a scan. - * @return inverted long - */ - public static long invert(Long key) { - return Long.MAX_VALUE - key; - } - } http://git-wip-us.apache.org/repos/asf/hadoop/blob/dc7df298/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/TestHBaseTimelineWriterImpl.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/TestHBaseTimelineWriterImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/TestHBaseTimelineWriterImpl.java index 31cb5d2..fd5643d 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/TestHBaseTimelineWriterImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/TestHBaseTimelineWriterImpl.java @@ -43,8 +43,8 @@ import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity; import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEvent; import org.apache.hadoop.yarn.api.records.timelineservice.TimelineMetric; import org.apache.hadoop.yarn.api.records.timelineservice.TimelineMetric.Type; -import org.apache.hadoop.yarn.server.timeline.GenericObjectMapper; import org.apache.hadoop.yarn.server.timelineservice.storage.common.Separator; +import org.apache.hadoop.yarn.server.timelineservice.storage.common.TimelineWriterUtils; import org.apache.hadoop.yarn.server.timelineservice.storage.entity.EntityColumn; import org.apache.hadoop.yarn.server.timelineservice.storage.entity.EntityColumnFamily; import org.apache.hadoop.yarn.server.timelineservice.storage.entity.EntityColumnPrefix; @@ -274,7 +274,7 @@ public class TestHBaseTimelineWriterImpl { assertEquals(user, Bytes.toString(rowKeyComponents[0])); assertEquals(cluster, Bytes.toString(rowKeyComponents[1])); assertEquals(flow, Bytes.toString(rowKeyComponents[2])); - assertEquals(EntityRowKey.invert(runid), Bytes.toLong(rowKeyComponents[3])); + assertEquals(TimelineWriterUtils.invert(runid), Bytes.toLong(rowKeyComponents[3])); assertEquals(appName, Bytes.toString(rowKeyComponents[4])); assertEquals(te.getType(), Bytes.toString(rowKeyComponents[5])); assertEquals(te.getId(), Bytes.toString(rowKeyComponents[6])); @@ -317,7 +317,6 @@ public class TestHBaseTimelineWriterImpl { byte[] startRow = EntityRowKey.getRowKeyPrefix(cluster, user, flow, runid, appName); s.setStartRow(startRow); - s.setMaxVersions(Integer.MAX_VALUE); Connection conn = ConnectionFactory.createConnection(c1); ResultScanner scanner = new EntityTable().getResultScanner(c1, conn, s); @@ -331,24 +330,23 @@ public class TestHBaseTimelineWriterImpl { assertTrue(isRowKeyCorrect(row1, cluster, user, flow, runid, appName, entity)); - // check the events - NavigableMap<String, NavigableMap<Long, Object>> eventsResult = - EntityColumnPrefix.EVENT.readResultsWithTimestamps(result); + Map<String, Object> eventsResult = + EntityColumnPrefix.EVENT.readResults(result); // there should be only one event assertEquals(1, eventsResult.size()); // key name for the event - String valueKey = eventId + Separator.VALUES.getValue() + expKey; - for (Map.Entry<String, NavigableMap<Long, Object>> e : + byte[] compoundColumnQualifierBytes = + Separator.VALUES.join(Bytes.toBytes(eventId), + Bytes.toBytes(TimelineWriterUtils.invert(expTs)), + Bytes.toBytes(expKey)); + String valueKey = Bytes.toString(compoundColumnQualifierBytes); + for (Map.Entry<String, Object> e : eventsResult.entrySet()) { // the value key must match assertEquals(valueKey, e.getKey()); - NavigableMap<Long, Object> value = e.getValue(); + Object value = e.getValue(); // there should be only one timestamp and value - assertEquals(1, value.size()); - for (Map.Entry<Long, Object> e2: value.entrySet()) { - assertEquals(expTs, e2.getKey()); - assertEquals(expVal, e2.getValue()); - } + assertEquals(expVal, value.toString()); } } } @@ -360,6 +358,85 @@ public class TestHBaseTimelineWriterImpl { } } + @Test + public void testAdditionalEntityEmptyEventInfo() throws IOException { + TimelineEvent event = new TimelineEvent(); + String eventId = "foo_event_id"; + event.setId(eventId); + Long expTs = 1436512802000L; + event.setTimestamp(expTs); + + final TimelineEntity entity = new TimelineEntity(); + entity.setId("attempt_1329348432655_0001_m_000008_18"); + entity.setType("FOO_ATTEMPT"); + entity.addEvent(event); + + TimelineEntities entities = new TimelineEntities(); + entities.addEntity(entity); + + HBaseTimelineWriterImpl hbi = null; + try { + Configuration c1 = util.getConfiguration(); + hbi = new HBaseTimelineWriterImpl(c1); + hbi.init(c1); + String cluster = "cluster_emptyeventkey"; + String user = "user_emptyeventkey"; + String flow = "other_flow_name"; + String flowVersion = "1111F01C2287BA"; + long runid = 1009876543218L; + String appName = "some app name"; + byte[] startRow = + EntityRowKey.getRowKeyPrefix(cluster, user, flow, runid, appName); + hbi.write(cluster, user, flow, flowVersion, runid, appName, entities); + hbi.stop(); + // scan the table and see that entity exists + Scan s = new Scan(); + s.setStartRow(startRow); + s.addFamily(EntityColumnFamily.INFO.getBytes()); + Connection conn = ConnectionFactory.createConnection(c1); + ResultScanner scanner = new EntityTable().getResultScanner(c1, conn, s); + + int rowCount = 0; + for (Result result : scanner) { + if (result != null && !result.isEmpty()) { + rowCount++; + + // check the row key + byte[] row1 = result.getRow(); + assertTrue(isRowKeyCorrect(row1, cluster, user, flow, runid, appName, + entity)); + + Map<String, Object> eventsResult = + EntityColumnPrefix.EVENT.readResults(result); + // there should be only one event + assertEquals(1, eventsResult.size()); + // key name for the event + byte[] compoundColumnQualifierWithTsBytes = + Separator.VALUES.join(Bytes.toBytes(eventId), + Bytes.toBytes(TimelineWriterUtils.invert(expTs))); + byte[] compoundColumnQualifierBytes = + Separator.VALUES.join(compoundColumnQualifierWithTsBytes, + null); + String valueKey = Bytes.toString(compoundColumnQualifierBytes); + for (Map.Entry<String, Object> e : + eventsResult.entrySet()) { + // the column qualifier key must match + assertEquals(valueKey, e.getKey()); + Object value = e.getValue(); + // value should be empty + assertEquals("", value.toString()); + } + } + } + assertEquals(1, rowCount); + + } finally { + hbi.stop(); + hbi.close(); + } + } + + @AfterClass public static void tearDownAfterClass() throws Exception { util.shutdownMiniCluster();