http://git-wip-us.apache.org/repos/asf/hadoop/blob/60a79b8e/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/FileSystemTimelineWriterImpl.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/FileSystemTimelineWriterImpl.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/FileSystemTimelineWriterImpl.java
new file mode 100644
index 0000000..1bb77a0
--- /dev/null
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/FileSystemTimelineWriterImpl.java
@@ -0,0 +1,158 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.timelineservice.storage;
+
+import java.io.BufferedWriter;
+import java.io.File;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.io.OutputStreamWriter;
+import java.io.PrintWriter;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.service.AbstractService;
+import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntities;
+import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity;
+import 
org.apache.hadoop.yarn.api.records.timelineservice.TimelineWriteResponse;
+import 
org.apache.hadoop.yarn.api.records.timelineservice.TimelineWriteResponse.TimelineWriteError;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.util.timeline.TimelineUtils;
+
+/**
+ * This implements a local file based backend for storing application timeline
+ * information. This implementation may not provide a complete implementation 
of
+ * all the necessary features. This implementation is provided solely for basic
+ * testing purposes, and should not be used in a non-test situation.
+ */
+@InterfaceAudience.Private
+@InterfaceStability.Unstable
+public class FileSystemTimelineWriterImpl extends AbstractService
+    implements TimelineWriter {
+
+  private String outputRoot;
+
+  /** Config param for timeline service storage tmp root for FILE YARN-3264. */
+  public static final String TIMELINE_SERVICE_STORAGE_DIR_ROOT
+      = YarnConfiguration.TIMELINE_SERVICE_PREFIX + "fs-writer.root-dir";
+
+  /** default value for storage location on local disk. */
+  public static final String DEFAULT_TIMELINE_SERVICE_STORAGE_DIR_ROOT
+      = "/tmp/timeline_service_data";
+
+  public static final String ENTITIES_DIR = "entities";
+
+  /** Default extension for output files. */
+  public static final String TIMELINE_SERVICE_STORAGE_EXTENSION = ".thist";
+
+  FileSystemTimelineWriterImpl() {
+    super((FileSystemTimelineWriterImpl.class.getName()));
+  }
+
+  @Override
+  public TimelineWriteResponse write(String clusterId, String userId,
+      String flowName, String flowVersion, long flowRunId, String appId,
+      TimelineEntities entities) throws IOException {
+    TimelineWriteResponse response = new TimelineWriteResponse();
+    for (TimelineEntity entity : entities.getEntities()) {
+      write(clusterId, userId, flowName, flowVersion, flowRunId, appId, entity,
+          response);
+    }
+    return response;
+  }
+
+  private synchronized void write(String clusterId, String userId,
+      String flowName, String flowVersion, long flowRun, String appId,
+      TimelineEntity entity, TimelineWriteResponse response)
+      throws IOException {
+    PrintWriter out = null;
+    try {
+      String dir = mkdirs(outputRoot, ENTITIES_DIR, clusterId, userId,
+          escape(flowName), escape(flowVersion), String.valueOf(flowRun), 
appId,
+          entity.getType());
+      String fileName = dir + entity.getId() +
+          TIMELINE_SERVICE_STORAGE_EXTENSION;
+      out =
+          new PrintWriter(new BufferedWriter(new OutputStreamWriter(
+              new FileOutputStream(fileName, true), "UTF-8")));
+      out.println(TimelineUtils.dumpTimelineRecordtoJSON(entity));
+      out.write("\n");
+    } catch (IOException ioe) {
+      TimelineWriteError error = new TimelineWriteError();
+      error.setEntityId(entity.getId());
+      error.setEntityType(entity.getType());
+      /*
+       * TODO: set an appropriate error code after PoC could possibly be:
+       * error.setErrorCode(TimelineWriteError.IO_EXCEPTION);
+       */
+      response.addError(error);
+    } finally {
+      if (out != null) {
+        out.close();
+      }
+    }
+  }
+
+  @Override
+  public TimelineWriteResponse aggregate(TimelineEntity data,
+      TimelineAggregationTrack track) throws IOException {
+    return null;
+
+  }
+
+  public String getOutputRoot() {
+    return outputRoot;
+  }
+
+  @Override
+  public void serviceInit(Configuration conf) throws Exception {
+    outputRoot = conf.get(TIMELINE_SERVICE_STORAGE_DIR_ROOT,
+        DEFAULT_TIMELINE_SERVICE_STORAGE_DIR_ROOT);
+  }
+
+  @Override
+  public void serviceStart() throws Exception {
+    mkdirs(outputRoot, ENTITIES_DIR);
+  }
+
+  @Override
+  public void flush() throws IOException {
+    // no op
+  }
+
+  private static String mkdirs(String... dirStrs) throws IOException {
+    StringBuilder path = new StringBuilder();
+    for (String dirStr : dirStrs) {
+      path.append(dirStr).append('/');
+      File dir = new File(path.toString());
+      if (!dir.exists()) {
+        if (!dir.mkdirs()) {
+          throw new IOException("Could not create directories for " + dir);
+        }
+      }
+    }
+    return path.toString();
+  }
+
+  // specifically escape the separator character
+  private static String escape(String str) {
+    return str.replace(File.separatorChar, '_');
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/60a79b8e/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/HBaseTimelineReaderImpl.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/HBaseTimelineReaderImpl.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/HBaseTimelineReaderImpl.java
new file mode 100644
index 0000000..a384a84
--- /dev/null
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/HBaseTimelineReaderImpl.java
@@ -0,0 +1,88 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.yarn.server.timelineservice.storage;
+
+
+import java.io.IOException;
+import java.util.Set;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.HBaseConfiguration;
+import org.apache.hadoop.hbase.client.Connection;
+import org.apache.hadoop.hbase.client.ConnectionFactory;
+import org.apache.hadoop.service.AbstractService;
+import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity;
+import 
org.apache.hadoop.yarn.server.timelineservice.reader.TimelineDataToRetrieve;
+import 
org.apache.hadoop.yarn.server.timelineservice.reader.TimelineEntityFilters;
+import 
org.apache.hadoop.yarn.server.timelineservice.reader.TimelineReaderContext;
+import 
org.apache.hadoop.yarn.server.timelineservice.storage.reader.TimelineEntityReader;
+import 
org.apache.hadoop.yarn.server.timelineservice.storage.reader.TimelineEntityReaderFactory;
+
+/**
+ * HBase based implementation for {@link TimelineReader}.
+ */
+public class HBaseTimelineReaderImpl
+    extends AbstractService implements TimelineReader {
+
+  private static final Log LOG = LogFactory
+      .getLog(HBaseTimelineReaderImpl.class);
+
+  private Configuration hbaseConf = null;
+  private Connection conn;
+
+  public HBaseTimelineReaderImpl() {
+    super(HBaseTimelineReaderImpl.class.getName());
+  }
+
+  @Override
+  public void serviceInit(Configuration conf) throws Exception {
+    super.serviceInit(conf);
+    hbaseConf = HBaseConfiguration.create(conf);
+    conn = ConnectionFactory.createConnection(hbaseConf);
+  }
+
+  @Override
+  protected void serviceStop() throws Exception {
+    if (conn != null) {
+      LOG.info("closing the hbase Connection");
+      conn.close();
+    }
+    super.serviceStop();
+  }
+
+  @Override
+  public TimelineEntity getEntity(TimelineReaderContext context,
+      TimelineDataToRetrieve dataToRetrieve) throws IOException {
+    TimelineEntityReader reader =
+        TimelineEntityReaderFactory.createSingleEntityReader(context,
+            dataToRetrieve);
+    return reader.readEntity(hbaseConf, conn);
+  }
+
+  @Override
+  public Set<TimelineEntity> getEntities(TimelineReaderContext context,
+      TimelineEntityFilters filters, TimelineDataToRetrieve dataToRetrieve)
+      throws IOException {
+    TimelineEntityReader reader =
+        TimelineEntityReaderFactory.createMultipleEntitiesReader(context,
+            filters, dataToRetrieve);
+    return reader.readEntities(hbaseConf, conn);
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/60a79b8e/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/HBaseTimelineWriterImpl.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/HBaseTimelineWriterImpl.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/HBaseTimelineWriterImpl.java
new file mode 100644
index 0000000..3511a2f
--- /dev/null
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/HBaseTimelineWriterImpl.java
@@ -0,0 +1,571 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.yarn.server.timelineservice.storage;
+
+import java.io.IOException;
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.HBaseConfiguration;
+import org.apache.hadoop.hbase.client.Connection;
+import org.apache.hadoop.hbase.client.ConnectionFactory;
+import org.apache.hadoop.service.AbstractService;
+import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntities;
+import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity;
+import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntityType;
+import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEvent;
+import org.apache.hadoop.yarn.api.records.timelineservice.TimelineMetric;
+import 
org.apache.hadoop.yarn.api.records.timelineservice.TimelineWriteResponse;
+import org.apache.hadoop.yarn.server.metrics.ApplicationMetricsConstants;
+import 
org.apache.hadoop.yarn.server.timelineservice.storage.application.ApplicationColumn;
+import 
org.apache.hadoop.yarn.server.timelineservice.storage.application.ApplicationColumnPrefix;
+import 
org.apache.hadoop.yarn.server.timelineservice.storage.application.ApplicationRowKey;
+import 
org.apache.hadoop.yarn.server.timelineservice.storage.application.ApplicationTable;
+import 
org.apache.hadoop.yarn.server.timelineservice.storage.apptoflow.AppToFlowColumn;
+import 
org.apache.hadoop.yarn.server.timelineservice.storage.apptoflow.AppToFlowRowKey;
+import 
org.apache.hadoop.yarn.server.timelineservice.storage.apptoflow.AppToFlowTable;
+import 
org.apache.hadoop.yarn.server.timelineservice.storage.common.ColumnPrefix;
+import 
org.apache.hadoop.yarn.server.timelineservice.storage.common.EventColumnName;
+import 
org.apache.hadoop.yarn.server.timelineservice.storage.common.KeyConverter;
+import 
org.apache.hadoop.yarn.server.timelineservice.storage.common.LongKeyConverter;
+import org.apache.hadoop.yarn.server.timelineservice.storage.common.Separator;
+import 
org.apache.hadoop.yarn.server.timelineservice.storage.common.StringKeyConverter;
+import 
org.apache.hadoop.yarn.server.timelineservice.storage.common.TypedBufferedMutator;
+import 
org.apache.hadoop.yarn.server.timelineservice.storage.entity.EntityColumn;
+import 
org.apache.hadoop.yarn.server.timelineservice.storage.entity.EntityColumnPrefix;
+import 
org.apache.hadoop.yarn.server.timelineservice.storage.entity.EntityRowKey;
+import 
org.apache.hadoop.yarn.server.timelineservice.storage.entity.EntityTable;
+import 
org.apache.hadoop.yarn.server.timelineservice.storage.flow.AggregationCompactionDimension;
+import 
org.apache.hadoop.yarn.server.timelineservice.storage.flow.AggregationOperation;
+import org.apache.hadoop.yarn.server.timelineservice.storage.flow.Attribute;
+import 
org.apache.hadoop.yarn.server.timelineservice.storage.flow.FlowActivityColumnPrefix;
+import 
org.apache.hadoop.yarn.server.timelineservice.storage.flow.FlowActivityRowKey;
+import 
org.apache.hadoop.yarn.server.timelineservice.storage.flow.FlowActivityTable;
+import 
org.apache.hadoop.yarn.server.timelineservice.storage.flow.FlowRunColumn;
+import 
org.apache.hadoop.yarn.server.timelineservice.storage.flow.FlowRunColumnPrefix;
+import 
org.apache.hadoop.yarn.server.timelineservice.storage.flow.FlowRunRowKey;
+import org.apache.hadoop.yarn.server.timelineservice.storage.flow.FlowRunTable;
+
+/**
+ * This implements a hbase based backend for storing the timeline entity
+ * information.
+ * It writes to multiple tables at the backend
+ */
+@InterfaceAudience.Private
+@InterfaceStability.Unstable
+public class HBaseTimelineWriterImpl extends AbstractService implements
+    TimelineWriter {
+
+  private static final Log LOG = LogFactory
+      .getLog(HBaseTimelineWriterImpl.class);
+
+  private Connection conn;
+  private TypedBufferedMutator<EntityTable> entityTable;
+  private TypedBufferedMutator<AppToFlowTable> appToFlowTable;
+  private TypedBufferedMutator<ApplicationTable> applicationTable;
+  private TypedBufferedMutator<FlowActivityTable> flowActivityTable;
+  private TypedBufferedMutator<FlowRunTable> flowRunTable;
+
+  /**
+   * Used to convert strings key components to and from storage format.
+   */
+  private final KeyConverter<String> stringKeyConverter =
+      new StringKeyConverter();
+
+  /**
+   * Used to convert Long key components to and from storage format.
+   */
+  private final KeyConverter<Long> longKeyConverter = new LongKeyConverter();
+
+  public HBaseTimelineWriterImpl() {
+    super(HBaseTimelineWriterImpl.class.getName());
+  }
+
+  public HBaseTimelineWriterImpl(Configuration conf) throws IOException {
+    super(conf.get("yarn.application.id",
+        HBaseTimelineWriterImpl.class.getName()));
+  }
+
+  /**
+   * initializes the hbase connection to write to the entity table.
+   */
+  @Override
+  protected void serviceInit(Configuration conf) throws Exception {
+    super.serviceInit(conf);
+    Configuration hbaseConf = HBaseConfiguration.create(conf);
+    conn = ConnectionFactory.createConnection(hbaseConf);
+    entityTable = new EntityTable().getTableMutator(hbaseConf, conn);
+    appToFlowTable = new AppToFlowTable().getTableMutator(hbaseConf, conn);
+    applicationTable = new ApplicationTable().getTableMutator(hbaseConf, conn);
+    flowRunTable = new FlowRunTable().getTableMutator(hbaseConf, conn);
+    flowActivityTable =
+        new FlowActivityTable().getTableMutator(hbaseConf, conn);
+  }
+
+  /**
+   * Stores the entire information in TimelineEntities to the timeline store.
+   */
+  @Override
+  public TimelineWriteResponse write(String clusterId, String userId,
+      String flowName, String flowVersion, long flowRunId, String appId,
+      TimelineEntities data) throws IOException {
+
+    TimelineWriteResponse putStatus = new TimelineWriteResponse();
+    // defensive coding to avoid NPE during row key construction
+    if ((flowName == null) || (appId == null) || (clusterId == null)
+        || (userId == null)) {
+      LOG.warn("Found null for one of: flowName=" + flowName + " appId=" + 
appId
+          + " userId=" + userId + " clusterId=" + clusterId
+          + " . Not proceeding with writing to hbase");
+      return putStatus;
+    }
+
+    for (TimelineEntity te : data.getEntities()) {
+
+      // a set can have at most 1 null
+      if (te == null) {
+        continue;
+      }
+
+      // if the entity is the application, the destination is the application
+      // table
+      boolean isApplication = isApplicationEntity(te);
+      byte[] rowKey;
+      if (isApplication) {
+        ApplicationRowKey applicationRowKey =
+            new ApplicationRowKey(clusterId, userId, flowName, flowRunId,
+                appId);
+        rowKey = applicationRowKey.getRowKey();
+      } else {
+        EntityRowKey entityRowKey =
+            new EntityRowKey(clusterId, userId, flowName, flowRunId, appId,
+                te.getType(), te.getId());
+        rowKey = entityRowKey.getRowKey();
+      }
+
+      storeInfo(rowKey, te, flowVersion, isApplication);
+      storeEvents(rowKey, te.getEvents(), isApplication);
+      storeConfig(rowKey, te.getConfigs(), isApplication);
+      storeMetrics(rowKey, te.getMetrics(), isApplication);
+      storeRelations(rowKey, te, isApplication);
+
+      if (isApplication) {
+        TimelineEvent event =
+            getApplicationEvent(te,
+                ApplicationMetricsConstants.CREATED_EVENT_TYPE);
+        FlowRunRowKey flowRunRowKey =
+            new FlowRunRowKey(clusterId, userId, flowName, flowRunId);
+        if (event != null) {
+          AppToFlowRowKey appToFlowRowKey =
+              new AppToFlowRowKey(clusterId, appId);
+          onApplicationCreated(flowRunRowKey, appToFlowRowKey, appId, userId,
+              flowVersion, te, event.getTimestamp());
+        }
+        // if it's an application entity, store metrics
+        storeFlowMetricsAppRunning(flowRunRowKey, appId, te);
+        // if application has finished, store it's finish time and write final
+        // values of all metrics
+        event = getApplicationEvent(te,
+            ApplicationMetricsConstants.FINISHED_EVENT_TYPE);
+        if (event != null) {
+          onApplicationFinished(flowRunRowKey, flowVersion, appId, te,
+              event.getTimestamp());
+        }
+      }
+    }
+    return putStatus;
+  }
+
+  private void onApplicationCreated(FlowRunRowKey flowRunRowKey,
+      AppToFlowRowKey appToFlowRowKey, String appId, String userId,
+      String flowVersion, TimelineEntity te, long appCreatedTimeStamp)
+      throws IOException {
+
+    String flowName = flowRunRowKey.getFlowName();
+    Long flowRunId = flowRunRowKey.getFlowRunId();
+
+    // store in App to flow table
+    byte[] rowKey = appToFlowRowKey.getRowKey();
+    AppToFlowColumn.FLOW_ID.store(rowKey, appToFlowTable, null, flowName);
+    AppToFlowColumn.FLOW_RUN_ID.store(rowKey, appToFlowTable, null, flowRunId);
+    AppToFlowColumn.USER_ID.store(rowKey, appToFlowTable, null, userId);
+
+    // store in flow run table
+    storeAppCreatedInFlowRunTable(flowRunRowKey, appId, te);
+
+    // store in flow activity table
+    byte[] flowActivityRowKeyBytes =
+        new FlowActivityRowKey(flowRunRowKey.getClusterId(),
+            appCreatedTimeStamp, flowRunRowKey.getUserId(), flowName)
+            .getRowKey();
+    byte[] qualifier = longKeyConverter.encode(flowRunRowKey.getFlowRunId());
+    FlowActivityColumnPrefix.RUN_ID.store(flowActivityRowKeyBytes,
+        flowActivityTable, qualifier, null, flowVersion,
+        AggregationCompactionDimension.APPLICATION_ID.getAttribute(appId));
+  }
+
+  /*
+   * updates the {@link FlowRunTable} with Application Created information
+   */
+  private void storeAppCreatedInFlowRunTable(FlowRunRowKey flowRunRowKey,
+      String appId, TimelineEntity te) throws IOException {
+    byte[] rowKey = flowRunRowKey.getRowKey();
+    FlowRunColumn.MIN_START_TIME.store(rowKey, flowRunTable, null,
+        te.getCreatedTime(),
+        AggregationCompactionDimension.APPLICATION_ID.getAttribute(appId));
+  }
+
+
+  /*
+   * updates the {@link FlowRunTable} and {@link FlowActivityTable} when an
+   * application has finished
+   */
+  private void onApplicationFinished(FlowRunRowKey flowRunRowKey,
+      String flowVersion, String appId, TimelineEntity te,
+      long appFinishedTimeStamp) throws IOException {
+    // store in flow run table
+    storeAppFinishedInFlowRunTable(flowRunRowKey, appId, te,
+        appFinishedTimeStamp);
+
+    // indicate in the flow activity table that the app has finished
+    byte[] rowKey =
+        new FlowActivityRowKey(flowRunRowKey.getClusterId(),
+            appFinishedTimeStamp, flowRunRowKey.getUserId(),
+            flowRunRowKey.getFlowName()).getRowKey();
+    byte[] qualifier = longKeyConverter.encode(flowRunRowKey.getFlowRunId());
+    FlowActivityColumnPrefix.RUN_ID.store(rowKey, flowActivityTable, qualifier,
+        null, flowVersion,
+        AggregationCompactionDimension.APPLICATION_ID.getAttribute(appId));
+  }
+
+  /*
+   * Update the {@link FlowRunTable} with Application Finished information
+   */
+  private void storeAppFinishedInFlowRunTable(FlowRunRowKey flowRunRowKey,
+      String appId, TimelineEntity te, long appFinishedTimeStamp)
+      throws IOException {
+    byte[] rowKey = flowRunRowKey.getRowKey();
+    Attribute attributeAppId =
+        AggregationCompactionDimension.APPLICATION_ID.getAttribute(appId);
+    FlowRunColumn.MAX_END_TIME.store(rowKey, flowRunTable, null,
+        appFinishedTimeStamp, attributeAppId);
+
+    // store the final value of metrics since application has finished
+    Set<TimelineMetric> metrics = te.getMetrics();
+    if (metrics != null) {
+      storeFlowMetrics(rowKey, metrics, attributeAppId,
+          AggregationOperation.SUM_FINAL.getAttribute());
+    }
+  }
+
+  /*
+   * Updates the {@link FlowRunTable} with Application Metrics
+   */
+  private void storeFlowMetricsAppRunning(FlowRunRowKey flowRunRowKey,
+      String appId, TimelineEntity te) throws IOException {
+    Set<TimelineMetric> metrics = te.getMetrics();
+    if (metrics != null) {
+      byte[] rowKey = flowRunRowKey.getRowKey();
+      storeFlowMetrics(rowKey, metrics,
+          AggregationCompactionDimension.APPLICATION_ID.getAttribute(appId),
+          AggregationOperation.SUM.getAttribute());
+    }
+  }
+
+  private void storeFlowMetrics(byte[] rowKey, Set<TimelineMetric> metrics,
+      Attribute... attributes) throws IOException {
+    for (TimelineMetric metric : metrics) {
+      byte[] metricColumnQualifier = stringKeyConverter.encode(metric.getId());
+      Map<Long, Number> timeseries = metric.getValues();
+      for (Map.Entry<Long, Number> timeseriesEntry : timeseries.entrySet()) {
+        Long timestamp = timeseriesEntry.getKey();
+        FlowRunColumnPrefix.METRIC.store(rowKey, flowRunTable,
+            metricColumnQualifier, timestamp, timeseriesEntry.getValue(),
+            attributes);
+      }
+    }
+  }
+
+  private void storeRelations(byte[] rowKey, TimelineEntity te,
+      boolean isApplication) throws IOException {
+    if (isApplication) {
+      storeRelations(rowKey, te.getIsRelatedToEntities(),
+          ApplicationColumnPrefix.IS_RELATED_TO, applicationTable);
+      storeRelations(rowKey, te.getRelatesToEntities(),
+          ApplicationColumnPrefix.RELATES_TO, applicationTable);
+    } else {
+      storeRelations(rowKey, te.getIsRelatedToEntities(),
+          EntityColumnPrefix.IS_RELATED_TO, entityTable);
+      storeRelations(rowKey, te.getRelatesToEntities(),
+          EntityColumnPrefix.RELATES_TO, entityTable);
+    }
+  }
+
+  /**
+   * Stores the Relations from the {@linkplain TimelineEntity} object.
+   */
+  private <T> void storeRelations(byte[] rowKey,
+      Map<String, Set<String>> connectedEntities,
+      ColumnPrefix<T> columnPrefix, TypedBufferedMutator<T> table)
+          throws IOException {
+    for (Map.Entry<String, Set<String>> connectedEntity : connectedEntities
+        .entrySet()) {
+      // id3?id4?id5
+      String compoundValue =
+          Separator.VALUES.joinEncoded(connectedEntity.getValue());
+      columnPrefix.store(rowKey, table,
+          stringKeyConverter.encode(connectedEntity.getKey()), null,
+          compoundValue);
+    }
+  }
+
+  /**
+   * Stores information from the {@linkplain TimelineEntity} object.
+   */
+  private void storeInfo(byte[] rowKey, TimelineEntity te, String flowVersion,
+      boolean isApplication) throws IOException {
+
+    if (isApplication) {
+      ApplicationColumn.ID.store(rowKey, applicationTable, null, te.getId());
+      ApplicationColumn.CREATED_TIME.store(rowKey, applicationTable, null,
+          te.getCreatedTime());
+      ApplicationColumn.FLOW_VERSION.store(rowKey, applicationTable, null,
+          flowVersion);
+      Map<String, Object> info = te.getInfo();
+      if (info != null) {
+        for (Map.Entry<String, Object> entry : info.entrySet()) {
+          ApplicationColumnPrefix.INFO.store(rowKey, applicationTable,
+              stringKeyConverter.encode(entry.getKey()), null,
+              entry.getValue());
+        }
+      }
+    } else {
+      EntityColumn.ID.store(rowKey, entityTable, null, te.getId());
+      EntityColumn.TYPE.store(rowKey, entityTable, null, te.getType());
+      EntityColumn.CREATED_TIME.store(rowKey, entityTable, null,
+          te.getCreatedTime());
+      EntityColumn.FLOW_VERSION.store(rowKey, entityTable, null, flowVersion);
+      Map<String, Object> info = te.getInfo();
+      if (info != null) {
+        for (Map.Entry<String, Object> entry : info.entrySet()) {
+          EntityColumnPrefix.INFO.store(rowKey, entityTable,
+              stringKeyConverter.encode(entry.getKey()), null,
+              entry.getValue());
+        }
+      }
+    }
+  }
+
+  /**
+   * stores the config information from {@linkplain TimelineEntity}.
+   */
+  private void storeConfig(byte[] rowKey, Map<String, String> config,
+      boolean isApplication) throws IOException {
+    if (config == null) {
+      return;
+    }
+    for (Map.Entry<String, String> entry : config.entrySet()) {
+      byte[] configKey = stringKeyConverter.encode(entry.getKey());
+      if (isApplication) {
+        ApplicationColumnPrefix.CONFIG.store(rowKey, applicationTable,
+            configKey, null, entry.getValue());
+      } else {
+        EntityColumnPrefix.CONFIG.store(rowKey, entityTable, configKey,
+            null, entry.getValue());
+      }
+    }
+  }
+
+  /**
+   * stores the {@linkplain TimelineMetric} information from the
+   * {@linkplain TimelineEvent} object.
+   */
+  private void storeMetrics(byte[] rowKey, Set<TimelineMetric> metrics,
+      boolean isApplication) throws IOException {
+    if (metrics != null) {
+      for (TimelineMetric metric : metrics) {
+        byte[] metricColumnQualifier =
+            stringKeyConverter.encode(metric.getId());
+        Map<Long, Number> timeseries = metric.getValues();
+        for (Map.Entry<Long, Number> timeseriesEntry : timeseries.entrySet()) {
+          Long timestamp = timeseriesEntry.getKey();
+          if (isApplication) {
+            ApplicationColumnPrefix.METRIC.store(rowKey, applicationTable,
+                metricColumnQualifier, timestamp, timeseriesEntry.getValue());
+          } else {
+            EntityColumnPrefix.METRIC.store(rowKey, entityTable,
+                metricColumnQualifier, timestamp, timeseriesEntry.getValue());
+          }
+        }
+      }
+    }
+  }
+
+  /**
+   * Stores the events from the {@linkplain TimelineEvent} object.
+   */
+  private void storeEvents(byte[] rowKey, Set<TimelineEvent> events,
+      boolean isApplication) throws IOException {
+    if (events != null) {
+      for (TimelineEvent event : events) {
+        if (event != null) {
+          String eventId = event.getId();
+          if (eventId != null) {
+            long eventTimestamp = event.getTimestamp();
+            // if the timestamp is not set, use the current timestamp
+            if (eventTimestamp == TimelineEvent.INVALID_TIMESTAMP) {
+              LOG.warn("timestamp is not set for event " + eventId +
+                  "! Using the current timestamp");
+              eventTimestamp = System.currentTimeMillis();
+            }
+            Map<String, Object> eventInfo = event.getInfo();
+            if ((eventInfo == null) || (eventInfo.size() == 0)) {
+              byte[] columnQualifierBytes =
+                  new EventColumnName(eventId, eventTimestamp, null)
+                      .getColumnQualifier();
+              if (isApplication) {
+                ApplicationColumnPrefix.EVENT.store(rowKey, applicationTable,
+                    columnQualifierBytes, null, Separator.EMPTY_BYTES);
+              } else {
+                EntityColumnPrefix.EVENT.store(rowKey, entityTable,
+                    columnQualifierBytes, null, Separator.EMPTY_BYTES);
+              }
+            } else {
+              for (Map.Entry<String, Object> info : eventInfo.entrySet()) {
+                // eventId=infoKey
+                byte[] columnQualifierBytes =
+                    new EventColumnName(eventId, eventTimestamp, info.getKey())
+                        .getColumnQualifier();
+                if (isApplication) {
+                  ApplicationColumnPrefix.EVENT.store(rowKey, applicationTable,
+                      columnQualifierBytes, null, info.getValue());
+                } else {
+                  EntityColumnPrefix.EVENT.store(rowKey, entityTable,
+                      columnQualifierBytes, null, info.getValue());
+                }
+              } // for info: eventInfo
+            }
+          }
+        }
+      } // event : events
+    }
+  }
+
+  /**
+   * Checks if the input TimelineEntity object is an ApplicationEntity.
+   *
+   * @param te TimelineEntity object.
+   * @return true if input is an ApplicationEntity, false otherwise
+   */
+  static boolean isApplicationEntity(TimelineEntity te) {
+    return te.getType().equals(TimelineEntityType.YARN_APPLICATION.toString());
+  }
+
+  /**
+   * @param te TimelineEntity object.
+   * @param eventId event with this id needs to be fetched
+   * @return TimelineEvent if TimelineEntity contains the desired event.
+   */
+  private static TimelineEvent getApplicationEvent(TimelineEntity te,
+      String eventId) {
+    if (isApplicationEntity(te)) {
+      for (TimelineEvent event : te.getEvents()) {
+        if (event.getId().equals(eventId)) {
+          return event;
+        }
+      }
+    }
+    return null;
+  }
+
+  /*
+   * (non-Javadoc)
+   *
+   * @see
+   * org.apache.hadoop.yarn.server.timelineservice.storage
+   * .TimelineWriter#aggregate
+   * (org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity,
+   * org.apache
+   * .hadoop.yarn.server.timelineservice.storage.TimelineAggregationTrack)
+   */
+  @Override
+  public TimelineWriteResponse aggregate(TimelineEntity data,
+      TimelineAggregationTrack track) throws IOException {
+    return null;
+  }
+
+  /*
+   * (non-Javadoc)
+   *
+   * @see
+   * org.apache.hadoop.yarn.server.timelineservice.storage.TimelineWriter#flush
+   * ()
+   */
+  @Override
+  public void flush() throws IOException {
+    // flush all buffered mutators
+    entityTable.flush();
+    appToFlowTable.flush();
+    applicationTable.flush();
+    flowRunTable.flush();
+    flowActivityTable.flush();
+  }
+
+  /**
+   * close the hbase connections The close APIs perform flushing and release 
any
+   * resources held.
+   */
+  @Override
+  protected void serviceStop() throws Exception {
+    if (entityTable != null) {
+      LOG.info("closing the entity table");
+      // The close API performs flushing and releases any resources held
+      entityTable.close();
+    }
+    if (appToFlowTable != null) {
+      LOG.info("closing the app_flow table");
+      // The close API performs flushing and releases any resources held
+      appToFlowTable.close();
+    }
+    if (applicationTable != null) {
+      LOG.info("closing the application table");
+      applicationTable.close();
+    }
+    if (flowRunTable != null) {
+      LOG.info("closing the flow run table");
+      // The close API performs flushing and releases any resources held
+      flowRunTable.close();
+    }
+    if (flowActivityTable != null) {
+      LOG.info("closing the flowActivityTable table");
+      // The close API performs flushing and releases any resources held
+      flowActivityTable.close();
+    }
+    if (conn != null) {
+      LOG.info("closing the hbase Connection");
+      conn.close();
+    }
+    super.serviceStop();
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/60a79b8e/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/OfflineAggregationWriter.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/OfflineAggregationWriter.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/OfflineAggregationWriter.java
new file mode 100644
index 0000000..1484f22
--- /dev/null
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/OfflineAggregationWriter.java
@@ -0,0 +1,67 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.timelineservice.storage;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.service.AbstractService;
+import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntities;
+import 
org.apache.hadoop.yarn.api.records.timelineservice.TimelineWriteResponse;
+import 
org.apache.hadoop.yarn.server.timelineservice.collector.TimelineCollectorContext;
+import 
org.apache.hadoop.yarn.server.timelineservice.storage.common.OfflineAggregationInfo;
+
+import java.io.IOException;
+
+/**
+ * YARN timeline service v2 offline aggregation storage interface.
+ */
+@InterfaceAudience.Private
+@InterfaceStability.Unstable
+public abstract class OfflineAggregationWriter extends AbstractService {
+
+  /**
+   * Construct the offline writer.
+   *
+   * @param name service name
+   */
+  public OfflineAggregationWriter(String name) {
+    super(name);
+  }
+
+  /**
+   * Persist aggregated timeline entities to the offline store based on which
+   * track this entity is to be rolled up to. The tracks along which
+   * aggregations are to be done are given by {@link OfflineAggregationInfo}.
+   *
+   * @param context a {@link TimelineCollectorContext} object that describes 
the
+   *                context information of the aggregated data. Depends on the
+   *                type of the aggregation, some fields of this context maybe
+   *                empty or null.
+   * @param entities {@link TimelineEntities} to be persisted.
+   * @param info an {@link OfflineAggregationInfo} object that describes the
+   *             detail of the aggregation. Current supported option is
+   *             {@link OfflineAggregationInfo#FLOW_AGGREGATION}.
+   * @return a {@link TimelineWriteResponse} object.
+   * @throws IOException if any problem occurs while writing aggregated
+   *     entities.
+   */
+  abstract TimelineWriteResponse writeAggregatedEntity(
+      TimelineCollectorContext context, TimelineEntities entities,
+      OfflineAggregationInfo info) throws IOException;
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/60a79b8e/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/PhoenixOfflineAggregationWriterImpl.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/PhoenixOfflineAggregationWriterImpl.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/PhoenixOfflineAggregationWriterImpl.java
new file mode 100644
index 0000000..130cb6c
--- /dev/null
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/PhoenixOfflineAggregationWriterImpl.java
@@ -0,0 +1,358 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.timelineservice.storage;
+
+import com.google.common.annotations.VisibleForTesting;
+import org.apache.commons.lang.StringUtils;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.classification.InterfaceAudience.Private;
+import org.apache.hadoop.classification.InterfaceStability.Unstable;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntities;
+import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity;
+import org.apache.hadoop.yarn.api.records.timelineservice.TimelineMetric;
+import 
org.apache.hadoop.yarn.api.records.timelineservice.TimelineWriteResponse;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.server.timeline.GenericObjectMapper;
+import 
org.apache.hadoop.yarn.server.timelineservice.storage.common.OfflineAggregationInfo;
+import 
org.apache.hadoop.yarn.server.timelineservice.collector.TimelineCollectorContext;
+import org.apache.phoenix.util.PropertiesUtil;
+
+import java.io.IOException;
+import java.sql.Connection;
+import java.sql.DriverManager;
+import java.sql.PreparedStatement;
+import java.sql.SQLException;
+import java.sql.Statement;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Properties;
+import java.util.Set;
+
+/**
+ * Offline aggregation Phoenix storage. This storage currently consists of two
+ * aggregation tables, one for flow level aggregation and one for user level
+ * aggregation.
+ *
+ * Example table record:
+ *
+ * <pre>
+ * |---------------------------|
+ * |  Primary   | Column Family|
+ * |  key       | metrics      |
+ * |---------------------------|
+ * | row_key    | metricId1:   |
+ * |            | metricValue1 |
+ * |            | @timestamp1  |
+ * |            |              |
+ * |            | metriciD1:   |
+ * |            | metricValue2 |
+ * |            | @timestamp2  |
+ * |            |              |
+ * |            | metricId2:   |
+ * |            | metricValue1 |
+ * |            | @timestamp2  |
+ * |            |              |
+ * |            |              |
+ * |            |              |
+ * |            |              |
+ * |            |              |
+ * |            |              |
+ * |            |              |
+ * |            |              |
+ * |            |              |
+ * |            |              |
+ * |            |              |
+ * |            |              |
+ * |            |              |
+ * |            |              |
+ * |---------------------------|
+ * </pre>
+ *
+ * For the flow aggregation table, the primary key contains user, cluster, and
+ * flow id. For user aggregation table,the primary key is user.
+ *
+ * Metrics column family stores all aggregated metrics for each record.
+ */
+@Private
+@Unstable
+public class PhoenixOfflineAggregationWriterImpl
+    extends OfflineAggregationWriter {
+
+  private static final Log LOG
+      = LogFactory.getLog(PhoenixOfflineAggregationWriterImpl.class);
+  private static final String PHOENIX_COL_FAMILY_PLACE_HOLDER
+      = "timeline_cf_placeholder";
+
+  /** Default Phoenix JDBC driver name. */
+  private static final String DRIVER_CLASS_NAME
+      = "org.apache.phoenix.jdbc.PhoenixDriver";
+
+  /** Default Phoenix timeline config column family. */
+  private static final String METRIC_COLUMN_FAMILY = "m.";
+  /** Default Phoenix timeline info column family. */
+  private static final String INFO_COLUMN_FAMILY = "i.";
+  /** Default separator for Phoenix storage. */
+  private static final String AGGREGATION_STORAGE_SEPARATOR = ";";
+
+  /** Connection string to the deployed Phoenix cluster. */
+  private String connString = null;
+  private Properties connProperties = new Properties();
+
+  public PhoenixOfflineAggregationWriterImpl(Properties prop) {
+    super(PhoenixOfflineAggregationWriterImpl.class.getName());
+    connProperties = PropertiesUtil.deepCopy(prop);
+  }
+
+  public PhoenixOfflineAggregationWriterImpl() {
+    super(PhoenixOfflineAggregationWriterImpl.class.getName());
+  }
+
+  @Override
+  public void serviceInit(Configuration conf) throws Exception {
+    Class.forName(DRIVER_CLASS_NAME);
+    // so check it here and only read in the config if it's not overridden.
+    connString =
+        conf.get(YarnConfiguration.PHOENIX_OFFLINE_STORAGE_CONN_STR,
+            YarnConfiguration.PHOENIX_OFFLINE_STORAGE_CONN_STR_DEFAULT);
+    super.init(conf);
+  }
+
+  @Override
+  public TimelineWriteResponse writeAggregatedEntity(
+      TimelineCollectorContext context, TimelineEntities entities,
+      OfflineAggregationInfo info) throws IOException {
+    TimelineWriteResponse response = new TimelineWriteResponse();
+    String sql = "UPSERT INTO " + info.getTableName()
+        + " (" + StringUtils.join(info.getPrimaryKeyList(), ",")
+        + ", created_time, metric_names) "
+        + "VALUES ("
+        + StringUtils.repeat("?,", info.getPrimaryKeyList().length)
+        + "?, ?)";
+    if (LOG.isDebugEnabled()) {
+      LOG.debug("TimelineEntity write SQL: " + sql);
+    }
+
+    try (Connection conn = getConnection();
+        PreparedStatement ps = conn.prepareStatement(sql)) {
+      for (TimelineEntity entity : entities.getEntities()) {
+        HashMap<String, TimelineMetric> formattedMetrics = new HashMap<>();
+        if (entity.getMetrics() != null) {
+          for (TimelineMetric m : entity.getMetrics()) {
+            formattedMetrics.put(m.getId(), m);
+          }
+        }
+        int idx = info.setStringsForPrimaryKey(ps, context, null, 1);
+        ps.setLong(idx++, entity.getCreatedTime());
+        ps.setString(idx++,
+            StringUtils.join(formattedMetrics.keySet().toArray(),
+            AGGREGATION_STORAGE_SEPARATOR));
+        ps.execute();
+
+        storeEntityVariableLengthFields(entity, formattedMetrics, context, 
conn,
+            info);
+
+        conn.commit();
+      }
+    } catch (SQLException se) {
+      LOG.error("Failed to add entity to Phoenix " + se.getMessage());
+      throw new IOException(se);
+    } catch (Exception e) {
+      LOG.error("Exception on getting connection: " + e.getMessage());
+      throw new IOException(e);
+    }
+    return response;
+  }
+
+  /**
+   * Create Phoenix tables for offline aggregation storage if the tables do not
+   * exist.
+   *
+   * @throws IOException if any problem happens while creating Phoenix tables.
+   */
+  public void createPhoenixTables() throws IOException {
+    // Create tables if necessary
+    try (Connection conn = getConnection();
+        Statement stmt = conn.createStatement()) {
+      // Table schema defined as in YARN-3817.
+      String sql = "CREATE TABLE IF NOT EXISTS "
+          + OfflineAggregationInfo.FLOW_AGGREGATION_TABLE_NAME
+          + "(user VARCHAR NOT NULL, cluster VARCHAR NOT NULL, "
+          + "flow_name VARCHAR NOT NULL, "
+          + "created_time UNSIGNED_LONG, "
+          + METRIC_COLUMN_FAMILY + PHOENIX_COL_FAMILY_PLACE_HOLDER
+          + " VARBINARY, "
+          + "metric_names VARCHAR, info_keys VARCHAR "
+          + "CONSTRAINT pk PRIMARY KEY("
+          + "user, cluster, flow_name))";
+      stmt.executeUpdate(sql);
+      sql = "CREATE TABLE IF NOT EXISTS "
+          + OfflineAggregationInfo.USER_AGGREGATION_TABLE_NAME
+          + "(user VARCHAR NOT NULL, cluster VARCHAR NOT NULL, "
+          + "created_time UNSIGNED_LONG, "
+          + METRIC_COLUMN_FAMILY + PHOENIX_COL_FAMILY_PLACE_HOLDER
+          + " VARBINARY, "
+          + "metric_names VARCHAR, info_keys VARCHAR "
+          + "CONSTRAINT pk PRIMARY KEY(user, cluster))";
+      stmt.executeUpdate(sql);
+      conn.commit();
+    } catch (SQLException se) {
+      LOG.error("Failed in init data " + se.getLocalizedMessage());
+      throw new IOException(se);
+    }
+    return;
+  }
+
+  // Utility functions
+  @Private
+  @VisibleForTesting
+  Connection getConnection() throws IOException {
+    Connection conn;
+    try {
+      conn = DriverManager.getConnection(connString, connProperties);
+      conn.setAutoCommit(false);
+    } catch (SQLException se) {
+      LOG.error("Failed to connect to phoenix server! "
+          + se.getLocalizedMessage());
+      throw new IOException(se);
+    }
+    return conn;
+  }
+
+  // WARNING: This method will permanently drop a table!
+  @Private
+  @VisibleForTesting
+  void dropTable(String tableName) throws Exception {
+    try (Connection conn = getConnection();
+         Statement stmt = conn.createStatement()) {
+      String sql = "DROP TABLE " + tableName;
+      stmt.executeUpdate(sql);
+    } catch (SQLException se) {
+      LOG.error("Failed in dropping entity table " + se.getLocalizedMessage());
+      throw se;
+    }
+  }
+
+  private static class DynamicColumns<K> {
+    static final String COLUMN_FAMILY_TYPE_BYTES = " VARBINARY";
+    static final String COLUMN_FAMILY_TYPE_STRING = " VARCHAR";
+    private String columnFamilyPrefix;
+    private String type;
+    private Set<K> columns;
+
+    public DynamicColumns(String columnFamilyPrefix, String type,
+        Set<K> keyValues) {
+      this.columnFamilyPrefix = columnFamilyPrefix;
+      this.columns = keyValues;
+      this.type = type;
+    }
+  }
+
+  private static <K> StringBuilder appendColumnsSQL(
+      StringBuilder colNames, DynamicColumns<K> cfInfo) {
+    // Prepare the sql template by iterating through all keys
+    for (K key : cfInfo.columns) {
+      colNames.append(",").append(cfInfo.columnFamilyPrefix)
+          .append(key.toString()).append(cfInfo.type);
+    }
+    return colNames;
+  }
+
+  private static <K, V> int setValuesForColumnFamily(
+      PreparedStatement ps, Map<K, V> keyValues, int startPos,
+      boolean converToBytes) throws SQLException {
+    int idx = startPos;
+    for (Map.Entry<K, V> entry : keyValues.entrySet()) {
+      V value = entry.getValue();
+      if (value instanceof Collection) {
+        ps.setString(idx++, StringUtils.join(
+            (Collection) value, AGGREGATION_STORAGE_SEPARATOR));
+      } else {
+        if (converToBytes) {
+          try {
+            ps.setBytes(idx++, GenericObjectMapper.write(entry.getValue()));
+          } catch (IOException ie) {
+            LOG.error("Exception in converting values into bytes "
+                + ie.getMessage());
+            throw new SQLException(ie);
+          }
+        } else {
+          ps.setString(idx++, value.toString());
+        }
+      }
+    }
+    return idx;
+  }
+
+  private static <K, V> int setBytesForColumnFamily(
+      PreparedStatement ps, Map<K, V> keyValues, int startPos)
+      throws SQLException {
+    return setValuesForColumnFamily(ps, keyValues, startPos, true);
+  }
+
+  private static <K, V> int setStringsForColumnFamily(
+      PreparedStatement ps, Map<K, V> keyValues, int startPos)
+      throws SQLException {
+    return setValuesForColumnFamily(ps, keyValues, startPos, false);
+  }
+
+  private static void storeEntityVariableLengthFields(TimelineEntity entity,
+      Map<String, TimelineMetric> formattedMetrics,
+      TimelineCollectorContext context, Connection conn,
+      OfflineAggregationInfo aggregationInfo) throws SQLException {
+    int numPlaceholders = 0;
+    StringBuilder columnDefs = new StringBuilder(
+        StringUtils.join(aggregationInfo.getPrimaryKeyList(), ","));
+    if (formattedMetrics != null && formattedMetrics.size() > 0) {
+      appendColumnsSQL(columnDefs, new DynamicColumns<>(
+          METRIC_COLUMN_FAMILY, DynamicColumns.COLUMN_FAMILY_TYPE_BYTES,
+          formattedMetrics.keySet()));
+      numPlaceholders += formattedMetrics.keySet().size();
+    }
+    if (numPlaceholders == 0) {
+      return;
+    }
+    StringBuilder placeholders = new StringBuilder();
+    placeholders.append(
+        StringUtils.repeat("?,", aggregationInfo.getPrimaryKeyList().length));
+    // numPlaceholders >= 1 now
+    placeholders.append("?")
+        .append(StringUtils.repeat(",?", numPlaceholders - 1));
+    String sqlVariableLengthFields = new StringBuilder("UPSERT INTO ")
+        .append(aggregationInfo.getTableName()).append(" (").append(columnDefs)
+        .append(") VALUES(").append(placeholders).append(")").toString();
+    if (LOG.isDebugEnabled()) {
+      LOG.debug("SQL statement for variable length fields: "
+          + sqlVariableLengthFields);
+    }
+    // Use try with resource statement for the prepared statement
+    try (PreparedStatement psVariableLengthFields =
+        conn.prepareStatement(sqlVariableLengthFields)) {
+      int idx = aggregationInfo.setStringsForPrimaryKey(
+          psVariableLengthFields, context, null, 1);
+      if (formattedMetrics != null && formattedMetrics.size() > 0) {
+        idx = setBytesForColumnFamily(
+            psVariableLengthFields, formattedMetrics, idx);
+      }
+      psVariableLengthFields.execute();
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/60a79b8e/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/TimelineAggregationTrack.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/TimelineAggregationTrack.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/TimelineAggregationTrack.java
new file mode 100644
index 0000000..6a1e086
--- /dev/null
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/TimelineAggregationTrack.java
@@ -0,0 +1,28 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.timelineservice.storage;
+
+/**
+ * specifies the tracks along which an entity
+ * info is to be aggregated on.
+ *
+ */
+public enum TimelineAggregationTrack {
+  APP, FLOW, USER, QUEUE
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/60a79b8e/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/TimelineReader.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/TimelineReader.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/TimelineReader.java
new file mode 100644
index 0000000..e8eabf1
--- /dev/null
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/TimelineReader.java
@@ -0,0 +1,180 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.hadoop.yarn.server.timelineservice.storage;
+
+import java.io.IOException;
+
+import java.util.Set;
+import org.apache.hadoop.classification.InterfaceAudience.Private;
+import org.apache.hadoop.classification.InterfaceStability.Unstable;
+import org.apache.hadoop.service.Service;
+import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity;
+import 
org.apache.hadoop.yarn.server.timelineservice.reader.TimelineDataToRetrieve;
+import 
org.apache.hadoop.yarn.server.timelineservice.reader.TimelineEntityFilters;
+import 
org.apache.hadoop.yarn.server.timelineservice.reader.TimelineReaderContext;
+
+/** ATSv2 reader interface. */
+@Private
+@Unstable
+public interface TimelineReader extends Service {
+
+  /**
+   * Possible fields to retrieve for {@link #getEntities} and
+   * {@link #getEntity}.
+   */
+  public enum Field {
+    ALL,
+    EVENTS,
+    INFO,
+    METRICS,
+    CONFIGS,
+    RELATES_TO,
+    IS_RELATED_TO
+  }
+
+  /**
+   * <p>The API to fetch the single entity given the identifier(depending on
+   * the entity type) in the scope of the given context.</p>
+   * @param context Context which defines the scope in which query has to be
+   *    made. Use getters of {@link TimelineReaderContext} to fetch context
+   *    fields. Context contains the following :<br>
+   *    <ul>
+   *    <li><b>entityType</b> - Entity type(mandatory).</li>
+   *    <li><b>clusterId</b> - Identifies the cluster(mandatory).</li>
+   *    <li><b>userId</b> - Identifies the user.</li>
+   *    <li><b>flowName</b> - Context flow name.</li>
+   *    <li><b>flowRunId</b> - Context flow run id.</li>
+   *    <li><b>appId</b> - Context app id.</li>
+   *    <li><b>entityId</b> - Entity id.</li>
+   *    </ul>
+   *    Fields in context which are mandatory depends on entity type. Entity
+   *    type is always mandatory. In addition to entity type, below is the list
+   *    of context fields which are mandatory, based on entity type.<br>
+   *    <ul>
+   *    <li>If entity type is YARN_FLOW_RUN (i.e. query to fetch a specific 
flow
+   *    run), clusterId, userId, flowName and flowRunId are mandatory.</li>
+   *    <li>If entity type is YARN_APPLICATION (i.e. query to fetch a specific
+   *    app), query is within the scope of clusterId, userId, flowName,
+   *    flowRunId and appId. But out of this, only clusterId and appId are
+   *    mandatory. If only clusterId and appId are supplied, backend storage
+   *    must fetch the flow context information i.e. userId, flowName and
+   *    flowRunId first and based on that, fetch the app. If flow context
+   *    information is also given, app can be directly fetched.
+   *    </li>
+   *    <li>For other entity types (i.e. query to fetch generic entity), query
+   *    is within the scope of clusterId, userId, flowName, flowRunId, appId,
+   *    entityType and entityId. But out of this, only clusterId, appId,
+   *    entityType and entityId are mandatory. If flow context information is
+   *    not supplied, backend storage must fetch the flow context information
+   *    i.e. userId, flowName and flowRunId first and based on that, fetch the
+   *    entity. If flow context information is also given, entity can be
+   *    directly queried.
+   *    </li>
+   *    </ul>
+   * @param dataToRetrieve Specifies which data to retrieve for the entity. Use
+   *    getters of TimelineDataToRetrieve class to fetch dataToRetrieve
+   *    fields. All the dataToRetrieve fields are optional. Refer to
+   *    {@link TimelineDataToRetrieve} for details.
+   * @return A <cite>TimelineEntity</cite> instance or null. The entity will
+   *    contain the metadata plus the given fields to retrieve.<br>
+   *    If entityType is YARN_FLOW_RUN, entity returned is of type
+   *    <cite>FlowRunEntity</cite>.<br>
+   *    For all other entity types, entity returned is of type
+   *    <cite>TimelineEntity</cite>.
+   * @throws IOException if there is an exception encountered while fetching
+   *    entity from backend storage.
+   */
+  TimelineEntity getEntity(TimelineReaderContext context,
+      TimelineDataToRetrieve dataToRetrieve) throws IOException;
+
+  /**
+   * <p>The API to search for a set of entities of the given entity type in
+   * the scope of the given context which matches the given predicates. The
+   * predicates include the created time window, limit to number of entities to
+   * be returned, and the entities can be filtered by checking whether they
+   * contain the given info/configs entries in the form of key/value pairs,
+   * given metrics in the form of metricsIds and its relation with metric
+   * values, given events in the form of the Ids, and whether they relate 
to/are
+   * related to other entities. For those parameters which have multiple
+   * entries, the qualified entity needs to meet all or them.</p>
+   *
+   * @param context Context which defines the scope in which query has to be
+   *    made. Use getters of {@link TimelineReaderContext} to fetch context
+   *    fields. Context contains the following :<br>
+   *    <ul>
+   *    <li><b>entityType</b> - Entity type(mandatory).</li>
+   *    <li><b>clusterId</b> - Identifies the cluster(mandatory).</li>
+   *    <li><b>userId</b> - Identifies the user.</li>
+   *    <li><b>flowName</b> - Context flow name.</li>
+   *    <li><b>flowRunId</b> - Context flow run id.</li>
+   *    <li><b>appId</b> - Context app id.</li>
+   *    </ul>
+   *    Although entityId is also part of context, it has no meaning for
+   *    getEntities.<br>
+   *    Fields in context which are mandatory depends on entity type. Entity
+   *    type is always mandatory. In addition to entity type, below is the list
+   *    of context fields which are mandatory, based on entity type.<br>
+   *    <ul>
+   *    <li>If entity type is YARN_FLOW_ACTIVITY (i.e. query to fetch flows),
+   *    only clusterId is mandatory.
+   *    </li>
+   *    <li>If entity type is YARN_FLOW_RUN (i.e. query to fetch flow runs),
+   *    clusterId, userId and flowName are mandatory.</li>
+   *    <li>If entity type is YARN_APPLICATION (i.e. query to fetch apps), we
+   *    can either get all apps within the context of flow name or within the
+   *    context of flow run. If apps are queried within the scope of flow name,
+   *    clusterId, userId and flowName are supplied. If they are queried within
+   *    the scope of flow run, clusterId, userId, flowName and flowRunId are
+   *    supplied.</li>
+   *    <li>For other entity types (i.e. query to fetch generic entities), 
query
+   *    is within the scope of clusterId, userId, flowName, flowRunId, appId 
and
+   *    entityType. But out of this, only clusterId, appId and entityType are
+   *    mandatory. If flow context information is not supplied, backend storage
+   *    must fetch the flow context information i.e. userId, flowName and
+   *    flowRunId first and based on that, fetch the entities. If flow context
+   *    information is also given, entities can be directly queried.
+   *    </li>
+   *    </ul>
+   * @param filters Specifies filters which restrict the number of entities
+   *    to return. Use getters of TimelineEntityFilters class to fetch
+   *    various filters. All the filters are optional. Refer to
+   *    {@link TimelineEntityFilters} for details.
+   * @param dataToRetrieve Specifies which data to retrieve for each entity. 
Use
+   *    getters of TimelineDataToRetrieve class to fetch dataToRetrieve
+   *    fields. All the dataToRetrieve fields are optional. Refer to
+   *    {@link TimelineDataToRetrieve} for details.
+   * @return A set of <cite>TimelineEntity</cite> instances of the given entity
+   *    type in the given context scope which matches the given predicates
+   *    ordered by created time, descending. Each entity will only contain the
+   *    metadata(id, type and created time) plus the given fields to retrieve.
+   *    <br>
+   *    If entityType is YARN_FLOW_ACTIVITY, entities returned are of type
+   *    <cite>FlowActivityEntity</cite>.<br>
+   *    If entityType is YARN_FLOW_RUN, entities returned are of type
+   *    <cite>FlowRunEntity</cite>.<br>
+   *    For all other entity types, entities returned are of type
+   *    <cite>TimelineEntity</cite>.
+   * @throws IOException if there is an exception encountered while fetching
+   *    entity from backend storage.
+   */
+  Set<TimelineEntity> getEntities(
+      TimelineReaderContext context,
+      TimelineEntityFilters filters,
+      TimelineDataToRetrieve dataToRetrieve) throws IOException;
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hadoop/blob/60a79b8e/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/TimelineSchemaCreator.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/TimelineSchemaCreator.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/TimelineSchemaCreator.java
new file mode 100644
index 0000000..33f5449
--- /dev/null
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/TimelineSchemaCreator.java
@@ -0,0 +1,272 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.yarn.server.timelineservice.storage;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.commons.cli.CommandLine;
+import org.apache.commons.cli.CommandLineParser;
+import org.apache.commons.cli.HelpFormatter;
+import org.apache.commons.cli.Option;
+import org.apache.commons.cli.Options;
+import org.apache.commons.cli.ParseException;
+import org.apache.commons.cli.PosixParser;
+import org.apache.commons.lang.StringUtils;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.HBaseConfiguration;
+import org.apache.hadoop.hbase.client.Admin;
+import org.apache.hadoop.hbase.client.Connection;
+import org.apache.hadoop.hbase.client.ConnectionFactory;
+import org.apache.hadoop.util.GenericOptionsParser;
+import 
org.apache.hadoop.yarn.server.timelineservice.storage.application.ApplicationTable;
+import 
org.apache.hadoop.yarn.server.timelineservice.storage.apptoflow.AppToFlowTable;
+import 
org.apache.hadoop.yarn.server.timelineservice.storage.entity.EntityTable;
+import 
org.apache.hadoop.yarn.server.timelineservice.storage.flow.FlowActivityTable;
+import org.apache.hadoop.yarn.server.timelineservice.storage.flow.FlowRunTable;
+
+import com.google.common.annotations.VisibleForTesting;
+
+/**
+ * This creates the schema for a hbase based backend for storing application
+ * timeline information.
+ */
+@InterfaceAudience.Private
+@InterfaceStability.Unstable
+public final class TimelineSchemaCreator {
+  private TimelineSchemaCreator() {
+  }
+
+  final static String NAME = TimelineSchemaCreator.class.getSimpleName();
+  private static final Log LOG = 
LogFactory.getLog(TimelineSchemaCreator.class);
+  private static final String PHOENIX_OPTION_SHORT = "p";
+  private static final String SKIP_EXISTING_TABLE_OPTION_SHORT = "s";
+  private static final String APP_TABLE_NAME_SHORT = "a";
+  private static final String APP_TO_FLOW_TABLE_NAME_SHORT = "a2f";
+  private static final String TTL_OPTION_SHORT = "m";
+  private static final String ENTITY_TABLE_NAME_SHORT = "e";
+
+  public static void main(String[] args) throws Exception {
+
+    Configuration hbaseConf = HBaseConfiguration.create();
+    // Grab input args and allow for -Dxyz style arguments
+    String[] otherArgs = new GenericOptionsParser(hbaseConf, args)
+        .getRemainingArgs();
+
+    // Grab the arguments we're looking for.
+    CommandLine commandLine = parseArgs(otherArgs);
+
+    // Grab the entityTableName argument
+    String entityTableName
+        = commandLine.getOptionValue(ENTITY_TABLE_NAME_SHORT);
+    if (StringUtils.isNotBlank(entityTableName)) {
+      hbaseConf.set(EntityTable.TABLE_NAME_CONF_NAME, entityTableName);
+    }
+    String entityTableTTLMetrics = 
commandLine.getOptionValue(TTL_OPTION_SHORT);
+    if (StringUtils.isNotBlank(entityTableTTLMetrics)) {
+      int metricsTTL = Integer.parseInt(entityTableTTLMetrics);
+      new EntityTable().setMetricsTTL(metricsTTL, hbaseConf);
+    }
+    // Grab the appToflowTableName argument
+    String appToflowTableName = commandLine.getOptionValue(
+        APP_TO_FLOW_TABLE_NAME_SHORT);
+    if (StringUtils.isNotBlank(appToflowTableName)) {
+      hbaseConf.set(AppToFlowTable.TABLE_NAME_CONF_NAME, appToflowTableName);
+    }
+    // Grab the applicationTableName argument
+    String applicationTableName = commandLine.getOptionValue(
+        APP_TABLE_NAME_SHORT);
+    if (StringUtils.isNotBlank(applicationTableName)) {
+      hbaseConf.set(ApplicationTable.TABLE_NAME_CONF_NAME,
+          applicationTableName);
+    }
+
+    List<Exception> exceptions = new ArrayList<>();
+    try {
+      boolean skipExisting
+          = commandLine.hasOption(SKIP_EXISTING_TABLE_OPTION_SHORT);
+      if (skipExisting) {
+        LOG.info("Will skip existing tables and continue on htable creation "
+            + "exceptions!");
+      }
+      createAllTables(hbaseConf, skipExisting);
+      LOG.info("Successfully created HBase schema. ");
+    } catch (IOException e) {
+      LOG.error("Error in creating hbase tables: " + e.getMessage());
+      exceptions.add(e);
+    }
+
+    // Create Phoenix data schema if needed
+    if (commandLine.hasOption(PHOENIX_OPTION_SHORT)) {
+      Configuration phoenixConf = new Configuration();
+      try {
+        PhoenixOfflineAggregationWriterImpl phoenixWriter =
+            new PhoenixOfflineAggregationWriterImpl();
+        phoenixWriter.init(phoenixConf);
+        phoenixWriter.start();
+        phoenixWriter.createPhoenixTables();
+        phoenixWriter.stop();
+        LOG.info("Successfully created Phoenix offline aggregation schema. ");
+      } catch (IOException e) {
+        LOG.error("Error in creating phoenix tables: " + e.getMessage());
+        exceptions.add(e);
+      }
+    }
+    if (exceptions.size() > 0) {
+      LOG.warn("Schema creation finished with the following exceptions");
+      for (Exception e : exceptions) {
+        LOG.warn(e.getMessage());
+      }
+      System.exit(-1);
+    } else {
+      LOG.info("Schema creation finished successfully");
+    }
+  }
+
+  /**
+   * Parse command-line arguments.
+   *
+   * @param args
+   *          command line arguments passed to program.
+   * @return parsed command line.
+   * @throws ParseException
+   */
+  private static CommandLine parseArgs(String[] args) throws ParseException {
+    Options options = new Options();
+
+    // Input
+    Option o = new Option(ENTITY_TABLE_NAME_SHORT, "entityTableName", true,
+        "entity table name");
+    o.setArgName("entityTableName");
+    o.setRequired(false);
+    options.addOption(o);
+
+    o = new Option(TTL_OPTION_SHORT, "metricsTTL", true,
+        "TTL for metrics column family");
+    o.setArgName("metricsTTL");
+    o.setRequired(false);
+    options.addOption(o);
+
+    o = new Option(APP_TO_FLOW_TABLE_NAME_SHORT, "appToflowTableName", true,
+        "app to flow table name");
+    o.setArgName("appToflowTableName");
+    o.setRequired(false);
+    options.addOption(o);
+
+    o = new Option(APP_TABLE_NAME_SHORT, "applicationTableName", true,
+        "application table name");
+    o.setArgName("applicationTableName");
+    o.setRequired(false);
+    options.addOption(o);
+
+    // Options without an argument
+    // No need to set arg name since we do not need an argument here
+    o = new Option(PHOENIX_OPTION_SHORT, "usePhoenix", false,
+        "create Phoenix offline aggregation tables");
+    o.setRequired(false);
+    options.addOption(o);
+
+    o = new Option(SKIP_EXISTING_TABLE_OPTION_SHORT, "skipExistingTable",
+        false, "skip existing Hbase tables and continue to create new tables");
+    o.setRequired(false);
+    options.addOption(o);
+
+    CommandLineParser parser = new PosixParser();
+    CommandLine commandLine = null;
+    try {
+      commandLine = parser.parse(options, args);
+    } catch (Exception e) {
+      LOG.error("ERROR: " + e.getMessage() + "\n");
+      HelpFormatter formatter = new HelpFormatter();
+      formatter.printHelp(NAME + " ", options, true);
+      System.exit(-1);
+    }
+
+    return commandLine;
+  }
+
+  @VisibleForTesting
+  public static void createAllTables(Configuration hbaseConf,
+      boolean skipExisting) throws IOException {
+
+    Connection conn = null;
+    try {
+      conn = ConnectionFactory.createConnection(hbaseConf);
+      Admin admin = conn.getAdmin();
+      if (admin == null) {
+        throw new IOException("Cannot create table since admin is null");
+      }
+      try {
+        new EntityTable().createTable(admin, hbaseConf);
+      } catch (IOException e) {
+        if (skipExisting) {
+          LOG.warn("Skip and continue on: " + e.getMessage());
+        } else {
+          throw e;
+        }
+      }
+      try {
+        new AppToFlowTable().createTable(admin, hbaseConf);
+      } catch (IOException e) {
+        if (skipExisting) {
+          LOG.warn("Skip and continue on: " + e.getMessage());
+        } else {
+          throw e;
+        }
+      }
+      try {
+        new ApplicationTable().createTable(admin, hbaseConf);
+      } catch (IOException e) {
+        if (skipExisting) {
+          LOG.warn("Skip and continue on: " + e.getMessage());
+        } else {
+          throw e;
+        }
+      }
+      try {
+        new FlowRunTable().createTable(admin, hbaseConf);
+      } catch (IOException e) {
+        if (skipExisting) {
+          LOG.warn("Skip and continue on: " + e.getMessage());
+        } else {
+          throw e;
+        }
+      }
+      try {
+        new FlowActivityTable().createTable(admin, hbaseConf);
+      } catch (IOException e) {
+        if (skipExisting) {
+          LOG.warn("Skip and continue on: " + e.getMessage());
+        } else {
+          throw e;
+        }
+      }
+    } finally {
+      if (conn != null) {
+        conn.close();
+      }
+    }
+  }
+
+
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/60a79b8e/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/TimelineWriter.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/TimelineWriter.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/TimelineWriter.java
new file mode 100644
index 0000000..663a18a
--- /dev/null
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/TimelineWriter.java
@@ -0,0 +1,87 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.yarn.server.timelineservice.storage;
+
+import java.io.IOException;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.service.Service;
+import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntities;
+import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity;
+import 
org.apache.hadoop.yarn.api.records.timelineservice.TimelineWriteResponse;
+
+/**
+ * This interface is for storing application timeline information.
+ */
+@InterfaceAudience.Private
+@InterfaceStability.Unstable
+public interface TimelineWriter extends Service {
+
+  /**
+   * Stores the entire information in {@link TimelineEntities} to the
+   * timeline store. Any errors occurring for individual write request objects
+   * will be reported in the response.
+   *
+   * @param clusterId context cluster ID
+   * @param userId context user ID
+   * @param flowName context flow name
+   * @param flowVersion context flow version
+   * @param flowRunId run id for the flow.
+   * @param appId context app ID.
+   * @param data
+   *          a {@link TimelineEntities} object.
+   * @return a {@link TimelineWriteResponse} object.
+   * @throws IOException if there is any exception encountered while storing
+   *     or writing entities to the backend storage.
+   */
+  TimelineWriteResponse write(String clusterId, String userId,
+      String flowName, String flowVersion, long flowRunId, String appId,
+      TimelineEntities data) throws IOException;
+
+  /**
+   * Aggregates the entity information to the timeline store based on which
+   * track this entity is to be rolled up to The tracks along which 
aggregations
+   * are to be done are given by {@link TimelineAggregationTrack}
+   *
+   * Any errors occurring for individual write request objects will be reported
+   * in the response.
+   *
+   * @param data
+   *          a {@link TimelineEntity} object
+   *          a {@link TimelineAggregationTrack} enum
+   *          value.
+   * @param track Specifies the track or dimension along which aggregation 
would
+   *     occur. Includes USER, FLOW, QUEUE, etc.
+   * @return a {@link TimelineWriteResponse} object.
+   * @throws IOException if there is any exception encountered while 
aggregating
+   *     entities to the backend storage.
+   */
+  TimelineWriteResponse aggregate(TimelineEntity data,
+      TimelineAggregationTrack track) throws IOException;
+
+  /**
+   * Flushes the data to the backend storage. Whatever may be buffered will be
+   * written to the storage when the method returns. This may be a potentially
+   * time-consuming operation, and should be used judiciously.
+   *
+   * @throws IOException if there is any exception encountered while flushing
+   *     entities to the backend storage.
+   */
+  void flush() throws IOException;
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hadoop/blob/60a79b8e/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/application/ApplicationColumn.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/application/ApplicationColumn.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/application/ApplicationColumn.java
new file mode 100644
index 0000000..dde3911
--- /dev/null
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/application/ApplicationColumn.java
@@ -0,0 +1,156 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.yarn.server.timelineservice.storage.application;
+
+import java.io.IOException;
+
+import org.apache.hadoop.hbase.client.Result;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.yarn.server.timelineservice.storage.common.Column;
+import 
org.apache.hadoop.yarn.server.timelineservice.storage.common.ColumnFamily;
+import 
org.apache.hadoop.yarn.server.timelineservice.storage.common.ColumnHelper;
+import 
org.apache.hadoop.yarn.server.timelineservice.storage.common.GenericConverter;
+import 
org.apache.hadoop.yarn.server.timelineservice.storage.common.LongConverter;
+import org.apache.hadoop.yarn.server.timelineservice.storage.common.Separator;
+import 
org.apache.hadoop.yarn.server.timelineservice.storage.common.TypedBufferedMutator;
+import 
org.apache.hadoop.yarn.server.timelineservice.storage.common.ValueConverter;
+import org.apache.hadoop.yarn.server.timelineservice.storage.flow.Attribute;
+
+/**
+ * Identifies fully qualified columns for the {@link ApplicationTable}.
+ */
+public enum ApplicationColumn implements Column<ApplicationTable> {
+
+  /**
+   * App id.
+   */
+  ID(ApplicationColumnFamily.INFO, "id"),
+
+  /**
+   * When the application was created.
+   */
+  CREATED_TIME(ApplicationColumnFamily.INFO, "created_time",
+      new LongConverter()),
+
+  /**
+   * The version of the flow that this app belongs to.
+   */
+  FLOW_VERSION(ApplicationColumnFamily.INFO, "flow_version");
+
+  private final ColumnHelper<ApplicationTable> column;
+  private final ColumnFamily<ApplicationTable> columnFamily;
+  private final String columnQualifier;
+  private final byte[] columnQualifierBytes;
+
+  private ApplicationColumn(ColumnFamily<ApplicationTable> columnFamily,
+      String columnQualifier) {
+    this(columnFamily, columnQualifier, GenericConverter.getInstance());
+  }
+
+  private ApplicationColumn(ColumnFamily<ApplicationTable> columnFamily,
+      String columnQualifier, ValueConverter converter) {
+    this.columnFamily = columnFamily;
+    this.columnQualifier = columnQualifier;
+    // Future-proof by ensuring the right column prefix hygiene.
+    this.columnQualifierBytes =
+        Bytes.toBytes(Separator.SPACE.encode(columnQualifier));
+    this.column = new ColumnHelper<ApplicationTable>(columnFamily, converter);
+  }
+
+  /**
+   * @return the column name value
+   */
+  private String getColumnQualifier() {
+    return columnQualifier;
+  }
+
+  public void store(byte[] rowKey,
+      TypedBufferedMutator<ApplicationTable> tableMutator, Long timestamp,
+      Object inputValue, Attribute... attributes) throws IOException {
+    column.store(rowKey, tableMutator, columnQualifierBytes, timestamp,
+        inputValue, attributes);
+  }
+
+  public Object readResult(Result result) throws IOException {
+    return column.readResult(result, columnQualifierBytes);
+  }
+
+  @Override
+  public byte[] getColumnQualifierBytes() {
+    return columnQualifierBytes.clone();
+  }
+
+  @Override
+  public byte[] getColumnFamilyBytes() {
+    return columnFamily.getBytes();
+  }
+
+  @Override
+  public ValueConverter getValueConverter() {
+    return column.getValueConverter();
+  }
+
+  /**
+   * Retrieve an {@link ApplicationColumn} given a name, or null if there is no
+   * match. The following holds true: {@code columnFor(x) == columnFor(y)} if
+   * and only if {@code x.equals(y)} or {@code (x == y == null)}.
+   *
+   * @param columnQualifier Name of the column to retrieve
+   * @return the corresponding {@link ApplicationColumn} or null
+   */
+  public static final ApplicationColumn columnFor(String columnQualifier) {
+
+    // Match column based on value, assume column family matches.
+    for (ApplicationColumn ac : ApplicationColumn.values()) {
+      // Find a match based only on name.
+      if (ac.getColumnQualifier().equals(columnQualifier)) {
+        return ac;
+      }
+    }
+
+    // Default to null
+    return null;
+  }
+
+  /**
+   * Retrieve an {@link ApplicationColumn} given a name, or null if there is no
+   * match. The following holds true: {@code columnFor(a,x) == columnFor(b,y)}
+   * if and only if {@code a.equals(b) & x.equals(y)} or
+   * {@code (x == y == null)}
+   *
+   * @param columnFamily The columnFamily for which to retrieve the column.
+   * @param name Name of the column to retrieve
+   * @return the corresponding {@link ApplicationColumn} or null if both
+   *         arguments don't match.
+   */
+  public static final ApplicationColumn columnFor(
+      ApplicationColumnFamily columnFamily, String name) {
+
+    for (ApplicationColumn ac : ApplicationColumn.values()) {
+      // Find a match based column family and on name.
+      if (ac.columnFamily.equals(columnFamily)
+          && ac.getColumnQualifier().equals(name)) {
+        return ac;
+      }
+    }
+
+    // Default to null
+    return null;
+  }
+
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscr...@hadoop.apache.org
For additional commands, e-mail: common-commits-h...@hadoop.apache.org

Reply via email to