diff --git 
new file mode 100644
index 0000000..ee57890
--- /dev/null
@@ -0,0 +1,303 @@
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations 
+ * the License.
+ */
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Iterator;
+import java.util.List;
+import java.util.regex.Pattern;
+import org.apache.hadoop.hbase.util.Bytes;
+ * Used to separate row qualifiers, column qualifiers and compount fields.
+ */
+public enum Separator {
+  /**
+   * separator in key or column qualifier fields
+   */
+  QUALIFIERS("!", "%0$"),
+  /**
+   * separator in values, and/or compound key/column qualifier fields.
+   */
+  VALUES("?", "%1$"),
+  /**
+   * separator in values, often used to avoid having these in qualifiers and
+   * names. Note that if we use HTML form encoding through URLEncoder, we end 
+   * getting a + for a space, which may already occur in strings, so we don't
+   * want that.
+   */
+  SPACE(" ", "%2$");
+  /**
+   * The string value of this separator.
+   */
+  private final String value;
+  /**
+   * The URLEncoded version of this separator
+   */
+  private final String encodedValue;
+  /**
+   * The bye representation of value.
+   */
+  private final byte[] bytes;
+  /**
+   * The value quoted so that it can be used as a safe regex
+   */
+  private final String quotedValue;
+  private static final byte[] EMPTY_BYTES = new byte[0];
+  /**
+   * @param value of the separator to use. Cannot be null or empty string.
+   * @param encodedValue choose something that isn't likely to occur in the 
+   *          itself. Cannot be null or empty string.
+   */
+  private Separator(String value, String encodedValue) {
+    this.value = value;
+    this.encodedValue = encodedValue;
+    // validation
+    if (value == null || value.length() == 0 || encodedValue == null
+        || encodedValue.length() == 0) {
+      throw new IllegalArgumentException(
+          "Cannot create separator from null or empty string.");
+    }
+    this.bytes = Bytes.toBytes(value);
+    this.quotedValue = Pattern.quote(value);
+  }
+  /**
+   * Used to make token safe to be used with this separator without collisions.
+   *
+   * @param token
+   * @return the token with any occurrences of this separator URLEncoded.
+   */
+  public String encode(String token) {
+    if (token == null || token.length() == 0) {
+      // Nothing to replace
+      return token;
+    }
+    return token.replace(value, encodedValue);
+  }
+  /**
+   * @param token
+   * @return the token with any occurrences of the encoded separator replaced 
+   *         the separator itself.
+   */
+  public String decode(String token) {
+    if (token == null || token.length() == 0) {
+      // Nothing to replace
+      return token;
+    }
+    return token.replace(encodedValue, value);
+  }
+  /**
+   * Encode the given separators in the token with their encoding equivalent.
+   * This means that when encoding is already present in the token itself, this
+   * is not a reversible process. See also {@link #decode(String, 
+   *
+   * @param token containing possible separators that need to be encoded.
+   * @param separators to be encoded in the token with their URLEncoding
+   *          equivalent.
+   * @return non-null byte representation of the token with occurrences of the
+   *         separators encoded.
+   */
+  public static byte[] encode(String token, Separator... separators) {
+    if (token == null) {
+      return EMPTY_BYTES;
+    }
+    String result = token;
+    for (Separator separator : separators) {
+      if (separator != null) {
+        result = separator.encode(result);
+      }
+    }
+    return Bytes.toBytes(result);
+  }
+  /**
+   * Decode the given separators in the token with their decoding equivalent.
+   * This means that when encoding is already present in the token itself, this
+   * is not a reversible process.
+   *
+   * @param token containing possible separators that need to be encoded.
+   * @param separators to be encoded in the token with their URLEncoding
+   *          equivalent.
+   * @return String representation of the token with occurrences of the URL
+   *         encoded separators decoded.
+   */
+  public static String decode(byte[] token, Separator... separators) {
+    if (token == null) {
+      return null;
+    }
+    return decode(Bytes.toString(token), separators);
+  }
+  /**
+   * Decode the given separators in the token with their decoding equivalent.
+   * This means that when encoding is already present in the token itself, this
+   * is not a reversible process.
+   *
+   * @param token containing possible separators that need to be encoded.
+   * @param separators to be encoded in the token with their URLEncoding
+   *          equivalent.
+   * @return String representation of the token with occurrences of the URL
+   *         encoded separators decoded.
+   */
+  public static String decode(String token, Separator... separators) {
+    if (token == null) {
+      return null;
+    }
+    String result = token;
+    for (Separator separator : separators) {
+      if (separator != null) {
+        result = separator.decode(result);
+      }
+    }
+    return result;
+  }
+  /**
+   * Returns a single byte array containing all of the individual arrays
+   * components separated by this separator.
+   *
+   * @param components
+   * @return byte array after joining the components
+   */
+  public byte[] join(byte[]... components) {
+    if (components == null || components.length == 0) {
+      return EMPTY_BYTES;
+    }
+    int finalSize = 0;
+    finalSize = this.value.length() * (components.length - 1);
+    for (byte[] comp : components) {
+      if (comp != null) {
+        finalSize += comp.length;
+      }
+    }
+    byte[] buf = new byte[finalSize];
+    int offset = 0;
+    for (int i = 0; i < components.length; i++) {
+      if (components[i] != null) {
+        System.arraycopy(components[i], 0, buf, offset, components[i].length);
+        offset += components[i].length;
+      }
+      if (i < (components.length - 1)) {
+        System.arraycopy(this.bytes, 0, buf, offset, this.value.length());
+        offset += this.value.length();
+      }
+    }
+    return buf;
+  }
+  /**
+   * Concatenates items (as String), using this separator.
+   *
+   * @param items Items join, {@code toString()} will be called in each item.
+   *          Any occurrence of the separator in the individual strings will be
+   *          first encoded. Cannot be null.
+   * @return non-null joined result. Note that when separator is {@literal 
+   *         the result is simply all items concatenated and the process is not
+   *         reversible through {@link #splitEncoded(String)}
+   */
+  public String joinEncoded(String... items) {
+    if (items == null || items.length == 0) {
+      return "";
+    }
+    StringBuilder sb = new StringBuilder(encode(items[0].toString()));
+    // Start at 1, we've already grabbed the first value at index 0
+    for (int i = 1; i < items.length; i++) {
+      sb.append(this.value);
+      sb.append(encode(items[i].toString()));
+    }
+    return sb.toString();
+  }
+  /**
+   * Concatenates items (as String), using this separator.
+   *
+   * @param items Items join, {@code toString()} will be called in each item.
+   *          Any occurrence of the separator in the individual strings will be
+   *          first encoded. Cannot be null.
+   * @return non-null joined result. Note that when separator is {@literal 
+   *         the result is simply all items concatenated and the process is not
+   *         reversible through {@link #splitEncoded(String)}
+   */
+  public String joinEncoded(Iterable<?> items) {
+    if (items == null) {
+      return "";
+    }
+    Iterator<?> i = items.iterator();
+    if (!i.hasNext()) {
+      return "";
+    }
+    StringBuilder sb = new StringBuilder(encode(;
+    while (i.hasNext()) {
+      sb.append(this.value);
+      sb.append(encode(;
+    }
+    return sb.toString();
+  }
+  /**
+   * @param compoundValue containing individual values separated by this
+   *          separator, which have that separator encoded.
+   * @return non-null set of values from the compoundValue with the separator
+   *         decoded.
+   */
+  public Collection<String> splitEncoded(String compoundValue) {
+    List<String> result = new ArrayList<String>();
+    if (compoundValue != null) {
+      for (String value : compoundValue.split(quotedValue)) {
+        result.add(decode(value));
+      }
+    }
+    return result;
+  }
+  /**
+   * Splits the source array into multiple array segments using this separator,
+   * up to a maximum of count items. This will naturally produce copied byte
+   * arrays for each of the split segments.
+   * @param source to be split
+   * @param limit on how many segments are supposed to be returned. Negative
+   *          value indicates no limit on number of segments.
+   * @return source split by this separator.
+   */
+  public byte[][] split(byte[] source, int limit) {
+    return TimelineWriterUtils.split(source, this.bytes, limit);
+  }
diff --git 
new file mode 100644
index 0000000..5518a27
--- /dev/null
@@ -0,0 +1,68 @@
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.hbase.util.Bytes;
+ * contains the constants used in the context of schema accesses for
+ * {@link org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity}
+ * information
+ */
+public class TimelineEntitySchemaConstants {
+  /**
+   * Used to create a pre-split for tables starting with a username in the
+   * prefix. TODO: this may have to become a config variable (string with
+   * separators) so that different installations can presplit based on their 
+   * commonly occurring names.
+   */
+  private final static byte[][] USERNAME_SPLITS = { Bytes.toBytes("a"),
+      Bytes.toBytes("ad"), Bytes.toBytes("an"), Bytes.toBytes("b"),
+      Bytes.toBytes("ca"), Bytes.toBytes("cl"), Bytes.toBytes("d"),
+      Bytes.toBytes("e"), Bytes.toBytes("f"), Bytes.toBytes("g"),
+      Bytes.toBytes("h"), Bytes.toBytes("i"), Bytes.toBytes("j"),
+      Bytes.toBytes("k"), Bytes.toBytes("l"), Bytes.toBytes("m"),
+      Bytes.toBytes("n"), Bytes.toBytes("o"), Bytes.toBytes("q"),
+      Bytes.toBytes("r"), Bytes.toBytes("s"), Bytes.toBytes("se"),
+      Bytes.toBytes("t"), Bytes.toBytes("u"), Bytes.toBytes("v"),
+      Bytes.toBytes("w"), Bytes.toBytes("x"), Bytes.toBytes("y"),
+      Bytes.toBytes("z") };
+  /**
+   * The length at which keys auto-split
+   */
+  public static final String USERNAME_SPLIT_KEY_PREFIX_LENGTH = "4";
+  /**
+   * @return splits for splits where a user is a prefix.
+   */
+  public final static byte[][] getUsernameSplits() {
+    byte[][] kloon = USERNAME_SPLITS.clone();
+    // Deep copy.
+    for (int row = 0; row < USERNAME_SPLITS.length; row++) {
+      kloon[row] = Bytes.copy(USERNAME_SPLITS[row]);
+    }
+    return kloon;
+  }
\ No newline at end of file
diff --git 
new file mode 100644
index 0000000..28a0b6a
--- /dev/null
@@ -0,0 +1,127 @@
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+import java.util.ArrayList;
+import java.util.List;
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+ * bunch of utility functions used across TimelineWriter classes
+ */
+public class TimelineWriterUtils {
+  /** empty bytes */
+  public static final byte[] EMPTY_BYTES = new byte[0];
+  /**
+   * Splits the source array into multiple array segments using the given
+   * separator, up to a maximum of count items. This will naturally produce
+   * copied byte arrays for each of the split segments. To identify the split
+   * ranges without the array copies, see
+   * {@link TimelineWriterUtils#splitRanges(byte[], byte[])}.
+   *
+   * @param source
+   * @param separator
+   * @return byte[] array after splitting the source
+   */
+  public static byte[][] split(byte[] source, byte[] separator) {
+    return split(source, separator, -1);
+  }
+  /**
+   * Splits the source array into multiple array segments using the given
+   * separator, up to a maximum of count items. This will naturally produce
+   * copied byte arrays for each of the split segments. To identify the split
+   * ranges without the array copies, see
+   * {@link TimelineWriterUtils#splitRanges(byte[], byte[])}.
+   *
+   * @param source
+   * @param separator
+   * @param limit a negative value indicates no limit on number of segments.
+   * @return byte[][] after splitting the input source
+   */
+  public static byte[][] split(byte[] source, byte[] separator, int limit) {
+    List<Range> segments = splitRanges(source, separator, limit);
+    byte[][] splits = new byte[segments.size()][];
+    for (int i = 0; i < segments.size(); i++) {
+      Range r = segments.get(i);
+      byte[] tmp = new byte[r.length()];
+      if (tmp.length > 0) {
+        System.arraycopy(source, r.start(), tmp, 0, r.length());
+      }
+      splits[i] = tmp;
+    }
+    return splits;
+  }
+  /**
+   * Returns a list of ranges identifying [start, end) -- closed, open --
+   * positions within the source byte array that would be split using the
+   * separator byte array.
+   */
+  public static List<Range> splitRanges(byte[] source, byte[] separator) {
+    return splitRanges(source, separator, -1);
+  }
+  /**
+   * Returns a list of ranges identifying [start, end) -- closed, open --
+   * positions within the source byte array that would be split using the
+   * separator byte array.
+   *
+   * @param source the source data
+   * @param separator the separator pattern to look for
+   * @param limit the maximum number of splits to identify in the source
+   */
+  public static List<Range> splitRanges(byte[] source, byte[] separator,
+      int limit) {
+    List<Range> segments = new ArrayList<Range>();
+    if ((source == null) || (separator == null)) {
+      return segments;
+    }
+    int start = 0;
+    itersource: for (int i = 0; i < source.length; i++) {
+      for (int j = 0; j < separator.length; j++) {
+        if (source[i + j] != separator[j]) {
+          continue itersource;
+        }
+      }
+      // all separator elements matched
+      if (limit > 0 && segments.size() >= (limit - 1)) {
+        // everything else goes in one final segment
+        break;
+      }
+      segments.add(new Range(start, i));
+      start = i + separator.length;
+      // i will be incremented again in outer for loop
+      i += separator.length - 1;
+    }
+    // add in remaining to a final range
+    if (start <= source.length) {
+      segments.add(new Range(start, source.length));
+    }
+    return segments;
+  }
\ No newline at end of file
diff --git 
new file mode 100644
index 0000000..64a11f8
--- /dev/null
@@ -0,0 +1,28 @@
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+import org.apache.hadoop.hbase.client.BufferedMutator;
+ * Just a typed wrapper around {@link BufferedMutator} used to ensure that
+ * columns can write only to the table mutator for the right table.
+ */
+public interface TypedBufferedMutator<T> extends BufferedMutator {
+  // This class is intentionally left (almost) blank
diff --git 
new file mode 100644
index 0000000..32577fb
--- /dev/null
@@ -0,0 +1,24 @@
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
diff --git 
new file mode 100644
index 0000000..90da966
--- /dev/null
@@ -0,0 +1,141 @@
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+import org.apache.hadoop.hbase.client.Result;
+import org.apache.hadoop.hbase.util.Bytes;
+ * Identifies fully qualified columns for the {@link EntityTable}.
+ */
+public enum EntityColumn implements Column<EntityTable> {
+  /**
+   * Identifier for the entity.
+   */
+  ID(EntityColumnFamily.INFO, "id"),
+  /**
+   * The type of entity
+   */
+  TYPE(EntityColumnFamily.INFO, "type"),
+  /**
+   * When the entity was created.
+   */
+  CREATED_TIME(EntityColumnFamily.INFO, "created_time"),
+  /**
+   * When it was modified.
+   */
+  MODIFIED_TIME(EntityColumnFamily.INFO, "modified_time"),
+  /**
+   * The version of the flow that this entity belongs to.
+   */
+  FLOW_VERSION(EntityColumnFamily.INFO, "flow_version");
+  private final ColumnHelper<EntityTable> column;
+  private final ColumnFamily<EntityTable> columnFamily;
+  private final String columnQualifier;
+  private final byte[] columnQualifierBytes;
+  private EntityColumn(ColumnFamily<EntityTable> columnFamily,
+      String columnQualifier) {
+    this.columnFamily = columnFamily;
+    this.columnQualifier = columnQualifier;
+    // Future-proof by ensuring the right column prefix hygiene.
+    this.columnQualifierBytes =
+        Bytes.toBytes(Separator.SPACE.encode(columnQualifier));
+    this.column = new ColumnHelper<EntityTable>(columnFamily);
+  }
+  /**
+   * @return the column name value
+   */
+  private String getColumnQualifier() {
+    return columnQualifier;
+  }
+  public void store(byte[] rowKey,
+      TypedBufferedMutator<EntityTable> tableMutator, Long timestamp,
+      Object inputValue) throws IOException {
+, tableMutator, columnQualifierBytes, timestamp,
+        inputValue);
+  }
+  public Object readResult(Result result) throws IOException {
+    return column.readResult(result, columnQualifierBytes);
+  }
+  /**
+   * Retrieve an {@link EntityColumn} given a name, or null if there is no
+   * match. The following holds true: {@code columnFor(x) == columnFor(y)} if
+   * and only if {@code x.equals(y)} or {@code (x == y == null)}
+   *
+   * @param columnQualifier Name of the column to retrieve
+   * @return the corresponding {@link EntityColumn} or null
+   */
+  public static final EntityColumn columnFor(String columnQualifier) {
+    // Match column based on value, assume column family matches.
+    for (EntityColumn ec : EntityColumn.values()) {
+      // Find a match based only on name.
+      if (ec.getColumnQualifier().equals(columnQualifier)) {
+        return ec;
+      }
+    }
+    // Default to null
+    return null;
+  }
+  /**
+   * Retrieve an {@link EntityColumn} given a name, or null if there is no
+   * match. The following holds true: {@code columnFor(a,x) == columnFor(b,y)}
+   * if and only if {@code a.equals(b) & x.equals(y)} or
+   * {@code (x == y == null)}
+   *
+   * @param columnFamily The columnFamily for which to retrieve the column.
+   * @param name Name of the column to retrieve
+   * @return the corresponding {@link EntityColumn} or null if both arguments
+   *         don't match.
+   */
+  public static final EntityColumn columnFor(EntityColumnFamily columnFamily,
+      String name) {
+    for (EntityColumn ec : EntityColumn.values()) {
+      // Find a match based column family and on name.
+      if (ec.columnFamily.equals(columnFamily)
+          && ec.getColumnQualifier().equals(name)) {
+        return ec;
+      }
+    }
+    // Default to null
+    return null;
+  }
diff --git 
new file mode 100644
index 0000000..8a95d12
--- /dev/null
@@ -0,0 +1,65 @@
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+import org.apache.hadoop.hbase.util.Bytes;
+ * Represents the entity table column families.
+ */
+public enum EntityColumnFamily implements ColumnFamily<EntityTable> {
+  /**
+   * Info column family houses known columns, specifically ones included in
+   * columnfamily filters.
+   */
+  INFO("i"),
+  /**
+   * Configurations are in a separate column family for two reasons: a) the 
+   * of the config values can be very large and b) we expect that config values
+   * are often separately accessed from other metrics and info columns.
+   */
+  CONFIGS("c"),
+  /**
+   * Metrics have a separate column family, because they have a separate TTL.
+   */
+  METRICS("m");
+  /**
+   * Byte representation of this column family.
+   */
+  private final byte[] bytes;
+  /**
+   * @param value create a column family with this name. Must be lower case and
+   *          without spaces.
+   */
+  private EntityColumnFamily(String value) {
+    // column families should be lower case and not contain any spaces.
+    this.bytes = Bytes.toBytes(Separator.SPACE.encode(value));
+  }
+  public byte[] getBytes() {
+    return Bytes.copy(bytes);
+  }
diff --git 
new file mode 100644
index 0000000..4459868
--- /dev/null
@@ -0,0 +1,212 @@
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+import java.util.Map;
+import java.util.NavigableMap;
+import org.apache.hadoop.hbase.client.Result;
+import org.apache.hadoop.hbase.util.Bytes;
+ * Identifies partially qualified columns for the entity table.
+ */
+public enum EntityColumnPrefix implements ColumnPrefix<EntityTable> {
+  /**
+   * To store TimelineEntity getIsRelatedToEntities values.
+   */
+  IS_RELATED_TO(EntityColumnFamily.INFO, "s"),
+  /**
+   * To store TimelineEntity getRelatesToEntities values.
+   */
+  RELATES_TO(EntityColumnFamily.INFO, "r"),
+  /**
+   * Lifecycle events for an entity
+   */
+  EVENT(EntityColumnFamily.INFO, "e"),
+  /**
+   * Config column stores configuration with config key as the column name.
+   */
+  CONFIG(EntityColumnFamily.CONFIGS, null),
+  /**
+   * Metrics are stored with the metric name as the column name.
+   */
+  METRIC(EntityColumnFamily.METRICS, null);
+  private final ColumnHelper<EntityTable> column;
+  private final ColumnFamily<EntityTable> columnFamily;
+  /**
+   * Can be null for those cases where the provided column qualifier is the
+   * entire column name.
+   */
+  private final String columnPrefix;
+  private final byte[] columnPrefixBytes;
+  /**
+   * Private constructor, meant to be used by the enum definition.
+   *
+   * @param columnFamily that this column is stored in.
+   * @param columnPrefix for this column.
+   */
+  private EntityColumnPrefix(ColumnFamily<EntityTable> columnFamily,
+      String columnPrefix) {
+    column = new ColumnHelper<EntityTable>(columnFamily);
+    this.columnFamily = columnFamily;
+    this.columnPrefix = columnPrefix;
+    if (columnPrefix == null) {
+      this.columnPrefixBytes = null;
+    } else {
+      // Future-proof by ensuring the right column prefix hygiene.
+      this.columnPrefixBytes =
+          Bytes.toBytes(Separator.SPACE.encode(columnPrefix));
+    }
+  }
+  /**
+   * @return the column name value
+   */
+  private String getColumnPrefix() {
+    return columnPrefix;
+  }
+  /*
+   * (non-Javadoc)
+   *
+   * @see
+   *
+   * #store(byte[],
+   *
+   * TypedBufferedMutator, java.lang.String, java.lang.Long, java.lang.Object)
+   */
+  public void store(byte[] rowKey,
+      TypedBufferedMutator<EntityTable> tableMutator, String qualifier,
+      Long timestamp, Object inputValue) throws IOException {
+    // Null check
+    if (qualifier == null) {
+      throw new IOException("Cannot store column with null qualifier in "
+          + tableMutator.getName().getNameAsString());
+    }
+    byte[] columnQualifier =
+        ColumnHelper.getColumnQualifier(this.columnPrefixBytes, qualifier);
+, tableMutator, columnQualifier, timestamp, inputValue);
+  }
+  /*
+   * (non-Javadoc)
+   *
+   * @see
+   *
+   * #readResult(org.apache.hadoop.hbase.client.Result, java.lang.String)
+   */
+  public Object readResult(Result result, String qualifier) throws IOException 
+    byte[] columnQualifier =
+        ColumnHelper.getColumnQualifier(this.columnPrefixBytes, qualifier);
+    return column.readResult(result, columnQualifier);
+  }
+  /*
+   * (non-Javadoc)
+   *
+   * @see
+   *
+   * #readResults(org.apache.hadoop.hbase.client.Result)
+   */
+  public Map<String, Object> readResults(Result result) throws IOException {
+    return column.readResults(result, columnPrefixBytes);
+  }
+  /*
+   * (non-Javadoc)
+   *
+   * @see
+   *
+   * #readTimeseriesResults(org.apache.hadoop.hbase.client.Result)
+   */
+  public NavigableMap<String, NavigableMap<Long, Number>> 
+      Result result) throws IOException {
+    return column.readTimeseriesResults(result, columnPrefixBytes);
+  }
+  /**
+   * Retrieve an {@link EntityColumnPrefix} given a name, or null if there is 
+   * match. The following holds true: {@code columnFor(x) == columnFor(y)} if
+   * and only if {@code x.equals(y)} or {@code (x == y == null)}
+   *
+   * @param columnPrefix Name of the column to retrieve
+   * @return the corresponding {@link EntityColumnPrefix} or null
+   */
+  public static final EntityColumnPrefix columnFor(String columnPrefix) {
+    // Match column based on value, assume column family matches.
+    for (EntityColumnPrefix ecp : EntityColumnPrefix.values()) {
+      // Find a match based only on name.
+      if (ecp.getColumnPrefix().equals(columnPrefix)) {
+        return ecp;
+      }
+    }
+    // Default to null
+    return null;
+  }
+  /**
+   * Retrieve an {@link EntityColumnPrefix} given a name, or null if there is 
+   * match. The following holds true: {@code columnFor(a,x) == columnFor(b,y)}
+   * if and only if {@code (x == y == null)} or
+   * {@code a.equals(b) & x.equals(y)}
+   *
+   * @param columnFamily The columnFamily for which to retrieve the column.
+   * @param columnPrefix Name of the column to retrieve
+   * @return the corresponding {@link EntityColumnPrefix} or null if both
+   *         arguments don't match.
+   */
+  public static final EntityColumnPrefix columnFor(
+      EntityColumnFamily columnFamily, String columnPrefix) {
+    // TODO: needs unit test to confirm and need to update javadoc to explain
+    // null prefix case.
+    for (EntityColumnPrefix ecp : EntityColumnPrefix.values()) {
+      // Find a match based column family and on name.
+      if (ecp.columnFamily.equals(columnFamily)
+          && (((columnPrefix == null) && (ecp.getColumnPrefix() == null)) || 
+              .getColumnPrefix().equals(columnPrefix)))) {
+        return ecp;
+      }
+    }
+    // Default to null
+    return null;
+  }
diff --git 
new file mode 100644
index 0000000..61958c2
--- /dev/null
@@ -0,0 +1,93 @@
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity;
+ * Represents a rowkey for the entity table.
+ */
+public class EntityRowKey {
+  // TODO: more methods are needed for this class.
+  // TODO: API needs to be cleaned up.
+  /**
+   * Constructs a row key prefix for the entity table as follows:
+   * {@code userName!clusterId!flowId!flowRunId!AppId}
+   *
+   * @param clusterId
+   * @param userId
+   * @param flowId
+   * @param flowRunId
+   * @param appId
+   * @return byte array with the row key prefix
+   */
+  public static byte[] getRowKeyPrefix(String clusterId, String userId,
+      String flowId, Long flowRunId, String appId) {
+    byte[] first =
+        Bytes.toBytes(Separator.QUALIFIERS.joinEncoded(userId, clusterId,
+            flowId));
+    // Note that flowRunId is a long, so we can't encode them all at the same
+    // time.
+    byte[] second = Bytes.toBytes(EntityRowKey.invert(flowRunId));
+    byte[] third = Bytes.toBytes(Separator.QUALIFIERS.joinEncoded(appId));
+    return Separator.QUALIFIERS.join(first, second, third);
+  }
+  /**
+   * Constructs a row key prefix for the entity table as follows:
+   * {@code userName!clusterId!flowId!flowRunId!AppId}
+   *
+   * @param clusterId
+   * @param userId
+   * @param flowId
+   * @param flowRunId
+   * @param appId
+   * @return byte array with the row key prefix
+   */
+  public static byte[] getRowKey(String clusterId, String userId,
+      String flowId, Long flowRunId, String appId, TimelineEntity te) {
+    byte[] first =
+        Bytes.toBytes(Separator.QUALIFIERS.joinEncoded(userId, clusterId,
+            flowId));
+    // Note that flowRunId is a long, so we can't encode them all at the same
+    // time.
+    byte[] second = Bytes.toBytes(EntityRowKey.invert(flowRunId));
+    byte[] third =
+        Bytes.toBytes(Separator.QUALIFIERS.joinEncoded(appId, te.getType(),
+            te.getId()));
+    return Separator.QUALIFIERS.join(first, second, third);
+  }
+  /**
+   * Converts a timestamp into it's inverse timestamp to be used in (row) keys
+   * where we want to have the most recent timestamp in the top of the table
+   * (scans start at the most recent timestamp first).
+   *
+   * @param key value to be inverted so that the latest version will be first 
+   *          a scan.
+   * @return inverted long
+   */
+  public static long invert(Long key) {
+    return Long.MAX_VALUE - key;
+  }
diff --git 
new file mode 100644
index 0000000..61f7c4c
--- /dev/null
@@ -0,0 +1,161 @@
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.HColumnDescriptor;
+import org.apache.hadoop.hbase.HTableDescriptor;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.client.Admin;
+import org.apache.hadoop.hbase.regionserver.BloomType;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+ * The entity table as column families info, config and metrics. Info stores
+ * information about a timeline entity object config stores configuration data
+ * of a timeline entity object metrics stores the metrics of a timeline entity
+ * object
+ *
+ * Example entity table record:
+ *
+ * <pre>
+ * |--------------------------------------------------------------------|
+ * |  Row       | Column Family           | Column Family| Column Family|
+ * |  key       | info                    | metrics      | config       |
+ * |--------------------------------------------------------------------|
+ * | userName!  | id:entityId             | metricId1:   | configKey1:  |
+ * | clusterId! |                         | metricValue1 | configValue1 |
+ * | flowId!    | type:entityType         | @timestamp1  |              |
+ * | flowRunId! |                         |              | configKey2:  |
+ * | AppId!     | created_time:           | metriciD1:   | configValue2 |
+ * | entityType!| 1392993084018           | metricValue2 |              |
+ * | entityId   |                         | @timestamp2  |              |
+ * |            | modified_time:          |              |              |
+ * |            | 1392995081012           | metricId2:   |              |
+ * |            |                         | metricValue1 |              |
+ * |            | r!relatesToKey:         | @timestamp2  |              |
+ * |            | id3?id4?id5             |              |              |
+ * |            |                         |              |              |
+ * |            | s!isRelatedToKey        |              |              |
+ * |            | id7?id9?id6             |              |              |
+ * |            |                         |              |              |
+ * |            | e!eventId?eventInfoKey: |              |              |
+ * |            | eventInfoValue          |              |              |
+ * |            |                         |              |              |
+ * |            | flowVersion:            |              |              |
+ * |            | versionValue            |              |              |
+ * |--------------------------------------------------------------------|
+ * </pre>
+ */
+public class EntityTable extends BaseTable<EntityTable> {
+  /** entity prefix */
+  private static final String PREFIX =
+      YarnConfiguration.TIMELINE_SERVICE_PREFIX + ".entity";
+  /** config param name that specifies the entity table name */
+  public static final String TABLE_NAME_CONF_NAME = PREFIX + "";
+  /**
+   * config param name that specifies the TTL for metrics column family in
+   * entity table
+   */
+  private static final String METRICS_TTL_CONF_NAME = PREFIX
+      + ".table.metrics.ttl";
+  /** default value for entity table name */
+  private static final String DEFAULT_TABLE_NAME = "timelineservice.entity";
+  /** default TTL is 30 days for metrics timeseries */
+  private static final int DEFAULT_METRICS_TTL = 2592000;
+  /** default max number of versions */
+  private static final int DEFAULT_METRICS_MAX_VERSIONS = 1000;
+  private static final Log LOG = LogFactory.getLog(EntityTable.class);
+  public EntityTable() {
+  }
+  /*
+   * (non-Javadoc)
+   *
+   * @see
+   *
+   * (org.apache.hadoop.hbase.client.Admin,
+   * org.apache.hadoop.conf.Configuration)
+   */
+  public void createTable(Admin admin, Configuration hbaseConf)
+      throws IOException {
+    TableName table = getTableName(hbaseConf);
+    if (admin.tableExists(table)) {
+      // do not disable / delete existing table
+      // similar to the approach taken by map-reduce jobs when
+      // output directory exists
+      throw new IOException("Table " + table.getNameAsString()
+          + " already exists.");
+    }
+    HTableDescriptor entityTableDescp = new HTableDescriptor(table);
+    HColumnDescriptor infoCF =
+        new HColumnDescriptor(EntityColumnFamily.INFO.getBytes());
+    infoCF.setBloomFilterType(BloomType.ROWCOL);
+    entityTableDescp.addFamily(infoCF);
+    HColumnDescriptor configCF =
+        new HColumnDescriptor(EntityColumnFamily.CONFIGS.getBytes());
+    configCF.setBloomFilterType(BloomType.ROWCOL);
+    configCF.setBlockCacheEnabled(true);
+    entityTableDescp.addFamily(configCF);
+    HColumnDescriptor metricsCF =
+        new HColumnDescriptor(EntityColumnFamily.METRICS.getBytes());
+    entityTableDescp.addFamily(metricsCF);
+    metricsCF.setBlockCacheEnabled(true);
+    // always keep 1 version (the latest)
+    metricsCF.setMinVersions(1);
+    metricsCF.setMaxVersions(DEFAULT_METRICS_MAX_VERSIONS);
+    metricsCF.setTimeToLive(hbaseConf.getInt(METRICS_TTL_CONF_NAME,
+    entityTableDescp
+    entityTableDescp.setValue("KeyPrefixRegionSplitPolicy.prefix_length",
+        TimelineEntitySchemaConstants.USERNAME_SPLIT_KEY_PREFIX_LENGTH);
+    admin.createTable(entityTableDescp,
+        TimelineEntitySchemaConstants.getUsernameSplits());
+"Status of table creation for " + table.getNameAsString() + "="
+        + admin.tableExists(table));
+  }
+  /**
+   * @param metricsTTL time to live parameter for the metricss in this table.
+   * @param hbaseConf configururation in which to set the metrics TTL config
+   *          variable.
+   */
+  public void setMetricsTTL(int metricsTTL, Configuration hbaseConf) {
+    hbaseConf.setInt(METRICS_TTL_CONF_NAME, metricsTTL);
+  }
diff --git 
new file mode 100644
index 0000000..26f1cc5
--- /dev/null
@@ -0,0 +1,25 @@
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
diff --git 
index f999b4d..6abf240 100644
@@ -18,43 +18,41 @@
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertTrue;
 import java.util.HashMap;
 import java.util.HashSet;
-import java.util.List;
 import java.util.Map;
+import java.util.Map.Entry;
 import java.util.NavigableMap;
 import java.util.Set;
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertTrue;
-import static org.junit.Assert.assertNotNull;
 import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.hbase.Cell;
-import org.apache.hadoop.hbase.CellUtil;
 import org.apache.hadoop.hbase.HBaseTestingUtility;
-import org.apache.hadoop.hbase.TableName;
 import org.apache.hadoop.hbase.client.Connection;
 import org.apache.hadoop.hbase.client.ConnectionFactory;
 import org.apache.hadoop.hbase.client.Result;
 import org.apache.hadoop.hbase.client.ResultScanner;
 import org.apache.hadoop.hbase.client.Scan;
-import org.apache.hadoop.hbase.client.Table;
 import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntities;
 import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity;
+import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEvent;
 import org.apache.hadoop.yarn.api.records.timelineservice.TimelineMetric;
 import org.apache.hadoop.yarn.api.records.timelineservice.TimelineMetric.Type;
-import org.apache.hadoop.yarn.server.timeline.GenericObjectMapper;
-import org.junit.BeforeClass;
 import org.junit.AfterClass;
+import org.junit.BeforeClass;
 import org.junit.Test;
- * Unit test HBaseTimelineWriterImpl
- * YARN 3411
- *
  * @throws Exception
 public class TestHBaseTimelineWriterImpl {
@@ -69,12 +67,8 @@ public class TestHBaseTimelineWriterImpl {
   private static void createSchema() throws IOException {
-    byte[][] families = new byte[3][];
-    families[0] = EntityColumnFamily.INFO.getInBytes();
-    families[1] = EntityColumnFamily.CONFIG.getInBytes();
-    families[2] = EntityColumnFamily.METRICS.getInBytes();
-    TimelineSchemaCreator.createTimelineEntityTable(util.getHBaseAdmin(),
-        util.getConfiguration());
+    new EntityTable()
+        .createTable(util.getHBaseAdmin(), util.getConfiguration());
@@ -151,18 +145,15 @@ public class TestHBaseTimelineWriterImpl {
       // scan the table and see that entity exists
       Scan s = new Scan();
-      byte[] startRow = TimelineWriterUtils.getRowKeyPrefix(cluster, user, 
-          runid, appName);
+      byte[] startRow =
+          EntityRowKey.getRowKeyPrefix(cluster, user, flow, runid, appName);
-      ResultScanner scanner = null;
-      TableName entityTableName = TableName
-          .valueOf(TimelineEntitySchemaConstants.DEFAULT_ENTITY_TABLE_NAME);
       Connection conn = ConnectionFactory.createConnection(c1);
-      Table entityTable = conn.getTable(entityTableName);
+      ResultScanner scanner = new EntityTable().getResultScanner(c1, conn, s);
       int rowCount = 0;
       int colCount = 0;
-      scanner = entityTable.getScanner(s);
       for (Result result : scanner) {
         if (result != null && !result.isEmpty()) {
@@ -172,37 +163,77 @@ public class TestHBaseTimelineWriterImpl {
           // check info column family
-          NavigableMap<byte[], byte[]> infoValues = result
-              .getFamilyMap(EntityColumnFamily.INFO.getInBytes());
-          String id1 = TimelineWriterUtils.getValueAsString(
-              EntityColumnDetails.ID.getInBytes(), infoValues);
+          String id1 = EntityColumn.ID.readResult(result).toString();
           assertEquals(id, id1);
-          String type1 = TimelineWriterUtils.getValueAsString(
-              EntityColumnDetails.TYPE.getInBytes(), infoValues);
+          String type1 = EntityColumn.TYPE.readResult(result).toString();
           assertEquals(type, type1);
-          Long cTime1 = TimelineWriterUtils.getValueAsLong(
-              EntityColumnDetails.CREATED_TIME.getInBytes(), infoValues);
+          Number val = (Number) EntityColumn.CREATED_TIME.readResult(result);
+          Long cTime1 = val.longValue();
           assertEquals(cTime1, cTime);
-          Long mTime1 = TimelineWriterUtils.getValueAsLong(
-              EntityColumnDetails.MODIFIED_TIME.getInBytes(), infoValues);
+          val = (Number) EntityColumn.MODIFIED_TIME.readResult(result);
+          Long mTime1 = val.longValue();
           assertEquals(mTime1, mTime);
-          checkRelatedEntities(isRelatedTo, infoValues,
-              EntityColumnDetails.PREFIX_IS_RELATED_TO.getInBytes());
-          checkRelatedEntities(relatesTo, infoValues,
-              EntityColumnDetails.PREFIX_RELATES_TO.getInBytes());
-          // check config column family
-          NavigableMap<byte[], byte[]> configValuesResult = result
-              .getFamilyMap(EntityColumnFamily.CONFIG.getInBytes());
-          checkConfigs(configValuesResult, conf);
-          NavigableMap<byte[], byte[]> metricsResult = result
-              .getFamilyMap(EntityColumnFamily.METRICS.getInBytes());
-          checkMetricsSizeAndKey(metricsResult, metrics);
-          List<Cell> metricCells = result.getColumnCells(
-              EntityColumnFamily.METRICS.getInBytes(),
-              Bytes.toBytes(m1.getId()));
-          checkMetricsTimeseries(metricCells, m1);
+          // Remember isRelatedTo is of type Map<String, Set<String>>
+          for (String isRelatedToKey : isRelatedTo.keySet()) {
+            Object isRelatedToValue =
+                EntityColumnPrefix.IS_RELATED_TO.readResult(result,
+                    isRelatedToKey);
+            String compoundValue = isRelatedToValue.toString();
+            // id7?id9?id6
+            Set<String> isRelatedToValues =
+                new HashSet<String>(
+                    Separator.VALUES.splitEncoded(compoundValue));
+            assertEquals(isRelatedTo.get(isRelatedToKey).size(),
+                isRelatedToValues.size());
+            for (String v : isRelatedTo.get(isRelatedToKey)) {
+              assertTrue(isRelatedToValues.contains(v));
+            }
+          }
+          // RelatesTo
+          for (String relatesToKey : relatesTo.keySet()) {
+            String compoundValue =
+                EntityColumnPrefix.RELATES_TO.readResult(result, relatesToKey)
+                    .toString();
+            // id3?id4?id5
+            Set<String> relatesToValues =
+                new HashSet<String>(
+                    Separator.VALUES.splitEncoded(compoundValue));
+            assertEquals(relatesTo.get(relatesToKey).size(),
+                relatesToValues.size());
+            for (String v : relatesTo.get(relatesToKey)) {
+              assertTrue(relatesToValues.contains(v));
+            }
+          }
+          // Configuration
+          Map<String, Object> configColumns =
+              EntityColumnPrefix.CONFIG.readResults(result);
+          assertEquals(conf.size(), configColumns.size());
+          for (String configItem : conf.keySet()) {
+            assertEquals(conf.get(configItem), configColumns.get(configItem));
+          }
+          NavigableMap<String, NavigableMap<Long, Number>> metricsResult =
+              EntityColumnPrefix.METRIC.readTimeseriesResults(result);
+          NavigableMap<Long, Number> metricMap = metricsResult.get(m1.getId());
+          // We got metrics back
+          assertNotNull(metricMap);
+          // Same number of metrics as we wrote
+          assertEquals(metricValues.entrySet().size(), metricMap.entrySet()
+              .size());
+          // Iterate over original metrics and confirm that they are present
+          // here.
+          for (Entry<Long, Number> metricEntry : metricValues.entrySet()) {
+            assertEquals(metricEntry.getValue(),
+                metricMap.get(metricEntry.getKey()));
+          }
       assertEquals(1, rowCount);
@@ -212,80 +243,77 @@ public class TestHBaseTimelineWriterImpl {
-  }
-  private void checkMetricsTimeseries(List<Cell> metricCells,
-      TimelineMetric m1) throws IOException {
-    Map<Long, Number> timeseries = m1.getValues();
-    assertEquals(timeseries.size(), metricCells.size());
-    for (Cell c1 : metricCells) {
-      assertTrue(timeseries.containsKey(c1.getTimestamp()));
-      assertEquals(,
-          timeseries.get(c1.getTimestamp()));
-    }
-  }
-  private void checkMetricsSizeAndKey(
-      NavigableMap<byte[], byte[]> metricsResult, Set<TimelineMetric> metrics) 
-    assertEquals(metrics.size(), metricsResult.size());
-    for (TimelineMetric m1 : metrics) {
-      byte[] key = Bytes.toBytes(m1.getId());
-      assertTrue(metricsResult.containsKey(key));
-    }
-  }
-  private void checkConfigs(NavigableMap<byte[], byte[]> configValuesResult,
-      Map<String, String> conf) throws IOException {
-    assertEquals(conf.size(), configValuesResult.size());
-    byte[] columnName;
-    for (String key : conf.keySet()) {
-      columnName = Bytes.toBytes(key);
-      assertTrue(configValuesResult.containsKey(columnName));
-      byte[] value = configValuesResult.get(columnName);
-      assertNotNull(value);
-      assertEquals(conf.get(key),;
-    }
-  }
-  private void checkRelatedEntities(Map<String, Set<String>> isRelatedTo,
-      NavigableMap<byte[], byte[]> infoValues, byte[] columnPrefix)
-      throws IOException {
-    for (String key : isRelatedTo.keySet()) {
-      byte[] columnName = TimelineWriterUtils.join(
-          TimelineEntitySchemaConstants.ROW_KEY_SEPARATOR_BYTES, columnPrefix,
-          Bytes.toBytes(key));
-      byte[] value = infoValues.get(columnName);
-      assertNotNull(value);
-      String isRelatedToEntities =;
-      assertNotNull(isRelatedToEntities);
-      assertEquals(
-          TimelineWriterUtils.getValueAsString(
-              TimelineEntitySchemaConstants.ROW_KEY_SEPARATOR,
-              isRelatedTo.get(key)), isRelatedToEntities);
-    }
+    // Somewhat of a hack, not a separate test in order not to have to deal 
+    // test case order exectution.
+    testAdditionalEntity();
   private boolean isRowKeyCorrect(byte[] rowKey, String cluster, String user,
       String flow, Long runid, String appName, TimelineEntity te) {
-    byte[][] rowKeyComponents = TimelineWriterUtils.split(rowKey,
-        TimelineEntitySchemaConstants.ROW_KEY_SEPARATOR_BYTES);
+    byte[][] rowKeyComponents = Separator.QUALIFIERS.split(rowKey, -1);
     assertTrue(rowKeyComponents.length == 7);
     assertEquals(user, Bytes.toString(rowKeyComponents[0]));
     assertEquals(cluster, Bytes.toString(rowKeyComponents[1]));
     assertEquals(flow, Bytes.toString(rowKeyComponents[2]));
-    assertEquals(TimelineWriterUtils.encodeRunId(runid),
-        Bytes.toLong(rowKeyComponents[3]));
-    assertEquals(TimelineWriterUtils.cleanse(appName), 
+    assertEquals(EntityRowKey.invert(runid), 
+    assertEquals(appName, Bytes.toString(rowKeyComponents[4]));
     assertEquals(te.getType(), Bytes.toString(rowKeyComponents[5]));
     assertEquals(te.getId(), Bytes.toString(rowKeyComponents[6]));
     return true;
+  private void testAdditionalEntity() throws IOException {
+    TimelineEvent event = new TimelineEvent();
+    event.setId("foo_event_id");
+    event.setTimestamp(System.currentTimeMillis());
+    event.addInfo("foo_event", "test");
+    final TimelineEntity entity = new TimelineEntity();
+    entity.setId("attempt_1329348432655_0001_m_000008_18");
+    entity.setType("FOO_ATTEMPT");
+    TimelineEntities entities = new TimelineEntities();
+    entities.addEntity(entity);
+    HBaseTimelineWriterImpl hbi = null;
+    try {
+      Configuration c1 = util.getConfiguration();
+      hbi = new HBaseTimelineWriterImpl(c1);
+      hbi.init(c1);
+      String cluster = "cluster2";
+      String user = "user2";
+      String flow = "other_flow_name";
+      String flowVersion = "1111F01C2287BA";
+      long runid = 1009876543218L;
+      String appName = "some app name";
+      hbi.write(cluster, user, flow, flowVersion, runid, appName, entities);
+      hbi.stop();
+      // scan the table and see that entity exists
+      Scan s = new Scan();
+      byte[] startRow =
+          EntityRowKey.getRowKeyPrefix(cluster, user, flow, runid, appName);
+      s.setStartRow(startRow);
+      s.setMaxVersions(Integer.MAX_VALUE);
+      Connection conn = ConnectionFactory.createConnection(c1);
+      ResultScanner scanner = new EntityTable().getResultScanner(c1, conn, s);
+      int rowCount = 0;
+      for (Result result : scanner) {
+        if (result != null && !result.isEmpty()) {
+          rowCount++;
+        }
+      }
+      assertEquals(1, rowCount);
+    } finally {
+      hbi.stop();
+      hbi.close();
+    }
+  }
   public static void tearDownAfterClass() throws Exception {
diff --git 
new file mode 100644
index 0000000..8b25a83
--- /dev/null
@@ -0,0 +1,129 @@
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations 
+ * the License.
+ */
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import org.junit.Test;
+public class TestSeparator {
+  private static String villain = "Dr. Heinz Doofenshmirtz";
+  private static String special =
+      ".   *   |   ?   +   (   )   [   ]   {   }   ^   $  \\ \"";
+  /**
+   *
+   */
+  @Test
+  public void testEncodeDecodeString() {
+    for (Separator separator : Separator.values()) {
+      testEncodeDecode(separator, "");
+      testEncodeDecode(separator, " ");
+      testEncodeDecode(separator, "!");
+      testEncodeDecode(separator, "?");
+      testEncodeDecode(separator, "&");
+      testEncodeDecode(separator, "+");
+      testEncodeDecode(separator, "Dr.");
+      testEncodeDecode(separator, "Heinz");
+      testEncodeDecode(separator, "Doofenshmirtz");
+      testEncodeDecode(separator, villain);
+      testEncodeDecode(separator, special);
+      assertNull(separator.encode(null));
+    }
+  }
+  private void testEncodeDecode(Separator separator, String token) {
+    String encoded = separator.encode(token);
+    String decoded = separator.decode(encoded);
+    String msg = "token:" + token + " separator:" + separator + ".";
+    assertEquals(msg, token, decoded);
+  }
+  @Test
+  public void testEncodeDecode() {
+    testEncodeDecode("Dr.", Separator.QUALIFIERS);
+    testEncodeDecode("Heinz", Separator.QUALIFIERS, Separator.QUALIFIERS);
+    testEncodeDecode("Doofenshmirtz", Separator.QUALIFIERS, null,
+        Separator.QUALIFIERS);
+    testEncodeDecode("&Perry", Separator.QUALIFIERS, Separator.VALUES, null);
+    testEncodeDecode("the ", Separator.QUALIFIERS, Separator.SPACE);
+    testEncodeDecode("Platypus...", (Separator) null);
+    testEncodeDecode("The what now ?!?", Separator.QUALIFIERS,
+        Separator.VALUES, Separator.SPACE);
+  }
+  /**
+   * Simple test to encode and decode using the same separators and confirm 
+   * we end up with the same as what we started with.
+   *
+   * @param token
+   * @param separators
+   */
+  private static void testEncodeDecode(String token, Separator... separators) {
+    byte[] encoded = Separator.encode(token, separators);
+    String decoded = Separator.decode(encoded, separators);
+    assertEquals(token, decoded);
+  }
+  @Test
+  public void testJoinStripped() {
+    List<String> stringList = new ArrayList<String>(0);
+    stringList.add("nothing");
+    String joined = Separator.VALUES.joinEncoded(stringList);
+    Iterable<String> split = Separator.VALUES.splitEncoded(joined);
+    assertTrue(Iterables.elementsEqual(stringList, split));
+    stringList = new ArrayList<String>(3);
+    stringList.add("a");
+    stringList.add("b?");
+    stringList.add("c");
+    joined = Separator.VALUES.joinEncoded(stringList);
+    split = Separator.VALUES.splitEncoded(joined);
+    assertTrue(Iterables.elementsEqual(stringList, split));
+    String[] stringArray1 = { "else" };
+    joined = Separator.VALUES.joinEncoded(stringArray1);
+    split = Separator.VALUES.splitEncoded(joined);
+    assertTrue(Iterables.elementsEqual(Arrays.asList(stringArray1), split));
+    String[] stringArray2 = { "d", "e?", "f" };
+    joined = Separator.VALUES.joinEncoded(stringArray2);
+    split = Separator.VALUES.splitEncoded(joined);
+    assertTrue(Iterables.elementsEqual(Arrays.asList(stringArray2), split));
+    List<String> empty = new ArrayList<String>(0);
+    split = Separator.VALUES.splitEncoded(null);
+    assertTrue(Iterables.elementsEqual(empty, split));
+  }
diff --git 
new file mode 100644
index 0000000..4f96f87
--- /dev/null
@@ -0,0 +1,29 @@
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+import org.junit.Test;
+public class TestTimelineWriterUtils {
+  @Test
+  public void test() {
+    // TODO: implement a test for the remaining method in TimelineWriterUtils.
+  }

Reply via email to