YARN-3984. Adjusted the event column key schema and avoided missing empty 
event. Contributed by Vrushali C.

(cherry picked from commit 895ccfa1ab9e701f2908586e323249f670fe5544)


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/d1a1a22a
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/d1a1a22a
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/d1a1a22a

Branch: refs/heads/YARN-2928
Commit: d1a1a22a1341c06160a98de36a04e80c118df292
Parents: d93e636
Author: Zhijie Shen <zjs...@apache.org>
Authored: Wed Aug 5 16:28:57 2015 -0700
Committer: Sangjin Lee <sj...@apache.org>
Committed: Tue Aug 25 10:47:15 2015 -0700

----------------------------------------------------------------------
 hadoop-yarn-project/CHANGES.txt                 |   3 +
 .../storage/HBaseTimelineWriterImpl.java        |  23 +++-
 .../storage/common/TimelineWriterUtils.java     |  13 +++
 .../storage/entity/EntityRowKey.java            |  18 +---
 .../storage/TestHBaseTimelineWriterImpl.java    | 105 ++++++++++++++++---
 5 files changed, 128 insertions(+), 34 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/d1a1a22a/hadoop-yarn-project/CHANGES.txt
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/CHANGES.txt b/hadoop-yarn-project/CHANGES.txt
index f5ce32a..a484040 100644
--- a/hadoop-yarn-project/CHANGES.txt
+++ b/hadoop-yarn-project/CHANGES.txt
@@ -121,6 +121,9 @@ Branch YARN-2928: Timeline Server Next Generation: Phase 1
     YARN-3993. Changed to use the AM flag in ContainerContext determine AM
     container in TestPerNodeTimelineCollectorsAuxService. (Sunil G via zjshen)
 
+    YARN-3984. Adjusted the event column key schema and avoided missing empty
+    event. (Vrushali C via zjshen)
+
 Trunk - Unreleased
 
   INCOMPATIBLE CHANGES

http://git-wip-us.apache.org/repos/asf/hadoop/blob/d1a1a22a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/HBaseTimelineWriterImpl.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/HBaseTimelineWriterImpl.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/HBaseTimelineWriterImpl.java
index cd2e76e..3173e87 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/HBaseTimelineWriterImpl.java
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/HBaseTimelineWriterImpl.java
@@ -37,6 +37,7 @@ import 
org.apache.hadoop.yarn.api.records.timelineservice.TimelineEvent;
 import org.apache.hadoop.yarn.api.records.timelineservice.TimelineMetric;
 import 
org.apache.hadoop.yarn.api.records.timelineservice.TimelineWriteResponse;
 import org.apache.hadoop.yarn.server.timelineservice.storage.common.Separator;
+import 
org.apache.hadoop.yarn.server.timelineservice.storage.common.TimelineWriterUtils;
 import 
org.apache.hadoop.yarn.server.timelineservice.storage.common.TypedBufferedMutator;
 import 
org.apache.hadoop.yarn.server.timelineservice.storage.entity.EntityColumn;
 import 
org.apache.hadoop.yarn.server.timelineservice.storage.entity.EntityColumnPrefix;
@@ -200,20 +201,32 @@ public class HBaseTimelineWriterImpl extends 
AbstractService implements
                   "! Using the current timestamp");
               eventTimestamp = System.currentTimeMillis();
             }
+            byte[] columnQualifierFirst =
+                Bytes.toBytes(Separator.VALUES.encode(eventId));
+            byte[] columnQualifierWithTsBytes =
+                Separator.VALUES.join(columnQualifierFirst,
+                    Bytes.toBytes(TimelineWriterUtils.invert(eventTimestamp)));
             Map<String, Object> eventInfo = event.getInfo();
-            if (eventInfo != null) {
+            if ((eventInfo == null) || (eventInfo.size() == 0)) {
+              // add separator since event key is empty
+              byte[] compoundColumnQualifierBytes =
+                  Separator.VALUES.join(columnQualifierWithTsBytes,
+                      null);
+              String compoundColumnQualifier =
+                  Bytes.toString(compoundColumnQualifierBytes);
+              EntityColumnPrefix.EVENT.store(rowKey, entityTable,
+                  compoundColumnQualifier, null, 
TimelineWriterUtils.EMPTY_BYTES);
+            } else {
               for (Map.Entry<String, Object> info : eventInfo.entrySet()) {
                 // eventId?infoKey
-                byte[] columnQualifierFirst =
-                    Bytes.toBytes(Separator.VALUES.encode(eventId));
                 byte[] compoundColumnQualifierBytes =
-                    Separator.VALUES.join(columnQualifierFirst,
+                    Separator.VALUES.join(columnQualifierWithTsBytes,
                         Bytes.toBytes(info.getKey()));
                 // convert back to string to avoid additional API on store.
                 String compoundColumnQualifier =
                     Bytes.toString(compoundColumnQualifierBytes);
                 EntityColumnPrefix.EVENT.store(rowKey, entityTable,
-                    compoundColumnQualifier, eventTimestamp, info.getValue());
+                    compoundColumnQualifier, null, info.getValue());
               } // for info: eventInfo
             }
           }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/d1a1a22a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/TimelineWriterUtils.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/TimelineWriterUtils.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/TimelineWriterUtils.java
