YARN-3908. Fixed bugs in HBaseTimelineWriterImpl. Contributed by Vrushali C and 
Sangjin Lee.

(cherry picked from commit df0ec473a84871b0effd7ca6faac776210d7df09)


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/f488b613
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/f488b613
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/f488b613

Branch: refs/heads/YARN-2928
Commit: f488b6136cd61b5dacb4ef31ccaf902cc8736be6
Parents: a0c1e50
Author: Zhijie Shen <zjs...@apache.org>
Authored: Mon Jul 27 15:50:28 2015 -0700
Committer: Vinod Kumar Vavilapalli <vino...@apache.org>
Committed: Fri Aug 14 11:23:26 2015 -0700

----------------------------------------------------------------------
 hadoop-yarn-project/CHANGES.txt                 |  3 ++
 .../records/timelineservice/TimelineEvent.java  |  4 +-
 .../storage/HBaseTimelineWriterImpl.java        | 18 ++++++-
 .../storage/common/ColumnHelper.java            | 21 ++++----
 .../storage/common/ColumnPrefix.java            |  7 +--
 .../storage/common/Separator.java               |  7 +++
 .../storage/entity/EntityColumnPrefix.java      | 15 ++++--
 .../storage/entity/EntityTable.java             |  6 ++-
 .../storage/TestHBaseTimelineWriterImpl.java    | 56 ++++++++++++++++++--
 9 files changed, 111 insertions(+), 26 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/f488b613/hadoop-yarn-project/CHANGES.txt
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/CHANGES.txt b/hadoop-yarn-project/CHANGES.txt
index cd05140..60bd2fd 100644
--- a/hadoop-yarn-project/CHANGES.txt
+++ b/hadoop-yarn-project/CHANGES.txt
@@ -115,6 +115,9 @@ Branch YARN-2928: Timeline Server Next Generation: Phase 1
     YARN-3792. Test case failures in TestDistributedShell and some issue fixes
     related to ATSV2 (Naganarasimha G R via sjlee)
 
+    YARN-3908. Fixed bugs in HBaseTimelineWriterImpl. (Vrushali C and Sangjin
+    Lee via zjshen)
+
 Trunk - Unreleased
 
   INCOMPATIBLE CHANGES

