http://git-wip-us.apache.org/repos/asf/eagle/blob/6e919c2e/eagle-core/eagle-query/eagle-entity-base/src/main/java/org/apache/eagle/log/entity/meta/MapSerDeser.java
----------------------------------------------------------------------
diff --git 
a/eagle-core/eagle-query/eagle-entity-base/src/main/java/org/apache/eagle/log/entity/meta/MapSerDeser.java
 
b/eagle-core/eagle-query/eagle-entity-base/src/main/java/org/apache/eagle/log/entity/meta/MapSerDeser.java
index d16fe3a..bb9889e 100755
--- 
a/eagle-core/eagle-query/eagle-entity-base/src/main/java/org/apache/eagle/log/entity/meta/MapSerDeser.java
+++ 
b/eagle-core/eagle-query/eagle-entity-base/src/main/java/org/apache/eagle/log/entity/meta/MapSerDeser.java
@@ -24,145 +24,148 @@ import java.util.TreeMap;
 
 /**
  * Serialization/deserialization for map type
- *
  */
 @SuppressWarnings("rawtypes")
 public class MapSerDeser implements EntitySerDeser<Map> {
 
-       @SuppressWarnings({ "unchecked" })
-       @Override
-       public Map deserialize(byte[] bytes) {
-               if (bytes == null || bytes.length == 0) {
-                       return null;
-               }
-               final Map map = new TreeMap();
-               int offset = 0;
-               // get size of int array
-               final int size = ByteUtil.bytesToInt(bytes, offset);
-               offset += 4;
-               
-               for (int i = 0; i < size; ++i) {
-                       final int keyID = ByteUtil.bytesToInt(bytes, offset);
-                       offset += 4;
-                       final Class<?> keyClass = 
EntityDefinitionManager.getClassByID(keyID);
-                       if (keyClass == null) {
-                               throw new IllegalArgumentException("Unsupported 
key type ID: " + keyID);
-                       }
-                       final EntitySerDeser keySerDer = 
EntityDefinitionManager.getSerDeser(keyClass);
-                       final int keyLength = ByteUtil.bytesToInt(bytes, 
offset);
-                       offset += 4;
-                       final byte[] keyContent = new byte[keyLength];
-                       System.arraycopy(bytes, offset, keyContent, 0, 
keyLength);
-                       offset += keyLength;
-                       final Object key = keySerDer.deserialize(keyContent);
-                       
-                       final int valueID = ByteUtil.bytesToInt(bytes, offset);
-                       offset += 4;
-                       final Class<?> valueClass = 
EntityDefinitionManager.getClassByID(valueID);
-                       if (valueClass == null) {
-                               throw new IllegalArgumentException("Unsupported 
value type ID: " + valueID);
-                       }
-                       final EntitySerDeser valueSerDer = 
EntityDefinitionManager.getSerDeser(valueClass);
-                       final int valueLength = ByteUtil.bytesToInt(bytes, 
offset);
-                       offset += 4;
-                       final byte[] valueContent = new byte[valueLength];
-                       System.arraycopy(bytes, offset, valueContent, 0, 
valueLength);
-                       offset += valueLength;
-                       final Object value = 
valueSerDer.deserialize(valueContent);
-                       
-                       map.put(key, value);
-               }
-               return map;
-       }
+    @SuppressWarnings({
+        "unchecked"
+        })
+    @Override
+    public Map deserialize(byte[] bytes) {
+        if (bytes == null || bytes.length == 0) {
+            return null;
+        }
+        final Map map = new TreeMap();
+        int offset = 0;
+        // get size of int array
+        final int size = ByteUtil.bytesToInt(bytes, offset);
+        offset += 4;
 
-       /**
-        *  size + key1 type ID + key1 length + key1 binary content + value1 
type id + value length + value1 binary content + ...
-        *   4B        4B             4B             key1 bytes            4B   
           4B              value1 bytes
-        */
-       @SuppressWarnings({ "unchecked" })
-       @Override
-       public byte[] serialize(Map map) {
-               if(map == null)
-                       return null;
-               final int size = map.size();
-               final int[] keyIDs = new int[size];
-               final int[] valueIDs = new int[size];
-               final byte[][] keyBytes = new byte[size][];
-               final byte[][] valueBytes = new byte[size][];
-               
-               int totalSize = 4 + size * 16;
-               int i = 0;
-               Iterator iter = map.entrySet().iterator();
-               while (iter.hasNext()) {
-                       final Map.Entry entry = (Map.Entry)iter.next();
-                       final Object key = entry.getKey();
-                       final Object value = entry.getValue();
-                       Class<?> keyClass = key.getClass();
-                       Class<?> valueClass = NullObject.class;
-                       if (value != null) {
-                               valueClass = value.getClass();
-                       }
-                       int keyTypeID = 
EntityDefinitionManager.getIDBySerDerClass(keyClass);
-                       int valueTypeID = 0; // default null object
-                       if (valueClass != null) {
-                               valueTypeID = 
EntityDefinitionManager.getIDBySerDerClass(valueClass);
-                       }
-                       if (keyTypeID == -1) {
-                               if (key instanceof Map) {
-                                       keyClass = Map.class;
-                                       keyTypeID = 
EntityDefinitionManager.getIDBySerDerClass(keyClass);
-                               } else {
-                                       throw new 
IllegalArgumentException("Unsupported class: " + keyClass.getName());
-                               }
-                       }
-                       if (valueTypeID == -1) {
-                               if (value instanceof Map) {
-                                       valueClass = Map.class;
-                                       valueTypeID = 
EntityDefinitionManager.getIDBySerDerClass(valueClass);
-                               } else {
-                                       throw new 
IllegalArgumentException("Unsupported class: " + valueClass.getName());
-                               }
-                       }
-                       keyIDs[i] = keyTypeID;
-                       valueIDs[i] = valueTypeID;
-                       final EntitySerDeser keySerDer = 
EntityDefinitionManager.getSerDeser(keyClass);
-                       final EntitySerDeser valueSerDer = 
EntityDefinitionManager.getSerDeser(valueClass);
-                       if (keySerDer == null) {
-                               throw new IllegalArgumentException("Unsupported 
class: " + keyClass.getName());
-                       }
-                       if (valueSerDer == null) {
-                               throw new IllegalArgumentException("Unsupported 
class: " + valueClass.getName());
-                       }
-                       keyBytes[i] = keySerDer.serialize(key);
-                       valueBytes[i] = valueSerDer.serialize(value);
-                       totalSize += keyBytes[i].length + valueBytes[i].length;
-                       ++i;
-               }
-               final byte[] result = new byte[totalSize];
-               int offset = 0;
-               ByteUtil.intToBytes(size, result, offset);
-               offset += 4;
-               for (i = 0; i < size; ++i) {
-                       ByteUtil.intToBytes(keyIDs[i], result, offset);
-                       offset += 4;
-                       ByteUtil.intToBytes(keyBytes[i].length, result, offset);
-                       offset += 4;
-                       System.arraycopy(keyBytes[i], 0, result, offset, 
keyBytes[i].length);
-                       offset += keyBytes[i].length;
-                       
-                       ByteUtil.intToBytes(valueIDs[i], result, offset);
-                       offset += 4;
-                       ByteUtil.intToBytes(valueBytes[i].length, result, 
offset);
-                       offset += 4;
-                       System.arraycopy(valueBytes[i], 0, result, offset, 
valueBytes[i].length);
-                       offset += valueBytes[i].length;
-               }
-               return result;
-       }
+        for (int i = 0; i < size; ++i) {
+            final int keyID = ByteUtil.bytesToInt(bytes, offset);
+            offset += 4;
+            final Class<?> keyClass = 
EntityDefinitionManager.getClassByID(keyID);
+            if (keyClass == null) {
+                throw new IllegalArgumentException("Unsupported key type ID: " 
+ keyID);
+            }
+            final EntitySerDeser keySerDer = 
EntityDefinitionManager.getSerDeser(keyClass);
+            final int keyLength = ByteUtil.bytesToInt(bytes, offset);
+            offset += 4;
+            final byte[] keyContent = new byte[keyLength];
+            System.arraycopy(bytes, offset, keyContent, 0, keyLength);
+            offset += keyLength;
+            final Object key = keySerDer.deserialize(keyContent);
 
-       @Override
-       public Class<Map> type() {
-               return Map.class;
-       }
-}
+            final int valueID = ByteUtil.bytesToInt(bytes, offset);
+            offset += 4;
+            final Class<?> valueClass = 
EntityDefinitionManager.getClassByID(valueID);
+            if (valueClass == null) {
+                throw new IllegalArgumentException("Unsupported value type ID: 
" + valueID);
+            }
+            final EntitySerDeser valueSerDer = 
EntityDefinitionManager.getSerDeser(valueClass);
+            final int valueLength = ByteUtil.bytesToInt(bytes, offset);
+            offset += 4;
+            final byte[] valueContent = new byte[valueLength];
+            System.arraycopy(bytes, offset, valueContent, 0, valueLength);
+            offset += valueLength;
+            final Object value = valueSerDer.deserialize(valueContent);
+
+            map.put(key, value);
+        }
+        return map;
+    }
+
+    /**
+     * size + key1 type ID + key1 length + key1 binary content + value1 type 
id + value length + value1 binary
+     * content + ... 4B 4B 4B key1 bytes 4B 4B value1 bytes
+     */
+    @SuppressWarnings({
+        "unchecked"
+        })
+    @Override
+    public byte[] serialize(Map map) {
+        if (map == null) {
+            return null;
+        }
+        final int size = map.size();
+        final int[] keyIDs = new int[size];
+        final int[] valueIDs = new int[size];
+        final byte[][] keyBytes = new byte[size][];
+        final byte[][] valueBytes = new byte[size][];
 
+        int totalSize = 4 + size * 16;
+        int i = 0;
+        Iterator iter = map.entrySet().iterator();
+        while (iter.hasNext()) {
+            final Map.Entry entry = (Map.Entry)iter.next();
+            final Object key = entry.getKey();
+            final Object value = entry.getValue();
+            Class<?> keyClass = key.getClass();
+            Class<?> valueClass = NullObject.class;
+            if (value != null) {
+                valueClass = value.getClass();
+            }
+            int keyTypeID = 
EntityDefinitionManager.getIDBySerDerClass(keyClass);
+            int valueTypeID = 0; // default null object
+            if (valueClass != null) {
+                valueTypeID = 
EntityDefinitionManager.getIDBySerDerClass(valueClass);
+            }
+            if (keyTypeID == -1) {
+                if (key instanceof Map) {
+                    keyClass = Map.class;
+                    keyTypeID = 
EntityDefinitionManager.getIDBySerDerClass(keyClass);
+                } else {
+                    throw new IllegalArgumentException("Unsupported class: " + 
keyClass.getName());
+                }
+            }
+            if (valueTypeID == -1) {
+                if (value instanceof Map) {
+                    valueClass = Map.class;
+                    valueTypeID = 
EntityDefinitionManager.getIDBySerDerClass(valueClass);
+                } else {
+                    throw new IllegalArgumentException("Unsupported class: " + 
valueClass.getName());
+                }
+            }
+            keyIDs[i] = keyTypeID;
+            valueIDs[i] = valueTypeID;
+            final EntitySerDeser keySerDer = 
EntityDefinitionManager.getSerDeser(keyClass);
+            final EntitySerDeser valueSerDer = 
EntityDefinitionManager.getSerDeser(valueClass);
+            if (keySerDer == null) {
+                throw new IllegalArgumentException("Unsupported class: " + 
keyClass.getName());
+            }
+            if (valueSerDer == null) {
+                throw new IllegalArgumentException("Unsupported class: " + 
valueClass.getName());
+            }
+            keyBytes[i] = keySerDer.serialize(key);
+            valueBytes[i] = valueSerDer.serialize(value);
+            totalSize += keyBytes[i].length + valueBytes[i].length;
+            ++i;
+        }
+        final byte[] result = new byte[totalSize];
+        int offset = 0;
+        ByteUtil.intToBytes(size, result, offset);
+        offset += 4;
+        for (i = 0; i < size; ++i) {
+            ByteUtil.intToBytes(keyIDs[i], result, offset);
+            offset += 4;
+            ByteUtil.intToBytes(keyBytes[i].length, result, offset);
+            offset += 4;
+            System.arraycopy(keyBytes[i], 0, result, offset, 
keyBytes[i].length);
+            offset += keyBytes[i].length;
+
+            ByteUtil.intToBytes(valueIDs[i], result, offset);
+            offset += 4;
+            ByteUtil.intToBytes(valueBytes[i].length, result, offset);
+            offset += 4;
+            System.arraycopy(valueBytes[i], 0, result, offset, 
valueBytes[i].length);
+            offset += valueBytes[i].length;
+        }
+        return result;
+    }
+
+    @Override
+    public Class<Map> type() {
+        return Map.class;
+    }
+}

http://git-wip-us.apache.org/repos/asf/eagle/blob/6e919c2e/eagle-core/eagle-query/eagle-entity-base/src/main/java/org/apache/eagle/log/entity/meta/Metric.java
----------------------------------------------------------------------
diff --git 
a/eagle-core/eagle-query/eagle-entity-base/src/main/java/org/apache/eagle/log/entity/meta/Metric.java
 
b/eagle-core/eagle-query/eagle-entity-base/src/main/java/org/apache/eagle/log/entity/meta/Metric.java
index 0e3e776..5f5bb6e 100644
--- 
a/eagle-core/eagle-query/eagle-entity-base/src/main/java/org/apache/eagle/log/entity/meta/Metric.java
+++ 
b/eagle-core/eagle-query/eagle-entity-base/src/main/java/org/apache/eagle/log/entity/meta/Metric.java
@@ -21,9 +21,11 @@ import java.lang.annotation.Retention;
 import java.lang.annotation.RetentionPolicy;
 import java.lang.annotation.Target;
 
-@Target({ElementType.TYPE})
+@Target({
+         ElementType.TYPE
+})
 @Retention(RetentionPolicy.RUNTIME)
 public @interface Metric {
-       // interval with million seconds
-       long interval() default 60000;
+    // interval with million seconds
+    long interval() default 60000;
 }

http://git-wip-us.apache.org/repos/asf/eagle/blob/6e919c2e/eagle-core/eagle-query/eagle-entity-base/src/main/java/org/apache/eagle/log/entity/meta/MetricDefinition.java
----------------------------------------------------------------------
diff --git 
a/eagle-core/eagle-query/eagle-entity-base/src/main/java/org/apache/eagle/log/entity/meta/MetricDefinition.java
 
b/eagle-core/eagle-query/eagle-entity-base/src/main/java/org/apache/eagle/log/entity/meta/MetricDefinition.java
index 06bbed3..4b23ea9 100755
--- 
a/eagle-core/eagle-query/eagle-entity-base/src/main/java/org/apache/eagle/log/entity/meta/MetricDefinition.java
+++ 
b/eagle-core/eagle-query/eagle-entity-base/src/main/java/org/apache/eagle/log/entity/meta/MetricDefinition.java
@@ -25,44 +25,54 @@ import java.io.DataOutput;
 import java.io.IOException;
 
 public class MetricDefinition implements Writable {
-       private final static Logger LOG = 
LoggerFactory.getLogger(MetricDefinition.class);
-       private long interval;
-       private Class<?> singleTimestampEntityClass;
-       public long getInterval() {
-               return interval;
-       }
-       public void setInterval(long interval) {
-               this.interval = interval;
-       }
-       public Class<?> getSingleTimestampEntityClass() {
-               return singleTimestampEntityClass;
-       }
-       public void setSingleTimestampEntityClass(Class<?> 
singleTimestampEntityClass) {
-               this.singleTimestampEntityClass = singleTimestampEntityClass;
-       }
+    private static final Logger LOG = 
LoggerFactory.getLogger(MetricDefinition.class);
+    private long interval;
+    private Class<?> singleTimestampEntityClass;
 
-       private final static String EMPTY="";
-       @Override
-       public void write(DataOutput out) throws IOException {
-               if(LOG.isDebugEnabled()) LOG.debug("Writing metric definition: 
interval = "+interval+" singleTimestampEntityClass = "+ 
this.singleTimestampEntityClass);
-               out.writeLong(interval);
-               if(this.singleTimestampEntityClass == null){
-                       out.writeUTF(EMPTY);
-               }else {
-                       out.writeUTF(this.singleTimestampEntityClass.getName());
-               }
-       }
+    public long getInterval() {
+        return interval;
+    }
 
-       @Override
-       public void readFields(DataInput in) throws IOException {
-               interval = in.readLong();
-               String singleTimestampEntityClassName = in.readUTF();
-               if(!EMPTY.equals(singleTimestampEntityClassName)) {
-                       try {
-                               this.singleTimestampEntityClass = 
Class.forName(singleTimestampEntityClassName);
-                       } catch (ClassNotFoundException e) {
-                               if(LOG.isDebugEnabled()) LOG.warn("Class " + 
singleTimestampEntityClassName + " not found ");
-                       }
-               }
-       }
+    public void setInterval(long interval) {
+        this.interval = interval;
+    }
+
+    public Class<?> getSingleTimestampEntityClass() {
+        return singleTimestampEntityClass;
+    }
+
+    public void setSingleTimestampEntityClass(Class<?> 
singleTimestampEntityClass) {
+        this.singleTimestampEntityClass = singleTimestampEntityClass;
+    }
+
+    private static final String EMPTY = "";
+
+    @Override
+    public void write(DataOutput out) throws IOException {
+        if (LOG.isDebugEnabled()) {
+            LOG.debug("Writing metric definition: interval = " + interval + " 
singleTimestampEntityClass = "
+                      + this.singleTimestampEntityClass);
+        }
+        out.writeLong(interval);
+        if (this.singleTimestampEntityClass == null) {
+            out.writeUTF(EMPTY);
+        } else {
+            out.writeUTF(this.singleTimestampEntityClass.getName());
+        }
+    }
+
+    @Override
+    public void readFields(DataInput in) throws IOException {
+        interval = in.readLong();
+        String singleTimestampEntityClassName = in.readUTF();
+        if (!EMPTY.equals(singleTimestampEntityClassName)) {
+            try {
+                this.singleTimestampEntityClass = 
Class.forName(singleTimestampEntityClassName);
+            } catch (ClassNotFoundException e) {
+                if (LOG.isDebugEnabled()) {
+                    LOG.warn("Class " + singleTimestampEntityClassName + " not 
found ");
+                }
+            }
+        }
+    }
 }

http://git-wip-us.apache.org/repos/asf/eagle/blob/6e919c2e/eagle-core/eagle-query/eagle-entity-base/src/main/java/org/apache/eagle/log/entity/meta/NonUniqueIndex.java
----------------------------------------------------------------------
diff --git 
a/eagle-core/eagle-query/eagle-entity-base/src/main/java/org/apache/eagle/log/entity/meta/NonUniqueIndex.java
 
b/eagle-core/eagle-query/eagle-entity-base/src/main/java/org/apache/eagle/log/entity/meta/NonUniqueIndex.java
index 9fb05a3..01536f1 100755
--- 
a/eagle-core/eagle-query/eagle-entity-base/src/main/java/org/apache/eagle/log/entity/meta/NonUniqueIndex.java
+++ 
b/eagle-core/eagle-query/eagle-entity-base/src/main/java/org/apache/eagle/log/entity/meta/NonUniqueIndex.java
@@ -21,7 +21,9 @@ import java.lang.annotation.Retention;
 import java.lang.annotation.RetentionPolicy;
 import java.lang.annotation.Target;
 
-@Target({ElementType.TYPE})
+@Target({
+         ElementType.TYPE
+})
 @Retention(RetentionPolicy.RUNTIME)
 public @interface NonUniqueIndex {
 

http://git-wip-us.apache.org/repos/asf/eagle/blob/6e919c2e/eagle-core/eagle-query/eagle-entity-base/src/main/java/org/apache/eagle/log/entity/meta/NonUniqueIndexes.java
----------------------------------------------------------------------
diff --git 
a/eagle-core/eagle-query/eagle-entity-base/src/main/java/org/apache/eagle/log/entity/meta/NonUniqueIndexes.java
 
b/eagle-core/eagle-query/eagle-entity-base/src/main/java/org/apache/eagle/log/entity/meta/NonUniqueIndexes.java
index ff11397..f838590 100755
--- 
a/eagle-core/eagle-query/eagle-entity-base/src/main/java/org/apache/eagle/log/entity/meta/NonUniqueIndexes.java
+++ 
b/eagle-core/eagle-query/eagle-entity-base/src/main/java/org/apache/eagle/log/entity/meta/NonUniqueIndexes.java
@@ -21,11 +21,12 @@ import java.lang.annotation.Retention;
 import java.lang.annotation.RetentionPolicy;
 import java.lang.annotation.Target;
 
-
-@Target({ElementType.TYPE})
+@Target({
+         ElementType.TYPE
+})
 @Retention(RetentionPolicy.RUNTIME)
 public @interface NonUniqueIndexes {
-       
-       public NonUniqueIndex[] value();
+
+    public NonUniqueIndex[] value();
 
 }

http://git-wip-us.apache.org/repos/asf/eagle/blob/6e919c2e/eagle-core/eagle-query/eagle-entity-base/src/main/java/org/apache/eagle/log/entity/meta/NullSerDeser.java
----------------------------------------------------------------------
diff --git 
a/eagle-core/eagle-query/eagle-entity-base/src/main/java/org/apache/eagle/log/entity/meta/NullSerDeser.java
 
b/eagle-core/eagle-query/eagle-entity-base/src/main/java/org/apache/eagle/log/entity/meta/NullSerDeser.java
index 1778788..fd76999 100755
--- 
a/eagle-core/eagle-query/eagle-entity-base/src/main/java/org/apache/eagle/log/entity/meta/NullSerDeser.java
+++ 
b/eagle-core/eagle-query/eagle-entity-base/src/main/java/org/apache/eagle/log/entity/meta/NullSerDeser.java
@@ -16,22 +16,22 @@
  */
 package org.apache.eagle.log.entity.meta;
 
-public class NullSerDeser implements EntitySerDeser<NullObject>{
+public class NullSerDeser implements EntitySerDeser<NullObject> {
 
-       private static final byte[] EMPTY_NULL_ARRAY = new byte[0];
-       
-       @Override
-       public NullObject deserialize(byte[] bytes) {
-               return null;
-       }
+    private static final byte[] EMPTY_NULL_ARRAY = new byte[0];
 
-       @Override
-       public byte[] serialize(NullObject t) {
-               return EMPTY_NULL_ARRAY;
-       }
+    @Override
+    public NullObject deserialize(byte[] bytes) {
+        return null;
+    }
 
-       @Override
-       public Class<NullObject> type() {
-               return NullObject.class;
-       }
+    @Override
+    public byte[] serialize(NullObject t) {
+        return EMPTY_NULL_ARRAY;
+    }
+
+    @Override
+    public Class<NullObject> type() {
+        return NullObject.class;
+    }
 }

http://git-wip-us.apache.org/repos/asf/eagle/blob/6e919c2e/eagle-core/eagle-query/eagle-entity-base/src/main/java/org/apache/eagle/log/entity/meta/Partition.java
----------------------------------------------------------------------
diff --git 
a/eagle-core/eagle-query/eagle-entity-base/src/main/java/org/apache/eagle/log/entity/meta/Partition.java
 
b/eagle-core/eagle-query/eagle-entity-base/src/main/java/org/apache/eagle/log/entity/meta/Partition.java
index cb60016..479cb33 100644
--- 
a/eagle-core/eagle-query/eagle-entity-base/src/main/java/org/apache/eagle/log/entity/meta/Partition.java
+++ 
b/eagle-core/eagle-query/eagle-entity-base/src/main/java/org/apache/eagle/log/entity/meta/Partition.java
@@ -22,19 +22,18 @@ import java.lang.annotation.RetentionPolicy;
 import java.lang.annotation.Target;
 
 /**
- * Partition annotation will impact the rowkey generation for Eagle entities. 
Once an entity class 
- * has defined the partition fields for an Eagle entity, the hash codes of the 
defined partition 
- * fields will be placed just after prefix field, and before timestamp field.
- * 
- *
+ * Partition annotation will impact the rowkey generation for Eagle entities. 
Once an entity class has defined
+ * the partition fields for an Eagle entity, the hash codes of the defined 
partition fields will be placed
+ * just after prefix field, and before timestamp field.
  */
-@Target({ElementType.TYPE})
+@Target({
+         ElementType.TYPE
+})
 @Retention(RetentionPolicy.RUNTIME)
-public @interface Partition
-{
+public @interface Partition {
     /**
      * Order in which annotated tags are to be regarded as data partitions.
      */
-    public String[] value() default { };
+    public String[] value() default {};
 
 }

http://git-wip-us.apache.org/repos/asf/eagle/blob/6e919c2e/eagle-core/eagle-query/eagle-entity-base/src/main/java/org/apache/eagle/log/entity/meta/Prefix.java
----------------------------------------------------------------------
diff --git 
a/eagle-core/eagle-query/eagle-entity-base/src/main/java/org/apache/eagle/log/entity/meta/Prefix.java
 
b/eagle-core/eagle-query/eagle-entity-base/src/main/java/org/apache/eagle/log/entity/meta/Prefix.java
index 36f404c..587243d 100644
--- 
a/eagle-core/eagle-query/eagle-entity-base/src/main/java/org/apache/eagle/log/entity/meta/Prefix.java
+++ 
b/eagle-core/eagle-query/eagle-entity-base/src/main/java/org/apache/eagle/log/entity/meta/Prefix.java
@@ -21,8 +21,10 @@ import java.lang.annotation.Retention;
 import java.lang.annotation.RetentionPolicy;
 import java.lang.annotation.Target;
 
-@Target({ElementType.TYPE})
+@Target({
+         ElementType.TYPE
+})
 @Retention(RetentionPolicy.RUNTIME)
 public @interface Prefix {
-       String value() default "";
-}
\ No newline at end of file
+    String value() default "";
+}

http://git-wip-us.apache.org/repos/asf/eagle/blob/6e919c2e/eagle-core/eagle-query/eagle-entity-base/src/main/java/org/apache/eagle/log/entity/meta/Qualifier.java
----------------------------------------------------------------------
diff --git 
a/eagle-core/eagle-query/eagle-entity-base/src/main/java/org/apache/eagle/log/entity/meta/Qualifier.java
 
b/eagle-core/eagle-query/eagle-entity-base/src/main/java/org/apache/eagle/log/entity/meta/Qualifier.java
index 64d73dd..7f849ff 100755
--- 
a/eagle-core/eagle-query/eagle-entity-base/src/main/java/org/apache/eagle/log/entity/meta/Qualifier.java
+++ 
b/eagle-core/eagle-query/eagle-entity-base/src/main/java/org/apache/eagle/log/entity/meta/Qualifier.java
@@ -27,74 +27,83 @@ import java.io.IOException;
 import java.util.HashMap;
 import java.util.Map;
 
-public class Qualifier implements Writable{
-       private final static Logger LOG = 
LoggerFactory.getLogger(Qualifier.class);
+public class Qualifier implements Writable {
+    private static final Logger LOG = LoggerFactory.getLogger(Qualifier.class);
 
-       private String displayName;
-       private String qualifierName;
-       private EntitySerDeser<Object> serDeser;
-       @JsonIgnore
-       public EntitySerDeser<Object> getSerDeser() {
-               return serDeser;
-       }
-       public void setSerDeser(EntitySerDeser<Object> serDeser) {
-               this.serDeser = serDeser;
-       }
-       public String getDisplayName() {
-               return displayName;
-       }
-       public void setDisplayName(String displayName) {
-               this.displayName = displayName;
-       }
-       public String getQualifierName() {
-               return qualifierName;
-       }
-       public void setQualifierName(String qualifierName) {
-               this.qualifierName = qualifierName;
-       }
-       
-       public String toString(){
-               StringBuffer sb = new StringBuffer();
-               sb.append("displayName:");
-               sb.append(displayName);
-               sb.append(",");
-               sb.append("qualifierName:");
-               sb.append(qualifierName);
-               sb.append(",");
-               sb.append("serDeser class:");
-               sb.append(serDeser.getClass().getName());
-               return sb.toString();
-       }
+    private String displayName;
+    private String qualifierName;
+    private EntitySerDeser<Object> serDeser;
 
-       @Override
-       public void write(DataOutput out) throws IOException {
-               out.writeUTF(displayName);
-               out.writeUTF(qualifierName);
-               out.writeUTF(serDeser.getClass().getName());
-       }
+    @JsonIgnore
+    public EntitySerDeser<Object> getSerDeser() {
+        return serDeser;
+    }
 
-       private final static Map<String, EntitySerDeser> _entitySerDeserCache = 
new HashMap<String,EntitySerDeser>();
+    public void setSerDeser(EntitySerDeser<Object> serDeser) {
+        this.serDeser = serDeser;
+    }
 
-       @Override
-       public void readFields(DataInput in) throws IOException {
-               displayName = in.readUTF();
-               qualifierName = in.readUTF();
-               String serDeserClassName = in.readUTF();
+    public String getDisplayName() {
+        return displayName;
+    }
 
-               EntitySerDeser _cached = 
_entitySerDeserCache.get(serDeserClassName);
-               if(_cached != null){
-                       this.serDeser = _cached;
-               }else {
-                       try {
-                               if (LOG.isDebugEnabled()) LOG.debug("Creating 
new instance for " + serDeserClassName);
-                               Class serDeserClass = 
Class.forName(serDeserClassName);
-                               this.serDeser = (EntitySerDeser) 
serDeserClass.newInstance();
-                               _entitySerDeserCache.put(serDeserClassName, 
this.serDeser);
-                       } catch (Exception e) {
-                               if (LOG.isDebugEnabled()) {
-                                       LOG.warn("Class not found for " + 
serDeserClassName + ": " + e.getMessage(), e);
-                               }
-                       }
-               }
-       }
+    public void setDisplayName(String displayName) {
+        this.displayName = displayName;
+    }
+
+    public String getQualifierName() {
+        return qualifierName;
+    }
+
+    public void setQualifierName(String qualifierName) {
+        this.qualifierName = qualifierName;
+    }
+
+    @Override
+    public String toString() {
+        StringBuffer sb = new StringBuffer();
+        sb.append("displayName:");
+        sb.append(displayName);
+        sb.append(",");
+        sb.append("qualifierName:");
+        sb.append(qualifierName);
+        sb.append(",");
+        sb.append("serDeser class:");
+        sb.append(serDeser.getClass().getName());
+        return sb.toString();
+    }
+
+    @Override
+    public void write(DataOutput out) throws IOException {
+        out.writeUTF(displayName);
+        out.writeUTF(qualifierName);
+        out.writeUTF(serDeser.getClass().getName());
+    }
+
+    private static final Map<String, EntitySerDeser> _entitySerDeserCache = 
new HashMap<String, EntitySerDeser>();
+
+    @Override
+    public void readFields(DataInput in) throws IOException {
+        displayName = in.readUTF();
+        qualifierName = in.readUTF();
+        String serDeserClassName = in.readUTF();
+
+        EntitySerDeser _cached = _entitySerDeserCache.get(serDeserClassName);
+        if (_cached != null) {
+            this.serDeser = _cached;
+        } else {
+            try {
+                if (LOG.isDebugEnabled()) {
+                    LOG.debug("Creating new instance for " + 
serDeserClassName);
+                }
+                Class serDeserClass = Class.forName(serDeserClassName);
+                this.serDeser = (EntitySerDeser)serDeserClass.newInstance();
+                _entitySerDeserCache.put(serDeserClassName, this.serDeser);
+            } catch (Exception e) {
+                if (LOG.isDebugEnabled()) {
+                    LOG.warn("Class not found for " + serDeserClassName + ": " 
+ e.getMessage(), e);
+                }
+            }
+        }
+    }
 }

http://git-wip-us.apache.org/repos/asf/eagle/blob/6e919c2e/eagle-core/eagle-query/eagle-entity-base/src/main/java/org/apache/eagle/log/entity/meta/Service.java
----------------------------------------------------------------------
diff --git 
a/eagle-core/eagle-query/eagle-entity-base/src/main/java/org/apache/eagle/log/entity/meta/Service.java
 
b/eagle-core/eagle-query/eagle-entity-base/src/main/java/org/apache/eagle/log/entity/meta/Service.java
index 22d70ed..f6e9700 100644
--- 
a/eagle-core/eagle-query/eagle-entity-base/src/main/java/org/apache/eagle/log/entity/meta/Service.java
+++ 
b/eagle-core/eagle-query/eagle-entity-base/src/main/java/org/apache/eagle/log/entity/meta/Service.java
@@ -21,8 +21,10 @@ import java.lang.annotation.Retention;
 import java.lang.annotation.RetentionPolicy;
 import java.lang.annotation.Target;
 
-@Target({ElementType.TYPE})
+@Target({
+         ElementType.TYPE
+})
 @Retention(RetentionPolicy.RUNTIME)
 public @interface Service {
-       String value() default "";
+    String value() default "";
 }

http://git-wip-us.apache.org/repos/asf/eagle/blob/6e919c2e/eagle-core/eagle-query/eagle-entity-base/src/main/java/org/apache/eagle/log/entity/meta/ServicePath.java
----------------------------------------------------------------------
diff --git 
a/eagle-core/eagle-query/eagle-entity-base/src/main/java/org/apache/eagle/log/entity/meta/ServicePath.java
 
b/eagle-core/eagle-query/eagle-entity-base/src/main/java/org/apache/eagle/log/entity/meta/ServicePath.java
index 8c712d0..6dc15c5 100644
--- 
a/eagle-core/eagle-query/eagle-entity-base/src/main/java/org/apache/eagle/log/entity/meta/ServicePath.java
+++ 
b/eagle-core/eagle-query/eagle-entity-base/src/main/java/org/apache/eagle/log/entity/meta/ServicePath.java
@@ -23,9 +23,10 @@ import java.lang.annotation.Target;
 
 /**
  * This class is for service client for generic entity creation API (entities 
and metrics)
- *
  */
-@Target({ElementType.TYPE})
+@Target({
+         ElementType.TYPE
+})
 @Retention(RetentionPolicy.RUNTIME)
 public @interface ServicePath {
 

http://git-wip-us.apache.org/repos/asf/eagle/blob/6e919c2e/eagle-core/eagle-query/eagle-entity-base/src/main/java/org/apache/eagle/log/entity/meta/StringArraySerDeser.java
----------------------------------------------------------------------
diff --git 
a/eagle-core/eagle-query/eagle-entity-base/src/main/java/org/apache/eagle/log/entity/meta/StringArraySerDeser.java
 
b/eagle-core/eagle-query/eagle-entity-base/src/main/java/org/apache/eagle/log/entity/meta/StringArraySerDeser.java
index 635065b..2e5fa8d 100755
--- 
a/eagle-core/eagle-query/eagle-entity-base/src/main/java/org/apache/eagle/log/entity/meta/StringArraySerDeser.java
+++ 
b/eagle-core/eagle-query/eagle-entity-base/src/main/java/org/apache/eagle/log/entity/meta/StringArraySerDeser.java
@@ -22,73 +22,73 @@ import org.apache.eagle.common.ByteUtil;
 
 /**
  * String array entity serializer and deserializer
- *
  */
 public class StringArraySerDeser implements EntitySerDeser<String[]> {
 
-       public static final int MAX_STRING_LENGTH = 65535;
-       public static final String UTF_8 = "UTF-8";
-       
-       @Override
-       public String[] deserialize(byte[] bytes) {
-               if(bytes == null || bytes.length < 4)
-                       return null;
-               int offset = 0;
-               // get size of int array
-               final int size = ByteUtil.bytesToInt(bytes, offset);
-               offset += 4;
-               final String[] strings = new String[size];
-               try {
-                       for(int i = 0; i < size; i++) {
-                               final int len = ByteUtil.bytesToInt(bytes, 
offset);
-                               offset += 4;
-                               strings[i] = new String(bytes, offset, len, 
UTF_8);
-                               offset += len;
-                       }
-               } catch (UnsupportedEncodingException e) {
-                       throw new IllegalArgumentException("Invalid byte 
array");
-               }
-               return strings;
-       }
-       
-       /**
-        *  size + str1 length + str1 + str2 length + str2 + ...
-        *   4B        4B         n1B        4B        n2B
-        *  
-        * @param obj
-        * @return
-        */
-       @Override
-       public byte[] serialize(String[] array) {
-               if(array == null)
-                       return null;
-               final int size = array.length;
-               final byte[][] tmp = new byte[size][];
-               int total = 4 + 4 * size;
-               for (int i = 0; i < size; ++i) {
-                       try {
-                               tmp[i] = array[i].getBytes(UTF_8);
-                       } catch (UnsupportedEncodingException e) {
-                               throw new IllegalArgumentException("String 
doesn't support UTF-8 encoding: " + array[i]);
-                       }
-                       total += tmp[i].length;
-               }
-               final byte[] result = new byte[total];
-               int offset = 0;
-               ByteUtil.intToBytes(size, result, offset);
-               offset += 4;
-               for (int i = 0; i < size; ++i) {
-                       ByteUtil.intToBytes(tmp[i].length, result, offset);
-                       offset += 4;
-                       System.arraycopy(tmp[i], 0, result, offset, 
tmp[i].length);
-                       offset += tmp[i].length;
-               }
-               return result;
-       }
+    public static final int MAX_STRING_LENGTH = 65535;
+    public static final String UTF_8 = "UTF-8";
+
+    @Override
+    public String[] deserialize(byte[] bytes) {
+        if (bytes == null || bytes.length < 4) {
+            return null;
+        }
+        int offset = 0;
+        // get size of int array
+        final int size = ByteUtil.bytesToInt(bytes, offset);
+        offset += 4;
+        final String[] strings = new String[size];
+        try {
+            for (int i = 0; i < size; i++) {
+                final int len = ByteUtil.bytesToInt(bytes, offset);
+                offset += 4;
+                strings[i] = new String(bytes, offset, len, UTF_8);
+                offset += len;
+            }
+        } catch (UnsupportedEncodingException e) {
+            throw new IllegalArgumentException("Invalid byte array");
+        }
+        return strings;
+    }
+
+    /**
+     * size + str1 length + str1 + str2 length + str2 + ... 4B 4B n1B 4B n2B
+     * 
+     * @param obj
+     * @return
+     */
+    @Override
+    public byte[] serialize(String[] array) {
+        if (array == null) {
+            return null;
+        }
+        final int size = array.length;
+        final byte[][] tmp = new byte[size][];
+        int total = 4 + 4 * size;
+        for (int i = 0; i < size; ++i) {
+            try {
+                tmp[i] = array[i].getBytes(UTF_8);
+            } catch (UnsupportedEncodingException e) {
+                throw new IllegalArgumentException("String doesn't support 
UTF-8 encoding: " + array[i]);
+            }
+            total += tmp[i].length;
+        }
+        final byte[] result = new byte[total];
+        int offset = 0;
+        ByteUtil.intToBytes(size, result, offset);
+        offset += 4;
+        for (int i = 0; i < size; ++i) {
+            ByteUtil.intToBytes(tmp[i].length, result, offset);
+            offset += 4;
+            System.arraycopy(tmp[i], 0, result, offset, tmp[i].length);
+            offset += tmp[i].length;
+        }
+        return result;
+    }
 
-       @Override
-       public Class<String[]> type() {
-               return String[].class;
-       }
+    @Override
+    public Class<String[]> type() {
+        return String[].class;
+    }
 
 }

http://git-wip-us.apache.org/repos/asf/eagle/blob/6e919c2e/eagle-core/eagle-query/eagle-entity-base/src/main/java/org/apache/eagle/log/entity/meta/StringSerDeser.java
----------------------------------------------------------------------
diff --git 
a/eagle-core/eagle-query/eagle-entity-base/src/main/java/org/apache/eagle/log/entity/meta/StringSerDeser.java
 
b/eagle-core/eagle-query/eagle-entity-base/src/main/java/org/apache/eagle/log/entity/meta/StringSerDeser.java
index eef6e4f..532e27a 100755
--- 
a/eagle-core/eagle-query/eagle-entity-base/src/main/java/org/apache/eagle/log/entity/meta/StringSerDeser.java
+++ 
b/eagle-core/eagle-query/eagle-entity-base/src/main/java/org/apache/eagle/log/entity/meta/StringSerDeser.java
@@ -18,22 +18,24 @@ package org.apache.eagle.log.entity.meta;
 
 public class StringSerDeser implements EntitySerDeser<String> {
 
-       public StringSerDeser(){}
+    public StringSerDeser() {
+    }
 
-       @Override
-       public String deserialize(byte[] bytes){
-               return new String(bytes);
-       }
-       
-       @Override
-       public byte[] serialize(String obj){
-               if(obj == null)
-                       return null;
-               return obj.getBytes();
-       }
+    @Override
+    public String deserialize(byte[] bytes) {
+        return new String(bytes);
+    }
 
-       @Override
-       public Class<String> type() {
-               return String.class;
-       }
+    @Override
+    public byte[] serialize(String obj) {
+        if (obj == null) {
+            return null;
+        }
+        return obj.getBytes();
+    }
+
+    @Override
+    public Class<String> type() {
+        return String.class;
+    }
 }

http://git-wip-us.apache.org/repos/asf/eagle/blob/6e919c2e/eagle-core/eagle-query/eagle-entity-base/src/main/java/org/apache/eagle/log/entity/meta/Table.java
----------------------------------------------------------------------
diff --git 
a/eagle-core/eagle-query/eagle-entity-base/src/main/java/org/apache/eagle/log/entity/meta/Table.java
 
b/eagle-core/eagle-query/eagle-entity-base/src/main/java/org/apache/eagle/log/entity/meta/Table.java
index ac722cd..d36487a 100644
--- 
a/eagle-core/eagle-query/eagle-entity-base/src/main/java/org/apache/eagle/log/entity/meta/Table.java
+++ 
b/eagle-core/eagle-query/eagle-entity-base/src/main/java/org/apache/eagle/log/entity/meta/Table.java
@@ -21,8 +21,10 @@ import java.lang.annotation.Retention;
 import java.lang.annotation.RetentionPolicy;
 import java.lang.annotation.Target;
 
-@Target({ElementType.TYPE})
+@Target({
+         ElementType.TYPE
+})
 @Retention(RetentionPolicy.RUNTIME)
 public @interface Table {
-       String value() default "";
+    String value() default "";
 }

http://git-wip-us.apache.org/repos/asf/eagle/blob/6e919c2e/eagle-core/eagle-query/eagle-entity-base/src/main/java/org/apache/eagle/log/entity/meta/Tags.java
----------------------------------------------------------------------
diff --git 
a/eagle-core/eagle-query/eagle-entity-base/src/main/java/org/apache/eagle/log/entity/meta/Tags.java
 
b/eagle-core/eagle-query/eagle-entity-base/src/main/java/org/apache/eagle/log/entity/meta/Tags.java
index ac9b328..0f5c38d 100644
--- 
a/eagle-core/eagle-query/eagle-entity-base/src/main/java/org/apache/eagle/log/entity/meta/Tags.java
+++ 
b/eagle-core/eagle-query/eagle-entity-base/src/main/java/org/apache/eagle/log/entity/meta/Tags.java
@@ -27,5 +27,7 @@ import java.lang.annotation.Target;
 @Target(ElementType.TYPE)
 @Retention(RetentionPolicy.RUNTIME)
 public @interface Tags {
-       String[] value() default {""};
+    String[] value() default {
+                              ""
+    };
 }

http://git-wip-us.apache.org/repos/asf/eagle/blob/6e919c2e/eagle-core/eagle-query/eagle-entity-base/src/main/java/org/apache/eagle/log/entity/meta/TimeSeries.java
----------------------------------------------------------------------
diff --git 
a/eagle-core/eagle-query/eagle-entity-base/src/main/java/org/apache/eagle/log/entity/meta/TimeSeries.java
 
b/eagle-core/eagle-query/eagle-entity-base/src/main/java/org/apache/eagle/log/entity/meta/TimeSeries.java
index 01023bc..e708f14 100644
--- 
a/eagle-core/eagle-query/eagle-entity-base/src/main/java/org/apache/eagle/log/entity/meta/TimeSeries.java
+++ 
b/eagle-core/eagle-query/eagle-entity-base/src/main/java/org/apache/eagle/log/entity/meta/TimeSeries.java
@@ -21,8 +21,10 @@ import java.lang.annotation.Retention;
 import java.lang.annotation.RetentionPolicy;
 import java.lang.annotation.Target;
 
-@Target({ElementType.TYPE})
+@Target({
+         ElementType.TYPE
+})
 @Retention(RetentionPolicy.RUNTIME)
 public @interface TimeSeries {
-       boolean value() default true;
+    boolean value() default true;
 }

http://git-wip-us.apache.org/repos/asf/eagle/blob/6e919c2e/eagle-core/eagle-query/eagle-entity-base/src/main/java/org/apache/eagle/log/entity/old/GenericByRowkeyReader.java
----------------------------------------------------------------------
diff --git 
a/eagle-core/eagle-query/eagle-entity-base/src/main/java/org/apache/eagle/log/entity/old/GenericByRowkeyReader.java
 
b/eagle-core/eagle-query/eagle-entity-base/src/main/java/org/apache/eagle/log/entity/old/GenericByRowkeyReader.java
index 43a7073..5744ee8 100644
--- 
a/eagle-core/eagle-query/eagle-entity-base/src/main/java/org/apache/eagle/log/entity/old/GenericByRowkeyReader.java
+++ 
b/eagle-core/eagle-query/eagle-entity-base/src/main/java/org/apache/eagle/log/entity/old/GenericByRowkeyReader.java
@@ -30,49 +30,51 @@ import org.apache.eagle.log.entity.InternalLog;
 import org.apache.eagle.common.EagleBase64Wrapper;
 
 public class GenericByRowkeyReader {
-       private static final Logger LOG = 
LoggerFactory.getLogger(GenericByRowkeyReader.class);
+    private static final Logger LOG = 
LoggerFactory.getLogger(GenericByRowkeyReader.class);
 
-       private TaggedLogObjectMapper mapper;
-       private String table;
-       private String columnFamily;
-       private boolean outputAll;
-       private List<String> outputColumns;
-       private GenericReader.EntityFactory entityFactory;
-       
-       public GenericByRowkeyReader(TaggedLogObjectMapper mapper, 
GenericReader.EntityFactory entityFactory, String table, String columnFamily, 
boolean outputAll, List<String> outputColumns){
-               this.mapper = mapper;
-               this.entityFactory = entityFactory;
-               this.table = table;
-               this.columnFamily = columnFamily;
-               this.outputAll = outputAll;
-               this.outputColumns = outputColumns;
-       }
-       
-       public List<TaggedLogAPIEntity> read(List<String> rowkeys) throws 
IOException{
-               HBaseLogByRowkeyReader reader = new 
HBaseLogByRowkeyReader(this.table, this.columnFamily, 
-                               outputAll, outputColumns);
-               List<TaggedLogAPIEntity> entities = new 
ArrayList<TaggedLogAPIEntity>();
-               try{
-                       reader.open();
-                       for(String rowkeyString : rowkeys){
-                               byte[] rowkey = 
EagleBase64Wrapper.decode(rowkeyString);
-                               InternalLog log = reader.get(rowkey);
-                               TaggedLogAPIEntity entity = 
entityFactory.create();
-                               entities.add(entity);
-                               entity.setTags(log.getTags());
-                               entity.setTimestamp(log.getTimestamp());
-                               entity.setEncodedRowkey(log.getEncodedRowkey());
-                               entity.setPrefix(log.getPrefix());
-                               Map<String, byte[]> qualifierValues = 
log.getQualifierValues();
-                               mapper.populateQualifierValues(entity, 
qualifierValues);
-                       }
-               }catch(IOException ex){
-                       LOG.error("Fail read by rowkey", ex);
-                       throw ex;
-               }finally{
-                       reader.close();
-               }
-               
-               return entities;
-       }
+    private TaggedLogObjectMapper mapper;
+    private String table;
+    private String columnFamily;
+    private boolean outputAll;
+    private List<String> outputColumns;
+    private GenericReader.EntityFactory entityFactory;
+
+    public GenericByRowkeyReader(TaggedLogObjectMapper mapper, 
GenericReader.EntityFactory entityFactory,
+                                 String table, String columnFamily, boolean 
outputAll,
+                                 List<String> outputColumns) {
+        this.mapper = mapper;
+        this.entityFactory = entityFactory;
+        this.table = table;
+        this.columnFamily = columnFamily;
+        this.outputAll = outputAll;
+        this.outputColumns = outputColumns;
+    }
+
+    public List<TaggedLogAPIEntity> read(List<String> rowkeys) throws 
IOException {
+        HBaseLogByRowkeyReader reader = new HBaseLogByRowkeyReader(this.table, 
this.columnFamily, outputAll,
+                                                                   
outputColumns);
+        List<TaggedLogAPIEntity> entities = new 
ArrayList<TaggedLogAPIEntity>();
+        try {
+            reader.open();
+            for (String rowkeyString : rowkeys) {
+                byte[] rowkey = EagleBase64Wrapper.decode(rowkeyString);
+                InternalLog log = reader.get(rowkey);
+                TaggedLogAPIEntity entity = entityFactory.create();
+                entities.add(entity);
+                entity.setTags(log.getTags());
+                entity.setTimestamp(log.getTimestamp());
+                entity.setEncodedRowkey(log.getEncodedRowkey());
+                entity.setPrefix(log.getPrefix());
+                Map<String, byte[]> qualifierValues = log.getQualifierValues();
+                mapper.populateQualifierValues(entity, qualifierValues);
+            }
+        } catch (IOException ex) {
+            LOG.error("Fail read by rowkey", ex);
+            throw ex;
+        } finally {
+            reader.close();
+        }
+
+        return entities;
+    }
 }

http://git-wip-us.apache.org/repos/asf/eagle/blob/6e919c2e/eagle-core/eagle-query/eagle-entity-base/src/main/java/org/apache/eagle/log/entity/old/GenericDeleter.java
----------------------------------------------------------------------
diff --git 
a/eagle-core/eagle-query/eagle-entity-base/src/main/java/org/apache/eagle/log/entity/old/GenericDeleter.java
 
b/eagle-core/eagle-query/eagle-entity-base/src/main/java/org/apache/eagle/log/entity/old/GenericDeleter.java
index e97b522..f2acd85 100755
--- 
a/eagle-core/eagle-query/eagle-entity-base/src/main/java/org/apache/eagle/log/entity/old/GenericDeleter.java
+++ 
b/eagle-core/eagle-query/eagle-entity-base/src/main/java/org/apache/eagle/log/entity/old/GenericDeleter.java
@@ -32,20 +32,19 @@ import 
org.apache.eagle.log.entity.meta.EntityDefinitionManager;
 import org.apache.eagle.log.entity.meta.IndexDefinition;
 
 public class GenericDeleter {
-       private static final Logger LOG = 
LoggerFactory.getLogger(GenericDeleter.class);
+    private static final Logger LOG = 
LoggerFactory.getLogger(GenericDeleter.class);
 
-       private final HBaseLogDeleter deleter;
-       private final HBaseLogByRowkeyReader reader;
-       
-       
-       public GenericDeleter(EntityDefinition ed) {
-               this(ed.getTable(), ed.getColumnFamily());
-       }
-       
-       public GenericDeleter(String table, String columnFamily) {
-               this.deleter = new HBaseLogDeleter(table, columnFamily);
-               this.reader = new HBaseLogByRowkeyReader(table, columnFamily, 
true, null);
-       }
+    private final HBaseLogDeleter deleter;
+    private final HBaseLogByRowkeyReader reader;
+
+    public GenericDeleter(EntityDefinition ed) {
+        this(ed.getTable(), ed.getColumnFamily());
+    }
+
+    public GenericDeleter(String table, String columnFamily) {
+        this.deleter = new HBaseLogDeleter(table, columnFamily);
+        this.reader = new HBaseLogByRowkeyReader(table, columnFamily, true, 
null);
+    }
 
     public void deleteByRowkeys(List<byte[]> rowkeys) throws Exception {
         try {
@@ -66,70 +65,73 @@ public class GenericDeleter {
             throw e;
         }
     }
-       
-       public List<String> delete(List<? extends TaggedLogAPIEntity> entities) 
throws Exception{
+
+    public List<String> delete(List<? extends TaggedLogAPIEntity> entities) 
throws Exception {
         List<String> encodedRowkey = new LinkedList<String>();
-               try{
-                       deleter.open();
-                       final Map<Class<? extends TaggedLogAPIEntity>, 
List<TaggedLogAPIEntity>> entityClassMap = classifyEntities(entities);
-                       for (Map.Entry<Class<? extends TaggedLogAPIEntity>, 
List<TaggedLogAPIEntity>> entry : entityClassMap.entrySet()) {
-                               final Class<? extends TaggedLogAPIEntity> clazz 
= entry.getKey();
-                               final List<? extends TaggedLogAPIEntity> 
entityList = entry.getValue();
+        try {
+            deleter.open();
+            final Map<Class<? extends TaggedLogAPIEntity>, 
List<TaggedLogAPIEntity>> entityClassMap = classifyEntities(entities);
+            for (Map.Entry<Class<? extends TaggedLogAPIEntity>, 
List<TaggedLogAPIEntity>> entry : entityClassMap
+                .entrySet()) {
+                final Class<? extends TaggedLogAPIEntity> clazz = 
entry.getKey();
+                final List<? extends TaggedLogAPIEntity> entityList = 
entry.getValue();
 
-                               final EntityDefinition entityDef = 
EntityDefinitionManager.getEntityDefinitionByEntityClass(clazz);
-                               // TODO: we should fix this hardcoded prefix 
hack
-                               fixPrefixAndTimestampIssue(entityList, 
entityDef);
+                final EntityDefinition entityDef = EntityDefinitionManager
+                    .getEntityDefinitionByEntityClass(clazz);
+                // TODO: we should fix this hardcoded prefix hack
+                fixPrefixAndTimestampIssue(entityList, entityDef);
 
-                               final List<byte[]> rowkeys = 
RowkeyHelper.getRowkeysByEntities(entityList, entityDef);
-                               // Check index
-                               final IndexDefinition[] indexes = 
entityDef.getIndexes();
-                               if (indexes != null && indexes.length > 0) {
-                                       reader.open();
-                                       final List<InternalLog> logs = 
reader.get(rowkeys);
-                                       final List<TaggedLogAPIEntity> 
newEntities = HBaseInternalLogHelper.buildEntities(logs, entityDef);
-                                       for (TaggedLogAPIEntity entity : 
newEntities) {
-                                               // Add index rowkeys
-                                               for (IndexDefinition index : 
indexes) {
-                                                       final byte[] 
indexRowkey = index.generateIndexRowkey(entity);
-                                                       
rowkeys.add(indexRowkey);
-                                               }
-                                       }
-                               }
-                for(byte[] rowkey:rowkeys) {
+                final List<byte[]> rowkeys = 
RowkeyHelper.getRowkeysByEntities(entityList, entityDef);
+                // Check index
+                final IndexDefinition[] indexes = entityDef.getIndexes();
+                if (indexes != null && indexes.length > 0) {
+                    reader.open();
+                    final List<InternalLog> logs = reader.get(rowkeys);
+                    final List<TaggedLogAPIEntity> newEntities = 
HBaseInternalLogHelper
+                        .buildEntities(logs, entityDef);
+                    for (TaggedLogAPIEntity entity : newEntities) {
+                        // Add index rowkeys
+                        for (IndexDefinition index : indexes) {
+                            final byte[] indexRowkey = 
index.generateIndexRowkey(entity);
+                            rowkeys.add(indexRowkey);
+                        }
+                    }
+                }
+                for (byte[] rowkey : rowkeys) {
                     
encodedRowkey.add(EagleBase64Wrapper.encodeByteArray2URLSafeString(rowkey));
                 }
-                               deleter.deleteRowkeys(rowkeys);
-                       }
-               }catch(IOException ioe){
-                       LOG.error("Fail writing tagged log", ioe);
-                       throw ioe;
-               }finally{
-                       deleter.close();
-               }
+                deleter.deleteRowkeys(rowkeys);
+            }
+        } catch (IOException ioe) {
+            LOG.error("Fail writing tagged log", ioe);
+            throw ioe;
+        } finally {
+            deleter.close();
+        }
         return encodedRowkey;
-       }
+    }
 
-       private void fixPrefixAndTimestampIssue(List<? extends 
TaggedLogAPIEntity> entities, EntityDefinition entityDef) {
-               for (TaggedLogAPIEntity e : entities) {
-                       e.setPrefix(entityDef.getPrefix());
-                       if (!entityDef.isTimeSeries()) {
-                               
e.setTimestamp(EntityConstants.FIXED_WRITE_TIMESTAMP); // set timestamp to MAX, 
then actually stored 0
-                       }
-               }
-       }
+    private void fixPrefixAndTimestampIssue(List<? extends TaggedLogAPIEntity> 
entities,
+                                            EntityDefinition entityDef) {
+        for (TaggedLogAPIEntity e : entities) {
+            e.setPrefix(entityDef.getPrefix());
+            if (!entityDef.isTimeSeries()) {
+                e.setTimestamp(EntityConstants.FIXED_WRITE_TIMESTAMP); // set 
timestamp to MAX, then actually stored 0
+            }
+        }
+    }
 
-       private Map<Class<? extends TaggedLogAPIEntity>, 
List<TaggedLogAPIEntity>> classifyEntities(List<? extends TaggedLogAPIEntity> 
entities) {
-               final Map<Class<? extends TaggedLogAPIEntity>, 
List<TaggedLogAPIEntity>> result = new 
-                               HashMap<Class<? extends TaggedLogAPIEntity>, 
List<TaggedLogAPIEntity>>();
-               for (TaggedLogAPIEntity entity : entities) {
-                       final Class<? extends TaggedLogAPIEntity> clazz = 
entity.getClass();
-                       List<TaggedLogAPIEntity> list = result.get(clazz);
-                       if (list == null) {
-                               list = new ArrayList<TaggedLogAPIEntity>();
-                               result.put(clazz, list);
-                       }
-                       list.add(entity);
-               }
-               return result;
-       }
+    private Map<Class<? extends TaggedLogAPIEntity>, List<TaggedLogAPIEntity>> 
classifyEntities(List<? extends TaggedLogAPIEntity> entities) {
+        final Map<Class<? extends TaggedLogAPIEntity>, 
List<TaggedLogAPIEntity>> result = new HashMap<Class<? extends 
TaggedLogAPIEntity>, List<TaggedLogAPIEntity>>();
+        for (TaggedLogAPIEntity entity : entities) {
+            final Class<? extends TaggedLogAPIEntity> clazz = 
entity.getClass();
+            List<TaggedLogAPIEntity> list = result.get(clazz);
+            if (list == null) {
+                list = new ArrayList<TaggedLogAPIEntity>();
+                result.put(clazz, list);
+            }
+            list.add(entity);
+        }
+        return result;
+    }
 }

http://git-wip-us.apache.org/repos/asf/eagle/blob/6e919c2e/eagle-core/eagle-query/eagle-entity-base/src/main/java/org/apache/eagle/log/entity/old/GenericReader.java
----------------------------------------------------------------------
diff --git 
a/eagle-core/eagle-query/eagle-entity-base/src/main/java/org/apache/eagle/log/entity/old/GenericReader.java
 
b/eagle-core/eagle-query/eagle-entity-base/src/main/java/org/apache/eagle/log/entity/old/GenericReader.java
index 76e314b..09ff861 100644
--- 
a/eagle-core/eagle-query/eagle-entity-base/src/main/java/org/apache/eagle/log/entity/old/GenericReader.java
+++ 
b/eagle-core/eagle-query/eagle-entity-base/src/main/java/org/apache/eagle/log/entity/old/GenericReader.java
@@ -32,84 +32,87 @@ import org.apache.eagle.log.entity.InternalLog;
 import org.apache.eagle.common.DateTimeUtil;
 
 public class GenericReader {
-       private static final Logger LOG = 
LoggerFactory.getLogger(GenericReader.class);
+    private static final Logger LOG = 
LoggerFactory.getLogger(GenericReader.class);
 
-       public interface EntityFactory{
-               public TaggedLogAPIEntity create();
-       }
-       
-       private Schema schema;
-       private EntityFactory entityFactory;
-       private TaggedLogObjectMapper mapper;
-       
-       public GenericReader(TaggedLogObjectMapper mapper, Schema schema, 
EntityFactory factory){
-               this.mapper = mapper;
-               this.schema = schema;
-               this.entityFactory = factory;
-       }
-       
-       public List<TaggedLogAPIEntity> read(String startTime, 
-                       String endTime, List<String> tagNameValues, 
List<String> outputTags, 
-                       List<String> outputFields, String startRowkey, int 
pageSize) throws Exception{
-               Date start = DateTimeUtil.humanDateToDate(startTime);
-               Date end = DateTimeUtil.humanDateToDate(endTime);
-               
-               // decode the query parameters
-               // TODO should support one tag has multiple tag values
-               Map<String, List<String>> searchTags = new HashMap<String, 
List<String>>();
-               for(String tagNameValue : tagNameValues){
-                       String[] tmp = tagNameValue.split("=");
-                       if(tmp == null || tmp.length <=1){
-                               continue; // silently ignore this parameter
-                       }
-                       List<String> tagValues = searchTags.get(tmp[0]);
-                       if(tagValues == null){
-                               tagValues = new ArrayList<String>();
-                               searchTags.put(tmp[0], tagValues);
-                       }
-                       tagValues.add(tmp[1]);
-               }
-               
-               int numTags = outputTags.size();
-               int numFields = outputFields.size();
-               byte[][] outputQualifiers = new byte[numTags+numFields][];
-               int i = 0;
-               for(String tag : outputTags){
-                       outputQualifiers[i++] = tag.getBytes();
-               }
-               for(String field : outputFields){
-                       outputQualifiers[i++] = field.getBytes();
-               }
-               // shortcut to avoid read when pageSize=0
-               List<TaggedLogAPIEntity> entities = new 
ArrayList<TaggedLogAPIEntity>();
-               if(pageSize <= 0){
-                       return entities; // return empty entities
-               }
+    public interface EntityFactory {
+        public TaggedLogAPIEntity create();
+    }
 
-               HBaseLogReader reader = new HBaseLogReader(schema, start, end, 
searchTags, startRowkey, outputQualifiers);
-               try{
-                       reader.open();
-                       InternalLog log;
-                       int count = 0;
-                       while ((log = reader.read()) != null) {
-                               TaggedLogAPIEntity entity = 
entityFactory.create();
-                               entity.setTags(log.getTags());
-                               entity.setTimestamp(log.getTimestamp());
-                               entity.setEncodedRowkey(log.getEncodedRowkey());
-                               entity.setPrefix(log.getPrefix());
-                               entities.add(entity);
-                               
-                               Map<String, byte[]> qualifierValues = 
log.getQualifierValues();
-                               mapper.populateQualifierValues(entity, 
qualifierValues);
-                               if(++count == pageSize)
-                                       break;
-                       }
-               }catch(IOException ioe){
-                       LOG.error("Fail reading log", ioe);
-                       throw ioe;
-               }finally{
-                       reader.close();
-               }               
-               return entities;
-       }
+    private Schema schema;
+    private EntityFactory entityFactory;
+    private TaggedLogObjectMapper mapper;
+
+    public GenericReader(TaggedLogObjectMapper mapper, Schema schema, 
EntityFactory factory) {
+        this.mapper = mapper;
+        this.schema = schema;
+        this.entityFactory = factory;
+    }
+
+    public List<TaggedLogAPIEntity> read(String startTime, String endTime, 
List<String> tagNameValues,
+                                         List<String> outputTags, List<String> 
outputFields,
+                                         String startRowkey, int pageSize)
+        throws Exception {
+
+        // decode the query parameters
+        // TODO should support one tag has multiple tag values
+        Map<String, List<String>> searchTags = new HashMap<String, 
List<String>>();
+        for (String tagNameValue : tagNameValues) {
+            String[] tmp = tagNameValue.split("=");
+            if (tmp == null || tmp.length <= 1) {
+                continue; // silently ignore this parameter
+            }
+            List<String> tagValues = searchTags.get(tmp[0]);
+            if (tagValues == null) {
+                tagValues = new ArrayList<String>();
+                searchTags.put(tmp[0], tagValues);
+            }
+            tagValues.add(tmp[1]);
+        }
+
+        int numTags = outputTags.size();
+        int numFields = outputFields.size();
+        byte[][] outputQualifiers = new byte[numTags + numFields][];
+        int i = 0;
+        for (String tag : outputTags) {
+            outputQualifiers[i++] = tag.getBytes();
+        }
+        for (String field : outputFields) {
+            outputQualifiers[i++] = field.getBytes();
+        }
+        // shortcut to avoid read when pageSize=0
+        List<TaggedLogAPIEntity> entities = new 
ArrayList<TaggedLogAPIEntity>();
+        if (pageSize <= 0) {
+            return entities; // return empty entities
+        }
+
+        Date start = DateTimeUtil.humanDateToDate(startTime);
+        Date end = DateTimeUtil.humanDateToDate(endTime);
+        HBaseLogReader reader = new HBaseLogReader(schema, start, end, 
searchTags, startRowkey,
+                                                   outputQualifiers);
+        try {
+            reader.open();
+            InternalLog log;
+            int count = 0;
+            while ((log = reader.read()) != null) {
+                TaggedLogAPIEntity entity = entityFactory.create();
+                entity.setTags(log.getTags());
+                entity.setTimestamp(log.getTimestamp());
+                entity.setEncodedRowkey(log.getEncodedRowkey());
+                entity.setPrefix(log.getPrefix());
+                entities.add(entity);
+
+                Map<String, byte[]> qualifierValues = log.getQualifierValues();
+                mapper.populateQualifierValues(entity, qualifierValues);
+                if (++count == pageSize) {
+                    break;
+                }
+            }
+        } catch (IOException ioe) {
+            LOG.error("Fail reading log", ioe);
+            throw ioe;
+        } finally {
+            reader.close();
+        }
+        return entities;
+    }
 }

http://git-wip-us.apache.org/repos/asf/eagle/blob/6e919c2e/eagle-core/eagle-query/eagle-entity-base/src/main/java/org/apache/eagle/log/entity/old/GenericWriter.java
----------------------------------------------------------------------
diff --git 
a/eagle-core/eagle-query/eagle-entity-base/src/main/java/org/apache/eagle/log/entity/old/GenericWriter.java
 
b/eagle-core/eagle-query/eagle-entity-base/src/main/java/org/apache/eagle/log/entity/old/GenericWriter.java
index 3d29237..9f3765a 100644
--- 
a/eagle-core/eagle-query/eagle-entity-base/src/main/java/org/apache/eagle/log/entity/old/GenericWriter.java
+++ 
b/eagle-core/eagle-query/eagle-entity-base/src/main/java/org/apache/eagle/log/entity/old/GenericWriter.java
@@ -32,62 +32,62 @@ import org.slf4j.LoggerFactory;
 import org.apache.eagle.common.EagleBase64Wrapper;
 
 public class GenericWriter {
-       private static final Logger LOG = 
LoggerFactory.getLogger(GenericWriter.class);
+    private static final Logger LOG = 
LoggerFactory.getLogger(GenericWriter.class);
 
-       private String table;
-       private String columnFamily;
-       private TaggedLogObjectMapper mapper;
-       
-       public GenericWriter(TaggedLogObjectMapper mapper, String table, String 
columnFamily){
-               this.mapper = mapper;
-               this.table = table;
-               this.columnFamily = columnFamily;
-       }
-       
-       public List<String> write(List<? extends TaggedLogAPIEntity> entities) 
throws IOException{
-               HBaseLogWriter writer = new HBaseLogWriter(table, columnFamily);
-               List<String> rowkeys = new ArrayList<String>();
-               
-               try{
-                       writer.open();
-                       for(TaggedLogAPIEntity entity : entities){
-                               InternalLog log = new InternalLog();
-                               Map<String, String> inputTags = 
entity.getTags();
-                               Map<String, String> tags = new TreeMap<String, 
String>();
-                               for(Map.Entry<String, String> entry : 
inputTags.entrySet()){
-                                       tags.put(entry.getKey(), 
entry.getValue());
-                               }
-                               log.setTags(tags);
-                               log.setTimestamp(entity.getTimestamp());
-                               log.setPrefix(entity.getPrefix());
-                               
log.setQualifierValues(mapper.createQualifierValues(entity));
-                               byte[] rowkey  = writer.write(log);
-                               
rowkeys.add(EagleBase64Wrapper.encodeByteArray2URLSafeString(rowkey));
-                       }
-               }catch(IOException ioe){
-                       LOG.error("Fail writing tagged log", ioe);
-                       throw ioe;
-               }finally{
-                       writer.close();
-               }
-               return rowkeys;
-       }
-       
-       public void updateByRowkey(List<? extends TaggedLogAPIEntity> entities) 
throws IOException{
-               HBaseLogWriter writer = new HBaseLogWriter(table, columnFamily);
-               try{
-                       writer.open();
-                       for(TaggedLogAPIEntity entity : entities){
-                               byte[] rowkey = 
EagleBase64Wrapper.decode(entity.getEncodedRowkey());
-                               InternalLog log = new InternalLog();
-                               
log.setQualifierValues(mapper.createQualifierValues(entity));
-                               writer.updateByRowkey(rowkey, log);
-                       }
-               }catch(IOException ioe){
-                       LOG.error("Fail writing tagged log", ioe);
-                       throw ioe;
-               }finally{
-                       writer.close();
-               }
-       }
+    private String table;
+    private String columnFamily;
+    private TaggedLogObjectMapper mapper;
+
+    public GenericWriter(TaggedLogObjectMapper mapper, String table, String 
columnFamily) {
+        this.mapper = mapper;
+        this.table = table;
+        this.columnFamily = columnFamily;
+    }
+
+    public List<String> write(List<? extends TaggedLogAPIEntity> entities) 
throws IOException {
+        HBaseLogWriter writer = new HBaseLogWriter(table, columnFamily);
+        List<String> rowkeys = new ArrayList<String>();
+
+        try {
+            writer.open();
+            for (TaggedLogAPIEntity entity : entities) {
+                InternalLog log = new InternalLog();
+                Map<String, String> inputTags = entity.getTags();
+                Map<String, String> tags = new TreeMap<String, String>();
+                for (Map.Entry<String, String> entry : inputTags.entrySet()) {
+                    tags.put(entry.getKey(), entry.getValue());
+                }
+                log.setTags(tags);
+                log.setTimestamp(entity.getTimestamp());
+                log.setPrefix(entity.getPrefix());
+                log.setQualifierValues(mapper.createQualifierValues(entity));
+                byte[] rowkey = writer.write(log);
+                
rowkeys.add(EagleBase64Wrapper.encodeByteArray2URLSafeString(rowkey));
+            }
+        } catch (IOException ioe) {
+            LOG.error("Fail writing tagged log", ioe);
+            throw ioe;
+        } finally {
+            writer.close();
+        }
+        return rowkeys;
+    }
+
+    public void updateByRowkey(List<? extends TaggedLogAPIEntity> entities) 
throws IOException {
+        HBaseLogWriter writer = new HBaseLogWriter(table, columnFamily);
+        try {
+            writer.open();
+            for (TaggedLogAPIEntity entity : entities) {
+                byte[] rowkey = 
EagleBase64Wrapper.decode(entity.getEncodedRowkey());
+                InternalLog log = new InternalLog();
+                log.setQualifierValues(mapper.createQualifierValues(entity));
+                writer.updateByRowkey(rowkey, log);
+            }
+        } catch (IOException ioe) {
+            LOG.error("Fail writing tagged log", ioe);
+            throw ioe;
+        } finally {
+            writer.close();
+        }
+    }
 }

http://git-wip-us.apache.org/repos/asf/eagle/blob/6e919c2e/eagle-core/eagle-query/eagle-entity-base/src/main/java/org/apache/eagle/log/entity/old/HBaseLogByRowkeyReader.java
----------------------------------------------------------------------
diff --git 
a/eagle-core/eagle-query/eagle-entity-base/src/main/java/org/apache/eagle/log/entity/old/HBaseLogByRowkeyReader.java
 
b/eagle-core/eagle-query/eagle-entity-base/src/main/java/org/apache/eagle/log/entity/old/HBaseLogByRowkeyReader.java
index 37e55ac..6b49cf7 100644
--- 
a/eagle-core/eagle-query/eagle-entity-base/src/main/java/org/apache/eagle/log/entity/old/HBaseLogByRowkeyReader.java
+++ 
b/eagle-core/eagle-query/eagle-entity-base/src/main/java/org/apache/eagle/log/entity/old/HBaseLogByRowkeyReader.java
@@ -36,126 +36,129 @@ import org.apache.eagle.common.ByteUtil;
 import org.apache.eagle.common.EagleBase64Wrapper;
 
 /**
- * Get details of rowkey and qualifiers given a raw rowkey. This function 
mostly is used for inspecting one row's content 
- * This only supports single column family, which is mostly used in log 
application 
+ * Get details of rowkey and qualifiers given a raw rowkey. This function 
mostly is used for inspecting one
+ * row's content This only supports single column family, which is mostly used 
in log application
  */
-public class HBaseLogByRowkeyReader implements Closeable{
-       private String table;
-       private String columnFamily;
-       private byte[][] outputQualifiers;
-       private boolean includingAllQualifiers;
-       private HTableInterface tbl;
-       private boolean isOpen;
-       
-       /**
-        * if includingAllQualifiers is true, then the fourth argument 
outputQualifiers is ignored
-        * if includingAllQualifiers is false, then need calculate based on the 
fourth argument outputQualifiers
-        */
-       public HBaseLogByRowkeyReader(String table, String columnFamily, 
boolean includingAllQualifiers, List<String> qualifiers){
-               this.table = table;
-               this.columnFamily = columnFamily;
-               if(qualifiers != null){
-                       this.outputQualifiers = new byte[qualifiers.size()][];
-                       int i = 0;
-                       for(String qualifier : qualifiers){
-                               this.outputQualifiers[i++] = 
qualifier.getBytes();
-                       }
-               }
-               this.includingAllQualifiers = includingAllQualifiers;
-       }
-       
-       
-       public void open() throws IOException {
-               if (isOpen)
-                       return; // silently return
-               try {
-                       tbl = EagleConfigFactory.load().getHTable(this.table);
-               } catch (RuntimeException ex) {
-                       throw new IOException(ex);
-               }
-               
-               isOpen = true;
-       }
+public class HBaseLogByRowkeyReader implements Closeable {
+    private String table;
+    private String columnFamily;
+    private byte[][] outputQualifiers;
+    private boolean includingAllQualifiers;
+    private HTableInterface tbl;
+    private boolean isOpen;
 
-       /**
-        * Here all qualifiers' values goes into qualifierValues of InternalLog 
as given a row, we can't differentiate it's a tag or a field
-        * @param rowkeys
-        * @return
-        * @throws IOException
-        */
-       public List<InternalLog> get(List<byte[]> rowkeys) throws IOException, 
NoSuchRowException {
-               final List<Get> gets = createGets(rowkeys);
-               final Result[] results = tbl.get(gets);
-               final List<InternalLog> logs = new ArrayList<InternalLog>();
-               for (Result result : results) {
-                       final InternalLog log = buildLog(result);
-                       logs.add(log);
-               }
-               return logs;
-       }
-       
-       private List<Get> createGets(List<byte[]> rowkeys) {
-               final List<Get> gets = new ArrayList<Get>();
-               for (byte[] rowkey : rowkeys) {
-                       final Get get = createGet(rowkey);
-                       gets.add(get);
-               }
-               return gets;
-       }
+    /**
+     * if includingAllQualifiers is true, then the fourth argument 
outputQualifiers is ignored if
+     * includingAllQualifiers is false, then need calculate based on the 
fourth argument outputQualifiers
+     */
+    public HBaseLogByRowkeyReader(String table, String columnFamily, boolean 
includingAllQualifiers,
+                                  List<String> qualifiers) {
+        this.table = table;
+        this.columnFamily = columnFamily;
+        if (qualifiers != null) {
+            this.outputQualifiers = new byte[qualifiers.size()][];
+            int i = 0;
+            for (String qualifier : qualifiers) {
+                this.outputQualifiers[i++] = qualifier.getBytes();
+            }
+        }
+        this.includingAllQualifiers = includingAllQualifiers;
+    }
 
+    public void open() throws IOException {
+        if (isOpen) {
+            return; // silently return
+        }
+        try {
+            tbl = EagleConfigFactory.load().getHTable(this.table);
+        } catch (RuntimeException ex) {
+            throw new IOException(ex);
+        }
 
-       private Get createGet(byte[] rowkey) {
-               final Get get = new Get(rowkey);
-               byte[] cf = this.columnFamily.getBytes();
-               if(includingAllQualifiers){
-                       get.addFamily(cf);
-               }else{
-                       for(byte[] outputQualifier : outputQualifiers){
-                               get.addColumn(cf, outputQualifier);
-                       }
-               }
-               return get;
-       }
+        isOpen = true;
+    }
 
+    /**
+     * Here all qualifiers' values goes into qualifierValues of InternalLog as 
given a row, we can't
+     * differentiate it's a tag or a field
+     *
+     * @param rowkeys
+     * @return
+     * @throws IOException
+     */
+    public List<InternalLog> get(List<byte[]> rowkeys) throws IOException, 
NoSuchRowException {
+        final List<Get> gets = createGets(rowkeys);
+        final Result[] results = tbl.get(gets);
+        final List<InternalLog> logs = new ArrayList<InternalLog>();
+        for (Result result : results) {
+            final InternalLog log = buildLog(result);
+            logs.add(log);
+        }
+        return logs;
+    }
 
-       /**
-        * Here all qualifiers' values goes into qualifierValues of InternalLog 
as given a row, we can't differentiate it's a tag or a field
-        * @param rowkey
-        * @return
-        * @throws IOException
-        */
-       public InternalLog get(byte[] rowkey) throws IOException, 
NoSuchRowException{
-               final Get get = createGet(rowkey);
-               final Result result = tbl.get(get);
-               final InternalLog log = buildLog(result);
-               return log;
-       }
-       
-       private InternalLog buildLog(Result result) {
-               final InternalLog log = new InternalLog();
-               final byte[] rowkey = result.getRow();
-               
log.setEncodedRowkey(EagleBase64Wrapper.encodeByteArray2URLSafeString(rowkey));
-               long timestamp = ByteUtil.bytesToLong(rowkey, 4);
-               timestamp = Long.MAX_VALUE - timestamp;
-               log.setTimestamp(timestamp);
-               Map<String, byte[]> qualifierValues = new HashMap<String, 
byte[]>();
-               log.setQualifierValues(qualifierValues);
-               NavigableMap<byte[], byte[]> map = 
result.getFamilyMap(this.columnFamily.getBytes());
-               if(map == null){
-                       throw new 
NoSuchRowException(EagleBase64Wrapper.encodeByteArray2URLSafeString(rowkey));
-               }
-               for(Map.Entry<byte[], byte[]> entry : map.entrySet()){
-                       byte[] qualifier = entry.getKey();
-                       byte[] value = entry.getValue();
-                       qualifierValues.put(new String(qualifier), value);
-               }
-               return log;
-       }
+    /**
+     * Here all qualifiers' values goes into qualifierValues of InternalLog as 
given a row, we can't
+     * differentiate it's a tag or a field
+     *
+     * @param rowkey
+     * @return
+     * @throws IOException
+     */
+    public InternalLog get(byte[] rowkey) throws IOException, 
NoSuchRowException {
+        final Get get = createGet(rowkey);
+        final Result result = tbl.get(get);
+        final InternalLog log = buildLog(result);
+        return log;
+    }
 
+    private List<Get> createGets(List<byte[]> rowkeys) {
+        final List<Get> gets = new ArrayList<Get>();
+        for (byte[] rowkey : rowkeys) {
+            final Get get = createGet(rowkey);
+            gets.add(get);
+        }
+        return gets;
+    }
 
-       public void close() throws IOException {
-               if(tbl != null){
-                       new HTableFactory().releaseHTableInterface(tbl);
-               }
-       }
+    private Get createGet(byte[] rowkey) {
+        final Get get = new Get(rowkey);
+        byte[] cf = this.columnFamily.getBytes();
+        if (includingAllQualifiers) {
+            get.addFamily(cf);
+        } else {
+            for (byte[] outputQualifier : outputQualifiers) {
+                get.addColumn(cf, outputQualifier);
+            }
+        }
+        return get;
+    }
+
+    private InternalLog buildLog(Result result) {
+        final InternalLog log = new InternalLog();
+        final byte[] rowkey = result.getRow();
+        
log.setEncodedRowkey(EagleBase64Wrapper.encodeByteArray2URLSafeString(rowkey));
+        long timestamp = ByteUtil.bytesToLong(rowkey, 4);
+        timestamp = Long.MAX_VALUE - timestamp;
+        log.setTimestamp(timestamp);
+        Map<String, byte[]> qualifierValues = new HashMap<String, byte[]>();
+        log.setQualifierValues(qualifierValues);
+        NavigableMap<byte[], byte[]> map = 
result.getFamilyMap(this.columnFamily.getBytes());
+        if (map == null) {
+            throw new 
NoSuchRowException(EagleBase64Wrapper.encodeByteArray2URLSafeString(rowkey));
+        }
+        for (Map.Entry<byte[], byte[]> entry : map.entrySet()) {
+            byte[] qualifier = entry.getKey();
+            byte[] value = entry.getValue();
+            qualifierValues.put(new String(qualifier), value);
+        }
+        return log;
+    }
+
+    @Override
+    public void close() throws IOException {
+        if (tbl != null) {
+            new HTableFactory().releaseHTableInterface(tbl);
+        }
+    }
 }

http://git-wip-us.apache.org/repos/asf/eagle/blob/6e919c2e/eagle-core/eagle-query/eagle-entity-base/src/main/java/org/apache/eagle/log/entity/old/HBaseLogDeleter.java
----------------------------------------------------------------------
diff --git 
a/eagle-core/eagle-query/eagle-entity-base/src/main/java/org/apache/eagle/log/entity/old/HBaseLogDeleter.java
 
b/eagle-core/eagle-query/eagle-entity-base/src/main/java/org/apache/eagle/log/entity/old/HBaseLogDeleter.java
index 3460949..ae69321 100644
--- 
a/eagle-core/eagle-query/eagle-entity-base/src/main/java/org/apache/eagle/log/entity/old/HBaseLogDeleter.java
+++ 
b/eagle-core/eagle-query/eagle-entity-base/src/main/java/org/apache/eagle/log/entity/old/HBaseLogDeleter.java
@@ -30,102 +30,105 @@ import org.apache.eagle.log.entity.InternalLog;
 import org.apache.eagle.log.entity.meta.EntityDefinition;
 import org.apache.eagle.common.EagleBase64Wrapper;
 
-public class HBaseLogDeleter implements LogDeleter{
-       private HTableInterface tbl;
-       private String table;
-       private String columnFamily;
-       
-       public HBaseLogDeleter(String table, String columnFamily) {
-               this.table = table;
-               this.columnFamily = columnFamily;
-       }
-       
-       @Override
-       public void open() throws IOException {
-               try{
-                       tbl = EagleConfigFactory.load().getHTable(this.table);
-               }catch(RuntimeException ex){
-                       throw new IOException(ex);
-               }
-       }
-
-       @Override
-       public void close() throws IOException {
-               if(tbl != null){
-                       new HTableFactory().releaseHTableInterface(tbl);
-               }
-       }
-       
-       @Override
-       public void flush() throws IOException{
-               throw new IllegalArgumentException("Not supported flush for 
hbase delete");
-       }
-       
-       /**
-        * support delete by constructing a rowkey or by encoded rowkey passed 
from client
-        */
-       @Override
-       public void delete(InternalLog log) throws IOException{
-               final byte[] rowkey = RowkeyHelper.getRowkey(log);
-               final Delete delete = createDelete(rowkey);
-               tbl.delete(delete);
-       }
-       
-       public void delete(TaggedLogAPIEntity entity, EntityDefinition 
entityDef) throws Exception {
-               final byte[] rowkey = RowkeyHelper.getRowkey(entity, entityDef);
-               final Delete delete = createDelete(rowkey);
-               tbl.delete(delete);
-       }
-       
-       /**
-        * Batch delete
-        * @param logs
-        * @throws IOException
-        */
-       public void delete(List<InternalLog> logs) throws IOException{
-               final List<byte[]> rowkeys = 
RowkeyHelper.getRowkeysByLogs(logs);
-               deleteRowkeys(rowkeys);
-       }
-
-
-       /**
-        * Batch delete
-        * @throws Exception
-        */
-       public void deleteEntities(List<? extends TaggedLogAPIEntity> entities, 
EntityDefinition entityDef) throws Exception{
-               final List<byte[]> rowkeys = 
RowkeyHelper.getRowkeysByEntities(entities, entityDef);
-               deleteRowkeys(rowkeys);
-       }
-       
-       /**
-        * Batch delete
-        * @throws IOException
-        */
-       public void deleteRowkeys(List<byte[]> rowkeys) throws IOException {
-               final List<Delete> deletes = new 
ArrayList<Delete>(rowkeys.size());
-               for (byte[] rowkey : rowkeys) {
-                       final Delete delete = createDelete(rowkey);
-                       deletes.add(delete);
-               }
-               tbl.delete(deletes);
-       }
-       
-       @Override
-       public void deleteRowByRowkey(String encodedRowkey) throws IOException{
-               byte[] row = EagleBase64Wrapper.decode(encodedRowkey);
-               final Delete delete = createDelete(row);
-               tbl.delete(delete);
-       }
-
-       public void deleteRowByRowkey(List<String> encodedRowkeys) throws 
IOException {
-               final List<byte[]> rowkeys = 
RowkeyHelper.getRowkeysByEncodedRowkeys(encodedRowkeys);
-               deleteRowkeys(rowkeys);
-       }
-       
-       private Delete createDelete(byte[] row) throws IOException{
-               Delete delete = new Delete(row);
-               delete.deleteFamily(columnFamily.getBytes());
-               return delete;
-       }
+public class HBaseLogDeleter implements LogDeleter {
+    private HTableInterface tbl;
+    private String table;
+    private String columnFamily;
+
+    public HBaseLogDeleter(String table, String columnFamily) {
+        this.table = table;
+        this.columnFamily = columnFamily;
+    }
+
+    @Override
+    public void open() throws IOException {
+        try {
+            tbl = EagleConfigFactory.load().getHTable(this.table);
+        } catch (RuntimeException ex) {
+            throw new IOException(ex);
+        }
+    }
+
+    @Override
+    public void close() throws IOException {
+        if (tbl != null) {
+            new HTableFactory().releaseHTableInterface(tbl);
+        }
+    }
+
+    @Override
+    public void flush() throws IOException {
+        throw new IllegalArgumentException("Not supported flush for hbase 
delete");
+    }
+
+    /**
+     * support delete by constructing a rowkey or by encoded rowkey passed 
from client.
+     */
+    @Override
+    public void delete(InternalLog log) throws IOException {
+        final byte[] rowkey = RowkeyHelper.getRowkey(log);
+        final Delete delete = createDelete(rowkey);
+        tbl.delete(delete);
+    }
+
+    public void delete(TaggedLogAPIEntity entity, EntityDefinition entityDef) 
throws Exception {
+        final byte[] rowkey = RowkeyHelper.getRowkey(entity, entityDef);
+        final Delete delete = createDelete(rowkey);
+        tbl.delete(delete);
+    }
+
+    /**
+     * Batch delete.
+     *
+     * @param logs
+     * @throws IOException
+     */
+    public void delete(List<InternalLog> logs) throws IOException {
+        final List<byte[]> rowkeys = RowkeyHelper.getRowkeysByLogs(logs);
+        deleteRowkeys(rowkeys);
+    }
+
+    /**
+     * Batch delete.
+     *
+     * @throws Exception
+     */
+    public void deleteEntities(List<? extends TaggedLogAPIEntity> entities, 
EntityDefinition entityDef)
+        throws Exception {
+        final List<byte[]> rowkeys = 
RowkeyHelper.getRowkeysByEntities(entities, entityDef);
+        deleteRowkeys(rowkeys);
+    }
+
+    /**
+     * Batch delete.
+     *
+     * @throws IOException
+     */
+    public void deleteRowkeys(List<byte[]> rowkeys) throws IOException {
+        final List<Delete> deletes = new ArrayList<Delete>(rowkeys.size());
+        for (byte[] rowkey : rowkeys) {
+            final Delete delete = createDelete(rowkey);
+            deletes.add(delete);
+        }
+        tbl.delete(deletes);
+    }
+
+    @Override
+    public void deleteRowByRowkey(String encodedRowkey) throws IOException {
+        byte[] row = EagleBase64Wrapper.decode(encodedRowkey);
+        final Delete delete = createDelete(row);
+        tbl.delete(delete);
+    }
+
+    public void deleteRowByRowkey(List<String> encodedRowkeys) throws 
IOException {
+        final List<byte[]> rowkeys = 
RowkeyHelper.getRowkeysByEncodedRowkeys(encodedRowkeys);
+        deleteRowkeys(rowkeys);
+    }
+
+    private Delete createDelete(byte[] row) throws IOException {
+        Delete delete = new Delete(row);
+        delete.deleteFamily(columnFamily.getBytes());
+        return delete;
+    }
 
 }

Reply via email to