YARN-3908. Fixed bugs in HBaseTimelineWriterImpl. Contributed by Vrushali C and Sangjin Lee.
(cherry picked from commit df0ec473a84871b0effd7ca6faac776210d7df09) Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/f488b613 Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/f488b613 Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/f488b613 Branch: refs/heads/YARN-2928 Commit: f488b6136cd61b5dacb4ef31ccaf902cc8736be6 Parents: a0c1e50 Author: Zhijie Shen <zjs...@apache.org> Authored: Mon Jul 27 15:50:28 2015 -0700 Committer: Vinod Kumar Vavilapalli <vino...@apache.org> Committed: Fri Aug 14 11:23:26 2015 -0700 ---------------------------------------------------------------------- hadoop-yarn-project/CHANGES.txt | 3 ++ .../records/timelineservice/TimelineEvent.java | 4 +- .../storage/HBaseTimelineWriterImpl.java | 18 ++++++- .../storage/common/ColumnHelper.java | 21 ++++---- .../storage/common/ColumnPrefix.java | 7 +-- .../storage/common/Separator.java | 7 +++ .../storage/entity/EntityColumnPrefix.java | 15 ++++-- .../storage/entity/EntityTable.java | 6 ++- .../storage/TestHBaseTimelineWriterImpl.java | 56 ++++++++++++++++++-- 9 files changed, 111 insertions(+), 26 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hadoop/blob/f488b613/hadoop-yarn-project/CHANGES.txt ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/CHANGES.txt b/hadoop-yarn-project/CHANGES.txt index cd05140..60bd2fd 100644 --- a/hadoop-yarn-project/CHANGES.txt +++ b/hadoop-yarn-project/CHANGES.txt @@ -115,6 +115,9 @@ Branch YARN-2928: Timeline Server Next Generation: Phase 1 YARN-3792. Test case failures in TestDistributedShell and some issue fixes related to ATSV2 (Naganarasimha G R via sjlee) + YARN-3908. Fixed bugs in HBaseTimelineWriterImpl. (Vrushali C and Sangjin + Lee via zjshen) + Trunk - Unreleased INCOMPATIBLE CHANGES http://git-wip-us.apache.org/repos/asf/hadoop/blob/f488b613/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/timelineservice/TimelineEvent.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/timelineservice/TimelineEvent.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/timelineservice/TimelineEvent.java index 1dbf7e5..a563658 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/timelineservice/TimelineEvent.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/timelineservice/TimelineEvent.java @@ -33,6 +33,8 @@ import java.util.Map; @InterfaceAudience.Public @InterfaceStability.Unstable public class TimelineEvent implements Comparable<TimelineEvent> { + public static final long INVALID_TIMESTAMP = 0L; + private String id; private HashMap<String, Object> info = new HashMap<>(); private long timestamp; @@ -83,7 +85,7 @@ public class TimelineEvent implements Comparable<TimelineEvent> { } public boolean isValid() { - return (id != null && timestamp != 0L); + return (id != null && timestamp != INVALID_TIMESTAMP); } @Override http://git-wip-us.apache.org/repos/asf/hadoop/blob/f488b613/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/HBaseTimelineWriterImpl.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/HBaseTimelineWriterImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/HBaseTimelineWriterImpl.java index 876ad6a..cd2e76e 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/HBaseTimelineWriterImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/HBaseTimelineWriterImpl.java @@ -141,6 +141,13 @@ public class HBaseTimelineWriterImpl extends AbstractService implements EntityColumn.MODIFIED_TIME.store(rowKey, entityTable, null, te.getModifiedTime()); EntityColumn.FLOW_VERSION.store(rowKey, entityTable, null, flowVersion); + Map<String, Object> info = te.getInfo(); + if (info != null) { + for (Map.Entry<String, Object> entry : info.entrySet()) { + EntityColumnPrefix.INFO.store(rowKey, entityTable, entry.getKey(), + null, entry.getValue()); + } + } } /** @@ -186,6 +193,13 @@ public class HBaseTimelineWriterImpl extends AbstractService implements if (event != null) { String eventId = event.getId(); if (eventId != null) { + long eventTimestamp = event.getTimestamp(); + // if the timestamp is not set, use the current timestamp + if (eventTimestamp == TimelineEvent.INVALID_TIMESTAMP) { + LOG.warn("timestamp is not set for event " + eventId + + "! Using the current timestamp"); + eventTimestamp = System.currentTimeMillis(); + } Map<String, Object> eventInfo = event.getInfo(); if (eventInfo != null) { for (Map.Entry<String, Object> info : eventInfo.entrySet()) { @@ -198,8 +212,8 @@ public class HBaseTimelineWriterImpl extends AbstractService implements // convert back to string to avoid additional API on store. String compoundColumnQualifier = Bytes.toString(compoundColumnQualifierBytes); - EntityColumnPrefix.METRIC.store(rowKey, entityTable, - compoundColumnQualifier, null, info.getValue()); + EntityColumnPrefix.EVENT.store(rowKey, entityTable, + compoundColumnQualifier, eventTimestamp, info.getValue()); } // for info: eventInfo } } http://git-wip-us.apache.org/repos/asf/hadoop/blob/f488b613/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/ColumnHelper.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/ColumnHelper.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/ColumnHelper.java index 6a204dc..a902924 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/ColumnHelper.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/ColumnHelper.java @@ -113,19 +113,22 @@ public class ColumnHelper<T> { } /** - * @param result from which to reads timeseries data + * @param result from which to reads data with timestamps * @param columnPrefixBytes optional prefix to limit columns. If null all * columns are returned. + * @param <V> the type of the values. The values will be cast into that type. * @return the cell values at each respective time in for form * {idA={timestamp1->value1}, idA={timestamp2->value2}, * idB={timestamp3->value3}, idC={timestamp1->value4}} * @throws IOException */ - public NavigableMap<String, NavigableMap<Long, Number>> readTimeseriesResults( - Result result, byte[] columnPrefixBytes) throws IOException { + @SuppressWarnings("unchecked") + public <V> NavigableMap<String, NavigableMap<Long, V>> + readResultsWithTimestamps(Result result, byte[] columnPrefixBytes) + throws IOException { - NavigableMap<String, NavigableMap<Long, Number>> results = - new TreeMap<String, NavigableMap<Long, Number>>(); + NavigableMap<String, NavigableMap<Long, V>> results = + new TreeMap<String, NavigableMap<Long, V>>(); if (result != null) { NavigableMap<byte[], NavigableMap<byte[], NavigableMap<Long, byte[]>>> resultMap = @@ -157,13 +160,13 @@ public class ColumnHelper<T> { // If this column has the prefix we want if (columnName != null) { - NavigableMap<Long, Number> cellResults = - new TreeMap<Long, Number>(); + NavigableMap<Long, V> cellResults = + new TreeMap<Long, V>(); NavigableMap<Long, byte[]> cells = entry.getValue(); if (cells != null) { for (Entry<Long, byte[]> cell : cells.entrySet()) { - Number value = - (Number) GenericObjectMapper.read(cell.getValue()); + V value = + (V) GenericObjectMapper.read(cell.getValue()); cellResults.put(cell.getKey(), value); } } http://git-wip-us.apache.org/repos/asf/hadoop/blob/f488b613/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/ColumnPrefix.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/ColumnPrefix.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/ColumnPrefix.java index 2eedea0..671c824 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/ColumnPrefix.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/ColumnPrefix.java @@ -72,12 +72,13 @@ public interface ColumnPrefix<T> { public Map<String, Object> readResults(Result result) throws IOException; /** - * @param result from which to reads timeseries data + * @param result from which to reads data with timestamps + * @param <V> the type of the values. The values will be cast into that type. * @return the cell values at each respective time in for form * {idA={timestamp1->value1}, idA={timestamp2->value2}, * idB={timestamp3->value3}, idC={timestamp1->value4}} * @throws IOException */ - public NavigableMap<String, NavigableMap<Long, Number>> readTimeseriesResults( - Result result) throws IOException; + public <V> NavigableMap<String, NavigableMap<Long, V>> + readResultsWithTimestamps(Result result) throws IOException; } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/hadoop/blob/f488b613/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/Separator.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/Separator.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/Separator.java index ee57890..3319419 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/Separator.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/Separator.java @@ -90,6 +90,13 @@ public enum Separator { } /** + * @return the original value of the separator + */ + public String getValue() { + return value; + } + + /** * Used to make token safe to be used with this separator without collisions. * * @param token http://git-wip-us.apache.org/repos/asf/hadoop/blob/f488b613/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/entity/EntityColumnPrefix.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/entity/EntityColumnPrefix.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/entity/EntityColumnPrefix.java index 4459868..8b7bc3e 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/entity/EntityColumnPrefix.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/entity/EntityColumnPrefix.java @@ -45,6 +45,11 @@ public enum EntityColumnPrefix implements ColumnPrefix<EntityTable> { RELATES_TO(EntityColumnFamily.INFO, "r"), /** + * To store TimelineEntity info values. + */ + INFO(EntityColumnFamily.INFO, "i"), + + /** * Lifecycle events for an entity */ EVENT(EntityColumnFamily.INFO, "e"), @@ -92,7 +97,7 @@ public enum EntityColumnPrefix implements ColumnPrefix<EntityTable> { /** * @return the column name value */ - private String getColumnPrefix() { + public String getColumnPrefix() { return columnPrefix; } @@ -150,11 +155,11 @@ public enum EntityColumnPrefix implements ColumnPrefix<EntityTable> { * * @see * org.apache.hadoop.yarn.server.timelineservice.storage.common.ColumnPrefix - * #readTimeseriesResults(org.apache.hadoop.hbase.client.Result) + * #readResultsWithTimestamps(org.apache.hadoop.hbase.client.Result) */ - public NavigableMap<String, NavigableMap<Long, Number>> readTimeseriesResults( - Result result) throws IOException { - return column.readTimeseriesResults(result, columnPrefixBytes); + public <T> NavigableMap<String, NavigableMap<Long, T>> + readResultsWithTimestamps(Result result) throws IOException { + return column.readResultsWithTimestamps(result, columnPrefixBytes); } /** http://git-wip-us.apache.org/repos/asf/hadoop/blob/f488b613/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/entity/EntityTable.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/entity/EntityTable.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/entity/EntityTable.java index 61f7c4c..2ae7d39 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/entity/EntityTable.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/entity/EntityTable.java @@ -54,7 +54,10 @@ import org.apache.hadoop.yarn.server.timelineservice.storage.common.TimelineEnti * | | modified_time: | | | * | | 1392995081012 | metricId2: | | * | | | metricValue1 | | - * | | r!relatesToKey: | @timestamp2 | | + * | | i!infoKey: | @timestamp2 | | + * | | infoValue | | | + * | | | | | + * | | r!relatesToKey: | | | * | | id3?id4?id5 | | | * | | | | | * | | s!isRelatedToKey | | | @@ -62,6 +65,7 @@ import org.apache.hadoop.yarn.server.timelineservice.storage.common.TimelineEnti * | | | | | * | | e!eventId?eventInfoKey: | | | * | | eventInfoValue | | | + * | | @timestamp | | | * | | | | | * | | flowVersion: | | | * | | versionValue | | | http://git-wip-us.apache.org/repos/asf/hadoop/blob/f488b613/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/TestHBaseTimelineWriterImpl.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/TestHBaseTimelineWriterImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/TestHBaseTimelineWriterImpl.java index 6abf240..31cb5d2 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/TestHBaseTimelineWriterImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/TestHBaseTimelineWriterImpl.java @@ -43,8 +43,10 @@ import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity; import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEvent; import org.apache.hadoop.yarn.api.records.timelineservice.TimelineMetric; import org.apache.hadoop.yarn.api.records.timelineservice.TimelineMetric.Type; +import org.apache.hadoop.yarn.server.timeline.GenericObjectMapper; import org.apache.hadoop.yarn.server.timelineservice.storage.common.Separator; import org.apache.hadoop.yarn.server.timelineservice.storage.entity.EntityColumn; +import org.apache.hadoop.yarn.server.timelineservice.storage.entity.EntityColumnFamily; import org.apache.hadoop.yarn.server.timelineservice.storage.entity.EntityColumnPrefix; import org.apache.hadoop.yarn.server.timelineservice.storage.entity.EntityRowKey; import org.apache.hadoop.yarn.server.timelineservice.storage.entity.EntityTable; @@ -84,6 +86,12 @@ public class TestHBaseTimelineWriterImpl { entity.setCreatedTime(cTime); entity.setModifiedTime(mTime); + // add the info map in Timeline Entity + Map<String, Object> infoMap = new HashMap<String, Object>(); + infoMap.put("infoMapKey1", "infoMapValue1"); + infoMap.put("infoMapKey2", 10); + entity.addInfo(infoMap); + // add the isRelatedToEntity info String key = "task"; String value = "is_related_to_entity_id_here"; @@ -177,6 +185,14 @@ public class TestHBaseTimelineWriterImpl { Long mTime1 = val.longValue(); assertEquals(mTime1, mTime); + Map<String, Object> infoColumns = + EntityColumnPrefix.INFO.readResults(result); + assertEquals(infoMap.size(), infoColumns.size()); + for (String infoItem : infoMap.keySet()) { + assertEquals(infoMap.get(infoItem), + infoColumns.get(infoItem)); + } + // Remember isRelatedTo is of type Map<String, Set<String>> for (String isRelatedToKey : isRelatedTo.keySet()) { Object isRelatedToValue = @@ -219,7 +235,7 @@ public class TestHBaseTimelineWriterImpl { } NavigableMap<String, NavigableMap<Long, Number>> metricsResult = - EntityColumnPrefix.METRIC.readTimeseriesResults(result); + EntityColumnPrefix.METRIC.readResultsWithTimestamps(result); NavigableMap<Long, Number> metricMap = metricsResult.get(m1.getId()); // We got metrics back @@ -237,7 +253,7 @@ public class TestHBaseTimelineWriterImpl { } } assertEquals(1, rowCount); - assertEquals(15, colCount); + assertEquals(17, colCount); } finally { hbi.stop(); @@ -267,13 +283,18 @@ public class TestHBaseTimelineWriterImpl { private void testAdditionalEntity() throws IOException { TimelineEvent event = new TimelineEvent(); - event.setId("foo_event_id"); - event.setTimestamp(System.currentTimeMillis()); - event.addInfo("foo_event", "test"); + String eventId = "foo_event_id"; + event.setId(eventId); + Long expTs = 1436512802000L; + event.setTimestamp(expTs); + String expKey = "foo_event"; + Object expVal = "test"; + event.addInfo(expKey, expVal); final TimelineEntity entity = new TimelineEntity(); entity.setId("attempt_1329348432655_0001_m_000008_18"); entity.setType("FOO_ATTEMPT"); + entity.addEvent(event); TimelineEntities entities = new TimelineEntities(); entities.addEntity(entity); @@ -304,6 +325,31 @@ public class TestHBaseTimelineWriterImpl { for (Result result : scanner) { if (result != null && !result.isEmpty()) { rowCount++; + + // check the row key + byte[] row1 = result.getRow(); + assertTrue(isRowKeyCorrect(row1, cluster, user, flow, runid, appName, + entity)); + + // check the events + NavigableMap<String, NavigableMap<Long, Object>> eventsResult = + EntityColumnPrefix.EVENT.readResultsWithTimestamps(result); + // there should be only one event + assertEquals(1, eventsResult.size()); + // key name for the event + String valueKey = eventId + Separator.VALUES.getValue() + expKey; + for (Map.Entry<String, NavigableMap<Long, Object>> e : + eventsResult.entrySet()) { + // the value key must match + assertEquals(valueKey, e.getKey()); + NavigableMap<Long, Object> value = e.getValue(); + // there should be only one timestamp and value + assertEquals(1, value.size()); + for (Map.Entry<Long, Object> e2: value.entrySet()) { + assertEquals(expTs, e2.getKey()); + assertEquals(expVal, e2.getValue()); + } + } } } assertEquals(1, rowCount);