index 28a0b6a..c957bf5 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/TimelineWriterUtils.java
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/TimelineWriterUtils.java
@@ -124,4 +124,17 @@ public class TimelineWriterUtils {
     return segments;
   }
 
+  /**
+   * Converts a timestamp into it's inverse timestamp to be used in (row) keys
+   * where we want to have the most recent timestamp in the top of the table
+   * (scans start at the most recent timestamp first).
+   *
+   * @param key value to be inverted so that the latest version will be first 
in
+   *          a scan.
+   * @return inverted long
+   */
+  public static long invert(Long key) {
+    return Long.MAX_VALUE - key;
+  }
+
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hadoop/blob/d1a1a22a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/entity/EntityRowKey.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/entity/EntityRowKey.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/entity/EntityRowKey.java
index 61958c2..3e17ad0 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/entity/EntityRowKey.java
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/entity/EntityRowKey.java
@@ -20,6 +20,7 @@ package 
org.apache.hadoop.yarn.server.timelineservice.storage.entity;
 import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity;
 import org.apache.hadoop.yarn.server.timelineservice.storage.common.Separator;
+import 
org.apache.hadoop.yarn.server.timelineservice.storage.common.TimelineWriterUtils;
 
 /**
  * Represents a rowkey for the entity table.
@@ -47,7 +48,7 @@ public class EntityRowKey {
             flowId));
     // Note that flowRunId is a long, so we can't encode them all at the same
     // time.
-    byte[] second = Bytes.toBytes(EntityRowKey.invert(flowRunId));
+    byte[] second = Bytes.toBytes(TimelineWriterUtils.invert(flowRunId));
     byte[] third = Bytes.toBytes(Separator.QUALIFIERS.joinEncoded(appId));
     return Separator.QUALIFIERS.join(first, second, third);
   }
@@ -70,24 +71,11 @@ public class EntityRowKey {
             flowId));
     // Note that flowRunId is a long, so we can't encode them all at the same
     // time.
-    byte[] second = Bytes.toBytes(EntityRowKey.invert(flowRunId));
+    byte[] second = Bytes.toBytes(TimelineWriterUtils.invert(flowRunId));
     byte[] third =
         Bytes.toBytes(Separator.QUALIFIERS.joinEncoded(appId, te.getType(),
             te.getId()));
     return Separator.QUALIFIERS.join(first, second, third);
   }
 
-  /**
-   * Converts a timestamp into it's inverse timestamp to be used in (row) keys
-   * where we want to have the most recent timestamp in the top of the table
-   * (scans start at the most recent timestamp first).
-   *
-   * @param key value to be inverted so that the latest version will be first 
in
-   *          a scan.
-   * @return inverted long
-   */
-  public static long invert(Long key) {
-    return Long.MAX_VALUE - key;
-  }
-
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/d1a1a22a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/TestHBaseTimelineWriterImpl.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/TestHBaseTimelineWriterImpl.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/TestHBaseTimelineWriterImpl.java
index 31cb5d2..fd5643d 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/TestHBaseTimelineWriterImpl.java
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/TestHBaseTimelineWriterImpl.java
@@ -43,8 +43,8 @@ import 
org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity;
 import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEvent;
 import org.apache.hadoop.yarn.api.records.timelineservice.TimelineMetric;
 import org.apache.hadoop.yarn.api.records.timelineservice.TimelineMetric.Type;
-import org.apache.hadoop.yarn.server.timeline.GenericObjectMapper;
 import org.apache.hadoop.yarn.server.timelineservice.storage.common.Separator;
+import 
org.apache.hadoop.yarn.server.timelineservice.storage.common.TimelineWriterUtils;
 import 
org.apache.hadoop.yarn.server.timelineservice.storage.entity.EntityColumn;
 import 
org.apache.hadoop.yarn.server.timelineservice.storage.entity.EntityColumnFamily;
 import 
