YARN-3411. [Storage implementation] explore the native HBase write schema for 
storage (Vrushali C via sjlee)

(cherry picked from commit 7a3068854d27eadae1c57545988f5b2029bf119a)


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/b3edd630
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/b3edd630
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/b3edd630

Branch: refs/heads/YARN-2928
Commit: b3edd6301b4ec8c59928cca0d5a41c94fb23740d
Parents: e93fa60
Author: Sangjin Lee <sj...@apache.org>
Authored: Thu May 21 14:11:01 2015 -0700
Committer: Sangjin Lee <sj...@apache.org>
Committed: Tue Aug 25 10:47:12 2015 -0700

----------------------------------------------------------------------
 hadoop-yarn-project/CHANGES.txt                 |   2 +
 .../collector/TimelineCollectorManager.java     |  19 +
 .../storage/EntityColumnDetails.java            | 110 ++++++
 .../storage/EntityColumnFamily.java             |  95 +++++
 .../storage/HBaseTimelineWriterImpl.java        | 225 ++++++++++++
 .../server/timelineservice/storage/Range.java   |  59 ++++
 .../storage/TimelineEntitySchemaConstants.java  |  71 ++++
 .../storage/TimelineSchemaCreator.java          | 231 +++++++++++++
 .../storage/TimelineWriterUtils.java            | 344 +++++++++++++++++++
 .../storage/TestHBaseTimelineWriterImpl.java    | 292 ++++++++++++++++
 10 files changed, 1448 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/b3edd630/hadoop-yarn-project/CHANGES.txt
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/CHANGES.txt b/hadoop-yarn-project/CHANGES.txt
index ec9abc9..3df01e9 100644
--- a/hadoop-yarn-project/CHANGES.txt
+++ b/hadoop-yarn-project/CHANGES.txt
@@ -73,6 +73,8 @@ Branch YARN-2928: Timeline Server Next Generation: Phase 1
     YARN-3634. TestMRTimelineEventHandling and TestApplication are broken. (
     Sangjin Lee via junping_du)
 
+    YARN-3411. [Storage implementation] explore the native HBase write schema
+    for storage (Vrushali C via sjlee)
   IMPROVEMENTS
 
   OPTIMIZATIONS

http://git-wip-us.apache.org/repos/asf/hadoop/blob/b3edd630/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/collector/TimelineCollectorManager.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/collector/TimelineCollectorManager.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/collector/TimelineCollectorManager.java
index 953d9b7..d54715c 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/collector/TimelineCollectorManager.java
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/collector/TimelineCollectorManager.java
@@ -59,6 +59,13 @@ public abstract class TimelineCollectorManager extends 
AbstractService {
     super.serviceInit(conf);
   }
 
+  @Override
+  protected void serviceStart() throws Exception {
+    super.serviceStart();
+    if (writer != null) {
+      writer.start();
+    }
+  }
 
   // access to this map is synchronized with the map itself
   private final Map<ApplicationId, TimelineCollector> collectors =
@@ -147,4 +154,16 @@ public abstract class TimelineCollectorManager extends 
AbstractService {
     return collectors.containsKey(appId);
   }
 