http://git-wip-us.apache.org/repos/asf/hadoop/blob/f488b613/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/timelineservice/TimelineEvent.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/timelineservice/TimelineEvent.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/timelineservice/TimelineEvent.java
index 1dbf7e5..a563658 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/timelineservice/TimelineEvent.java
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/timelineservice/TimelineEvent.java
@@ -33,6 +33,8 @@ import java.util.Map;
 @InterfaceAudience.Public
 @InterfaceStability.Unstable
 public class TimelineEvent implements Comparable<TimelineEvent> {
+  public static final long INVALID_TIMESTAMP = 0L;
+
   private String id;
   private HashMap<String, Object> info = new HashMap<>();
   private long timestamp;
@@ -83,7 +85,7 @@ public class TimelineEvent implements 
Comparable<TimelineEvent> {
   }
 
   public boolean isValid() {
-    return (id != null && timestamp != 0L);
+    return (id != null && timestamp != INVALID_TIMESTAMP);
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/hadoop/blob/f488b613/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/HBaseTimelineWriterImpl.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/HBaseTimelineWriterImpl.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/HBaseTimelineWriterImpl.java
index 876ad6a..cd2e76e 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/HBaseTimelineWriterImpl.java
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/HBaseTimelineWriterImpl.java
@@ -141,6 +141,13 @@ public class HBaseTimelineWriterImpl extends 
AbstractService implements
     EntityColumn.MODIFIED_TIME.store(rowKey, entityTable, null,
         te.getModifiedTime());
     EntityColumn.FLOW_VERSION.store(rowKey, entityTable, null, flowVersion);
+    Map<String, Object> info = te.getInfo();
+    if (info != null) {
+      for (Map.Entry<String, Object> entry : info.entrySet()) {
+        EntityColumnPrefix.INFO.store(rowKey, entityTable, entry.getKey(),
+            null, entry.getValue());
+      }
+    }
   }
 
   /**
@@ -186,6 +193,13 @@ public class HBaseTimelineWriterImpl extends 
AbstractService implements
         if (event != null) {
           String eventId = event.getId();
           if (eventId != null) {
+            long eventTimestamp = event.getTimestamp();
+            // if the timestamp is not set, use the current timestamp
+            if (eventTimestamp == TimelineEvent.INVALID_TIMESTAMP) {
+              LOG.warn("timestamp is not set for event " + eventId +
+                  "! Using the current timestamp");
+              eventTimestamp = System.currentTimeMillis();
+            }
             Map<String, Object> eventInfo = event.getInfo();
             if (eventInfo != null) {
               for (Map.Entry<String, Object> info : eventInfo.entrySet()) {
@@ -198,8 +212,8 @@ public class HBaseTimelineWriterImpl extends 
AbstractService implements
                 // convert back to string to avoid additional API on store.
                 String compoundColumnQualifier =
                     Bytes.toString(compoundColumnQualifierBytes);
-                EntityColumnPrefix.METRIC.store(rowKey, entityTable,
-                    compoundColumnQualifier, null, info.getValue());
+                EntityColumnPrefix.EVENT.store(rowKey, entityTable,
+                    compoundColumnQualifier, eventTimestamp, info.getValue());
               } // for info: eventInfo
             }
           }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/f488b613/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/ColumnHelper.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/ColumnHelper.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/ColumnHelper.java
index 6a204dc..a902924 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/ColumnHelper.java
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/ColumnHelper.java
@@ -113,19 +113,22 @@ public class ColumnHelper<T> {
   }
 
   /**
-   * @param result from which to reads timeseries data
+   * @param result from which to reads data with timestamps
    * @param columnPrefixBytes optional prefix to limit columns. If null all
    *          columns are returned.
+   * @param <V> the type of the values. The values will be cast into that type.
    * @return the cell values at each respective time in for form
    *         {idA={timestamp1->value1}, idA={timestamp2->value2},
    *         idB={timestamp3->value3}, idC={timestamp1->value4}}
    * @throws IOException
    */
-  public NavigableMap<String, NavigableMap<Long, Number>> 
readTimeseriesResults(
-      Result result, byte[] columnPrefixBytes) throws IOException {
+  @SuppressWarnings("unchecked")
+  public <V> NavigableMap<String, NavigableMap<Long, V>>
+      readResultsWithTimestamps(Result result, byte[] columnPrefixBytes)
+          throws IOException {
 
-    NavigableMap<String, NavigableMap<Long, Number>> results =
-        new TreeMap<String, NavigableMap<Long, Number>>();
+    NavigableMap<String, NavigableMap<Long, V>> results =
+        new TreeMap<String, NavigableMap<Long, V>>();
 
     if (result != null) {
       NavigableMap<byte[], NavigableMap<byte[], NavigableMap<Long, byte[]>>> 
resultMap =
@@ -157,13 +160,13 @@ public class ColumnHelper<T> {
 
           // If this column has the prefix we want
           if (columnName != null) {
-            NavigableMap<Long, Number> cellResults =
-                new TreeMap<Long, Number>();
+            NavigableMap<Long, V> cellResults =
+                new TreeMap<Long, V>();
             NavigableMap<Long, byte[]> cells = entry.getValue();
             if (cells != null) {
               for (Entry<Long, byte[]> cell : cells.entrySet()) {
-                Number value =
-                    (Number) GenericObjectMapper.read(cell.getValue());
+                V value =
+                    (V) GenericObjectMapper.read(cell.getValue());
                 cellResults.put(cell.getKey(), value);
               }
             }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/f488b613/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/ColumnPrefix.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/ColumnPrefix.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/ColumnPrefix.java
index 2eedea0..671c824 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/ColumnPrefix.java
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/ColumnPrefix.java
@@ -72,12 +72,13 @@ public interface ColumnPrefix<T> {
   public Map<String, Object> readResults(Result result) throws IOException;
 
   /**
-   * @param result from which to reads timeseries data
+   * @param result from which to reads data with timestamps
+   * @param <V> the type of the values. The values will be cast into that type.
    * @return the cell values at each respective time in for form
    *         {idA={timestamp1->value1}, idA={timestamp2->value2},
    *         idB={timestamp3->value3}, idC={timestamp1->value4}}
    * @throws IOException
    */
-  public NavigableMap<String, NavigableMap<Long, Number>> 
readTimeseriesResults(
-      Result result) throws IOException;
+  public <V> NavigableMap<String, NavigableMap<Long, V>>
+      readResultsWithTimestamps(Result result) throws IOException;
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hadoop/blob/f488b613/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/Separator.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/Separator.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/Separator.java
index ee57890..3319419 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/Separator.java
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/Separator.java
@@ -90,6 +90,13 @@ public enum Separator {
   }
 
   /**
+   * @return the original value of the separator
+   */
+  public String getValue() {
+    return value;
+  }
+
+  /**
    * Used to make token safe to be used with this separator without collisions.
    *
    * @param token

http://git-wip-us.apache.org/repos/asf/hadoop/blob/f488b613/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/entity/EntityColumnPrefix.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/entity/EntityColumnPrefix.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/entity/EntityColumnPrefix.java
index 4459868..8b7bc3e 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/entity/EntityColumnPrefix.java
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/entity/EntityColumnPrefix.java
@@ -45,6 +45,11 @@ public enum EntityColumnPrefix implements 
ColumnPrefix<EntityTable> {
   RELATES_TO(EntityColumnFamily.INFO, "r"),
 
   /**
+   * To store TimelineEntity info values.
+   */
+  INFO(EntityColumnFamily.INFO, "i"),
+
+  /**
    * Lifecycle events for an entity
    */
   EVENT(EntityColumnFamily.INFO, "e"),
@@ -92,7 +97,7 @@ public enum EntityColumnPrefix implements 
ColumnPrefix<EntityTable> {
   /**
    * @return the column name value
    */
-  private String getColumnPrefix() {
+  public String getColumnPrefix() {
     return columnPrefix;
   }
 
@@ -150,11 +155,11 @@ public enum EntityColumnPrefix implements 
ColumnPrefix<EntityTable> {
    *
    * @see
    * org.apache.hadoop.yarn.server.timelineservice.storage.common.ColumnPrefix
-   * #readTimeseriesResults(org.apache.hadoop.hbase.client.Result)
+   * #readResultsWithTimestamps(org.apache.hadoop.hbase.client.Result)
    */
-  public NavigableMap<String, NavigableMap<Long, Number>> 
readTimeseriesResults(
-      Result result) throws IOException {
-    return column.readTimeseriesResults(result, columnPrefixBytes);
+  public <T> NavigableMap<String, NavigableMap<Long, T>>
+      readResultsWithTimestamps(Result result) throws IOException {
+    return column.readResultsWithTimestamps(result, columnPrefixBytes);
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/hadoop/blob/f488b613/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/entity/EntityTable.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/entity/EntityTable.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/entity/EntityTable.java
index 61f7c4c..2ae7d39 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/entity/EntityTable.java
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/entity/EntityTable.java
@@ -54,7 +54,10 @@ import 
org.apache.hadoop.yarn.server.timelineservice.storage.common.TimelineEnti
  * |            | modified_time:          |              |              |
  * |            | 1392995081012           | metricId2:   |              |
  * |            |                         | metricValue1 |              |
- * |            | r!relatesToKey:         | @timestamp2  |              |
+ * |            | i!infoKey:              | @timestamp2  |              |
+ * |            | infoValue               |              |              |
+ * |            |                         |              |              |
+ * |            | r!relatesToKey:         |              |              |
  * |            | id3?id4?id5             |              |              |
  * |            |                         |              |              |
  * |            | s!isRelatedToKey        |              |              |
@@ -62,6 +65,7 @@ import 
org.apache.hadoop.yarn.server.timelineservice.storage.common.TimelineEnti
  * |            |                         |              |              |
  * |            | e!eventId?eventInfoKey: |              |              |
  * |            | eventInfoValue          |              |              |
+ * |            | @timestamp              |              |              |
  * |            |                         |              |              |
  * |            | flowVersion:            |              |              |
  * |            | versionValue            |              |              |

http://git-wip-us.apache.org/repos/asf/hadoop/blob/f488b613/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/TestHBaseTimelineWriterImpl.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/TestHBaseTimelineWriterImpl.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/TestHBaseTimelineWriterImpl.java
index 6abf240..31cb5d2 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/TestHBaseTimelineWriterImpl.java
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/TestHBaseTimelineWriterImpl.java
@@ -43,8 +43,10 @@ import 
org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity;
 import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEvent;
 import org.apache.hadoop.yarn.api.records.timelineservice.TimelineMetric;
 import org.apache.hadoop.yarn.api.records.timelineservice.TimelineMetric.Type;
+import org.apache.hadoop.yarn.server.timeline.GenericObjectMapper;
 import org.apache.hadoop.yarn.server.timelineservice.storage.common.Separator;
 import 
org.apache.hadoop.yarn.server.timelineservice.storage.entity.EntityColumn;
+import 
org.apache.hadoop.yarn.server.timelineservice.storage.entity.EntityColumnFamily;
 import 
org.apache.hadoop.yarn.server.timelineservice.storage.entity.EntityColumnPrefix;
 import 
org.apache.hadoop.yarn.server.timelineservice.storage.entity.EntityRowKey;
 import 
org.apache.hadoop.yarn.server.timelineservice.storage.entity.EntityTable;
@@ -84,6 +86,12 @@ public class TestHBaseTimelineWriterImpl {
     entity.setCreatedTime(cTime);
     entity.setModifiedTime(mTime);
 
+    // add the info map in Timeline Entity
+    Map<String, Object> infoMap = new HashMap<String, Object>();
+    infoMap.put("infoMapKey1", "infoMapValue1");
+    infoMap.put("infoMapKey2", 10);
+    entity.addInfo(infoMap);
+
     // add the isRelatedToEntity info
     String key = "task";
     String value = "is_related_to_entity_id_here";
@@ -177,6 +185,14 @@ public class TestHBaseTimelineWriterImpl {
           Long mTime1 = val.longValue();
           assertEquals(mTime1, mTime);
 
+          Map<String, Object> infoColumns =
+              EntityColumnPrefix.INFO.readResults(result);
+          assertEquals(infoMap.size(), infoColumns.size());
+          for (String infoItem : infoMap.keySet()) {
+            assertEquals(infoMap.get(infoItem),
+                infoColumns.get(infoItem));
+          }
+
           // Remember isRelatedTo is of type Map<String, Set<String>>
           for (String isRelatedToKey : isRelatedTo.keySet()) {
             Object isRelatedToValue =
@@ -219,7 +235,7 @@ public class TestHBaseTimelineWriterImpl {
           }
 
           NavigableMap<String, NavigableMap<Long, Number>> metricsResult =
-              EntityColumnPrefix.METRIC.readTimeseriesResults(result);
+              EntityColumnPrefix.METRIC.readResultsWithTimestamps(result);
 
           NavigableMap<Long, Number> metricMap = metricsResult.get(m1.getId());
           // We got metrics back
@@ -237,7 +253,7 @@ public class TestHBaseTimelineWriterImpl {
         }
       }
       assertEquals(1, rowCount);
-      assertEquals(15, colCount);
+      assertEquals(17, colCount);
 
     } finally {
       hbi.stop();
@@ -267,13 +283,18 @@ public class TestHBaseTimelineWriterImpl {
 
   private void testAdditionalEntity() throws IOException {
     TimelineEvent event = new TimelineEvent();
-    event.setId("foo_event_id");
-    event.setTimestamp(System.currentTimeMillis());
-    event.addInfo("foo_event", "test");
+    String eventId = "foo_event_id";
+    event.setId(eventId);
+    Long expTs = 1436512802000L;
+    event.setTimestamp(expTs);
+    String expKey = "foo_event";
+    Object expVal = "test";
+    event.addInfo(expKey, expVal);
 
     final TimelineEntity entity = new TimelineEntity();
     entity.setId("attempt_1329348432655_0001_m_000008_18");
     entity.setType("FOO_ATTEMPT");
+    entity.addEvent(event);
 
     TimelineEntities entities = new TimelineEntities();
     entities.addEntity(entity);
@@ -304,6 +325,31 @@ public class TestHBaseTimelineWriterImpl {
       for (Result result : scanner) {
         if (result != null && !result.isEmpty()) {
           rowCount++;
+
+          // check the row key
+          byte[] row1 = result.getRow();
+          assertTrue(isRowKeyCorrect(row1, cluster, user, flow, runid, appName,
+              entity));
+
+          // check the events
+          NavigableMap<String, NavigableMap<Long, Object>> eventsResult =
+              EntityColumnPrefix.EVENT.readResultsWithTimestamps(result);
+          // there should be only one event
+          assertEquals(1, eventsResult.size());
+          // key name for the event
+          String valueKey = eventId + Separator.VALUES.getValue() + expKey;
+          for (Map.Entry<String, NavigableMap<Long, Object>> e :
+              eventsResult.entrySet()) {
+            // the value key must match
+            assertEquals(valueKey, e.getKey());
+            NavigableMap<Long, Object> value = e.getValue();
+            // there should be only one timestamp and value
+            assertEquals(1, value.size());
+            for (Map.Entry<Long, Object> e2: value.entrySet()) {
+              assertEquals(expTs, e2.getKey());
+              assertEquals(expVal, e2.getValue());
+            }
+          }
         }
       }
       assertEquals(1, rowCount);

Reply via email to