org.apache.hadoop.yarn.server.timelineservice.storage.entity.EntityColumnPrefix;
@@ -274,7 +274,7 @@ public class TestHBaseTimelineWriterImpl {
     assertEquals(user, Bytes.toString(rowKeyComponents[0]));
     assertEquals(cluster, Bytes.toString(rowKeyComponents[1]));
     assertEquals(flow, Bytes.toString(rowKeyComponents[2]));
-    assertEquals(EntityRowKey.invert(runid), 
Bytes.toLong(rowKeyComponents[3]));
+    assertEquals(TimelineWriterUtils.invert(runid), 
Bytes.toLong(rowKeyComponents[3]));
     assertEquals(appName, Bytes.toString(rowKeyComponents[4]));
     assertEquals(te.getType(), Bytes.toString(rowKeyComponents[5]));
     assertEquals(te.getId(), Bytes.toString(rowKeyComponents[6]));
@@ -317,7 +317,6 @@ public class TestHBaseTimelineWriterImpl {
       byte[] startRow =
           EntityRowKey.getRowKeyPrefix(cluster, user, flow, runid, appName);
       s.setStartRow(startRow);
-      s.setMaxVersions(Integer.MAX_VALUE);
       Connection conn = ConnectionFactory.createConnection(c1);
       ResultScanner scanner = new EntityTable().getResultScanner(c1, conn, s);
 
@@ -331,24 +330,23 @@ public class TestHBaseTimelineWriterImpl {
           assertTrue(isRowKeyCorrect(row1, cluster, user, flow, runid, appName,
               entity));
 
-          // check the events
-          NavigableMap<String, NavigableMap<Long, Object>> eventsResult =
-              EntityColumnPrefix.EVENT.readResultsWithTimestamps(result);
+          Map<String, Object> eventsResult =
+              EntityColumnPrefix.EVENT.readResults(result);
           // there should be only one event
           assertEquals(1, eventsResult.size());
           // key name for the event
-          String valueKey = eventId + Separator.VALUES.getValue() + expKey;
-          for (Map.Entry<String, NavigableMap<Long, Object>> e :
+          byte[] compoundColumnQualifierBytes =
+              Separator.VALUES.join(Bytes.toBytes(eventId),
+                  Bytes.toBytes(TimelineWriterUtils.invert(expTs)),
+                  Bytes.toBytes(expKey));
+          String valueKey = Bytes.toString(compoundColumnQualifierBytes);
+          for (Map.Entry<String, Object> e :
               eventsResult.entrySet()) {
             // the value key must match
             assertEquals(valueKey, e.getKey());
-            NavigableMap<Long, Object> value = e.getValue();
+            Object value = e.getValue();
             // there should be only one timestamp and value
-            assertEquals(1, value.size());
-            for (Map.Entry<Long, Object> e2: value.entrySet()) {
-              assertEquals(expTs, e2.getKey());
-              assertEquals(expVal, e2.getValue());
-            }
+            assertEquals(expVal, value.toString());
           }
         }
       }
@@ -360,6 +358,85 @@ public class TestHBaseTimelineWriterImpl {
     }
   }
 
+  @Test
+  public void testAdditionalEntityEmptyEventInfo() throws IOException {
+    TimelineEvent event = new TimelineEvent();
+    String eventId = "foo_event_id";
+    event.setId(eventId);
+    Long expTs = 1436512802000L;
+    event.setTimestamp(expTs);
+
+    final TimelineEntity entity = new TimelineEntity();
+    entity.setId("attempt_1329348432655_0001_m_000008_18");
+    entity.setType("FOO_ATTEMPT");
+    entity.addEvent(event);
+
+    TimelineEntities entities = new TimelineEntities();
+    entities.addEntity(entity);
+
+    HBaseTimelineWriterImpl hbi = null;
+    try {
+      Configuration c1 = util.getConfiguration();
+      hbi = new HBaseTimelineWriterImpl(c1);
+      hbi.init(c1);
+      String cluster = "cluster_emptyeventkey";
+      String user = "user_emptyeventkey";
+      String flow = "other_flow_name";
+      String flowVersion = "1111F01C2287BA";
+      long runid = 1009876543218L;
+      String appName = "some app name";
+      byte[] startRow =
+          EntityRowKey.getRowKeyPrefix(cluster, user, flow, runid, appName);
+      hbi.write(cluster, user, flow, flowVersion, runid, appName, entities);
+      hbi.stop();
+      // scan the table and see that entity exists
+      Scan s = new Scan();
+      s.setStartRow(startRow);
+      s.addFamily(EntityColumnFamily.INFO.getBytes());
+      Connection conn = ConnectionFactory.createConnection(c1);
+      ResultScanner scanner = new EntityTable().getResultScanner(c1, conn, s);
+
+      int rowCount = 0;
+      for (Result result : scanner) {
+        if (result != null && !result.isEmpty()) {
+          rowCount++;
+
+          // check the row key
+          byte[] row1 = result.getRow();
+          assertTrue(isRowKeyCorrect(row1, cluster, user, flow, runid, appName,
+              entity));
+
+          Map<String, Object> eventsResult =
+              EntityColumnPrefix.EVENT.readResults(result);
+          // there should be only one event
+          assertEquals(1, eventsResult.size());
+          // key name for the event
+          byte[] compoundColumnQualifierWithTsBytes =
+              Separator.VALUES.join(Bytes.toBytes(eventId),
+                  Bytes.toBytes(TimelineWriterUtils.invert(expTs)));
+          byte[] compoundColumnQualifierBytes =
+              Separator.VALUES.join(compoundColumnQualifierWithTsBytes,
+                  null);
+          String valueKey = Bytes.toString(compoundColumnQualifierBytes);
+          for (Map.Entry<String, Object> e :
+              eventsResult.entrySet()) {
+            // the column qualifier key must match
+            assertEquals(valueKey, e.getKey());
+            Object value = e.getValue();
+            // value should be empty
+            assertEquals("", value.toString());
+          }
+        }
+      }
+      assertEquals(1, rowCount);
+
+    } finally {
+      hbi.stop();
+      hbi.close();
+    }
+  }
+
+
   @AfterClass
   public static void tearDownAfterClass() throws Exception {
     util.shutdownMiniCluster();

Reply via email to