http://git-wip-us.apache.org/repos/asf/hadoop/blob/b01514f6/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowRunCoprocessor.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowRunCoprocessor.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowRunCoprocessor.java
deleted file mode 100644
index 2be6ef8..0000000
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowRunCoprocessor.java
+++ /dev/null
@@ -1,304 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.yarn.server.timelineservice.storage.flow;
-
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.List;
-import java.util.Map;
-import java.util.NavigableMap;
-import java.util.TreeMap;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.hbase.Cell;
-import org.apache.hadoop.hbase.CellUtil;
-import org.apache.hadoop.hbase.CoprocessorEnvironment;
-import org.apache.hadoop.hbase.HConstants;
-import org.apache.hadoop.hbase.KeyValue;
-import org.apache.hadoop.hbase.Tag;
-import org.apache.hadoop.hbase.client.Durability;
-import org.apache.hadoop.hbase.client.Get;
-import org.apache.hadoop.hbase.client.Put;
-import org.apache.hadoop.hbase.client.Scan;
-import org.apache.hadoop.hbase.coprocessor.BaseRegionObserver;
-import org.apache.hadoop.hbase.coprocessor.ObserverContext;
-import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
-import org.apache.hadoop.hbase.regionserver.Region;
-import org.apache.hadoop.hbase.regionserver.InternalScanner;
-import org.apache.hadoop.hbase.regionserver.RegionScanner;
-import org.apache.hadoop.hbase.regionserver.ScanType;
-import org.apache.hadoop.hbase.regionserver.Store;
-import org.apache.hadoop.hbase.regionserver.StoreFile;
-import org.apache.hadoop.hbase.regionserver.compactions.CompactionRequest;
-import org.apache.hadoop.hbase.regionserver.wal.WALEdit;
-import org.apache.hadoop.hbase.util.Bytes;
-import 
org.apache.hadoop.yarn.server.timelineservice.storage.common.HBaseTimelineStorageUtils;
-import 
org.apache.hadoop.yarn.server.timelineservice.storage.common.TimestampGenerator;
-
-/**
- * Coprocessor for flow run table.
- */
-public class FlowRunCoprocessor extends BaseRegionObserver {
-
-  private static final Log LOG = LogFactory.getLog(FlowRunCoprocessor.class);
-  private boolean isFlowRunRegion = false;
-
-  private Region region;
-  /**
-   * generate a timestamp that is unique per row in a region this is per 
region.
-   */
-  private final TimestampGenerator timestampGenerator =
-      new TimestampGenerator();
-
-  @Override
-  public void start(CoprocessorEnvironment e) throws IOException {
-    if (e instanceof RegionCoprocessorEnvironment) {
-      RegionCoprocessorEnvironment env = (RegionCoprocessorEnvironment) e;
-      this.region = env.getRegion();
-      isFlowRunRegion = HBaseTimelineStorageUtils.isFlowRunTable(
-          region.getRegionInfo(), env.getConfiguration());
-    }
-  }
-
-  public boolean isFlowRunRegion() {
-    return isFlowRunRegion;
-  }
-
-  /*
-   * (non-Javadoc)
-   *
-   * This method adds the tags onto the cells in the Put. It is presumed that
-   * all the cells in one Put have the same set of Tags. The existing cell
-   * timestamp is overwritten for non-metric cells and each such cell gets a 
new
-   * unique timestamp generated by {@link TimestampGenerator}
-   *
-   * @see
-   * org.apache.hadoop.hbase.coprocessor.BaseRegionObserver#prePut(org.apache
-   * .hadoop.hbase.coprocessor.ObserverContext,
-   * org.apache.hadoop.hbase.client.Put,
-   * org.apache.hadoop.hbase.regionserver.wal.WALEdit,
-   * org.apache.hadoop.hbase.client.Durability)
-   */
-  @Override
-  public void prePut(ObserverContext<RegionCoprocessorEnvironment> e, Put put,
-      WALEdit edit, Durability durability) throws IOException {
-    Map<String, byte[]> attributes = put.getAttributesMap();
-
-    if (!isFlowRunRegion) {
-      return;
-    }
-    // Assumption is that all the cells in a put are the same operation.
-    List<Tag> tags = new ArrayList<>();
-    if ((attributes != null) && (attributes.size() > 0)) {
-      for (Map.Entry<String, byte[]> attribute : attributes.entrySet()) {
-        Tag t = HBaseTimelineStorageUtils.getTagFromAttribute(attribute);
-        tags.add(t);
-      }
-      byte[] tagByteArray = Tag.fromList(tags);
-      NavigableMap<byte[], List<Cell>> newFamilyMap = new TreeMap<>(
-          Bytes.BYTES_COMPARATOR);
-      for (Map.Entry<byte[], List<Cell>> entry : put.getFamilyCellMap()
-          .entrySet()) {
-        List<Cell> newCells = new ArrayList<>(entry.getValue().size());
-        for (Cell cell : entry.getValue()) {
-          // for each cell in the put add the tags
-          // Assumption is that all the cells in
-          // one put are the same operation
-          // also, get a unique cell timestamp for non-metric cells
-          // this way we don't inadvertently overwrite cell versions
-          long cellTimestamp = getCellTimestamp(cell.getTimestamp(), tags);
-          newCells.add(CellUtil.createCell(CellUtil.cloneRow(cell),
-              CellUtil.cloneFamily(cell), CellUtil.cloneQualifier(cell),
-              cellTimestamp, KeyValue.Type.Put, CellUtil.cloneValue(cell),
-              tagByteArray));
-        }
-        newFamilyMap.put(entry.getKey(), newCells);
-      } // for each entry
-      // Update the family map for the Put
-      put.setFamilyCellMap(newFamilyMap);
-    }
-  }
-
-  /**
-   * Determines if the current cell's timestamp is to be used or a new unique
-   * cell timestamp is to be used. The reason this is done is to inadvertently
-   * overwrite cells when writes come in very fast. But for metric cells, the
-   * cell timestamp signifies the metric timestamp. Hence we don't want to
-   * overwrite it.
-   *
-   * @param timestamp
-   * @param tags
-   * @return cell timestamp
-   */
-  private long getCellTimestamp(long timestamp, List<Tag> tags) {
-    // if ts not set (hbase sets to HConstants.LATEST_TIMESTAMP by default)
-    // then use the generator
-    if (timestamp == HConstants.LATEST_TIMESTAMP) {
-      return timestampGenerator.getUniqueTimestamp();
-    } else {
-      return timestamp;
-    }
-  }
-
-  /*
-   * (non-Javadoc)
-   *
-   * Creates a {@link FlowScanner} Scan so that it can correctly process the
-   * contents of {@link FlowRunTable}.
-   *
-   * @see
-   * org.apache.hadoop.hbase.coprocessor.BaseRegionObserver#preGetOp(org.apache
-   * .hadoop.hbase.coprocessor.ObserverContext,
-   * org.apache.hadoop.hbase.client.Get, java.util.List)
-   */
-  @Override
-  public void preGetOp(ObserverContext<RegionCoprocessorEnvironment> e,
-      Get get, List<Cell> results) throws IOException {
-    if (!isFlowRunRegion) {
-      return;
-    }
-
-    Scan scan = new Scan(get);
-    scan.setMaxVersions();
-    RegionScanner scanner = null;
-    try {
-      scanner = new FlowScanner(e.getEnvironment(), scan,
-          region.getScanner(scan), FlowScannerOperation.READ);
-      scanner.next(results);
-      e.bypass();
-    } finally {
-      if (scanner != null) {
-        scanner.close();
-      }
-    }
-  }
-
-  /*
-   * (non-Javadoc)
-   *
-   * Ensures that max versions are set for the Scan so that metrics can be
-   * correctly aggregated and min/max can be correctly determined.
-   *
-   * @see
-   * org.apache.hadoop.hbase.coprocessor.BaseRegionObserver#preScannerOpen(org
-   * .apache.hadoop.hbase.coprocessor.ObserverContext,
-   * org.apache.hadoop.hbase.client.Scan,
-   * org.apache.hadoop.hbase.regionserver.RegionScanner)
-   */
-  @Override
-  public RegionScanner preScannerOpen(
-      ObserverContext<RegionCoprocessorEnvironment> e, Scan scan,
-      RegionScanner scanner) throws IOException {
-
-    if (isFlowRunRegion) {
-      // set max versions for scan to see all
-      // versions to aggregate for metrics
-      scan.setMaxVersions();
-    }
-    return scanner;
-  }
-
-  /*
-   * (non-Javadoc)
-   *
-   * Creates a {@link FlowScanner} Scan so that it can correctly process the
-   * contents of {@link FlowRunTable}.
-   *
-   * @see
-   * org.apache.hadoop.hbase.coprocessor.BaseRegionObserver#postScannerOpen(
-   * org.apache.hadoop.hbase.coprocessor.ObserverContext,
-   * org.apache.hadoop.hbase.client.Scan,
-   * org.apache.hadoop.hbase.regionserver.RegionScanner)
-   */
-  @Override
-  public RegionScanner postScannerOpen(
-      ObserverContext<RegionCoprocessorEnvironment> e, Scan scan,
-      RegionScanner scanner) throws IOException {
-    if (!isFlowRunRegion) {
-      return scanner;
-    }
-    return new FlowScanner(e.getEnvironment(), scan,
-        scanner, FlowScannerOperation.READ);
-  }
-
-  @Override
-  public InternalScanner preFlush(
-      ObserverContext<RegionCoprocessorEnvironment> c, Store store,
-      InternalScanner scanner) throws IOException {
-    if (!isFlowRunRegion) {
-      return scanner;
-    }
-    if (LOG.isDebugEnabled()) {
-      if (store != null) {
-        LOG.debug("preFlush store = " + store.getColumnFamilyName()
-            + " flushableSize=" + store.getFlushableSize()
-            + " flushedCellsCount=" + store.getFlushedCellsCount()
-            + " compactedCellsCount=" + store.getCompactedCellsCount()
-            + " majorCompactedCellsCount="
-            + store.getMajorCompactedCellsCount() + " memstoreFlushSize="
-            + store.getMemstoreFlushSize() + " memstoreSize="
-            + store.getMemStoreSize() + " size=" + store.getSize()
-            + " storeFilesCount=" + store.getStorefilesCount());
-      }
-    }
-    return new FlowScanner(c.getEnvironment(), scanner,
-        FlowScannerOperation.FLUSH);
-  }
-
-  @Override
-  public void postFlush(ObserverContext<RegionCoprocessorEnvironment> c,
-      Store store, StoreFile resultFile) {
-    if (!isFlowRunRegion) {
-      return;
-    }
-    if (LOG.isDebugEnabled()) {
-      if (store != null) {
-        LOG.debug("postFlush store = " + store.getColumnFamilyName()
-            + " flushableSize=" + store.getFlushableSize()
-            + " flushedCellsCount=" + store.getFlushedCellsCount()
-            + " compactedCellsCount=" + store.getCompactedCellsCount()
-            + " majorCompactedCellsCount="
-            + store.getMajorCompactedCellsCount() + " memstoreFlushSize="
-            + store.getMemstoreFlushSize() + " memstoreSize="
-            + store.getMemStoreSize() + " size=" + store.getSize()
-            + " storeFilesCount=" + store.getStorefilesCount());
-      }
-    }
-  }
-
-  @Override
-  public InternalScanner preCompact(
-      ObserverContext<RegionCoprocessorEnvironment> e, Store store,
-      InternalScanner scanner, ScanType scanType, CompactionRequest request)
-      throws IOException {
-
-    if (!isFlowRunRegion) {
-      return scanner;
-    }
-    FlowScannerOperation requestOp = FlowScannerOperation.MINOR_COMPACTION;
-    if (request != null) {
-      requestOp = (request.isMajor() ? FlowScannerOperation.MAJOR_COMPACTION
-          : FlowScannerOperation.MINOR_COMPACTION);
-      LOG.info("Compactionrequest= " + request.toString() + " "
-          + requestOp.toString() + " RegionName=" + e.getEnvironment()
-              .getRegion().getRegionInfo().getRegionNameAsString());
-    }
-    return new FlowScanner(e.getEnvironment(), scanner, requestOp);
-  }
-}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/b01514f6/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowRunRowKey.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowRunRowKey.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowRunRowKey.java
deleted file mode 100644
index 8fda9a8..0000000
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowRunRowKey.java
+++ /dev/null
@@ -1,190 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.yarn.server.timelineservice.storage.flow;
-
-import org.apache.hadoop.hbase.util.Bytes;
-import 
org.apache.hadoop.yarn.server.timelineservice.storage.common.KeyConverter;
-import 
org.apache.hadoop.yarn.server.timelineservice.storage.common.LongConverter;
-import org.apache.hadoop.yarn.server.timelineservice.storage.common.Separator;
-
-/**
- * Represents a rowkey for the flow run table.
- */
-public class FlowRunRowKey {
-  private final String clusterId;
-  private final String userId;
-  private final String flowName;
-  private final Long flowRunId;
-  private final FlowRunRowKeyConverter flowRunRowKeyConverter =
-      new FlowRunRowKeyConverter();
-
-  public FlowRunRowKey(String clusterId, String userId, String flowName,
-      Long flowRunId) {
-    this.clusterId = clusterId;
-    this.userId = userId;
-    this.flowName = flowName;
-    this.flowRunId = flowRunId;
-  }
-
-  public String getClusterId() {
-    return clusterId;
-  }
-
-  public String getUserId() {
-    return userId;
-  }
-
-  public String getFlowName() {
-    return flowName;
-  }
-
-  public Long getFlowRunId() {
-    return flowRunId;
-  }
-
-  /**
-   * Constructs a row key for the entity table as follows: {
-   * clusterId!userId!flowName!Inverted Flow Run Id}.
-   *
-   * @return byte array with the row key
-   */
-  public byte[] getRowKey() {
-    return flowRunRowKeyConverter.encode(this);
-  }
-
-
-  /**
-   * Given the raw row key as bytes, returns the row key as an object.
-   *
-   * @param rowKey Byte representation of row key.
-   * @return A <cite>FlowRunRowKey</cite> object.
-   */
-  public static FlowRunRowKey parseRowKey(byte[] rowKey) {
-    return new FlowRunRowKeyConverter().decode(rowKey);
-  }
-
-  /**
-   * returns the Flow Key as a verbose String output.
-   * @return String
-   */
-  @Override
-  public String toString() {
-    StringBuilder flowKeyStr = new StringBuilder();
-    flowKeyStr.append("{clusterId=" + clusterId);
-    flowKeyStr.append(" userId=" + userId);
-    flowKeyStr.append(" flowName=" + flowName);
-    flowKeyStr.append(" flowRunId=");
-    flowKeyStr.append(flowRunId);
-    flowKeyStr.append("}");
-    return flowKeyStr.toString();
-  }
-
-  /**
-   * Encodes and decodes row key for flow run table.
-   * The row key is of the form : clusterId!userId!flowName!flowrunId.
-   * flowrunId is a long and rest are strings.
-   * <p>
-   */
-  final private static class FlowRunRowKeyConverter implements
-      KeyConverter<FlowRunRowKey> {
-
-    private FlowRunRowKeyConverter() {
-    }
-
-    /**
-     * The flow run row key is of the form clusterId!userId!flowName!flowrunId
-     * with each segment separated by !. The sizes below indicate sizes of each
-     * one of these segments in sequence. clusterId, userId and flowName are
-     * strings. flowrunId is a long hence 8 bytes in size. Strings are variable
-     * in size (i.e. end whenever separator is encountered). This is used while
-     * decoding and helps in determining where to split.
-     */
-    private static final int[] SEGMENT_SIZES = {Separator.VARIABLE_SIZE,
-        Separator.VARIABLE_SIZE, Separator.VARIABLE_SIZE, Bytes.SIZEOF_LONG };
-
-    /*
-     * (non-Javadoc)
-     *
-     * Encodes FlowRunRowKey object into a byte array with each component/field
-     * in FlowRunRowKey separated by Separator#QUALIFIERS. This leads to an 
flow
-     * run row key of the form clusterId!userId!flowName!flowrunId If flowRunId
-     * in passed FlowRunRowKey object is null (and the fields preceding it i.e.
-     * clusterId, userId and flowName are not null), this returns a row key
-     * prefix of the form clusterId!userName!flowName! flowRunId is inverted
-     * while encoding as it helps maintain a descending order for flow keys in
-     * flow run table.
-     *
-     * @see
-     * org.apache.hadoop.yarn.server.timelineservice.storage.common
-     * .KeyConverter#encode(java.lang.Object)
-     */
-    @Override
-    public byte[] encode(FlowRunRowKey rowKey) {
-      byte[] first =
-          Separator.QUALIFIERS.join(Separator.encode(rowKey.getClusterId(),
-              Separator.SPACE, Separator.TAB, Separator.QUALIFIERS), Separator
-              .encode(rowKey.getUserId(), Separator.SPACE, Separator.TAB,
-                  Separator.QUALIFIERS), Separator.encode(rowKey.getFlowName(),
-              Separator.SPACE, Separator.TAB, Separator.QUALIFIERS));
-      if (rowKey.getFlowRunId() == null) {
-        return Separator.QUALIFIERS.join(first, Separator.EMPTY_BYTES);
-      } else {
-        // Note that flowRunId is a long, so we can't encode them all at the
-        // same
-        // time.
-        byte[] second =
-            Bytes.toBytes(LongConverter.invertLong(rowKey.getFlowRunId()));
-        return Separator.QUALIFIERS.join(first, second);
-      }
-    }
-
-    /*
-     * (non-Javadoc)
-     *
-     * Decodes an flow run row key of the form
-     * clusterId!userId!flowName!flowrunId represented in byte format and
-     * converts it into an FlowRunRowKey object. flowRunId is inverted while
-     * decoding as it was inverted while encoding.
-     *
-     * @see
-     * org.apache.hadoop.yarn.server.timelineservice.storage.common
-     * .KeyConverter#decode(byte[])
-     */
-    @Override
-    public FlowRunRowKey decode(byte[] rowKey) {
-      byte[][] rowKeyComponents =
-          Separator.QUALIFIERS.split(rowKey, SEGMENT_SIZES);
-      if (rowKeyComponents.length != 4) {
-        throw new IllegalArgumentException("the row key is not valid for "
-            + "a flow run");
-      }
-      String clusterId =
-          Separator.decode(Bytes.toString(rowKeyComponents[0]),
-              Separator.QUALIFIERS, Separator.TAB, Separator.SPACE);
-      String userId =
-          Separator.decode(Bytes.toString(rowKeyComponents[1]),
-              Separator.QUALIFIERS, Separator.TAB, Separator.SPACE);
-      String flowName =
-          Separator.decode(Bytes.toString(rowKeyComponents[2]),
-              Separator.QUALIFIERS, Separator.TAB, Separator.SPACE);
-      Long flowRunId =
-          LongConverter.invertLong(Bytes.toLong(rowKeyComponents[3]));
-      return new FlowRunRowKey(clusterId, userId, flowName, flowRunId);
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/b01514f6/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowRunRowKeyPrefix.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowRunRowKeyPrefix.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowRunRowKeyPrefix.java
deleted file mode 100644
index 23ebc66..0000000
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowRunRowKeyPrefix.java
+++ /dev/null
@@ -1,54 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.yarn.server.timelineservice.storage.flow;
-
-import 
org.apache.hadoop.yarn.server.timelineservice.storage.common.RowKeyPrefix;
-
-/**
- * Represents a partial rowkey (without the flowRunId) for the flow run table.
- */
-public class FlowRunRowKeyPrefix extends FlowRunRowKey implements
-    RowKeyPrefix<FlowRunRowKey> {
-
-  /**
-   * Constructs a row key prefix for the flow run table as follows:
-   * {@code clusterId!userI!flowName!}.
-   *
-   * @param clusterId identifying the cluster
-   * @param userId identifying the user
-   * @param flowName identifying the flow
-   */
-  public FlowRunRowKeyPrefix(String clusterId, String userId,
-      String flowName) {
-    super(clusterId, userId, flowName, null);
-  }
-
-  /*
-   * (non-Javadoc)
-   *
-   * @see
-   * org.apache.hadoop.yarn.server.timelineservice.storage.application.
-   * RowKeyPrefix#getRowKeyPrefix()
-   */
-  public byte[] getRowKeyPrefix() {
-    // We know we're a FlowRunRowKey with null florRunId, so we can simply
-    // delegate
-    return super.getRowKey();
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/b01514f6/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowRunTable.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowRunTable.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowRunTable.java
deleted file mode 100644
index 547bef0..0000000
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowRunTable.java
+++ /dev/null
@@ -1,141 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.yarn.server.timelineservice.storage.flow;
-
-import java.io.IOException;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.hbase.HColumnDescriptor;
-import org.apache.hadoop.hbase.HTableDescriptor;
-import org.apache.hadoop.hbase.TableName;
-import org.apache.hadoop.hbase.client.Admin;
-import org.apache.hadoop.hbase.regionserver.BloomType;
-import org.apache.hadoop.yarn.conf.YarnConfiguration;
-import org.apache.hadoop.yarn.server.timelineservice.storage.common.BaseTable;
-
-/**
- * The flow run table has column family info
- * Stores per flow run information
- * aggregated across applications.
- *
- * Metrics are also stored in the info column family.
- *
- * Example flow run table record:
- *
- * <pre>
- * flow_run table
- * |-------------------------------------------|
- * |  Row key   | Column Family                |
- * |            | info                         |
- * |-------------------------------------------|
- * | clusterId! | flow_version:version7        |
- * | userName!  |                              |
- * | flowName!  | running_apps:1               |
- * | flowRunId  |                              |
- * |            | min_start_time:1392995080000 |
- * |            | #0:""                        |
- * |            |                              |
- * |            | min_start_time:1392995081012 |
- * |            | #0:appId2                    |
- * |            |                              |
- * |            | min_start_time:1392993083210 |
- * |            | #0:appId3                    |
- * |            |                              |
- * |            |                              |
- * |            | max_end_time:1392993084018   |
- * |            | #0:""                        |
- * |            |                              |
- * |            |                              |
- * |            | m!mapInputRecords:127        |
- * |            | #0:""                        |
- * |            |                              |
- * |            | m!mapInputRecords:31         |
- * |            | #2:appId2                    |
- * |            |                              |
- * |            | m!mapInputRecords:37         |
- * |            | #1:appId3                    |
- * |            |                              |
- * |            |                              |
- * |            | m!mapOutputRecords:181       |
- * |            | #0:""                        |
- * |            |                              |
- * |            | m!mapOutputRecords:37        |
- * |            | #1:appId3                    |
- * |            |                              |
- * |            |                              |
- * |-------------------------------------------|
- * </pre>
- */
-public class FlowRunTable extends BaseTable<FlowRunTable> {
-  /** entity prefix. */
-  private static final String PREFIX =
-      YarnConfiguration.TIMELINE_SERVICE_PREFIX + ".flowrun";
-
-  /** config param name that specifies the flowrun table name. */
-  public static final String TABLE_NAME_CONF_NAME = PREFIX + ".table.name";
-
-  /** default value for flowrun table name. */
-  public static final String DEFAULT_TABLE_NAME = "timelineservice.flowrun";
-
-  private static final Log LOG = LogFactory.getLog(FlowRunTable.class);
-
-  /** default max number of versions. */
-  public static final int DEFAULT_METRICS_MAX_VERSIONS = Integer.MAX_VALUE;
-
-  public FlowRunTable() {
-    super(TABLE_NAME_CONF_NAME, DEFAULT_TABLE_NAME);
-  }
-
-  /*
-   * (non-Javadoc)
-   *
-   * @see
-   * 
org.apache.hadoop.yarn.server.timelineservice.storage.BaseTable#createTable
-   * (org.apache.hadoop.hbase.client.Admin,
-   * org.apache.hadoop.conf.Configuration)
-   */
-  public void createTable(Admin admin, Configuration hbaseConf)
-      throws IOException {
-
-    TableName table = getTableName(hbaseConf);
-    if (admin.tableExists(table)) {
-      // do not disable / delete existing table
-      // similar to the approach taken by map-reduce jobs when
-      // output directory exists
-      throw new IOException("Table " + table.getNameAsString()
-          + " already exists.");
-    }
-
-    HTableDescriptor flowRunTableDescp = new HTableDescriptor(table);
-    HColumnDescriptor infoCF =
-        new HColumnDescriptor(FlowRunColumnFamily.INFO.getBytes());
-    infoCF.setBloomFilterType(BloomType.ROWCOL);
-    flowRunTableDescp.addFamily(infoCF);
-    infoCF.setMinVersions(1);
-    infoCF.setMaxVersions(DEFAULT_METRICS_MAX_VERSIONS);
-
-    // TODO: figure the split policy
-    flowRunTableDescp.addCoprocessor(FlowRunCoprocessor.class
-        .getCanonicalName());
-    admin.createTable(flowRunTableDescp);
-    LOG.info("Status of table creation for " + table.getNameAsString() + "="
-        + admin.tableExists(table));
-  }
-}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/b01514f6/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowScanner.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowScanner.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowScanner.java
deleted file mode 100644
index 0e3c8ee..0000000
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowScanner.java
+++ /dev/null
@@ -1,728 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hadoop.yarn.server.timelineservice.storage.flow;
-
-import java.io.Closeable;
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Set;
-import java.util.SortedSet;
-import java.util.TreeSet;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.hbase.Cell;
-import org.apache.hadoop.hbase.CellUtil;
-import org.apache.hadoop.hbase.HRegionInfo;
-import org.apache.hadoop.hbase.KeyValue;
-import org.apache.hadoop.hbase.Tag;
-import org.apache.hadoop.hbase.client.Scan;
-import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
-import org.apache.hadoop.hbase.regionserver.InternalScanner;
-import org.apache.hadoop.hbase.regionserver.Region;
-import org.apache.hadoop.hbase.regionserver.RegionScanner;
-import org.apache.hadoop.hbase.regionserver.ScannerContext;
-import org.apache.hadoop.hbase.util.Bytes;
-import org.apache.hadoop.hbase.util.Bytes.ByteArrayComparator;
-import org.apache.hadoop.yarn.conf.YarnConfiguration;
-import 
org.apache.hadoop.yarn.server.timelineservice.storage.common.GenericConverter;
-import 
org.apache.hadoop.yarn.server.timelineservice.storage.common.HBaseTimelineStorageUtils;
-import 
org.apache.hadoop.yarn.server.timelineservice.storage.common.NumericValueConverter;
-import org.apache.hadoop.yarn.server.timelineservice.storage.common.Separator;
-import 
org.apache.hadoop.yarn.server.timelineservice.storage.common.TimestampGenerator;
-import 
org.apache.hadoop.yarn.server.timelineservice.storage.common.ValueConverter;
-
-import com.google.common.annotations.VisibleForTesting;
-
-/**
- * Invoked via the coprocessor when a Get or a Scan is issued for flow run
- * table. Looks through the list of cells per row, checks their tags and does
- * operation on those cells as per the cell tags. Transforms reads of the 
stored
- * metrics into calculated sums for each column Also, finds the min and max for
- * start and end times in a flow run.
- */
-class FlowScanner implements RegionScanner, Closeable {
-
-  private static final Log LOG = LogFactory.getLog(FlowScanner.class);
-
-  /**
-   * use a special application id to represent the flow id this is needed since
-   * TimestampGenerator parses the app id to generate a cell timestamp.
-   */
-  private static final String FLOW_APP_ID = "application_00000000000_0000";
-
-  private final Region region;
-  private final InternalScanner flowRunScanner;
-  private final int batchSize;
-  private final long appFinalValueRetentionThreshold;
-  private RegionScanner regionScanner;
-  private boolean hasMore;
-  private byte[] currentRow;
-  private List<Cell> availableCells = new ArrayList<>();
-  private int currentIndex;
-  private FlowScannerOperation action = FlowScannerOperation.READ;
-
-  FlowScanner(RegionCoprocessorEnvironment env, InternalScanner 
internalScanner,
-      FlowScannerOperation action) {
-    this(env, null, internalScanner, action);
-  }
-
-  FlowScanner(RegionCoprocessorEnvironment env, Scan incomingScan,
-      InternalScanner internalScanner, FlowScannerOperation action) {
-    this.batchSize = incomingScan == null ? -1 : incomingScan.getBatch();
-    // TODO initialize other scan attributes like Scan#maxResultSize
-    this.flowRunScanner = internalScanner;
-    if (internalScanner instanceof RegionScanner) {
-      this.regionScanner = (RegionScanner) internalScanner;
-    }
-    this.action = action;
-    if (env == null) {
-      this.appFinalValueRetentionThreshold =
-          YarnConfiguration.DEFAULT_APP_FINAL_VALUE_RETENTION_THRESHOLD;
-      this.region = null;
-    } else {
-      this.region = env.getRegion();
-      Configuration hbaseConf = env.getConfiguration();
-      this.appFinalValueRetentionThreshold = hbaseConf.getLong(
-          YarnConfiguration.APP_FINAL_VALUE_RETENTION_THRESHOLD,
-          YarnConfiguration.DEFAULT_APP_FINAL_VALUE_RETENTION_THRESHOLD);
-    }
-    if (LOG.isDebugEnabled()) {
-      LOG.debug(" batch size=" + batchSize);
-    }
-  }
-
-
-  /*
-   * (non-Javadoc)
-   *
-   * @see org.apache.hadoop.hbase.regionserver.RegionScanner#getRegionInfo()
-   */
-  @Override
-  public HRegionInfo getRegionInfo() {
-    return region.getRegionInfo();
-  }
-
-  @Override
-  public boolean nextRaw(List<Cell> cells) throws IOException {
-    return nextRaw(cells, ScannerContext.newBuilder().build());
-  }
-
-  @Override
-  public boolean nextRaw(List<Cell> cells, ScannerContext scannerContext)
-      throws IOException {
-    return nextInternal(cells, scannerContext);
-  }
-
-  @Override
-  public boolean next(List<Cell> cells) throws IOException {
-    return next(cells, ScannerContext.newBuilder().build());
-  }
-
-  @Override
-  public boolean next(List<Cell> cells, ScannerContext scannerContext)
-      throws IOException {
-    return nextInternal(cells, scannerContext);
-  }
-
-  /**
-   * Get value converter associated with a column or a column prefix. If 
nothing
-   * matches, generic converter is returned.
-   * @param colQualifierBytes
-   * @return value converter implementation.
-   */
-  private static ValueConverter getValueConverter(byte[] colQualifierBytes) {
-    // Iterate over all the column prefixes for flow run table and get the
-    // appropriate converter for the column qualifier passed if prefix matches.
-    for (FlowRunColumnPrefix colPrefix : FlowRunColumnPrefix.values()) {
-      byte[] colPrefixBytes = colPrefix.getColumnPrefixBytes("");
-      if (Bytes.compareTo(colPrefixBytes, 0, colPrefixBytes.length,
-          colQualifierBytes, 0, colPrefixBytes.length) == 0) {
-        return colPrefix.getValueConverter();
-      }
-    }
-    // Iterate over all the columns for flow run table and get the
-    // appropriate converter for the column qualifier passed if match occurs.
-    for (FlowRunColumn column : FlowRunColumn.values()) {
-      if (Bytes.compareTo(
-          column.getColumnQualifierBytes(), colQualifierBytes) == 0) {
-        return column.getValueConverter();
-      }
-    }
-    // Return generic converter if nothing matches.
-    return GenericConverter.getInstance();
-  }
-
-  /**
-   * This method loops through the cells in a given row of the
-   * {@link FlowRunTable}. It looks at the tags of each cell to figure out how
-   * to process the contents. It then calculates the sum or min or max for each
-   * column or returns the cell as is.
-   *
-   * @param cells
-   * @param scannerContext
-   * @return true if next row is available for the scanner, false otherwise
-   * @throws IOException
-   */
-  private boolean nextInternal(List<Cell> cells, ScannerContext scannerContext)
-      throws IOException {
-    Cell cell = null;
-    startNext();
-    // Loop through all the cells in this row
-    // For min/max/metrics we do need to scan the entire set of cells to get 
the
-    // right one
-    // But with flush/compaction, the number of cells being scanned will go 
down
-    // cells are grouped per column qualifier then sorted by cell timestamp
-    // (latest to oldest) per column qualifier
-    // So all cells in one qualifier come one after the other before we see the
-    // next column qualifier
-    ByteArrayComparator comp = new ByteArrayComparator();
-    byte[] previousColumnQualifier = Separator.EMPTY_BYTES;
-    AggregationOperation currentAggOp = null;
-    SortedSet<Cell> currentColumnCells = new TreeSet<>(KeyValue.COMPARATOR);
-    Set<String> alreadySeenAggDim = new HashSet<>();
-    int addedCnt = 0;
-    long currentTimestamp = System.currentTimeMillis();
-    ValueConverter converter = null;
-    int limit = batchSize;
-
-    while (limit <= 0 || addedCnt < limit) {
-      cell = peekAtNextCell(scannerContext);
-      if (cell == null) {
-        break;
-      }
-      byte[] currentColumnQualifier = CellUtil.cloneQualifier(cell);
-      if (previousColumnQualifier == null) {
-        // first time in loop
-        previousColumnQualifier = currentColumnQualifier;
-      }
-
-      converter = getValueConverter(currentColumnQualifier);
-      if (comp.compare(previousColumnQualifier, currentColumnQualifier) != 0) {
-        addedCnt += emitCells(cells, currentColumnCells, currentAggOp,
-            converter, currentTimestamp);
-        resetState(currentColumnCells, alreadySeenAggDim);
-        previousColumnQualifier = currentColumnQualifier;
-        currentAggOp = getCurrentAggOp(cell);
-        converter = getValueConverter(currentColumnQualifier);
-      }
-      collectCells(currentColumnCells, currentAggOp, cell, alreadySeenAggDim,
-          converter, scannerContext);
-      nextCell(scannerContext);
-    }
-    if ((!currentColumnCells.isEmpty()) && ((limit <= 0 || addedCnt < limit))) 
{
-      addedCnt += emitCells(cells, currentColumnCells, currentAggOp, converter,
-          currentTimestamp);
-      if (LOG.isDebugEnabled()) {
-        if (addedCnt > 0) {
-          LOG.debug("emitted cells. " + addedCnt + " for " + this.action
-              + " rowKey="
-              + FlowRunRowKey.parseRowKey(CellUtil.cloneRow(cells.get(0))));
-        } else {
-          LOG.debug("emitted no cells for " + this.action);
-        }
-      }
-    }
-    return hasMore();
-  }
-
-  private AggregationOperation getCurrentAggOp(Cell cell) {
-    List<Tag> tags = Tag.asList(cell.getTagsArray(), cell.getTagsOffset(),
-        cell.getTagsLength());
-    // We assume that all the operations for a particular column are the same
-    return HBaseTimelineStorageUtils.getAggregationOperationFromTagsList(tags);
-  }
-
-  /**
-   * resets the parameters to an initialized state for next loop iteration.
-   *
-   * @param cell
-   * @param currentAggOp
-   * @param currentColumnCells
-   * @param alreadySeenAggDim
-   * @param collectedButNotEmitted
-   */
-  private void resetState(SortedSet<Cell> currentColumnCells,
-      Set<String> alreadySeenAggDim) {
-    currentColumnCells.clear();
-    alreadySeenAggDim.clear();
-  }
-
-  private void collectCells(SortedSet<Cell> currentColumnCells,
-      AggregationOperation currentAggOp, Cell cell,
-      Set<String> alreadySeenAggDim, ValueConverter converter,
-      ScannerContext scannerContext) throws IOException {
-
-    if (currentAggOp == null) {
-      // not a min/max/metric cell, so just return it as is
-      currentColumnCells.add(cell);
-      return;
-    }
-
-    switch (currentAggOp) {
-    case GLOBAL_MIN:
-      if (currentColumnCells.size() == 0) {
-        currentColumnCells.add(cell);
-      } else {
-        Cell currentMinCell = currentColumnCells.first();
-        Cell newMinCell = compareCellValues(currentMinCell, cell, currentAggOp,
-            (NumericValueConverter) converter);
-        if (!currentMinCell.equals(newMinCell)) {
-          currentColumnCells.remove(currentMinCell);
-          currentColumnCells.add(newMinCell);
-        }
-      }
-      break;
-    case GLOBAL_MAX:
-      if (currentColumnCells.size() == 0) {
-        currentColumnCells.add(cell);
-      } else {
-        Cell currentMaxCell = currentColumnCells.first();
-        Cell newMaxCell = compareCellValues(currentMaxCell, cell, currentAggOp,
-            (NumericValueConverter) converter);
-        if (!currentMaxCell.equals(newMaxCell)) {
-          currentColumnCells.remove(currentMaxCell);
-          currentColumnCells.add(newMaxCell);
-        }
-      }
-      break;
-    case SUM:
-    case SUM_FINAL:
-      if (LOG.isTraceEnabled()) {
-        LOG.trace("In collect cells "
-            + " FlowSannerOperation="
-            + this.action
-            + " currentAggOp="
-            + currentAggOp
-            + " cell qualifier="
-            + Bytes.toString(CellUtil.cloneQualifier(cell))
-            + " cell value= "
-            + converter.decodeValue(CellUtil.cloneValue(cell))
-            + " timestamp=" + cell.getTimestamp());
-      }
-
-      // only if this app has not been seen yet, add to current column cells
-      List<Tag> tags = Tag.asList(cell.getTagsArray(), cell.getTagsOffset(),
-          cell.getTagsLength());
-      String aggDim = HBaseTimelineStorageUtils
-          .getAggregationCompactionDimension(tags);
-      if (!alreadySeenAggDim.contains(aggDim)) {
-        // if this agg dimension has already been seen,
-        // since they show up in sorted order
-        // we drop the rest which are older
-        // in other words, this cell is older than previously seen cells
-        // for that agg dim
-        // but when this agg dim is not seen,
-        // consider this cell in our working set
-        currentColumnCells.add(cell);
-        alreadySeenAggDim.add(aggDim);
-      }
-      break;
-    default:
-      break;
-    } // end of switch case
-  }
-
-  /*
-   * Processes the cells in input param currentColumnCells and populates
-   * List<Cell> cells as the output based on the input AggregationOperation
-   * parameter.
-   */
-  private int emitCells(List<Cell> cells, SortedSet<Cell> currentColumnCells,
-      AggregationOperation currentAggOp, ValueConverter converter,
-      long currentTimestamp) throws IOException {
-    if ((currentColumnCells == null) || (currentColumnCells.size() == 0)) {
-      return 0;
-    }
-    if (currentAggOp == null) {
-      cells.addAll(currentColumnCells);
-      return currentColumnCells.size();
-    }
-    if (LOG.isTraceEnabled()) {
-      LOG.trace("In emitCells " + this.action + " currentColumnCells size= "
-          + currentColumnCells.size() + " currentAggOp" + currentAggOp);
-    }
-
-    switch (currentAggOp) {
-    case GLOBAL_MIN:
-    case GLOBAL_MAX:
-      cells.addAll(currentColumnCells);
-      return currentColumnCells.size();
-    case SUM:
-    case SUM_FINAL:
-      switch (action) {
-      case FLUSH:
-      case MINOR_COMPACTION:
-        cells.addAll(currentColumnCells);
-        return currentColumnCells.size();
-      case READ:
-        Cell sumCell = processSummation(currentColumnCells,
-            (NumericValueConverter) converter);
-        cells.add(sumCell);
-        return 1;
-      case MAJOR_COMPACTION:
-        List<Cell> finalCells = processSummationMajorCompaction(
-            currentColumnCells, (NumericValueConverter) converter,
-            currentTimestamp);
-        cells.addAll(finalCells);
-        return finalCells.size();
-      default:
-        cells.addAll(currentColumnCells);
-        return currentColumnCells.size();
-      }
-    default:
-      cells.addAll(currentColumnCells);
-      return currentColumnCells.size();
-    }
-  }
-
-  /*
-   * Returns a cell whose value is the sum of all cell values in the input set.
-   * The new cell created has the timestamp of the most recent metric cell. The
-   * sum of a metric for a flow run is the summation at the point of the last
-   * metric update in that flow till that time.
-   */
-  private Cell processSummation(SortedSet<Cell> currentColumnCells,
-      NumericValueConverter converter) throws IOException {
-    Number sum = 0;
-    Number currentValue = 0;
-    long ts = 0L;
-    long mostCurrentTimestamp = 0L;
-    Cell mostRecentCell = null;
-    for (Cell cell : currentColumnCells) {
-      currentValue = (Number) converter.decodeValue(CellUtil.cloneValue(cell));
-      ts = cell.getTimestamp();
-      if (mostCurrentTimestamp < ts) {
-        mostCurrentTimestamp = ts;
-        mostRecentCell = cell;
-      }
-      sum = converter.add(sum, currentValue);
-    }
-    byte[] sumBytes = converter.encodeValue(sum);
-    Cell sumCell =
-        HBaseTimelineStorageUtils.createNewCell(mostRecentCell, sumBytes);
-    return sumCell;
-  }
-
-
-  /**
-   * Returns a list of cells that contains
-   *
-   * A) the latest cells for applications that haven't finished yet
-   * B) summation
-   * for the flow, based on applications that have completed and are older than
-   * a certain time
-   *
-   * The new cell created has the timestamp of the most recent metric cell. The
-   * sum of a metric for a flow run is the summation at the point of the last
-   * metric update in that flow till that time.
-   */
-  @VisibleForTesting
-  List<Cell> processSummationMajorCompaction(
-      SortedSet<Cell> currentColumnCells, NumericValueConverter converter,
-      long currentTimestamp)
-      throws IOException {
-    Number sum = 0;
-    Number currentValue = 0;
-    long ts = 0L;
-    boolean summationDone = false;
-    List<Cell> finalCells = new ArrayList<Cell>();
-    if (currentColumnCells == null) {
-      return finalCells;
-    }
-
-    if (LOG.isDebugEnabled()) {
-      LOG.debug("In processSummationMajorCompaction,"
-          + " will drop cells older than " + currentTimestamp
-          + " CurrentColumnCells size=" + currentColumnCells.size());
-    }
-
-    for (Cell cell : currentColumnCells) {
-      AggregationOperation cellAggOp = getCurrentAggOp(cell);
-      // if this is the existing flow sum cell
-      List<Tag> tags = Tag.asList(cell.getTagsArray(), cell.getTagsOffset(),
-          cell.getTagsLength());
-      String appId = HBaseTimelineStorageUtils
-          .getAggregationCompactionDimension(tags);
-      if (appId == FLOW_APP_ID) {
-        sum = converter.add(sum, currentValue);
-        summationDone = true;
-        if (LOG.isTraceEnabled()) {
-          LOG.trace("reading flow app id sum=" + sum);
-        }
-      } else {
-        currentValue = (Number) converter.decodeValue(CellUtil
-            .cloneValue(cell));
-        // read the timestamp truncated by the generator
-        ts =  TimestampGenerator.getTruncatedTimestamp(cell.getTimestamp());
-        if ((cellAggOp == AggregationOperation.SUM_FINAL)
-            && ((ts + this.appFinalValueRetentionThreshold)
-                < currentTimestamp)) {
-          sum = converter.add(sum, currentValue);
-          summationDone = true;
-          if (LOG.isTraceEnabled()) {
-            LOG.trace("MAJOR COMPACTION loop sum= " + sum
-                + " discarding now: " + " qualifier="
-                + Bytes.toString(CellUtil.cloneQualifier(cell)) + " value="
-                + converter.decodeValue(CellUtil.cloneValue(cell))
-                + " timestamp=" + cell.getTimestamp() + " " + this.action);
-          }
-        } else {
-          // not a final value but it's the latest cell for this app
-          // so include this cell in the list of cells to write back
-          finalCells.add(cell);
-        }
-      }
-    }
-    if (summationDone) {
-      Cell anyCell = currentColumnCells.first();
-      List<Tag> tags = new ArrayList<Tag>();
-      Tag t = new Tag(AggregationOperation.SUM_FINAL.getTagType(),
-          Bytes.toBytes(FLOW_APP_ID));
-      tags.add(t);
-      t = new Tag(AggregationCompactionDimension.APPLICATION_ID.getTagType(),
-          Bytes.toBytes(FLOW_APP_ID));
-      tags.add(t);
-      byte[] tagByteArray = Tag.fromList(tags);
-      Cell sumCell = HBaseTimelineStorageUtils.createNewCell(
-          CellUtil.cloneRow(anyCell),
-          CellUtil.cloneFamily(anyCell),
-          CellUtil.cloneQualifier(anyCell),
-          TimestampGenerator.getSupplementedTimestamp(
-              System.currentTimeMillis(), FLOW_APP_ID),
-              converter.encodeValue(sum), tagByteArray);
-      finalCells.add(sumCell);
-      if (LOG.isTraceEnabled()) {
-        LOG.trace("MAJOR COMPACTION final sum= " + sum + " for "
-            + Bytes.toString(CellUtil.cloneQualifier(sumCell))
-            + " " + this.action);
-      }
-      LOG.info("After major compaction for qualifier="
-          + Bytes.toString(CellUtil.cloneQualifier(sumCell))
-          + " with currentColumnCells.size="
-          + currentColumnCells.size()
-          + " returning finalCells.size=" + finalCells.size()
-          + " with sum=" + sum.longValue()
-          + " with cell timestamp " + sumCell.getTimestamp());
-    } else {
-      String qualifier = "";
-      LOG.info("After major compaction for qualifier=" + qualifier
-          + " with currentColumnCells.size="
-          + currentColumnCells.size()
-          + " returning finalCells.size=" + finalCells.size()
-          + " with zero sum="
-          + sum.longValue());
-    }
-    return finalCells;
-  }
-
-  /**
-   * Determines which cell is to be returned based on the values in each cell
-   * and the comparison operation MIN or MAX.
-   *
-   * @param previouslyChosenCell
-   * @param currentCell
-   * @param currentAggOp
-   * @return the cell which is the min (or max) cell
-   * @throws IOException
-   */
-  private Cell compareCellValues(Cell previouslyChosenCell, Cell currentCell,
-      AggregationOperation currentAggOp, NumericValueConverter converter)
-      throws IOException {
-    if (previouslyChosenCell == null) {
-      return currentCell;
-    }
-    try {
-      Number previouslyChosenCellValue = (Number)converter.decodeValue(
-          CellUtil.cloneValue(previouslyChosenCell));
-      Number currentCellValue = (Number) converter.decodeValue(CellUtil
-          .cloneValue(currentCell));
-      switch (currentAggOp) {
-      case GLOBAL_MIN:
-        if (converter.compare(
-            currentCellValue, previouslyChosenCellValue) < 0) {
-          // new value is minimum, hence return this cell
-          return currentCell;
-        } else {
-          // previously chosen value is miniumum, hence return previous min 
cell
-          return previouslyChosenCell;
-        }
-      case GLOBAL_MAX:
-        if (converter.compare(
-            currentCellValue, previouslyChosenCellValue) > 0) {
-          // new value is max, hence return this cell
-          return currentCell;
-        } else {
-          // previously chosen value is max, hence return previous max cell
-          return previouslyChosenCell;
-        }
-      default:
-        return currentCell;
-      }
-    } catch (IllegalArgumentException iae) {
-      LOG.error("caught iae during conversion to long ", iae);
-      return currentCell;
-    }
-  }
-
-  @Override
-  public void close() throws IOException {
-    if (flowRunScanner != null) {
-      flowRunScanner.close();
-    } else {
-      LOG.warn("scanner close called but scanner is null");
-    }
-  }
-
-  /**
-   * Called to signal the start of the next() call by the scanner.
-   */
-  public void startNext() {
-    currentRow = null;
-  }
-
-  /**
-   * Returns whether or not the underlying scanner has more rows.
-   */
-  public boolean hasMore() {
-    return currentIndex < availableCells.size() ? true : hasMore;
-  }
-
-  /**
-   * Returns the next available cell for the current row and advances the
-   * pointer to the next cell. This method can be called multiple times in a 
row
-   * to advance through all the available cells.
-   *
-   * @param scannerContext
-   *          context information for the batch of cells under consideration
-   * @return the next available cell or null if no more cells are available for
-   *         the current row
-   * @throws IOException
-   */
-  public Cell nextCell(ScannerContext scannerContext) throws IOException {
-    Cell cell = peekAtNextCell(scannerContext);
-    if (cell != null) {
-      currentIndex++;
-    }
-    return cell;
-  }
-
-  /**
-   * Returns the next available cell for the current row, without advancing the
-   * pointer. Calling this method multiple times in a row will continue to
-   * return the same cell.
-   *
-   * @param scannerContext
-   *          context information for the batch of cells under consideration
-   * @return the next available cell or null if no more cells are available for
-   *         the current row
-   * @throws IOException if any problem is encountered while grabbing the next
-   *     cell.
-   */
-  public Cell peekAtNextCell(ScannerContext scannerContext) throws IOException 
{
-    if (currentIndex >= availableCells.size()) {
-      // done with current batch
-      availableCells.clear();
-      currentIndex = 0;
-      hasMore = flowRunScanner.next(availableCells, scannerContext);
-    }
-    Cell cell = null;
-    if (currentIndex < availableCells.size()) {
-      cell = availableCells.get(currentIndex);
-      if (currentRow == null) {
-        currentRow = CellUtil.cloneRow(cell);
-      } else if (!CellUtil.matchingRow(cell, currentRow)) {
-        // moved on to the next row
-        // don't use the current cell
-        // also signal no more cells for this row
-        return null;
-      }
-    }
-    return cell;
-  }
-
-  /*
-   * (non-Javadoc)
-   *
-   * @see org.apache.hadoop.hbase.regionserver.RegionScanner#getMaxResultSize()
-   */
-  @Override
-  public long getMaxResultSize() {
-    if (regionScanner == null) {
-      throw new IllegalStateException(
-          "RegionScanner.isFilterDone() called when the flow "
-              + "scanner's scanner is not a RegionScanner");
-    }
-    return regionScanner.getMaxResultSize();
-  }
-
-  /*
-   * (non-Javadoc)
-   *
-   * @see org.apache.hadoop.hbase.regionserver.RegionScanner#getMvccReadPoint()
-   */
-  @Override
-  public long getMvccReadPoint() {
-    if (regionScanner == null) {
-      throw new IllegalStateException(
-          "RegionScanner.isFilterDone() called when the flow "
-              + "scanner's internal scanner is not a RegionScanner");
-    }
-    return regionScanner.getMvccReadPoint();
-  }
-
-  /*
-   * (non-Javadoc)
-   *
-   * @see org.apache.hadoop.hbase.regionserver.RegionScanner#isFilterDone()
-   */
-  @Override
-  public boolean isFilterDone() throws IOException {
-    if (regionScanner == null) {
-      throw new IllegalStateException(
-          "RegionScanner.isFilterDone() called when the flow "
-              + "scanner's internal scanner is not a RegionScanner");
-    }
-    return regionScanner.isFilterDone();
-
-  }
-
-  /*
-   * (non-Javadoc)
-   *
-   * @see org.apache.hadoop.hbase.regionserver.RegionScanner#reseek(byte[])
-   */
-  @Override
-  public boolean reseek(byte[] bytes) throws IOException {
-    if (regionScanner == null) {
-      throw new IllegalStateException(
-          "RegionScanner.reseek() called when the flow "
-              + "scanner's internal scanner is not a RegionScanner");
-    }
-    return regionScanner.reseek(bytes);
-  }
-
-  @Override
-  public int getBatch() {
-    return batchSize;
-  }
-}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/b01514f6/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowScannerOperation.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowScannerOperation.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowScannerOperation.java
deleted file mode 100644
index 73c666f..0000000
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowScannerOperation.java
+++ /dev/null
@@ -1,46 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.yarn.server.timelineservice.storage.flow;
-
-
-/**
- * Identifies the scanner operation on the {@link FlowRunTable}.
- */
-public enum FlowScannerOperation {
-
-  /**
-   * If the scanner is opened for reading
-   * during preGet or preScan.
-   */
-  READ,
-
-  /**
-   * If the scanner is opened during preFlush.
-   */
-  FLUSH,
-
-  /**
-   * If the scanner is opened during minor Compaction.
-   */
-  MINOR_COMPACTION,
-
-  /**
-   * If the scanner is opened during major Compaction.
-   */
-  MAJOR_COMPACTION
-}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/b01514f6/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/package-info.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/package-info.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/package-info.java
deleted file mode 100644
index 04963f3..0000000
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/package-info.java
+++ /dev/null
@@ -1,29 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-/**
- * Package org.apache.hadoop.yarn.server.timelineservice.storage.flow
- * contains classes related to implementation for flow related tables, viz. 
flow
- * run table and flow activity table.
- */
-@InterfaceAudience.Private
-@InterfaceStability.Unstable
-package org.apache.hadoop.yarn.server.timelineservice.storage.flow;
-
-import org.apache.hadoop.classification.InterfaceAudience;
-import org.apache.hadoop.classification.InterfaceStability;

http://git-wip-us.apache.org/repos/asf/hadoop/blob/b01514f6/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/reader/ApplicationEntityReader.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/reader/ApplicationEntityReader.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/reader/ApplicationEntityReader.java
deleted file mode 100644
index aa2bfda..0000000
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/reader/ApplicationEntityReader.java
+++ /dev/null
@@ -1,481 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.yarn.server.timelineservice.storage.reader;
-
-import java.io.IOException;
-import java.util.EnumSet;
-import java.util.Set;
-
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.hbase.client.Connection;
-import org.apache.hadoop.hbase.client.Get;
-import org.apache.hadoop.hbase.client.Result;
-import org.apache.hadoop.hbase.client.ResultScanner;
-import org.apache.hadoop.hbase.client.Scan;
-import org.apache.hadoop.hbase.filter.BinaryComparator;
-import org.apache.hadoop.hbase.filter.CompareFilter.CompareOp;
-import org.apache.hadoop.hbase.filter.FamilyFilter;
-import org.apache.hadoop.hbase.filter.FilterList;
-import org.apache.hadoop.hbase.filter.FilterList.Operator;
-import org.apache.hadoop.hbase.filter.PageFilter;
-import org.apache.hadoop.hbase.filter.QualifierFilter;
-import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity;
-import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntityType;
-import 
org.apache.hadoop.yarn.server.timelineservice.reader.TimelineDataToRetrieve;
-import 
org.apache.hadoop.yarn.server.timelineservice.reader.TimelineEntityFilters;
-import 
org.apache.hadoop.yarn.server.timelineservice.reader.TimelineReaderContext;
-import 
org.apache.hadoop.yarn.server.timelineservice.reader.filter.TimelineFilterList;
-import 
org.apache.hadoop.yarn.server.timelineservice.reader.filter.TimelineFilterUtils;
-import 
org.apache.hadoop.yarn.server.timelineservice.storage.TimelineReader.Field;
-import 
org.apache.hadoop.yarn.server.timelineservice.storage.application.ApplicationColumn;
-import 
org.apache.hadoop.yarn.server.timelineservice.storage.application.ApplicationColumnFamily;
-import 
org.apache.hadoop.yarn.server.timelineservice.storage.application.ApplicationColumnPrefix;
-import 
org.apache.hadoop.yarn.server.timelineservice.storage.application.ApplicationRowKey;
-import 
org.apache.hadoop.yarn.server.timelineservice.storage.application.ApplicationRowKeyPrefix;
-import 
org.apache.hadoop.yarn.server.timelineservice.storage.application.ApplicationTable;
-import 
org.apache.hadoop.yarn.server.timelineservice.storage.apptoflow.AppToFlowRowKey;
-import org.apache.hadoop.yarn.server.timelineservice.storage.common.BaseTable;
-import 
org.apache.hadoop.yarn.server.timelineservice.storage.common.RowKeyPrefix;
-import 
org.apache.hadoop.yarn.server.timelineservice.storage.common.TimelineStorageUtils;
-
-import com.google.common.base.Preconditions;
-
-/**
- * Timeline entity reader for application entities that are stored in the
- * application table.
- */
-class ApplicationEntityReader extends GenericEntityReader {
-  private static final ApplicationTable APPLICATION_TABLE =
-      new ApplicationTable();
-
-  public ApplicationEntityReader(TimelineReaderContext ctxt,
-      TimelineEntityFilters entityFilters, TimelineDataToRetrieve toRetrieve) {
-    super(ctxt, entityFilters, toRetrieve, true);
-  }
-
-  public ApplicationEntityReader(TimelineReaderContext ctxt,
-      TimelineDataToRetrieve toRetrieve) {
-    super(ctxt, toRetrieve);
-  }
-
-  /**
-   * Uses the {@link ApplicationTable}.
-   */
-  protected BaseTable<?> getTable() {
-    return APPLICATION_TABLE;
-  }
-
-  /**
-   * This method is called only for multiple entity reads.
-   */
-  @Override
-  protected FilterList constructFilterListBasedOnFilters() throws IOException {
-    // Filters here cannot be null for multiple entity reads as they are set in
-    // augmentParams if null.
-    TimelineEntityFilters filters = getFilters();
-    FilterList listBasedOnFilters = new FilterList();
-    // Create filter list based on created time range and add it to
-    // listBasedOnFilters.
-    long createdTimeBegin = filters.getCreatedTimeBegin();
-    long createdTimeEnd = filters.getCreatedTimeEnd();
-    if (createdTimeBegin != 0 || createdTimeEnd != Long.MAX_VALUE) {
-      listBasedOnFilters.addFilter(
-          TimelineFilterUtils.createSingleColValueFiltersByRange(
-          ApplicationColumn.CREATED_TIME, createdTimeBegin, createdTimeEnd));
-    }
-    // Create filter list based on metric filters and add it to
-    // listBasedOnFilters.
-    TimelineFilterList metricFilters = filters.getMetricFilters();
-    if (metricFilters != null && !metricFilters.getFilterList().isEmpty()) {
-      listBasedOnFilters.addFilter(
-          TimelineFilterUtils.createHBaseFilterList(
-              ApplicationColumnPrefix.METRIC, metricFilters));
-    }
-    // Create filter list based on config filters and add it to
-    // listBasedOnFilters.
-    TimelineFilterList configFilters = filters.getConfigFilters();
-    if (configFilters != null && !configFilters.getFilterList().isEmpty()) {
-      listBasedOnFilters.addFilter(
-          TimelineFilterUtils.createHBaseFilterList(
-              ApplicationColumnPrefix.CONFIG, configFilters));
-    }
-    // Create filter list based on info filters and add it to 
listBasedOnFilters
-    TimelineFilterList infoFilters = filters.getInfoFilters();
-    if (infoFilters != null && !infoFilters.getFilterList().isEmpty()) {
-      listBasedOnFilters.addFilter(
-          TimelineFilterUtils.createHBaseFilterList(
-              ApplicationColumnPrefix.INFO, infoFilters));
-    }
-    return listBasedOnFilters;
-  }
-
-  /**
-   * Add {@link QualifierFilter} filters to filter list for each column of
-   * application table.
-   *
-   * @param list filter list to which qualifier filters have to be added.
-   */
-  @Override
-  protected void updateFixedColumns(FilterList list) {
-    for (ApplicationColumn column : ApplicationColumn.values()) {
-      list.addFilter(new QualifierFilter(CompareOp.EQUAL,
-          new BinaryComparator(column.getColumnQualifierBytes())));
-    }
-  }
-
-  /**
-   * Creates a filter list which indicates that only some of the column
-   * qualifiers in the info column family will be returned in result.
-   *
-   * @return filter list.
-   * @throws IOException if any problem occurs while creating filter list.
-   */
-  private FilterList createFilterListForColsOfInfoFamily()
-      throws IOException {
-    FilterList infoFamilyColsFilter = new FilterList(Operator.MUST_PASS_ONE);
-    // Add filters for each column in entity table.
-    updateFixedColumns(infoFamilyColsFilter);
-    EnumSet<Field> fieldsToRetrieve = 
getDataToRetrieve().getFieldsToRetrieve();
-    // If INFO field has to be retrieved, add a filter for fetching columns
-    // with INFO column prefix.
-    if (hasField(fieldsToRetrieve, Field.INFO)) {
-      infoFamilyColsFilter.addFilter(
-          TimelineFilterUtils.createHBaseQualifierFilter(
-              CompareOp.EQUAL, ApplicationColumnPrefix.INFO));
-    }
-    TimelineFilterList relatesTo = getFilters().getRelatesTo();
-    if (hasField(fieldsToRetrieve, Field.RELATES_TO)) {
-      // If RELATES_TO field has to be retrieved, add a filter for fetching
-      // columns with RELATES_TO column prefix.
-      infoFamilyColsFilter.addFilter(
-          TimelineFilterUtils.createHBaseQualifierFilter(
-              CompareOp.EQUAL, ApplicationColumnPrefix.RELATES_TO));
-    } else if (relatesTo != null && !relatesTo.getFilterList().isEmpty()) {
-      // Even if fields to retrieve does not contain RELATES_TO, we still
-      // need to have a filter to fetch some of the column qualifiers if
-      // relatesTo filters are specified. relatesTo filters will then be
-      // matched after fetching rows from HBase.
-      Set<String> relatesToCols =
-          TimelineFilterUtils.fetchColumnsFromFilterList(relatesTo);
-      infoFamilyColsFilter.addFilter(createFiltersFromColumnQualifiers(
-          ApplicationColumnPrefix.RELATES_TO, relatesToCols));
-    }
-    TimelineFilterList isRelatedTo = getFilters().getIsRelatedTo();
-    if (hasField(fieldsToRetrieve, Field.IS_RELATED_TO)) {
-      // If IS_RELATED_TO field has to be retrieved, add a filter for fetching
-      // columns with IS_RELATED_TO column prefix.
-      infoFamilyColsFilter.addFilter(
-          TimelineFilterUtils.createHBaseQualifierFilter(
-              CompareOp.EQUAL, ApplicationColumnPrefix.IS_RELATED_TO));
-    } else if (isRelatedTo != null && !isRelatedTo.getFilterList().isEmpty()) {
-      // Even if fields to retrieve does not contain IS_RELATED_TO, we still
-      // need to have a filter to fetch some of the column qualifiers if
-      // isRelatedTo filters are specified. isRelatedTo filters will then be
-      // matched after fetching rows from HBase.
-      Set<String> isRelatedToCols =
-          TimelineFilterUtils.fetchColumnsFromFilterList(isRelatedTo);
-      infoFamilyColsFilter.addFilter(createFiltersFromColumnQualifiers(
-          ApplicationColumnPrefix.IS_RELATED_TO, isRelatedToCols));
-    }
-    TimelineFilterList eventFilters = getFilters().getEventFilters();
-    if (hasField(fieldsToRetrieve, Field.EVENTS)) {
-      // If EVENTS field has to be retrieved, add a filter for fetching columns
-      // with EVENT column prefix.
-      infoFamilyColsFilter.addFilter(
-          TimelineFilterUtils.createHBaseQualifierFilter(
-              CompareOp.EQUAL, ApplicationColumnPrefix.EVENT));
-    } else if (eventFilters != null && 
!eventFilters.getFilterList().isEmpty()){
-      // Even if fields to retrieve does not contain EVENTS, we still need to
-      // have a filter to fetch some of the column qualifiers on the basis of
-      // event filters specified. Event filters will then be matched after
-      // fetching rows from HBase.
-      Set<String> eventCols =
-          TimelineFilterUtils.fetchColumnsFromFilterList(eventFilters);
-      infoFamilyColsFilter.addFilter(createFiltersFromColumnQualifiers(
-          ApplicationColumnPrefix.EVENT, eventCols));
-    }
-    return infoFamilyColsFilter;
-  }
-
-  /**
-   * Exclude column prefixes via filters which are not required(based on fields
-   * to retrieve) from info column family. These filters are added to filter
-   * list which contains a filter for getting info column family.
-   *
-   * @param infoColFamilyList filter list for info column family.
-   */
-  private void excludeFieldsFromInfoColFamily(FilterList infoColFamilyList) {
-    EnumSet<Field> fieldsToRetrieve = 
getDataToRetrieve().getFieldsToRetrieve();
-    // Events not required.
-    if (!hasField(fieldsToRetrieve, Field.EVENTS)) {
-      infoColFamilyList.addFilter(
-          TimelineFilterUtils.createHBaseQualifierFilter(
-              CompareOp.NOT_EQUAL, ApplicationColumnPrefix.EVENT));
-    }
-    // info not required.
-    if (!hasField(fieldsToRetrieve, Field.INFO)) {
-      infoColFamilyList.addFilter(
-          TimelineFilterUtils.createHBaseQualifierFilter(
-              CompareOp.NOT_EQUAL, ApplicationColumnPrefix.INFO));
-    }
-    // is related to not required.
-    if (!hasField(fieldsToRetrieve, Field.IS_RELATED_TO)) {
-      infoColFamilyList.addFilter(
-          TimelineFilterUtils.createHBaseQualifierFilter(
-              CompareOp.NOT_EQUAL, ApplicationColumnPrefix.IS_RELATED_TO));
-    }
-    // relates to not required.
-    if (!hasField(fieldsToRetrieve, Field.RELATES_TO)) {
-      infoColFamilyList.addFilter(
-          TimelineFilterUtils.createHBaseQualifierFilter(
-              CompareOp.NOT_EQUAL, ApplicationColumnPrefix.RELATES_TO));
-    }
-  }
-
-  /**
-   * Updates filter list based on fields for confs and metrics to retrieve.
-   *
-   * @param listBasedOnFields filter list based on fields.
-   * @throws IOException if any problem occurs while updating filter list.
-   */
-  private void updateFilterForConfsAndMetricsToRetrieve(
-      FilterList listBasedOnFields) throws IOException {
-    TimelineDataToRetrieve dataToRetrieve = getDataToRetrieve();
-    // Please note that if confsToRetrieve is specified, we would have added
-    // CONFS to fields to retrieve in augmentParams() even if not specified.
-    if (dataToRetrieve.getFieldsToRetrieve().contains(Field.CONFIGS)) {
-      // Create a filter list for configs.
-      listBasedOnFields.addFilter(TimelineFilterUtils.
-          createFilterForConfsOrMetricsToRetrieve(
-              dataToRetrieve.getConfsToRetrieve(),
-              ApplicationColumnFamily.CONFIGS, 
ApplicationColumnPrefix.CONFIG));
-    }
-
-    // Please note that if metricsToRetrieve is specified, we would have added
-    // METRICS to fields to retrieve in augmentParams() even if not specified.
-    if (dataToRetrieve.getFieldsToRetrieve().contains(Field.METRICS)) {
-      // Create a filter list for metrics.
-      listBasedOnFields.addFilter(TimelineFilterUtils.
-          createFilterForConfsOrMetricsToRetrieve(
-              dataToRetrieve.getMetricsToRetrieve(),
-              ApplicationColumnFamily.METRICS, 
ApplicationColumnPrefix.METRIC));
-    }
-  }
-
-  @Override
-  protected FilterList constructFilterListBasedOnFields() throws IOException {
-    if (!needCreateFilterListBasedOnFields()) {
-      // Fetch all the columns. No need of a filter.
-      return null;
-    }
-    FilterList listBasedOnFields = new FilterList(Operator.MUST_PASS_ONE);
-    FilterList infoColFamilyList = new FilterList();
-    // By default fetch everything in INFO column family.
-    FamilyFilter infoColumnFamily =
-        new FamilyFilter(CompareOp.EQUAL,
-            new BinaryComparator(ApplicationColumnFamily.INFO.getBytes()));
-    infoColFamilyList.addFilter(infoColumnFamily);
-    if (!isSingleEntityRead() && fetchPartialColsFromInfoFamily()) {
-      // We can fetch only some of the columns from info family.
-      infoColFamilyList.addFilter(createFilterListForColsOfInfoFamily());
-    } else {
-      // Exclude column prefixes in info column family which are not required
-      // based on fields to retrieve.
-      excludeFieldsFromInfoColFamily(infoColFamilyList);
-    }
-    listBasedOnFields.addFilter(infoColFamilyList);
-
-    updateFilterForConfsAndMetricsToRetrieve(listBasedOnFields);
-    return listBasedOnFields;
-  }
-
-  @Override
-  protected Result getResult(Configuration hbaseConf, Connection conn,
-      FilterList filterList) throws IOException {
-    TimelineReaderContext context = getContext();
-    ApplicationRowKey applicationRowKey =
-        new ApplicationRowKey(context.getClusterId(), context.getUserId(),
-            context.getFlowName(), context.getFlowRunId(), context.getAppId());
-    byte[] rowKey = applicationRowKey.getRowKey();
-    Get get = new Get(rowKey);
-    get.setMaxVersions(getDataToRetrieve().getMetricsLimit());
-    if (filterList != null && !filterList.getFilters().isEmpty()) {
-      get.setFilter(filterList);
-    }
-    return getTable().getResult(hbaseConf, conn, get);
-  }
-
-  @Override
-  protected void validateParams() {
-    Preconditions.checkNotNull(getContext(), "context shouldn't be null");
-    Preconditions.checkNotNull(
-        getDataToRetrieve(), "data to retrieve shouldn't be null");
-    Preconditions.checkNotNull(getContext().getClusterId(),
-        "clusterId shouldn't be null");
-    Preconditions.checkNotNull(getContext().getEntityType(),
-        "entityType shouldn't be null");
-    if (isSingleEntityRead()) {
-      Preconditions.checkNotNull(getContext().getAppId(),
-          "appId shouldn't be null");
-    } else {
-      Preconditions.checkNotNull(getContext().getUserId(),
-          "userId shouldn't be null");
-      Preconditions.checkNotNull(getContext().getFlowName(),
-          "flowName shouldn't be null");
-    }
-  }
-
-  @Override
-  protected void augmentParams(Configuration hbaseConf, Connection conn)
-      throws IOException {
-    TimelineReaderContext context = getContext();
-    if (isSingleEntityRead()) {
-      // Get flow context information from AppToFlow table.
-      if (context.getFlowName() == null || context.getFlowRunId() == null
-          || context.getUserId() == null) {
-        AppToFlowRowKey appToFlowRowKey =
-            new AppToFlowRowKey(context.getClusterId(), context.getAppId());
-        FlowContext flowContext =
-            lookupFlowContext(appToFlowRowKey,
-                hbaseConf, conn);
-        context.setFlowName(flowContext.getFlowName());
-        context.setFlowRunId(flowContext.getFlowRunId());
-        context.setUserId(flowContext.getUserId());
-      }
-    }
-    // Add configs/metrics to fields to retrieve if confsToRetrieve and/or
-    // metricsToRetrieve are specified.
-    getDataToRetrieve().addFieldsBasedOnConfsAndMetricsToRetrieve();
-    if (!isSingleEntityRead()) {
-      createFiltersIfNull();
-    }
-  }
-
-  @Override
-  protected ResultScanner getResults(Configuration hbaseConf,
-      Connection conn, FilterList filterList) throws IOException {
-    Scan scan = new Scan();
-    TimelineReaderContext context = getContext();
-    // Whether or not flowRunID is null doesn't matter, the
-    // ApplicationRowKeyPrefix will do the right thing.
-    RowKeyPrefix<ApplicationRowKey> applicationRowKeyPrefix =
-        new ApplicationRowKeyPrefix(context.getClusterId(),
-            context.getUserId(), context.getFlowName(),
-            context.getFlowRunId());
-    scan.setRowPrefixFilter(applicationRowKeyPrefix.getRowKeyPrefix());
-    FilterList newList = new FilterList();
-    newList.addFilter(new PageFilter(getFilters().getLimit()));
-    if (filterList != null && !filterList.getFilters().isEmpty()) {
-      newList.addFilter(filterList);
-    }
-    scan.setFilter(newList);
-    scan.setMaxVersions(getDataToRetrieve().getMetricsLimit());
-    return getTable().getResultScanner(hbaseConf, conn, scan);
-  }
-
-  @Override
-  protected TimelineEntity parseEntity(Result result) throws IOException {
-    if (result == null || result.isEmpty()) {
-      return null;
-    }
-    TimelineEntity entity = new TimelineEntity();
-    entity.setType(TimelineEntityType.YARN_APPLICATION.toString());
-    String entityId = ApplicationColumn.ID.readResult(result).toString();
-    entity.setId(entityId);
-
-    TimelineEntityFilters filters = getFilters();
-    // fetch created time
-    Long createdTime = (Long) 
ApplicationColumn.CREATED_TIME.readResult(result);
-    entity.setCreatedTime(createdTime);
-
-    EnumSet<Field> fieldsToRetrieve = 
getDataToRetrieve().getFieldsToRetrieve();
-    // fetch is related to entities and match isRelatedTo filter. If 
isRelatedTo
-    // filters do not match, entity would be dropped. We have to match filters
-    // locally as relevant HBase filters to filter out rows on the basis of
-    // isRelatedTo are not set in HBase scan.
-    boolean checkIsRelatedTo =
-        !isSingleEntityRead() && filters.getIsRelatedTo() != null &&
-        filters.getIsRelatedTo().getFilterList().size() > 0;
-    if (hasField(fieldsToRetrieve, Field.IS_RELATED_TO) || checkIsRelatedTo) {
-      readRelationship(entity, result, ApplicationColumnPrefix.IS_RELATED_TO,
-          true);
-      if (checkIsRelatedTo && !TimelineStorageUtils.matchIsRelatedTo(entity,
-          filters.getIsRelatedTo())) {
-        return null;
-      }
-      if (!hasField(fieldsToRetrieve,
-          Field.IS_RELATED_TO)) {
-        entity.getIsRelatedToEntities().clear();
-      }
-    }
-
-    // fetch relates to entities and match relatesTo filter. If relatesTo
-    // filters do not match, entity would be dropped. We have to match filters
-    // locally as relevant HBase filters to filter out rows on the basis of
-    // relatesTo are not set in HBase scan.
-    boolean checkRelatesTo =
-        !isSingleEntityRead() && filters.getRelatesTo() != null &&
-        filters.getRelatesTo().getFilterList().size() > 0;
-    if (hasField(fieldsToRetrieve, Field.RELATES_TO) ||
-        checkRelatesTo) {
-      readRelationship(entity, result, ApplicationColumnPrefix.RELATES_TO,
-          false);
-      if (checkRelatesTo && !TimelineStorageUtils.matchRelatesTo(entity,
-          filters.getRelatesTo())) {
-        return null;
-      }
-      if (!hasField(fieldsToRetrieve, Field.RELATES_TO)) {
-        entity.getRelatesToEntities().clear();
-      }
-    }
-
-    // fetch info if fieldsToRetrieve contains INFO or ALL.
-    if (hasField(fieldsToRetrieve, Field.INFO)) {
-      readKeyValuePairs(entity, result, ApplicationColumnPrefix.INFO, false);
-    }
-
-    // fetch configs if fieldsToRetrieve contains CONFIGS or ALL.
-    if (hasField(fieldsToRetrieve, Field.CONFIGS)) {
-      readKeyValuePairs(entity, result, ApplicationColumnPrefix.CONFIG, true);
-    }
-
-    // fetch events and match event filters if they exist. If event filters do
-    // not match, entity would be dropped. We have to match filters locally
-    // as relevant HBase filters to filter out rows on the basis of events
-    // are not set in HBase scan.
-    boolean checkEvents =
-        !isSingleEntityRead() && filters.getEventFilters() != null &&
-        filters.getEventFilters().getFilterList().size() > 0;
-    if (hasField(fieldsToRetrieve, Field.EVENTS) || checkEvents) {
-      readEvents(entity, result, ApplicationColumnPrefix.EVENT);
-      if (checkEvents && !TimelineStorageUtils.matchEventFilters(entity,
-          filters.getEventFilters())) {
-        return null;
-      }
-      if (!hasField(fieldsToRetrieve, Field.EVENTS)) {
-        entity.getEvents().clear();
-      }
-    }
-
-    // fetch metrics if fieldsToRetrieve contains METRICS or ALL.
-    if (hasField(fieldsToRetrieve, Field.METRICS)) {
-      readMetrics(entity, result, ApplicationColumnPrefix.METRIC);
-    }
-    return entity;
-  }
-}


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscr...@hadoop.apache.org
For additional commands, e-mail: common-commits-h...@hadoop.apache.org

Reply via email to