+  @Override
+  protected void serviceStop() throws Exception {
+    if (collectors != null && collectors.size() > 1) {
+      for (TimelineCollector c : collectors.values()) {
+        c.serviceStop();
+      }
+    }
+    if (writer != null) {
+      writer.close();
+    }
+    super.serviceStop();
+  }
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/b3edd630/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/EntityColumnDetails.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/EntityColumnDetails.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/EntityColumnDetails.java
new file mode 100644
index 0000000..2894c41
--- /dev/null
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/EntityColumnDetails.java
@@ -0,0 +1,110 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.yarn.server.timelineservice.storage;
+
+import java.io.IOException;
+import java.util.Set;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.hbase.client.BufferedMutator;
+import org.apache.hadoop.hbase.util.Bytes;
+
+/**
+ * Contains the Info Column Family details like Column names, types and byte
+ * representations for
+ * {@link org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity}
+ * object that is stored in hbase Also has utility functions for storing each 
of
+ * these to the backend
+ */
+@InterfaceAudience.Private
+@InterfaceStability.Unstable
+enum EntityColumnDetails {
+  ID(EntityColumnFamily.INFO, "id"),
+  TYPE(EntityColumnFamily.INFO, "type"),
+  CREATED_TIME(EntityColumnFamily.INFO, "created_time"),
+  MODIFIED_TIME(EntityColumnFamily.INFO, "modified_time"),
+  FLOW_VERSION(EntityColumnFamily.INFO, "flow_version"),
+  PREFIX_IS_RELATED_TO(EntityColumnFamily.INFO, "r"),
+  PREFIX_RELATES_TO(EntityColumnFamily.INFO, "s"),
+  PREFIX_EVENTS(EntityColumnFamily.INFO, "e");
+
+  private final EntityColumnFamily columnFamily;
+  private final String value;
+  private final byte[] inBytes;
+
+  private EntityColumnDetails(EntityColumnFamily columnFamily, 
+      String value) {
+    this.columnFamily = columnFamily;
+    this.value = value;
+    this.inBytes = Bytes.toBytes(this.value.toLowerCase());
+  }
+
+  public String getValue() {
+    return value;
+  }
+
+  byte[] getInBytes() {
+    return inBytes;
+  }
+
+  void store(byte[] rowKey, BufferedMutator entityTable, Object inputValue)
+      throws IOException {
+    TimelineWriterUtils.store(rowKey, entityTable,
+        this.columnFamily.getInBytes(), null, this.getInBytes(), inputValue,
+        null);
+  }
+
+  /**
+   * stores events data with column prefix
+   */
+  void store(byte[] rowKey, BufferedMutator entityTable, byte[] idBytes,
+      String key, Object inputValue) throws IOException {
+    TimelineWriterUtils.store(rowKey, entityTable,
+        this.columnFamily.getInBytes(),
+        // column prefix
+        TimelineWriterUtils.join(
+            TimelineEntitySchemaConstants.ROW_KEY_SEPARATOR_BYTES,
+            this.getInBytes(), idBytes),
+        // column qualifier
+        Bytes.toBytes(key),
+        inputValue, null);
+  }
+
+  /**
+   * stores relation entities with a column prefix
+   */
+  void store(byte[] rowKey, BufferedMutator entityTable, String key,
+      Set<String> inputValue) throws IOException {
+    TimelineWriterUtils.store(rowKey, entityTable,
+        this.columnFamily.getInBytes(),
+        // column prefix
+        this.getInBytes(),
+        // column qualifier
+        Bytes.toBytes(key),
+        // value
+        TimelineWriterUtils.getValueAsString(
+            TimelineEntitySchemaConstants.ROW_KEY_SEPARATOR, inputValue),
+        // cell timestamp
+        null);
+  }
+
+  // TODO add a method that accepts a byte array,
+  // iterates over the enum and returns an enum from those bytes
+
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/b3edd630/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/EntityColumnFamily.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/EntityColumnFamily.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/EntityColumnFamily.java
new file mode 100644
index 0000000..e556351
--- /dev/null
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/EntityColumnFamily.java
@@ -0,0 +1,95 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.yarn.server.timelineservice.storage;
+
+import java.io.IOException;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.hbase.client.BufferedMutator;
+import org.apache.hadoop.hbase.util.Bytes;
+
+/**
+ * Contains the Column family names and byte representations for
+ * {@link org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity}
+ * object that is stored in hbase
+ * Also has utility functions for storing each of these to the backend
+ */
+@InterfaceAudience.Private
+@InterfaceStability.Unstable
+enum EntityColumnFamily {
+  INFO("i"),
+  CONFIG("c"),
+  METRICS("m");
+
+  private final String value;
+  private final byte[] inBytes;
+
+  private EntityColumnFamily(String value) {
+    this.value = value;
+    this.inBytes = Bytes.toBytes(this.value.toLowerCase());
+  }
+
+  byte[] getInBytes() {
+    return inBytes;
+  }
+
+  public String getValue() {
+    return value;
+  }
+
+  /**
+   * stores the key as column and value as hbase column value in the given
+   * column family in the entity table
+   *
+   * @param rowKey
+   * @param entityTable
+   * @param inputValue
+   * @throws IOException
+   */
+  public void store(byte[] rowKey, BufferedMutator entityTable, String key,
+      String inputValue) throws IOException {
+    if (key == null) {
+      return;
+    }
+    TimelineWriterUtils.store(rowKey, entityTable, this.getInBytes(), null,
+        Bytes.toBytes(key), inputValue, null);
+  }
+
+  /**
+   * stores the values along with cell timestamp
+   *
+   * @param rowKey
+   * @param entityTable
+   * @param key
+   * @param timestamp
+   * @param inputValue
+   * @throws IOException
+   */
+  public void store(byte[] rowKey, BufferedMutator entityTable, String key,
+      Long timestamp, Number inputValue) throws IOException {
+    if (key == null) {
+      return;
+    }
+    TimelineWriterUtils.store(rowKey, entityTable, this.getInBytes(), null,
+        Bytes.toBytes(key), inputValue, timestamp);
+  }
+
+  // TODO add a method that accepts a byte array,
+  // iterates over the enum and returns an enum from those bytes
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/b3edd630/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/HBaseTimelineWriterImpl.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/HBaseTimelineWriterImpl.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/HBaseTimelineWriterImpl.java
new file mode 100644
index 0000000..aa71c6c
--- /dev/null
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/HBaseTimelineWriterImpl.java
@@ -0,0 +1,225 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.yarn.server.timelineservice.storage;
+
+import java.io.IOException;
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.service.AbstractService;
+import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntities;
+import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity;
+import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEvent;
+import org.apache.hadoop.yarn.api.records.timelineservice.TimelineMetric;
+import 
org.apache.hadoop.yarn.api.records.timelineservice.TimelineWriteResponse;
+import org.apache.hadoop.hbase.client.BufferedMutator;
+import org.apache.hadoop.hbase.client.Connection;
+import org.apache.hadoop.hbase.HBaseConfiguration;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.client.ConnectionFactory;
+import org.apache.hadoop.hbase.util.Bytes;
+import 
org.apache.hadoop.yarn.server.timelineservice.storage.TimelineEntitySchemaConstants;
+
+/**
+ * This implements a hbase based backend for storing application timeline 
entity
+ * information.
+ */
+@InterfaceAudience.Private
+@InterfaceStability.Unstable
+public class HBaseTimelineWriterImpl extends AbstractService implements
+    TimelineWriter {
+
+  private Connection conn;
+  private BufferedMutator entityTable;
+
+  private static final Log LOG = LogFactory
+      .getLog(HBaseTimelineWriterImpl.class);
+
+  public HBaseTimelineWriterImpl() {
+    super(HBaseTimelineWriterImpl.class.getName());
+  }
+
+  public HBaseTimelineWriterImpl(Configuration conf) throws IOException {
+    super(conf.get("yarn.application.id",
+        HBaseTimelineWriterImpl.class.getName()));
+  }
+
+  /**
+   * initializes the hbase connection to write to the entity table
+   */
+  @Override
+  protected void serviceInit(Configuration conf) throws Exception {
+    super.serviceInit(conf);
+    Configuration hbaseConf = HBaseConfiguration.create(conf);
+    conn = ConnectionFactory.createConnection(hbaseConf);
+    TableName entityTableName = TableName.valueOf(hbaseConf.get(
+        TimelineEntitySchemaConstants.ENTITY_TABLE_NAME,
+        TimelineEntitySchemaConstants.DEFAULT_ENTITY_TABLE_NAME));
+    entityTable = conn.getBufferedMutator(entityTableName);
+  }
+
+  /**
+   * Stores the entire information in TimelineEntities to the timeline store.
+   */
+  @Override
+  public TimelineWriteResponse write(String clusterId, String userId,
+      String flowName, String flowVersion, long flowRunId, String appId,
+      TimelineEntities data) throws IOException {
+
+    byte[] rowKeyPrefix = TimelineWriterUtils.getRowKeyPrefix(clusterId,
+        userId, flowName, flowRunId, appId);
+
+    TimelineWriteResponse putStatus = new TimelineWriteResponse();
+    for (TimelineEntity te : data.getEntities()) {
+
+      // a set can have at most 1 null
+      if (te == null) {
+        continue;
+      }
+      // get row key
+      byte[] row = TimelineWriterUtils.join(
+          TimelineEntitySchemaConstants.ROW_KEY_SEPARATOR_BYTES, rowKeyPrefix,
+          Bytes.toBytes(te.getType()), Bytes.toBytes(te.getId()));
+
+      storeInfo(row, te, flowVersion);
+      storeEvents(row, te.getEvents());
+      storeConfig(row, te.getConfigs());
+      storeMetrics(row, te.getMetrics());
+      storeRelations(row, te.getIsRelatedToEntities(),
+          EntityColumnDetails.PREFIX_IS_RELATED_TO);
+      storeRelations(row, te.getRelatesToEntities(),
+          EntityColumnDetails.PREFIX_RELATES_TO);
+    }
+
+    return putStatus;
+  }
+
+  /**
+   * Stores the Relations from the {@linkplain TimelineEntity} object
+   */
+  private void storeRelations(byte[] rowKey,
+      Map<String, Set<String>> connectedEntities,
+      EntityColumnDetails columnNamePrefix) throws IOException {
+    for (Map.Entry<String, Set<String>> entry : connectedEntities.entrySet()) {
+      columnNamePrefix.store(rowKey, entityTable, entry.getKey(),
+          entry.getValue());
+    }
+  }
+
+  /**
+   * Stores information from the {@linkplain TimelineEntity} object
+   */
+  private void storeInfo(byte[] rowKey, TimelineEntity te, String flowVersion)
+      throws IOException {
+
+    EntityColumnDetails.ID.store(rowKey, entityTable, te.getId());
+    EntityColumnDetails.TYPE.store(rowKey, entityTable, te.getType());
+    EntityColumnDetails.CREATED_TIME.store(rowKey, entityTable,
+        te.getCreatedTime());
+    EntityColumnDetails.MODIFIED_TIME.store(rowKey, entityTable,
+        te.getModifiedTime());
+    EntityColumnDetails.FLOW_VERSION.store(rowKey, entityTable, flowVersion);
+  }
+
+  /**
+   * stores the config information from {@linkplain TimelineEntity}
+   */
+  private void storeConfig(byte[] rowKey, Map<String, String> config)
+      throws IOException {
+    if (config == null) {
+      return;
+    }
+    for (Map.Entry<String, String> entry : config.entrySet()) {
+      EntityColumnFamily.CONFIG.store(rowKey, entityTable,
+          entry.getKey(), entry.getValue());
+    }
+  }
+
+  /**
+   * stores the {@linkplain TimelineMetric} information from the
+   * {@linkplain TimelineEvent} object
+   */
+  private void storeMetrics(byte[] rowKey, Set<TimelineMetric> metrics)
+      throws IOException {
+    if (metrics != null) {
+      for (TimelineMetric metric : metrics) {
+        String key = metric.getId();
+        Map<Long, Number> timeseries = metric.getValues();
+        for (Map.Entry<Long, Number> entry : timeseries.entrySet()) {
+          EntityColumnFamily.METRICS.store(rowKey, entityTable, key,
+              entry.getKey(), entry.getValue());
+        }
+      }
+    }
+  }
+
+  /**
+   * Stores the events from the {@linkplain TimelineEvent} object
+   */
+  private void storeEvents(byte[] rowKey, Set<TimelineEvent> events)
+      throws IOException {
+    if (events != null) {
+      for (TimelineEvent event : events) {
+        if (event != null) {
+          String id = event.getId();
+          if (id != null) {
+            byte[] idBytes = Bytes.toBytes(id);
+            Map<String, Object> eventInfo = event.getInfo();
+            if (eventInfo != null) {
+              for (Map.Entry<String, Object> info : eventInfo.entrySet()) {
+                EntityColumnDetails.PREFIX_EVENTS.store(rowKey,
+                    entityTable, idBytes, info.getKey(), info.getValue());
+              }
+            }
+          }
+        }
+      }
+    }
+  }
+
+  @Override
+  public TimelineWriteResponse aggregate(TimelineEntity data,
+      TimelineAggregationTrack track) throws IOException {
+    return null;
+  }
+
+  /**
+   * close the hbase connections
+   * The close APIs perform flushing and release any
+   * resources held
+   */
+  @Override
+  protected void serviceStop() throws Exception {
+    if (entityTable != null) {
+      LOG.info("closing entity table");
+      // The close API performs flushing and releases any resources held
+      entityTable.close();
+    }
+    if (conn != null) {
+      LOG.info("closing the hbase Connection");
+      conn.close();
+    }
+    super.serviceStop();
+  }
+
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hadoop/blob/b3edd630/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/Range.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/Range.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/Range.java
new file mode 100644
index 0000000..2a2db81
--- /dev/null
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/Range.java
@@ -0,0 +1,59 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.yarn.server.timelineservice.storage;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+
+@InterfaceAudience.Private
+@InterfaceStability.Unstable
+public class Range {
+  private final int startIdx;
+  private final int endIdx;
+
+  /**
+   * Defines a range from start index (inclusive) to end index (exclusive).
+   *
+   * @param start
+   *          Starting index position
+   * @param end
+   *          Ending index position (exclusive)
+   */
+  public Range(int start, int end) {
+    if (start < 0 || end < start) {
+      throw new IllegalArgumentException(
+          "Invalid range, required that: 0 <= start <= end; start=" + start
+              + ", end=" + end);
+    }
+
+    this.startIdx = start;
+    this.endIdx = end;
+  }
+
+  public int start() {
+    return startIdx;
+  }
+
+  public int end() {
+    return endIdx;
+  }
+
+  public int length() {
+    return endIdx - startIdx;
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hadoop/blob/b3edd630/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/TimelineEntitySchemaConstants.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/TimelineEntitySchemaConstants.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/TimelineEntitySchemaConstants.java
new file mode 100644
index 0000000..d95cbb2
--- /dev/null
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/TimelineEntitySchemaConstants.java
@@ -0,0 +1,71 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.yarn.server.timelineservice.storage;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+
+/**
+ * contains the constants used in the context of schema accesses for
+ * {@link org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity}
+ * information
+ */
+@InterfaceAudience.Private
+@InterfaceStability.Unstable
+public class TimelineEntitySchemaConstants {
+
+  /** entity prefix */
+  public static final String ENTITY_PREFIX =
+      YarnConfiguration.TIMELINE_SERVICE_PREFIX
+      + ".entity";
+
+  /** config param name that specifies the entity table name */
+  public static final String ENTITY_TABLE_NAME = ENTITY_PREFIX
+      + ".table.name";
+
+  /**
+   * config param name that specifies the TTL for metrics column family in
+   * entity table
+   */
+  public static final String ENTITY_TABLE_METRICS_TTL = ENTITY_PREFIX
+      + ".table.metrics.ttl";
+
+  /** default value for entity table name */
+  public static final String DEFAULT_ENTITY_TABLE_NAME = 
"timelineservice.entity";
+
+  /** in bytes default value for entity table name */
+  static final byte[] DEFAULT_ENTITY_TABLE_NAME_BYTES = Bytes
+      .toBytes(DEFAULT_ENTITY_TABLE_NAME);
+
+  /** separator in row key */
+  public static final String ROW_KEY_SEPARATOR = "!";
+
+  /** byte representation of the separator in row key */
+  static final byte[] ROW_KEY_SEPARATOR_BYTES = Bytes
+      .toBytes(ROW_KEY_SEPARATOR);
+
+  public static final byte ZERO_BYTES = 0;
+
+  /** default TTL is 30 days for metrics timeseries */
+  public static final int ENTITY_TABLE_METRICS_TTL_DEFAULT = 2592000;
+
+  /** default max number of versions */
+  public static final int ENTITY_TABLE_METRICS_MAX_VERSIONS_DEFAULT = 1000;
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hadoop/blob/b3edd630/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/TimelineSchemaCreator.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/TimelineSchemaCreator.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/TimelineSchemaCreator.java
new file mode 100644
index 0000000..820a6d1
--- /dev/null
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/TimelineSchemaCreator.java
@@ -0,0 +1,231 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.yarn.server.timelineservice.storage;
+
+import java.io.IOException;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.HBaseConfiguration;
+import org.apache.hadoop.hbase.HColumnDescriptor;
+import org.apache.hadoop.hbase.HTableDescriptor;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.client.Admin;
+import org.apache.hadoop.hbase.client.Connection;
+import org.apache.hadoop.hbase.client.ConnectionFactory;
+import org.apache.hadoop.hbase.regionserver.BloomType;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.commons.lang.StringUtils;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.commons.cli.CommandLine;
+import org.apache.commons.cli.CommandLineParser;
+import org.apache.commons.cli.HelpFormatter;
+import org.apache.commons.cli.Option;
+import org.apache.commons.cli.Options;
+import org.apache.commons.cli.ParseException;
+import org.apache.commons.cli.PosixParser;
+import org.apache.hadoop.util.GenericOptionsParser;
+
+/**
+ * This creates the schema for a hbase based backend for storing application
+ * timeline information.
+ */
+@InterfaceAudience.Private
+@InterfaceStability.Unstable
+public class TimelineSchemaCreator {
+
+  final static String NAME = TimelineSchemaCreator.class.getSimpleName();
+  private static final Log LOG = 
LogFactory.getLog(TimelineSchemaCreator.class);
+  final static byte[][] splits = { Bytes.toBytes("a"), Bytes.toBytes("ad"),
+      Bytes.toBytes("an"), Bytes.toBytes("b"), Bytes.toBytes("ca"),
+      Bytes.toBytes("cl"), Bytes.toBytes("d"), Bytes.toBytes("e"),
+      Bytes.toBytes("f"), Bytes.toBytes("g"), Bytes.toBytes("h"),
+      Bytes.toBytes("i"), Bytes.toBytes("j"), Bytes.toBytes("k"),
+      Bytes.toBytes("l"), Bytes.toBytes("m"), Bytes.toBytes("n"),
+      Bytes.toBytes("o"), Bytes.toBytes("q"), Bytes.toBytes("r"),
+      Bytes.toBytes("s"), Bytes.toBytes("se"), Bytes.toBytes("t"),
+      Bytes.toBytes("u"), Bytes.toBytes("v"), Bytes.toBytes("w"),
+      Bytes.toBytes("x"), Bytes.toBytes("y"), Bytes.toBytes("z") };
+
+  public static final String SPLIT_KEY_PREFIX_LENGTH = "4";
+
+  public static void main(String[] args) throws Exception {
+
+    Configuration hbaseConf = HBaseConfiguration.create();
+    // Grab input args and allow for -Dxyz style arguments
+    String[] otherArgs = new GenericOptionsParser(hbaseConf, args)
+        .getRemainingArgs();
+
+    // Grab the arguments we're looking for.
+    CommandLine commandLine = parseArgs(otherArgs);
+
+    // Grab the entityTableName argument
+    String entityTableName = commandLine.getOptionValue("e");
+    if (StringUtils.isNotBlank(entityTableName)) {
+      hbaseConf.set(TimelineEntitySchemaConstants.ENTITY_TABLE_NAME,
+          entityTableName);
+    }
+    String entityTable_TTL_Metrics = commandLine.getOptionValue("m");
+    if (StringUtils.isNotBlank(entityTable_TTL_Metrics)) {
+      hbaseConf.set(TimelineEntitySchemaConstants.ENTITY_TABLE_METRICS_TTL,
+          entityTable_TTL_Metrics);
+    }
+    createAllTables(hbaseConf);
+  }
+
+  /**
+   * Parse command-line arguments.
+   *
+   * @param args
+   *          command line arguments passed to program.
+   * @return parsed command line.
+   * @throws ParseException
+   */
+  private static CommandLine parseArgs(String[] args) throws ParseException {
+    Options options = new Options();
+
+    // Input
+    Option o = new Option("e", "entityTableName", true, "entity table name");
+    o.setArgName("entityTableName");
+    o.setRequired(false);
+    options.addOption(o);
+
+    o = new Option("m", "metricsTTL", true, "TTL for metrics column family");
+    o.setArgName("metricsTTL");
+    o.setRequired(false);
+    options.addOption(o);
+
+    CommandLineParser parser = new PosixParser();
+    CommandLine commandLine = null;
+    try {
+      commandLine = parser.parse(options, args);
+    } catch (Exception e) {
+      LOG.error("ERROR: " + e.getMessage() + "\n");
+      HelpFormatter formatter = new HelpFormatter();
+      formatter.printHelp(NAME + " ", options, true);
+      System.exit(-1);
+    }
+
+    return commandLine;
+  }
+
+  private static void createAllTables(Configuration hbaseConf)
+      throws IOException {
+
+    Connection conn = null;
+    try {
+      conn = ConnectionFactory.createConnection(hbaseConf);
+      Admin admin = conn.getAdmin();
+      if (admin == null) {
+        throw new IOException("Cannot create table since admin is null");
+      }
+      createTimelineEntityTable(admin, hbaseConf);
+    } finally {
+      if (conn != null) {
+        conn.close();
+      }
+    }
+  }
+
+  /**
+   * Creates a table with column families info, config and metrics
+   * info stores information about a timeline entity object
+   * config stores configuration data of a timeline entity object
+   * metrics stores the metrics of a timeline entity object
+   *
+   * Example entity table record:
+   * <pre>
+   *|------------------------------------------------------------|
+   *|  Row       | Column Family  | Column Family | Column Family|
+   *|  key       | info           | metrics       | config       |
+   *|------------------------------------------------------------|
+   *| userName!  | id:entityId    | metricName1:  | configKey1:  |
+   *| clusterId! |                | metricValue1  | configValue1 |
+   *| flowId!    | type:entityType| @timestamp1   |              |
+   *| flowRunId! |                |               | configKey2:  |
+   *| AppId!     | created_time:  | metricName1:  | configValue2 |
+   *| entityType!| 1392993084018  | metricValue2  |              |
+   *| entityId   |                | @timestamp2   |              |
+   *|            | modified_time: |               |              |
+   *|            | 1392995081012  | metricName2:  |              |
+   *|            |                | metricValue1  |              |
+   *|            | r!relatesToKey:| @timestamp2   |              |
+   *|            | id3!id4!id5    |               |              |
+   *|            |                |               |              |
+   *|            | s!isRelatedToKey|              |              |
+   *|            | id7!id9!id5    |               |              |
+   *|            |                |               |              |
+   *|            | e!eventKey:    |               |              |
+   *|            | eventValue     |               |              |
+   *|            |                |               |              |
+   *|            | flowVersion:   |               |              |
+   *|            | versionValue   |               |              |
+   *|------------------------------------------------------------|
+   *</pre>
+   * @param admin
+   * @param hbaseConf
+   * @throws IOException
+   */
+  public static void createTimelineEntityTable(Admin admin,
+      Configuration hbaseConf) throws IOException {
+
+    TableName table = TableName.valueOf(hbaseConf.get(
+        TimelineEntitySchemaConstants.ENTITY_TABLE_NAME,
+        TimelineEntitySchemaConstants.DEFAULT_ENTITY_TABLE_NAME));
+    if (admin.tableExists(table)) {
+      // do not disable / delete existing table
+      // similar to the approach taken by map-reduce jobs when
+      // output directory exists
+      throw new IOException("Table " + table.getNameAsString()
+          + " already exists.");
+    }
+
+    HTableDescriptor entityTableDescp = new HTableDescriptor(table);
+    HColumnDescriptor cf1 = new HColumnDescriptor(
+        EntityColumnFamily.INFO.getInBytes());
+    cf1.setBloomFilterType(BloomType.ROWCOL);
+    entityTableDescp.addFamily(cf1);
+
+    HColumnDescriptor cf2 = new HColumnDescriptor(
+        EntityColumnFamily.CONFIG.getInBytes());
+    cf2.setBloomFilterType(BloomType.ROWCOL);
+    cf2.setBlockCacheEnabled(true);
+    entityTableDescp.addFamily(cf2);
+
+    HColumnDescriptor cf3 = new HColumnDescriptor(
+        EntityColumnFamily.METRICS.getInBytes());
+    entityTableDescp.addFamily(cf3);
+    cf3.setBlockCacheEnabled(true);
+    // always keep 1 version (the latest)
+    cf3.setMinVersions(1);
+    
cf3.setMaxVersions(TimelineEntitySchemaConstants.ENTITY_TABLE_METRICS_MAX_VERSIONS_DEFAULT);
+    cf3.setTimeToLive(hbaseConf.getInt(
+        TimelineEntitySchemaConstants.ENTITY_TABLE_METRICS_TTL,
+        TimelineEntitySchemaConstants.ENTITY_TABLE_METRICS_TTL_DEFAULT));
+    entityTableDescp
+        
.setRegionSplitPolicyClassName("org.apache.hadoop.hbase.regionserver.KeyPrefixRegionSplitPolicy");
+    entityTableDescp.setValue("KeyPrefixRegionSplitPolicy.prefix_length",
+        SPLIT_KEY_PREFIX_LENGTH);
+    admin.createTable(entityTableDescp, splits);
+    LOG.info("Status of table creation for " + table.getNameAsString() + "="
+        + admin.tableExists(table));
+
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/b3edd630/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/TimelineWriterUtils.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/TimelineWriterUtils.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/TimelineWriterUtils.java
new file mode 100644
index 0000000..113935e
--- /dev/null
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/TimelineWriterUtils.java
@@ -0,0 +1,344 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.yarn.server.timelineservice.storage;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.hbase.Cell;
+import org.apache.hadoop.hbase.CellUtil;
+import org.apache.hadoop.hbase.client.BufferedMutator;
+import org.apache.hadoop.hbase.client.Put;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.yarn.server.timeline.GenericObjectMapper;
+import org.apache.hadoop.yarn.server.timelineservice.storage.Range;
+
+/**
+ * bunch of utility functions used across TimelineWriter classes
+ */
+@InterfaceAudience.Public
+@InterfaceStability.Unstable
+public class TimelineWriterUtils {
+
+  /** empty bytes */
+  public static final byte[] EMPTY_BYTES = new byte[0];
+  private static final String SPACE = " ";
+  private static final String UNDERSCORE = "_";
+  private static final String EMPTY_STRING = "";
+
+  /**
+   * Returns a single byte array containing all of the individual component
+   * arrays separated by the separator array.
+   *
+   * @param separator
+   * @param components
+   * @return byte array after joining the components
+   */
+  public static byte[] join(byte[] separator, byte[]... components) {
+    if (components == null || components.length == 0) {
+      return EMPTY_BYTES;
+    }
+
+    int finalSize = 0;
+    if (separator != null) {
+      finalSize = separator.length * (components.length - 1);
+    }
+    for (byte[] comp : components) {
+      if (comp != null) {
+        finalSize += comp.length;
+      }
+    }
+
+    byte[] buf = new byte[finalSize];
+    int offset = 0;
+    for (int i = 0; i < components.length; i++) {
+      if (components[i] != null) {
+        System.arraycopy(components[i], 0, buf, offset, components[i].length);
+        offset += components[i].length;
+        if (i < (components.length - 1) && separator != null
+            && separator.length > 0) {
+          System.arraycopy(separator, 0, buf, offset, separator.length);
+          offset += separator.length;
+        }
+      }
+    }
+    return buf;
+  }
+
+  /**
+   * Splits the source array into multiple array segments using the given
+   * separator, up to a maximum of count items. This will naturally produce
+   * copied byte arrays for each of the split segments. To identify the split
+   * ranges without the array copies, see
+   * {@link TimelineWriterUtils#splitRanges(byte[], byte[])}.
+   *
+   * @param source
+   * @param separator
+   * @return byte[] array after splitting the source
+   */
+  public static byte[][] split(byte[] source, byte[] separator) {
+    return split(source, separator, -1);
+  }
+
+  /**
+   * Splits the source array into multiple array segments using the given
+   * separator, up to a maximum of count items. This will naturally produce
+   * copied byte arrays for each of the split segments. To identify the split
+   * ranges without the array copies, see
+   * {@link TimelineWriterUtils#splitRanges(byte[], byte[])}.
+   *
+   * @param source
+   * @param separator
+   * @param limit
+   * @return byte[][] after splitting the input source
+   */
+  public static byte[][] split(byte[] source, byte[] separator, int limit) {
+    List<Range> segments = splitRanges(source, separator, limit);
+
+    byte[][] splits = new byte[segments.size()][];
+    for (int i = 0; i < segments.size(); i++) {
+      Range r = segments.get(i);
+      byte[] tmp = new byte[r.length()];
+      if (tmp.length > 0) {
+        System.arraycopy(source, r.start(), tmp, 0, r.length());
+      }
+      splits[i] = tmp;
+    }
+    return splits;
+  }
+
+  /**
+   * Returns a list of ranges identifying [start, end) -- closed, open --
+   * positions within the source byte array that would be split using the
+   * separator byte array.
+   */
+  public static List<Range> splitRanges(byte[] source, byte[] separator) {
+    return splitRanges(source, separator, -1);
+  }
+
+  /**
+   * Returns a list of ranges identifying [start, end) -- closed, open --
+   * positions within the source byte array that would be split using the
+   * separator byte array.
+   * @param source the source data
+   * @param separator the separator pattern to look for
+   * @param limit the maximum number of splits to identify in the source
+   */
+  public static List<Range> splitRanges(byte[] source, byte[] separator, int 
limit) {
+    List<Range> segments = new ArrayList<Range>();
+    if ((source == null) || (separator == null)) {
+      return segments;
+    }
+    int start = 0;
+    itersource: for (int i = 0; i < source.length; i++) {
+      for (int j = 0; j < separator.length; j++) {
+        if (source[i + j] != separator[j]) {
+          continue itersource;
+        }
+      }
+      // all separator elements matched
+      if (limit > 0 && segments.size() >= (limit-1)) {
+        // everything else goes in one final segment
+        break;
+      }
+
+      segments.add(new Range(start, i));
+      start = i + separator.length;
+      // i will be incremented again in outer for loop
+      i += separator.length-1;
+    }
+    // add in remaining to a final range
+    if (start <= source.length) {
+      segments.add(new Range(start, source.length));
+    }
+    return segments;
+  }
+
+  /**
+   * converts run id into it's inverse timestamp
+   * @param flowRunId
+   * @return inverted long
+   */
+  public static long encodeRunId(Long flowRunId) {
+    return Long.MAX_VALUE - flowRunId;
+  }
+
+  /**
+   * return a value from the Map as a String
+   * @param key
+   * @param values
+   * @return value as a String or ""
+   * @throws IOException 
+   */
+  public static String getValueAsString(final byte[] key,
+      final Map<byte[], byte[]> values) throws IOException {
+    if( values == null ) {
+      return EMPTY_STRING;
+    }
+    byte[] value = values.get(key);
+    if (value != null) {
+      return GenericObjectMapper.read(value).toString();
+    } else {
+      return EMPTY_STRING;
+    }
+  }
+
+  /**
+   * return a value from the Map as a long
+   * @param key
+   * @param values
+   * @return value as Long or 0L
+   * @throws IOException 
+   */
+  public static long getValueAsLong(final byte[] key,
+      final Map<byte[], byte[]> values) throws IOException {
+    if (values == null) {
+      return 0;
+    }
+    byte[] value = values.get(key);
+    if (value != null) {
+      Number val = (Number) GenericObjectMapper.read(value);
+      return val.longValue();
+    } else {
+      return 0L;
+    }
+  }
+
+  /**
+   * concates the values from a Set<Strings> to return a single delimited 
string value
+   * @param rowKeySeparator
+   * @param values
+   * @return Value from the set of strings as a string
+   */
+  public static String getValueAsString(String rowKeySeparator,
+      Set<String> values) {
+
+    if (values == null) {
+      return EMPTY_STRING;
+    }
+    StringBuilder concatStrings = new StringBuilder();
+    for (String value : values) {
+      concatStrings.append(value);
+      concatStrings.append(rowKeySeparator);
+    }
+    // remove the last separator
+    if(concatStrings.length() > 1) {
+      concatStrings.deleteCharAt(concatStrings.lastIndexOf(rowKeySeparator));
+    }
+    return concatStrings.toString();
+  }
+  /**
+   * Constructs a row key prefix for the entity table
+   * @param clusterId
+   * @param userId
+   * @param flowId
+   * @param flowRunId
+   * @param appId
+   * @return byte array with the row key prefix
+   */
+  static byte[] getRowKeyPrefix(String clusterId, String userId, String flowId,
+      Long flowRunId, String appId) {
+    return TimelineWriterUtils.join(
+        TimelineEntitySchemaConstants.ROW_KEY_SEPARATOR_BYTES,
+        Bytes.toBytes(cleanse(userId)), Bytes.toBytes(cleanse(clusterId)),
+        Bytes.toBytes(cleanse(flowId)),
+        Bytes.toBytes(TimelineWriterUtils.encodeRunId(flowRunId)),
+        Bytes.toBytes(cleanse(appId)));
+ }
+
+  /**
+   * Takes a string token to be used as a key or qualifier and
+   * cleanses out reserved tokens.
+   * This operation is not symmetrical.
+   * Logic is to replace all spaces and separator chars in input with
+   * underscores.
+   *
+   * @param token token to cleanse.
+   * @return String with no spaces and no separator chars
+   */
+  public static String cleanse(String token) {
+    if (token == null || token.length() == 0) {
+      return token;
+    }
+
+    String cleansed = token.replaceAll(SPACE, UNDERSCORE);
+    cleansed = cleansed.replaceAll(
+        TimelineEntitySchemaConstants.ROW_KEY_SEPARATOR, UNDERSCORE);
+
+    return cleansed;
+  }
+
+  /**
+   * stores the info to the table in hbase
+   * 
+   * @param rowKey
+   * @param table
+   * @param columnFamily
+   * @param columnPrefix
+   * @param columnQualifier
+   * @param inputValue
+   * @param cellTimeStamp
+   * @throws IOException
+   */
+  public static void store(byte[] rowKey, BufferedMutator table, byte[] 
columnFamily,
+      byte[] columnPrefix, byte[] columnQualifier, Object inputValue,
+      Long cellTimeStamp) throws IOException {
+    if ((rowKey == null) || (table == null) || (columnFamily == null)
+        || (columnQualifier == null) || (inputValue == null)) {
+      return;
+    }
+
+    Put p = null;
+    if (cellTimeStamp == null) {
+      if (columnPrefix != null) {
+        // store with prefix
+        p = new Put(rowKey);
+        p.addColumn(
+            columnFamily,
+            join(TimelineEntitySchemaConstants.ROW_KEY_SEPARATOR_BYTES,
+                columnPrefix, columnQualifier), GenericObjectMapper
+                .write(inputValue));
+      } else {
+        // store without prefix
+        p = new Put(rowKey);
+        p.addColumn(columnFamily, columnQualifier,
+            GenericObjectMapper.write(inputValue));
+      }
+    } else {
+      // store with cell timestamp
+      Cell cell = CellUtil.createCell(rowKey, columnFamily, columnQualifier,
+          // set the cell timestamp
+          cellTimeStamp,
+          // KeyValue Type minimum
+          TimelineEntitySchemaConstants.ZERO_BYTES,
+          GenericObjectMapper.write(inputValue));
+      p = new Put(rowKey);
+      p.add(cell);
+    }
+    if (p != null) {
+      table.mutate(p);
+    }
+
+  }
+
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hadoop/blob/b3edd630/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/TestHBaseTimelineWriterImpl.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/TestHBaseTimelineWriterImpl.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/TestHBaseTimelineWriterImpl.java
new file mode 100644
index 0000000..48bacd6
--- /dev/null
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/TestHBaseTimelineWriterImpl.java
@@ -0,0 +1,292 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.timelineservice.storage;
+
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.NavigableMap;
+import java.util.Set;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.assertNotNull;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.Cell;
+import org.apache.hadoop.hbase.CellUtil;
+import org.apache.hadoop.hbase.HBaseTestingUtility;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.client.Connection;
+import org.apache.hadoop.hbase.client.ConnectionFactory;
+import org.apache.hadoop.hbase.client.Result;
+import org.apache.hadoop.hbase.client.ResultScanner;
+import org.apache.hadoop.hbase.client.Scan;
+import org.apache.hadoop.hbase.client.Table;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntities;
+import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity;
+import org.apache.hadoop.yarn.api.records.timelineservice.TimelineMetric;
+import org.apache.hadoop.yarn.api.records.timelineservice.TimelineMetric.Type;
+import org.apache.hadoop.yarn.server.timeline.GenericObjectMapper;
+import org.junit.BeforeClass;
+import org.junit.AfterClass;
+import org.junit.Test;
+
+/**
+ * Unit test HBaseTimelineWriterImpl
+ * YARN 3411
+ *
+ * @throws Exception
+ */
+public class TestHBaseTimelineWriterImpl {
+
+  private static HBaseTestingUtility util;
+
+  @BeforeClass
+  public static void setupBeforeClass() throws Exception {
+    util = new HBaseTestingUtility();
+    util.startMiniCluster();
+    createSchema();
+  }
+
+  private static void createSchema() throws IOException {
+    byte[][] families = new byte[3][];
+    families[0] = EntityColumnFamily.INFO.getInBytes();
+    families[1] = EntityColumnFamily.CONFIG.getInBytes();
+    families[2] = EntityColumnFamily.METRICS.getInBytes();
+    TimelineSchemaCreator.createTimelineEntityTable(util.getHBaseAdmin(),
+        util.getConfiguration());
+  }
+
+  @Test
+  public void testWriteEntityToHBase() throws Exception {
+    TimelineEntities te = new TimelineEntities();
+    TimelineEntity entity = new TimelineEntity();
+    String id = "hello";
+    String type = "world";
+    entity.setId(id);
+    entity.setType(type);
+    Long cTime = 1425016501000L;
+    Long mTime = 1425026901000L;
+    entity.setCreatedTime(cTime);
+    entity.setModifiedTime(mTime);
+
+    // add the isRelatedToEntity info
+    String key = "task";
+    String value = "is_related_to_entity_id_here";
+    Set<String> isRelatedToSet = new HashSet<String>();
+    isRelatedToSet.add(value);
+    Map<String, Set<String>> isRelatedTo = new HashMap<String, Set<String>>();
+    isRelatedTo.put(key, isRelatedToSet);
+    entity.setIsRelatedToEntities(isRelatedTo);
+
+    // add the relatesTo info
+    key = "container";
+    value = "relates_to_entity_id_here";
+    Set<String> relatesToSet = new HashSet<String>();
+    relatesToSet.add(value);
+    value = "relates_to_entity_id_here_Second";
+    relatesToSet.add(value);
+    Map<String, Set<String>> relatesTo = new HashMap<String, Set<String>>();
+    relatesTo.put(key, relatesToSet);
+    entity.setRelatesToEntities(relatesTo);
+
+    // add some config entries
+    Map<String, String> conf = new HashMap<String, String>();
+    conf.put("config_param1", "value1");
+    conf.put("config_param2", "value2");
+    entity.addConfigs(conf);
+
+    // add metrics
+    Set<TimelineMetric> metrics = new HashSet<>();
+    TimelineMetric m1 = new TimelineMetric();
+    m1.setId("MAP_SLOT_MILLIS");
+    Map<Long, Number> metricValues = new HashMap<Long, Number>();
+    metricValues.put(1429741609000L, 100000000);
+    metricValues.put(1429742609000L, 200000000);
+    metricValues.put(1429743609000L, 300000000);
+    metricValues.put(1429744609000L, 400000000);
+    metricValues.put(1429745609000L, 50000000000L);
+    metricValues.put(1429746609000L, 60000000000L);
+    m1.setType(Type.TIME_SERIES);
+    m1.setValues(metricValues);
+    metrics.add(m1);
+    entity.addMetrics(metrics);
+
+    te.addEntity(entity);
+
+    HBaseTimelineWriterImpl hbi = null;
+    try {
+      Configuration c1 = util.getConfiguration();
+      hbi = new HBaseTimelineWriterImpl(c1);
+      hbi.init(c1);
+      String cluster = "cluster1";
+      String user = "user1";
+      String flow = "some_flow_name";
+      String flowVersion = "AB7822C10F1111";
+      long runid = 1002345678919L;
+      String appName = "some app name";
+      hbi.write(cluster, user, flow, flowVersion, runid, appName, te);
+      hbi.stop();
+
+      // scan the table and see that entity exists
+      Scan s = new Scan();
+      byte[] startRow = TimelineWriterUtils.getRowKeyPrefix(cluster, user, 
flow,
+          runid, appName);
+      s.setStartRow(startRow);
+      s.setMaxVersions(Integer.MAX_VALUE);
+      ResultScanner scanner = null;
+      TableName entityTableName = TableName
+          .valueOf(TimelineEntitySchemaConstants.DEFAULT_ENTITY_TABLE_NAME);
+      Connection conn = ConnectionFactory.createConnection(c1);
+      Table entityTable = conn.getTable(entityTableName);
+      int rowCount = 0;
+      int colCount = 0;
+      scanner = entityTable.getScanner(s);
+      for (Result result : scanner) {
+        if (result != null && !result.isEmpty()) {
+          rowCount++;
+          colCount += result.size();
+          byte[] row1 = result.getRow();
+          assertTrue(isRowKeyCorrect(row1, cluster, user, flow, runid, appName,
+              entity));
+
+          // check info column family
+          NavigableMap<byte[], byte[]> infoValues = result
+              .getFamilyMap(EntityColumnFamily.INFO.getInBytes());
+          String id1 = TimelineWriterUtils.getValueAsString(
+              EntityColumnDetails.ID.getInBytes(), infoValues);
+          assertEquals(id, id1);
+          String type1 = TimelineWriterUtils.getValueAsString(
+              EntityColumnDetails.TYPE.getInBytes(), infoValues);
+          assertEquals(type, type1);
+          Long cTime1 = TimelineWriterUtils.getValueAsLong(
+              EntityColumnDetails.CREATED_TIME.getInBytes(), infoValues);
+          assertEquals(cTime1, cTime);
+          Long mTime1 = TimelineWriterUtils.getValueAsLong(
+              EntityColumnDetails.MODIFIED_TIME.getInBytes(), infoValues);
+          assertEquals(mTime1, mTime);
+          checkRelatedEntities(isRelatedTo, infoValues,
+              EntityColumnDetails.PREFIX_IS_RELATED_TO.getInBytes());
+          checkRelatedEntities(relatesTo, infoValues,
+              EntityColumnDetails.PREFIX_RELATES_TO.getInBytes());
+
+          // check config column family
+          NavigableMap<byte[], byte[]> configValuesResult = result
+              .getFamilyMap(EntityColumnFamily.CONFIG.getInBytes());
+          checkConfigs(configValuesResult, conf);
+
+          NavigableMap<byte[], byte[]> metricsResult = result
+              .getFamilyMap(EntityColumnFamily.METRICS.getInBytes());
+          checkMetricsSizeAndKey(metricsResult, metrics);
+          List<Cell> metricCells = result.getColumnCells(
+              EntityColumnFamily.METRICS.getInBytes(),
+              Bytes.toBytes(m1.getId()));
+          checkMetricsTimeseries(metricCells, m1);
+        }
+      }
+      assertEquals(1, rowCount);
+      assertEquals(15, colCount);
+
+    } finally {
+      hbi.stop();
+      hbi.close();
+    }
+  }
+
+  private void checkMetricsTimeseries(List<Cell> metricCells,
+      TimelineMetric m1) throws IOException {
+    Map<Long, Number> timeseries = m1.getValues();
+    assertEquals(metricCells.size(), timeseries.size());
+    for (Cell c1 : metricCells) {
+      assertTrue(timeseries.containsKey(c1.getTimestamp()));
+      assertEquals(GenericObjectMapper.read(CellUtil.cloneValue(c1)),
+          timeseries.get(c1.getTimestamp()));
+    }
+  }
+
+  private void checkMetricsSizeAndKey(
+      NavigableMap<byte[], byte[]> metricsResult, Set<TimelineMetric> metrics) 
{
+    assertEquals(metrics.size(), metricsResult.size());
+    for (TimelineMetric m1 : metrics) {
+      byte[] key = Bytes.toBytes(m1.getId());
+      assertTrue(metricsResult.containsKey(key));
+    }
+  }
+
+  private void checkConfigs(NavigableMap<byte[], byte[]> configValuesResult,
+      Map<String, String> conf) throws IOException {
+
+    assertEquals(conf.size(), configValuesResult.size());
+    byte[] columnName;
+    for (String key : conf.keySet()) {
+      columnName = Bytes.toBytes(key);
+      assertTrue(configValuesResult.containsKey(columnName));
+      byte[] value = configValuesResult.get(columnName);
+      assertNotNull(value);
+      assertEquals(conf.get(key), GenericObjectMapper.read(value));
+    }
+  }
+
+  private void checkRelatedEntities(Map<String, Set<String>> isRelatedTo,
+      NavigableMap<byte[], byte[]> infoValues, byte[] columnPrefix)
+      throws IOException {
+
+    for (String key : isRelatedTo.keySet()) {
+      byte[] columnName = TimelineWriterUtils.join(
+          TimelineEntitySchemaConstants.ROW_KEY_SEPARATOR_BYTES, columnPrefix,
+          Bytes.toBytes(key));
+
+      byte[] value = infoValues.get(columnName);
+      assertNotNull(value);
+      String isRelatedToEntities = GenericObjectMapper.read(value).toString();
+      assertNotNull(isRelatedToEntities);
+      assertEquals(
+          TimelineWriterUtils.getValueAsString(
+              TimelineEntitySchemaConstants.ROW_KEY_SEPARATOR,
+              isRelatedTo.get(key)), isRelatedToEntities);
+    }
+  }
+
+  private boolean isRowKeyCorrect(byte[] rowKey, String cluster, String user,
+      String flow, Long runid, String appName, TimelineEntity te) {
+
+    byte[][] rowKeyComponents = TimelineWriterUtils.split(rowKey,
+        TimelineEntitySchemaConstants.ROW_KEY_SEPARATOR_BYTES);
+
+    assertTrue(rowKeyComponents.length == 7);
+    assertEquals(user, Bytes.toString(rowKeyComponents[0]));
+    assertEquals(cluster, Bytes.toString(rowKeyComponents[1]));
+    assertEquals(flow, Bytes.toString(rowKeyComponents[2]));
+    assertEquals(TimelineWriterUtils.encodeRunId(runid),
+        Bytes.toLong(rowKeyComponents[3]));
+    assertEquals(TimelineWriterUtils.cleanse(appName), 
Bytes.toString(rowKeyComponents[4]));
+    assertEquals(te.getType(), Bytes.toString(rowKeyComponents[5]));
+    assertEquals(te.getId(), Bytes.toString(rowKeyComponents[6]));
+    return true;
+  }
+
+  @AfterClass
+  public static void tearDownAfterClass() throws Exception {
+    util.shutdownMiniCluster();
+  }
+}

Reply via email to