http://git-wip-us.apache.org/repos/asf/eagle/blob/6e919c2e/eagle-core/eagle-query/eagle-entity-base/src/main/java/org/apache/eagle/log/entity/GenericEntityStreamReader.java
----------------------------------------------------------------------
diff --git 
a/eagle-core/eagle-query/eagle-entity-base/src/main/java/org/apache/eagle/log/entity/GenericEntityStreamReader.java
 
b/eagle-core/eagle-query/eagle-entity-base/src/main/java/org/apache/eagle/log/entity/GenericEntityStreamReader.java
index c3d916e..6dfe27d 100755
--- 
a/eagle-core/eagle-query/eagle-entity-base/src/main/java/org/apache/eagle/log/entity/GenericEntityStreamReader.java
+++ 
b/eagle-core/eagle-query/eagle-entity-base/src/main/java/org/apache/eagle/log/entity/GenericEntityStreamReader.java
@@ -29,97 +29,103 @@ import java.util.ArrayList;
 import java.util.List;
 
 public class GenericEntityStreamReader extends StreamReader {
-       private static final Logger LOG = 
LoggerFactory.getLogger(GenericEntityStreamReader.class);
-       
-       private EntityDefinition entityDef;
-       private SearchCondition condition;
-       private String prefix;
-       private StreamReader readerAfterPlan;
-
-       public GenericEntityStreamReader(String serviceName, SearchCondition 
condition) throws InstantiationException, IllegalAccessException{
-               this(serviceName, condition, null);
-       }
-
-       public GenericEntityStreamReader(EntityDefinition entityDef, 
SearchCondition condition) throws InstantiationException, 
IllegalAccessException{
-               this(entityDef, condition, entityDef.getPrefix());
-       }
-       
-       public GenericEntityStreamReader(String serviceName, SearchCondition 
condition, String prefix) throws InstantiationException, IllegalAccessException{
-               this.prefix = prefix;
-               checkNotNull(serviceName, "serviceName");
-               this.entityDef = 
EntityDefinitionManager.getEntityByServiceName(serviceName);
-               checkNotNull(entityDef, "EntityDefinition");
-               this.condition = condition;
-               this.readerAfterPlan = selectQueryReader();
-       }
-
-       public GenericEntityStreamReader(EntityDefinition entityDef, 
SearchCondition condition, String prefix) throws InstantiationException, 
IllegalAccessException{
-               this.prefix = prefix;
-               checkNotNull(entityDef, "entityDef");
-               this.entityDef = entityDef;
-               checkNotNull(entityDef, "EntityDefinition");
-               this.condition = condition;
-               this.readerAfterPlan = selectQueryReader();
-       }
-
-       private void checkNotNull(Object o, String message){
-               if(o == null){
-                       throw new IllegalArgumentException(message + " should 
not be null");
-               }
-       }
-       
-       public EntityDefinition getEntityDefinition() {
-               return entityDef;
-       }
-       
-       public SearchCondition getSearchCondition() {
-               return condition;
-       }
-       
-       @Override
-       public void readAsStream() throws Exception{
-               readerAfterPlan._listeners.addAll(this._listeners);
-               readerAfterPlan.readAsStream();
-       }
-       
-       private StreamReader selectQueryReader() throws InstantiationException, 
IllegalAccessException {
-               final ORExpression query = condition.getQueryExpression();
-               IndexDefinition[] indexDefs = entityDef.getIndexes();
+    private static final Logger LOG = 
LoggerFactory.getLogger(GenericEntityStreamReader.class);
+
+    private EntityDefinition entityDef;
+    private SearchCondition condition;
+    private String prefix;
+    private StreamReader readerAfterPlan;
+
+    public GenericEntityStreamReader(String serviceName, SearchCondition 
condition)
+        throws InstantiationException, IllegalAccessException {
+        this(serviceName, condition, null);
+    }
+
+    public GenericEntityStreamReader(EntityDefinition entityDef, 
SearchCondition condition)
+        throws InstantiationException, IllegalAccessException {
+        this(entityDef, condition, entityDef.getPrefix());
+    }
+
+    public GenericEntityStreamReader(String serviceName, SearchCondition 
condition, String prefix)
+        throws InstantiationException, IllegalAccessException {
+        this.prefix = prefix;
+        checkNotNull(serviceName, "serviceName");
+        this.entityDef = 
EntityDefinitionManager.getEntityByServiceName(serviceName);
+        checkNotNull(entityDef, "EntityDefinition");
+        this.condition = condition;
+        this.readerAfterPlan = selectQueryReader();
+    }
+
+    public GenericEntityStreamReader(EntityDefinition entityDef, 
SearchCondition condition, String prefix)
+        throws InstantiationException, IllegalAccessException {
+        this.prefix = prefix;
+        checkNotNull(entityDef, "entityDef");
+        this.entityDef = entityDef;
+        checkNotNull(entityDef, "EntityDefinition");
+        this.condition = condition;
+        this.readerAfterPlan = selectQueryReader();
+    }
+
+    private void checkNotNull(Object o, String message) {
+        if (o == null) {
+            throw new IllegalArgumentException(message + " should not be 
null");
+        }
+    }
+
+    public EntityDefinition getEntityDefinition() {
+        return entityDef;
+    }
+
+    public SearchCondition getSearchCondition() {
+        return condition;
+    }
+
+    @Override
+    public void readAsStream() throws Exception {
+        readerAfterPlan.listeners.addAll(this.listeners);
+        readerAfterPlan.readAsStream();
+    }
+
+    private StreamReader selectQueryReader() throws InstantiationException, 
IllegalAccessException {
+        final ORExpression query = condition.getQueryExpression();
+        IndexDefinition[] indexDefs = entityDef.getIndexes();
 
         // Index just works with query condition
-               if (indexDefs != null && condition.getQueryExpression()!=null) {
-                       List<byte[]> rowkeys = new ArrayList<>();
-                       for (IndexDefinition index : indexDefs) {
-                               // Check unique index first
-                               if (index.isUnique()) {
-                                       final IndexDefinition.IndexType type = 
index.canGoThroughIndex(query, rowkeys);
-                                       if 
(!IndexDefinition.IndexType.NON_INDEX.equals(type)) {
-                                               LOG.info("Selectd query unique 
index " + index.getIndexName() + " for query: " + 
condition.getQueryExpression());
-                                               return new 
UniqueIndexStreamReader(index, condition, rowkeys);
-                                       }
-                               }
-                       }
-                       for (IndexDefinition index : indexDefs) {
-                               // Check non-clustered index
-                               if (!index.isUnique()) {
-                                       final IndexDefinition.IndexType type = 
index.canGoThroughIndex(query, rowkeys);
-                                       if 
(!IndexDefinition.IndexType.NON_INDEX.equals(type)) {
-                                               LOG.info("Selectd query non 
clustered index " + index.getIndexName() + " for query: " + 
condition.getQueryExpression().toString());
-                                               return new 
NonClusteredIndexStreamReader(index, condition, rowkeys);
-                                       }
-                               }
-                       }
-               }
-               return new GenericEntityScanStreamReader(entityDef, condition, 
this.prefix);
-       }
-
-       @Override
-       public long getLastTimestamp() {
-               return readerAfterPlan.getLastTimestamp();
-       }
-
-       @Override
-       public long getFirstTimestamp() {
-               return readerAfterPlan.getFirstTimestamp();
-       }
+        if (indexDefs != null && condition.getQueryExpression() != null) {
+            List<byte[]> rowkeys = new ArrayList<>();
+            for (IndexDefinition index : indexDefs) {
+                // Check unique index first
+                if (index.isUnique()) {
+                    final IndexDefinition.IndexType type = 
index.canGoThroughIndex(query, rowkeys);
+                    if (!IndexDefinition.IndexType.NON_INDEX.equals(type)) {
+                        LOG.info("Selectd query unique index " + 
index.getIndexName() + " for query: "
+                                 + condition.getQueryExpression());
+                        return new UniqueIndexStreamReader(index, condition, 
rowkeys);
+                    }
+                }
+            }
+            for (IndexDefinition index : indexDefs) {
+                // Check non-clustered index
+                if (!index.isUnique()) {
+                    final IndexDefinition.IndexType type = 
index.canGoThroughIndex(query, rowkeys);
+                    if (!IndexDefinition.IndexType.NON_INDEX.equals(type)) {
+                        LOG.info("Selectd query non clustered index " + 
index.getIndexName() + " for query: "
+                                 + condition.getQueryExpression().toString());
+                        return new NonClusteredIndexStreamReader(index, 
condition, rowkeys);
+                    }
+                }
+            }
+        }
+        return new GenericEntityScanStreamReader(entityDef, condition, 
this.prefix);
+    }
+
+    @Override
+    public long getLastTimestamp() {
+        return readerAfterPlan.getLastTimestamp();
+    }
+
+    @Override
+    public long getFirstTimestamp() {
+        return readerAfterPlan.getFirstTimestamp();
+    }
 }

http://git-wip-us.apache.org/repos/asf/eagle/blob/6e919c2e/eagle-core/eagle-query/eagle-entity-base/src/main/java/org/apache/eagle/log/entity/GenericEntityStreamReaderMT.java
----------------------------------------------------------------------
diff --git 
a/eagle-core/eagle-query/eagle-entity-base/src/main/java/org/apache/eagle/log/entity/GenericEntityStreamReaderMT.java
 
b/eagle-core/eagle-query/eagle-entity-base/src/main/java/org/apache/eagle/log/entity/GenericEntityStreamReaderMT.java
index bf72a36..15bdd20 100755
--- 
a/eagle-core/eagle-query/eagle-entity-base/src/main/java/org/apache/eagle/log/entity/GenericEntityStreamReaderMT.java
+++ 
b/eagle-core/eagle-query/eagle-entity-base/src/main/java/org/apache/eagle/log/entity/GenericEntityStreamReaderMT.java
@@ -31,121 +31,124 @@ import org.apache.eagle.common.DateTimeUtil;
 
 /**
  * multi-threading stream readers which only applies to time-series entity 
where we split the query into
- * different time range
- * 
- * When this class is used together with list query or aggregate query, be 
aware that the query's behavior could
- * be changed for example pageSize does not work well, output sequence is not 
determined
+ * different time range When this class is used together with list query or 
aggregate query, be aware that the
+ * query's behavior could be changed for example pageSize does not work well, 
output sequence is not
+ * determined
  */
-public class GenericEntityStreamReaderMT extends StreamReader{
-       private static final Logger LOG = 
LoggerFactory.getLogger(GenericEntityStreamReaderMT.class);
-       private List<GenericEntityStreamReader> readers = new 
ArrayList<GenericEntityStreamReader>(); 
-       
-       public GenericEntityStreamReaderMT(String serviceName, SearchCondition 
condition, int numThreads) throws Exception{
-               checkIsTimeSeries(serviceName);
-               checkNumThreads(numThreads);
-               long queryStartTime = condition.getStartTime();
-               long queryEndTime = condition.getEndTime();
-               long subStartTime = queryStartTime;
-               long subEndTime = 0;
-               long interval = (queryEndTime-queryStartTime) / numThreads;
-               for(int i=0; i<numThreads; i++){
-                       // split search condition by time range
-                       subStartTime = queryStartTime + i*interval;
-                       if(i == numThreads-1){
-                               subEndTime = queryEndTime;
-                       }else{
-                               subEndTime = subStartTime + interval;
-                       }
-                       //String strStartTime = 
DateTimeUtil.millisecondsToHumanDateWithSeconds(subStartTime);
-                       //String strEndTime = 
DateTimeUtil.millisecondsToHumanDateWithSeconds(subEndTime);
-                       SearchCondition sc = new SearchCondition(condition);
-                       sc.setStartTime(subStartTime);
-                       sc.setEndTime(subEndTime);
-                       GenericEntityStreamReader reader = new 
GenericEntityStreamReader(serviceName, sc);
-                       readers.add(reader);
-               }
-       }
-       
-       private void checkIsTimeSeries(String serviceName) throws Exception{
-               EntityDefinition ed = 
EntityDefinitionManager.getEntityByServiceName(serviceName);
-               if(!ed.isTimeSeries()){
-                       throw new IllegalArgumentException("Multi-threading 
stream reader must be applied to time series table");
-               }
-       }
-       
-       private void checkNumThreads(int numThreads){
-               if(numThreads <= 0){
-                       throw new IllegalArgumentException("Multi-threading 
stream reader must have numThreads >= 1");
-               }
-       }
-       
-       /**
-        * default to 2 threads
-        * @param serviceName
-        * @param condition
-        */
-       public GenericEntityStreamReaderMT(String serviceName, SearchCondition 
condition) throws Exception{
-               this(serviceName, condition, 2);
-       }
-       
-       @Override
-       public void readAsStream() throws Exception{
-               // populate listeners to all readers
-               for(EntityCreationListener l : _listeners){
-                       for(GenericEntityStreamReader r : readers){
-                               r.register(l);
-                       }
-               }
-
-               List<Future<Void>> futures = new ArrayList<Future<Void>>();
-               for(GenericEntityStreamReader r : readers){
-                       SingleReader reader = new SingleReader(r);
-                       Future<Void> readFuture = 
EagleConfigFactory.load().getExecutor().submit(reader);
-                       futures.add(readFuture);
-               }
-               
-               // join threads and check exceptions
-               for(Future<Void> future : futures){
-                       try{
-                               future.get();
-                       }catch(Exception ex){
-                               LOG.error("Error in read", ex);
-                               throw ex;
-                       }
-               }
-       }
-       
-       private static class SingleReader implements Callable<Void>{
-               private GenericEntityStreamReader reader;
-               public SingleReader(GenericEntityStreamReader reader){
-                       this.reader = reader;
-               }
-               @Override
-               public Void call() throws Exception{
-                       reader.readAsStream();
-                       return null;
-               }
-       }
-
-       @Override
-       public long getLastTimestamp() {
-               long lastTimestamp = 0;
-               for (GenericEntityStreamReader reader : readers) {
-                       if (lastTimestamp < reader.getLastTimestamp()) {
-                               lastTimestamp = reader.getLastTimestamp();
-                       }
-               }
-               return lastTimestamp;
-       }
-
-       @Override
-       public long getFirstTimestamp() {
-               long firstTimestamp = 0;
-               for (GenericEntityStreamReader reader : readers) {
-                       if (firstTimestamp > reader.getLastTimestamp() || 
firstTimestamp == 0) {
-                               firstTimestamp = reader.getLastTimestamp();
-                       }
-               }
-               return firstTimestamp;
-       }
+public class GenericEntityStreamReaderMT extends StreamReader {
+    private static final Logger LOG = 
LoggerFactory.getLogger(GenericEntityStreamReaderMT.class);
+    private List<GenericEntityStreamReader> readers = new 
ArrayList<GenericEntityStreamReader>();
+
+    public GenericEntityStreamReaderMT(String serviceName, SearchCondition 
condition, int numThreads)
+        throws Exception {
+        checkIsTimeSeries(serviceName);
+        checkNumThreads(numThreads);
+        long queryStartTime = condition.getStartTime();
+        long queryEndTime = condition.getEndTime();
+        long subStartTime = queryStartTime;
+        long subEndTime = 0;
+        long interval = (queryEndTime - queryStartTime) / numThreads;
+        for (int i = 0; i < numThreads; i++) {
+            // split search condition by time range
+            subStartTime = queryStartTime + i * interval;
+            if (i == numThreads - 1) {
+                subEndTime = queryEndTime;
+            } else {
+                subEndTime = subStartTime + interval;
+            }
+            // String strStartTime = 
DateTimeUtil.millisecondsToHumanDateWithSeconds(subStartTime);
+            // String strEndTime = 
DateTimeUtil.millisecondsToHumanDateWithSeconds(subEndTime);
+            SearchCondition sc = new SearchCondition(condition);
+            sc.setStartTime(subStartTime);
+            sc.setEndTime(subEndTime);
+            GenericEntityStreamReader reader = new 
GenericEntityStreamReader(serviceName, sc);
+            readers.add(reader);
+        }
+    }
+
+    private void checkIsTimeSeries(String serviceName) throws Exception {
+        EntityDefinition ed = 
EntityDefinitionManager.getEntityByServiceName(serviceName);
+        if (!ed.isTimeSeries()) {
+            throw new IllegalArgumentException("Multi-threading stream reader 
must be applied to time series table");
+        }
+    }
+
+    private void checkNumThreads(int numThreads) {
+        if (numThreads <= 0) {
+            throw new IllegalArgumentException("Multi-threading stream reader 
must have numThreads >= 1");
+        }
+    }
+
+    /**
+     * default to 2 threads
+     *
+     * @param serviceName
+     * @param condition
+     */
+    public GenericEntityStreamReaderMT(String serviceName, SearchCondition 
condition) throws Exception {
+        this(serviceName, condition, 2);
+    }
+
+    @Override
+    public void readAsStream() throws Exception {
+        // populate listeners to all readers
+        for (EntityCreationListener l : listeners) {
+            for (GenericEntityStreamReader r : readers) {
+                r.register(l);
+            }
+        }
+
+        List<Future<Void>> futures = new ArrayList<Future<Void>>();
+        for (GenericEntityStreamReader r : readers) {
+            SingleReader reader = new SingleReader(r);
+            Future<Void> readFuture = 
EagleConfigFactory.load().getExecutor().submit(reader);
+            futures.add(readFuture);
+        }
+
+        // join threads and check exceptions
+        for (Future<Void> future : futures) {
+            try {
+                future.get();
+            } catch (Exception ex) {
+                LOG.error("Error in read", ex);
+                throw ex;
+            }
+        }
+    }
+
+    private static class SingleReader implements Callable<Void> {
+        private GenericEntityStreamReader reader;
+
+        public SingleReader(GenericEntityStreamReader reader) {
+            this.reader = reader;
+        }
+
+        @Override
+        public Void call() throws Exception {
+            reader.readAsStream();
+            return null;
+        }
+    }
+
+    @Override
+    public long getLastTimestamp() {
+        long lastTimestamp = 0;
+        for (GenericEntityStreamReader reader : readers) {
+            if (lastTimestamp < reader.getLastTimestamp()) {
+                lastTimestamp = reader.getLastTimestamp();
+            }
+        }
+        return lastTimestamp;
+    }
+
+    @Override
+    public long getFirstTimestamp() {
+        long firstTimestamp = 0;
+        for (GenericEntityStreamReader reader : readers) {
+            if (firstTimestamp > reader.getLastTimestamp() || firstTimestamp 
== 0) {
+                firstTimestamp = reader.getLastTimestamp();
+            }
+        }
+        return firstTimestamp;
+    }
 }

http://git-wip-us.apache.org/repos/asf/eagle/blob/6e919c2e/eagle-core/eagle-query/eagle-entity-base/src/main/java/org/apache/eagle/log/entity/GenericEntityWriter.java
----------------------------------------------------------------------
diff --git 
a/eagle-core/eagle-query/eagle-entity-base/src/main/java/org/apache/eagle/log/entity/GenericEntityWriter.java
 
b/eagle-core/eagle-query/eagle-entity-base/src/main/java/org/apache/eagle/log/entity/GenericEntityWriter.java
index 5c8b12d..926fcba 100755
--- 
a/eagle-core/eagle-query/eagle-entity-base/src/main/java/org/apache/eagle/log/entity/GenericEntityWriter.java
+++ 
b/eagle-core/eagle-query/eagle-entity-base/src/main/java/org/apache/eagle/log/entity/GenericEntityWriter.java
@@ -27,52 +27,53 @@ import java.util.ArrayList;
 import java.util.List;
 
 public class GenericEntityWriter {
-       private static final Logger LOG = 
LoggerFactory.getLogger(GenericEntityWriter.class);
-       private EntityDefinition entityDef;
+    private static final Logger LOG = 
LoggerFactory.getLogger(GenericEntityWriter.class);
+    private EntityDefinition entityDef;
 
-       public GenericEntityWriter(String serviceName) throws 
InstantiationException, IllegalAccessException{
-               this.entityDef = 
EntityDefinitionManager.getEntityByServiceName(serviceName);
-               checkNotNull(entityDef, "serviceName");
-       }
+    public GenericEntityWriter(String serviceName) throws 
InstantiationException, IllegalAccessException {
+        this.entityDef = 
EntityDefinitionManager.getEntityByServiceName(serviceName);
+        checkNotNull(entityDef, "serviceName");
+    }
 
-       public GenericEntityWriter(EntityDefinition entityDef) throws 
InstantiationException, IllegalAccessException{
-               this.entityDef = entityDef;
-               checkNotNull(entityDef, "serviceName");
-       }
-       
-       private void checkNotNull(Object o, String message) {
-               if(o == null){
-                       throw new IllegalArgumentException(message + " should 
not be null");
-               }
-       }
+    public GenericEntityWriter(EntityDefinition entityDef)
+        throws InstantiationException, IllegalAccessException {
+        this.entityDef = entityDef;
+        checkNotNull(entityDef, "serviceName");
+    }
 
-       /**
-        * @param entities
-        * @return row keys
-        * @throws Exception
-        */
-       public List<String> write(List<? extends TaggedLogAPIEntity> entities) 
throws Exception{
-               HBaseLogWriter writer = new 
HBaseLogWriter(entityDef.getTable(), entityDef.getColumnFamily());
-               List<String> rowkeys = new ArrayList<String>(entities.size());
-               List<InternalLog> logs = new 
ArrayList<InternalLog>(entities.size());
-               
-               try{
-                       writer.open();
-                       for(TaggedLogAPIEntity entity : entities){
-                               final InternalLog entityLog = 
HBaseInternalLogHelper.convertToInternalLog(entity, entityDef);
-                               logs.add(entityLog);
-                       }
-                       List<byte[]> bRowkeys  = writer.write(logs);
-                       for (byte[] rowkey : bRowkeys) {
-                               
rowkeys.add(EagleBase64Wrapper.encodeByteArray2URLSafeString(rowkey));
-                       }
+    private void checkNotNull(Object o, String message) {
+        if (o == null) {
+            throw new IllegalArgumentException(message + " should not be 
null");
+        }
+    }
 
-               }catch(Exception ex){
-                       LOG.error("fail writing tagged log", ex);
-                       throw ex;
-               }finally{
-                       writer.close();
-               }
-               return rowkeys;
-       }
+    /**
+     * @param entities
+     * @return row keys
+     * @throws Exception
+     */
+    public List<String> write(List<? extends TaggedLogAPIEntity> entities) 
throws Exception {
+        HBaseLogWriter writer = new HBaseLogWriter(entityDef.getTable(), 
entityDef.getColumnFamily());
+        List<String> rowkeys = new ArrayList<String>(entities.size());
+        List<InternalLog> logs = new ArrayList<InternalLog>(entities.size());
+
+        try {
+            writer.open();
+            for (TaggedLogAPIEntity entity : entities) {
+                final InternalLog entityLog = 
HBaseInternalLogHelper.convertToInternalLog(entity, entityDef);
+                logs.add(entityLog);
+            }
+            List<byte[]> bRowkeys = writer.write(logs);
+            for (byte[] rowkey : bRowkeys) {
+                
rowkeys.add(EagleBase64Wrapper.encodeByteArray2URLSafeString(rowkey));
+            }
+
+        } catch (Exception ex) {
+            LOG.error("fail writing tagged log", ex);
+            throw ex;
+        } finally {
+            writer.close();
+        }
+        return rowkeys;
+    }
 }

http://git-wip-us.apache.org/repos/asf/eagle/blob/6e919c2e/eagle-core/eagle-query/eagle-entity-base/src/main/java/org/apache/eagle/log/entity/GenericMetricEntity.java
----------------------------------------------------------------------
diff --git 
a/eagle-core/eagle-query/eagle-entity-base/src/main/java/org/apache/eagle/log/entity/GenericMetricEntity.java
 
b/eagle-core/eagle-query/eagle-entity-base/src/main/java/org/apache/eagle/log/entity/GenericMetricEntity.java
index 9f6937b..56cd453 100755
--- 
a/eagle-core/eagle-query/eagle-entity-base/src/main/java/org/apache/eagle/log/entity/GenericMetricEntity.java
+++ 
b/eagle-core/eagle-query/eagle-entity-base/src/main/java/org/apache/eagle/log/entity/GenericMetricEntity.java
@@ -21,33 +21,35 @@ import org.apache.eagle.log.entity.meta.*;
 import com.fasterxml.jackson.databind.annotation.JsonSerialize;
 
 /**
- * GenericMetricEntity should use prefix field which is extended from 
TaggedLogAPIEntity as metric name
- * metric name is used to partition the metric tables
+ * GenericMetricEntity should use prefix field which is extended from 
TaggedLogAPIEntity as metric name metric
+ * name is used to partition the metric tables
  */
-@JsonSerialize(include=JsonSerialize.Inclusion.NON_NULL)
+@JsonSerialize(include = JsonSerialize.Inclusion.NON_NULL)
 @Table("eagle_metric")
 @ColumnFamily("f")
 @Prefix(GenericMetricEntity.GENERIC_METRIC_PREFIX_PLACE_HOLDER)
 @Service(GenericMetricEntity.GENERIC_METRIC_SERVICE)
 @TimeSeries(true)
-@Metric(interval=60000)
+@Metric(interval = 60000)
 @ServicePath(path = "/metric")
 // TODO:
-@Tags({"site","application","policyId","alertExecutorId", 
"streamName","source","partitionSeq"})
+@Tags({
+       "site", "application", "policyId", "alertExecutorId", "streamName", 
"source", "partitionSeq"
+    })
 public class GenericMetricEntity extends TaggedLogAPIEntity {
-       public static final String GENERIC_METRIC_SERVICE = 
"GenericMetricService";
-       public static final String GENERIC_METRIC_PREFIX_PLACE_HOLDER = 
"GENERIC_METRIC_PREFIX_PLACEHODLER";
-       public static final String VALUE_FIELD ="value";
+    public static final String GENERIC_METRIC_SERVICE = "GenericMetricService";
+    public static final String GENERIC_METRIC_PREFIX_PLACE_HOLDER = 
"GENERIC_METRIC_PREFIX_PLACEHODLER";
+    public static final String VALUE_FIELD = "value";
 
-       @Column("a")
-       private double[] value;
+    @Column("a")
+    private double[] value;
 
-       public double[] getValue() {
-               return value;
-       }
+    public double[] getValue() {
+        return value;
+    }
 
-       public void setValue(double[] value) {
-               this.value = value;
-               pcs.firePropertyChange("value", null, null);
-       }
-}
\ No newline at end of file
+    public void setValue(double[] value) {
+        this.value = value;
+        pcs.firePropertyChange("value", null, null);
+    }
+}

http://git-wip-us.apache.org/repos/asf/eagle/blob/6e919c2e/eagle-core/eagle-query/eagle-entity-base/src/main/java/org/apache/eagle/log/entity/GenericMetricEntityBatchReader.java
----------------------------------------------------------------------
diff --git 
a/eagle-core/eagle-query/eagle-entity-base/src/main/java/org/apache/eagle/log/entity/GenericMetricEntityBatchReader.java
 
b/eagle-core/eagle-query/eagle-entity-base/src/main/java/org/apache/eagle/log/entity/GenericMetricEntityBatchReader.java
index 84b02ae..bc99a81 100755
--- 
a/eagle-core/eagle-query/eagle-entity-base/src/main/java/org/apache/eagle/log/entity/GenericMetricEntityBatchReader.java
+++ 
b/eagle-core/eagle-query/eagle-entity-base/src/main/java/org/apache/eagle/log/entity/GenericMetricEntityBatchReader.java
@@ -23,32 +23,37 @@ import org.slf4j.LoggerFactory;
 import java.util.ArrayList;
 import java.util.List;
 
-public class GenericMetricEntityBatchReader  implements EntityCreationListener{
-       private static final Logger LOG = 
LoggerFactory.getLogger(GenericEntityBatchReader.class);
-       
-       private List<TaggedLogAPIEntity> entities = new 
ArrayList<TaggedLogAPIEntity>();
-       private GenericEntityStreamReader reader;
-       
-       public GenericMetricEntityBatchReader(String metricName, 
SearchCondition condition) throws Exception{
-               reader = new 
GenericEntityStreamReader(GenericMetricEntity.GENERIC_METRIC_SERVICE, 
condition, metricName);
-       }
-       
-       public long getLastTimestamp() {
-               return reader.getLastTimestamp();
-       }
-       public long getFirstTimestamp() {
-               return reader.getFirstTimestamp();
-       }
-       @Override
-       public void entityCreated(TaggedLogAPIEntity entity){
-               entities.add(entity);
-       }
-       
-       @SuppressWarnings("unchecked")
-       public <T> List<T> read() throws Exception{
-               if(LOG.isDebugEnabled()) LOG.debug("Start reading as batch 
mode");
-               reader.register(this);
-               reader.readAsStream();
-               return (List<T>)entities;
-       }
+public class GenericMetricEntityBatchReader implements EntityCreationListener {
+    private static final Logger LOG = 
LoggerFactory.getLogger(GenericEntityBatchReader.class);
+
+    private List<TaggedLogAPIEntity> entities = new 
ArrayList<TaggedLogAPIEntity>();
+    private GenericEntityStreamReader reader;
+
+    public GenericMetricEntityBatchReader(String metricName, SearchCondition 
condition) throws Exception {
+        reader = new 
GenericEntityStreamReader(GenericMetricEntity.GENERIC_METRIC_SERVICE, condition,
+                                               metricName);
+    }
+
+    public long getLastTimestamp() {
+        return reader.getLastTimestamp();
+    }
+
+    public long getFirstTimestamp() {
+        return reader.getFirstTimestamp();
+    }
+
+    @Override
+    public void entityCreated(TaggedLogAPIEntity entity) {
+        entities.add(entity);
+    }
+
+    @SuppressWarnings("unchecked")
+    public <T> List<T> read() throws Exception {
+        if (LOG.isDebugEnabled()) {
+            LOG.debug("Start reading as batch mode");
+        }
+        reader.register(this);
+        reader.readAsStream();
+        return (List<T>)entities;
+    }
 }

http://git-wip-us.apache.org/repos/asf/eagle/blob/6e919c2e/eagle-core/eagle-query/eagle-entity-base/src/main/java/org/apache/eagle/log/entity/GenericMetricEntityDecompactionStreamReader.java
----------------------------------------------------------------------
diff --git 
a/eagle-core/eagle-query/eagle-entity-base/src/main/java/org/apache/eagle/log/entity/GenericMetricEntityDecompactionStreamReader.java
 
b/eagle-core/eagle-query/eagle-entity-base/src/main/java/org/apache/eagle/log/entity/GenericMetricEntityDecompactionStreamReader.java
index 1cf3905..216022f 100755
--- 
a/eagle-core/eagle-query/eagle-entity-base/src/main/java/org/apache/eagle/log/entity/GenericMetricEntityDecompactionStreamReader.java
+++ 
b/eagle-core/eagle-query/eagle-entity-base/src/main/java/org/apache/eagle/log/entity/GenericMetricEntityDecompactionStreamReader.java
@@ -25,74 +25,79 @@ import org.slf4j.LoggerFactory;
 
 import java.text.ParseException;
 
-public class GenericMetricEntityDecompactionStreamReader extends StreamReader 
implements EntityCreationListener{
-       @SuppressWarnings("unused")
-       private static final Logger LOG = 
LoggerFactory.getLogger(GenericMetricEntityDecompactionStreamReader.class);
-       private GenericEntityStreamReader reader;
-       private EntityDefinition ed;
-       private String serviceName = GenericMetricEntity.GENERIC_METRIC_SERVICE;
-       private long start;
-       private long end;
-       private GenericMetricShadowEntity single = new 
GenericMetricShadowEntity();
-       
-       /**
-        * it makes sense that serviceName should not be provided while metric 
name should be provided as prefix
-        * @param metricName
-        * @param condition
-        * @throws InstantiationException
-        * @throws IllegalAccessException
-        * @throws ParseException
-        */
-       public GenericMetricEntityDecompactionStreamReader(String metricName, 
SearchCondition condition) throws InstantiationException, 
IllegalAccessException, ParseException{
-               ed = 
EntityDefinitionManager.getEntityByServiceName(serviceName);
-               checkIsMetric(ed);
-               reader = new GenericEntityStreamReader(serviceName, condition, 
metricName);
-               start = condition.getStartTime();
-               end = condition.getEndTime();
-       }
-       
-       private void checkIsMetric(EntityDefinition ed){
-               if(ed.getMetricDefinition() == null)
-                       throw new IllegalArgumentException("Only metric entity 
comes here");
-       }
-       
-       @Override
-       public void entityCreated(TaggedLogAPIEntity entity) throws Exception{
-               GenericMetricEntity e = (GenericMetricEntity)entity;
-               double[] value = e.getValue();
-               if(value != null) {
-                       int count =value.length;
-                       @SuppressWarnings("unused")
-                       Class<?> cls = 
ed.getMetricDefinition().getSingleTimestampEntityClass();
-                       for (int i = 0; i < count; i++) {
-                               long ts = entity.getTimestamp() + i * 
ed.getMetricDefinition().getInterval();
-                               // exclude those entity which is not within the 
time range in search condition. [start, end)
-                               if (ts < start || ts >= end) {
-                                       continue;
-                               }
-                               single.setTimestamp(ts);
-                               single.setTags(entity.getTags());
-                               single.setValue(e.getValue()[i]);
-                               for (EntityCreationListener l : _listeners) {
-                                       l.entityCreated(single);
-                               }
-                       }
-               }
-       }
-       
-       @Override
-       public void readAsStream() throws Exception{
-               reader.register(this);
-               reader.readAsStream();
-       }
+public class GenericMetricEntityDecompactionStreamReader extends StreamReader
+    implements EntityCreationListener {
+    @SuppressWarnings("unused")
+    private static final Logger LOG = LoggerFactory
+        .getLogger(GenericMetricEntityDecompactionStreamReader.class);
+    private GenericEntityStreamReader reader;
+    private EntityDefinition ed;
+    private String serviceName = GenericMetricEntity.GENERIC_METRIC_SERVICE;
+    private long start;
+    private long end;
+    private GenericMetricShadowEntity single = new GenericMetricShadowEntity();
 
-       @Override
-       public long getLastTimestamp() {
-               return reader.getLastTimestamp();
-       }
+    /**
+     * it makes sense that serviceName should not be provided while metric 
name should be provided as prefix
+     *
+     * @param metricName
+     * @param condition
+     * @throws InstantiationException
+     * @throws IllegalAccessException
+     * @throws ParseException
+     */
+    public GenericMetricEntityDecompactionStreamReader(String metricName, 
SearchCondition condition)
+        throws InstantiationException, IllegalAccessException, ParseException {
+        ed = EntityDefinitionManager.getEntityByServiceName(serviceName);
+        checkIsMetric(ed);
+        reader = new GenericEntityStreamReader(serviceName, condition, 
metricName);
+        start = condition.getStartTime();
+        end = condition.getEndTime();
+    }
 
-       @Override
-       public long getFirstTimestamp() {
-               return reader.getFirstTimestamp();
-       }
-}
\ No newline at end of file
+    private void checkIsMetric(EntityDefinition ed) {
+        if (ed.getMetricDefinition() == null) {
+            throw new IllegalArgumentException("Only metric entity comes 
here");
+        }
+    }
+
+    @Override
+    public void entityCreated(TaggedLogAPIEntity entity) throws Exception {
+        GenericMetricEntity e = (GenericMetricEntity)entity;
+        double[] value = e.getValue();
+        if (value != null) {
+            int count = value.length;
+            @SuppressWarnings("unused")
+            Class<?> cls = 
ed.getMetricDefinition().getSingleTimestampEntityClass();
+            for (int i = 0; i < count; i++) {
+                long ts = entity.getTimestamp() + i * 
ed.getMetricDefinition().getInterval();
+                // exclude those entity which is not within the time range in 
search condition. [start, end)
+                if (ts < start || ts >= end) {
+                    continue;
+                }
+                single.setTimestamp(ts);
+                single.setTags(entity.getTags());
+                single.setValue(e.getValue()[i]);
+                for (EntityCreationListener l : listeners) {
+                    l.entityCreated(single);
+                }
+            }
+        }
+    }
+
+    @Override
+    public void readAsStream() throws Exception {
+        reader.register(this);
+        reader.readAsStream();
+    }
+
+    @Override
+    public long getLastTimestamp() {
+        return reader.getLastTimestamp();
+    }
+
+    @Override
+    public long getFirstTimestamp() {
+        return reader.getFirstTimestamp();
+    }
+}

http://git-wip-us.apache.org/repos/asf/eagle/blob/6e919c2e/eagle-core/eagle-query/eagle-entity-base/src/main/java/org/apache/eagle/log/entity/GenericMetricShadowEntity.java
----------------------------------------------------------------------
diff --git 
a/eagle-core/eagle-query/eagle-entity-base/src/main/java/org/apache/eagle/log/entity/GenericMetricShadowEntity.java
 
b/eagle-core/eagle-query/eagle-entity-base/src/main/java/org/apache/eagle/log/entity/GenericMetricShadowEntity.java
index acd1290..8ead7cd 100644
--- 
a/eagle-core/eagle-query/eagle-entity-base/src/main/java/org/apache/eagle/log/entity/GenericMetricShadowEntity.java
+++ 
b/eagle-core/eagle-query/eagle-entity-base/src/main/java/org/apache/eagle/log/entity/GenericMetricShadowEntity.java
@@ -22,13 +22,13 @@ import 
org.apache.eagle.log.base.taggedlog.TaggedLogAPIEntity;
  * just a shadow class to avoid dynamically create the class and instantiate 
using reflection
  */
 public class GenericMetricShadowEntity extends TaggedLogAPIEntity {
-       private double value;
+    private double value;
 
-       public double getValue() {
-               return value;
-       }
+    public double getValue() {
+        return value;
+    }
 
-       public void setValue(double value) {
-               this.value = value;
-       }
+    public void setValue(double value) {
+        this.value = value;
+    }
 }

http://git-wip-us.apache.org/repos/asf/eagle/blob/6e919c2e/eagle-core/eagle-query/eagle-entity-base/src/main/java/org/apache/eagle/log/entity/GenericServiceAPIResponseEntity.java
----------------------------------------------------------------------
diff --git 
a/eagle-core/eagle-query/eagle-entity-base/src/main/java/org/apache/eagle/log/entity/GenericServiceAPIResponseEntity.java
 
b/eagle-core/eagle-query/eagle-entity-base/src/main/java/org/apache/eagle/log/entity/GenericServiceAPIResponseEntity.java
index 6869c7c..97f538c 100644
--- 
a/eagle-core/eagle-query/eagle-entity-base/src/main/java/org/apache/eagle/log/entity/GenericServiceAPIResponseEntity.java
+++ 
b/eagle-core/eagle-query/eagle-entity-base/src/main/java/org/apache/eagle/log/entity/GenericServiceAPIResponseEntity.java
@@ -35,24 +35,27 @@ import java.util.Map;
  */
 @XmlRootElement
 @XmlAccessorType(XmlAccessType.FIELD)
-@XmlType(propOrder = {"success","exception","meta","type","obj"})
-@JsonSerialize(include=JsonSerialize.Inclusion.NON_NULL)
+@XmlType(propOrder = {
+                      "success", "exception", "meta", "type", "obj"
+    })
+@JsonSerialize(include = JsonSerialize.Inclusion.NON_NULL)
 @JsonDeserialize(using = GenericServiceAPIResponseEntityDeserializer.class)
-@JsonIgnoreProperties(ignoreUnknown=true)
-public class GenericServiceAPIResponseEntity<T>{
+@JsonIgnoreProperties(ignoreUnknown = true)
+public class GenericServiceAPIResponseEntity<T> {
     /**
      * Please use primitive type of value in meta as possible
      */
-    private Map<String,Object> meta;
-       private boolean success;
-       private String exception;
+    private Map<String, Object> meta;
+    private boolean success;
+    private String exception;
     private List<T> obj;
     private Class<T> type;
 
-    public GenericServiceAPIResponseEntity(){
+    public GenericServiceAPIResponseEntity() {
         // default constructor
     }
-    public GenericServiceAPIResponseEntity(Class<T> type){
+
+    public GenericServiceAPIResponseEntity(Class<T> type) {
         this.setType(type);
     }
 
@@ -72,7 +75,7 @@ public class GenericServiceAPIResponseEntity<T>{
         this.obj = obj;
     }
 
-    public void setObj(List<T> obj,Class<T> type) {
+    public void setObj(List<T> obj, Class<T> type) {
         this.setObj(obj);
         this.setType(type);
     }
@@ -85,10 +88,10 @@ public class GenericServiceAPIResponseEntity<T>{
      * Set the first object's class as type
      */
     @SuppressWarnings("unused")
-    public void setTypeByObj(){
-        for(T t:this.obj){
-            if(this.type == null && t!=null){
-                this.type = (Class<T>) t.getClass();
+    public void setTypeByObj() {
+        for (T t : this.obj) {
+            if (this.type == null && t != null) {
+                this.type = (Class<T>)t.getClass();
             }
         }
     }
@@ -102,17 +105,19 @@ public class GenericServiceAPIResponseEntity<T>{
         this.type = type;
     }
 
-       public boolean isSuccess() {
-               return success;
-       }
-       public void setSuccess(boolean success) {
-               this.success = success;
-       }
-       public String getException() {
-               return exception;
-       }
-
-    public void setException(Exception exceptionObj){
+    public boolean isSuccess() {
+        return success;
+    }
+
+    public void setSuccess(boolean success) {
+        this.success = success;
+    }
+
+    public String getException() {
+        return exception;
+    }
+
+    public void setException(Exception exceptionObj) {
         this.exception = EagleExceptionWrapper.wrap(exceptionObj);
     }
 }

http://git-wip-us.apache.org/repos/asf/eagle/blob/6e919c2e/eagle-core/eagle-query/eagle-entity-base/src/main/java/org/apache/eagle/log/entity/GenericServiceAPIResponseEntityDeserializer.java
----------------------------------------------------------------------
diff --git 
a/eagle-core/eagle-query/eagle-entity-base/src/main/java/org/apache/eagle/log/entity/GenericServiceAPIResponseEntityDeserializer.java
 
b/eagle-core/eagle-query/eagle-entity-base/src/main/java/org/apache/eagle/log/entity/GenericServiceAPIResponseEntityDeserializer.java
index 836295b..8ccb43a 100644
--- 
a/eagle-core/eagle-query/eagle-entity-base/src/main/java/org/apache/eagle/log/entity/GenericServiceAPIResponseEntityDeserializer.java
+++ 
b/eagle-core/eagle-query/eagle-entity-base/src/main/java/org/apache/eagle/log/entity/GenericServiceAPIResponseEntityDeserializer.java
@@ -30,57 +30,61 @@ import java.io.IOException;
 import java.util.*;
 
 /**
- * @since 3/18/15
+ * @since 3/18/15.
  */
-public class GenericServiceAPIResponseEntityDeserializer extends 
JsonDeserializer<GenericServiceAPIResponseEntity> {
-    private final static String META_FIELD="meta";
-    private final static String SUCCESS_FIELD="success";
-    private final static String EXCEPTION_FIELD="exception";
-    private final static String OBJ_FIELD="obj";
-    private final static String TYPE_FIELD="type";
+public class GenericServiceAPIResponseEntityDeserializer
+    extends JsonDeserializer<GenericServiceAPIResponseEntity> {
+    private static final String META_FIELD = "meta";
+    private static final String SUCCESS_FIELD = "success";
+    private static final String EXCEPTION_FIELD = "exception";
+    private static final String OBJ_FIELD = "obj";
+    private static final String TYPE_FIELD = "type";
 
     @Override
-    public GenericServiceAPIResponseEntity deserialize(JsonParser jp, 
DeserializationContext ctxt) throws IOException, JsonProcessingException {
+    public GenericServiceAPIResponseEntity deserialize(JsonParser jp, 
DeserializationContext ctxt)
+        throws IOException, JsonProcessingException {
         GenericServiceAPIResponseEntity entity = new 
GenericServiceAPIResponseEntity();
         ObjectCodec objectCodec = jp.getCodec();
 
         JsonNode rootNode = jp.getCodec().readTree(jp);
-        if(rootNode.isObject()){
-            Iterator<Map.Entry<String,JsonNode>> fields = rootNode.fields();
+        if (rootNode.isObject()) {
+            Iterator<Map.Entry<String, JsonNode>> fields = rootNode.fields();
             JsonNode objNode = null;
-            while(fields.hasNext()){
-                Map.Entry<String,JsonNode> field = fields.next();
-                if (META_FIELD.equals(field.getKey()) && field.getValue() != 
null)
+            while (fields.hasNext()) {
+                Map.Entry<String, JsonNode> field = fields.next();
+                if (META_FIELD.equals(field.getKey()) && field.getValue() != 
null) {
                     
entity.setMeta(objectCodec.readValue(field.getValue().traverse(), Map.class));
-                else if(SUCCESS_FIELD.equals(field.getKey()) && 
field.getValue() != null){
+                } else if (SUCCESS_FIELD.equals(field.getKey()) && 
field.getValue() != null) {
                     entity.setSuccess(field.getValue().booleanValue());
-                }else if(EXCEPTION_FIELD.equals(field.getKey()) && 
field.getValue() != null){
+                } else if (EXCEPTION_FIELD.equals(field.getKey()) && 
field.getValue() != null) {
                     entity.setException(new 
Exception(field.getValue().textValue()));
-                }else if(TYPE_FIELD.endsWith(field.getKey())  && 
field.getValue() != null){
-                    
Preconditions.checkNotNull(field.getValue().textValue(),"Response type class is 
null");
+                } else if (TYPE_FIELD.endsWith(field.getKey()) && 
field.getValue() != null) {
+                    Preconditions.checkNotNull(field.getValue().textValue(), 
"Response type class is null");
                     try {
                         
entity.setType(Class.forName(field.getValue().textValue()));
                     } catch (ClassNotFoundException e) {
                         throw new IOException(e);
                     }
-                }else if(OBJ_FIELD.equals(field.getKey()) && field.getValue() 
!= null){
+                } else if (OBJ_FIELD.equals(field.getKey()) && 
field.getValue() != null) {
                     objNode = field.getValue();
                 }
             }
 
-            if(objNode!=null) {
-                JavaType collectionType=null;
+            if (objNode != null) {
+                JavaType collectionType = null;
                 if (entity.getType() != null) {
-                    collectionType = 
TypeFactory.defaultInstance().constructCollectionType(LinkedList.class, 
entity.getType());
-                }else{
-                    collectionType = 
TypeFactory.defaultInstance().constructCollectionType(LinkedList.class, 
Map.class);
+                    collectionType = 
TypeFactory.defaultInstance().constructCollectionType(LinkedList.class,
+                                                                               
            entity.getType());
+                } else {
+                    collectionType = 
TypeFactory.defaultInstance().constructCollectionType(LinkedList.class,
+                                                                               
            Map.class);
                 }
                 List obj = objectCodec.readValue(objNode.traverse(), 
collectionType);
                 entity.setObj(obj);
             }
-        }else{
+        } else {
             throw new IOException("root node is not object");
         }
         return entity;
     }
-}
\ No newline at end of file
+}

http://git-wip-us.apache.org/repos/asf/eagle/blob/6e919c2e/eagle-core/eagle-query/eagle-entity-base/src/main/java/org/apache/eagle/log/entity/HBaseInternalLogHelper.java
----------------------------------------------------------------------
diff --git 
a/eagle-core/eagle-query/eagle-entity-base/src/main/java/org/apache/eagle/log/entity/HBaseInternalLogHelper.java
 
b/eagle-core/eagle-query/eagle-entity-base/src/main/java/org/apache/eagle/log/entity/HBaseInternalLogHelper.java
index 7a38033..32f382b 100755
--- 
a/eagle-core/eagle-query/eagle-entity-base/src/main/java/org/apache/eagle/log/entity/HBaseInternalLogHelper.java
+++ 
b/eagle-core/eagle-query/eagle-entity-base/src/main/java/org/apache/eagle/log/entity/HBaseInternalLogHelper.java
@@ -30,216 +30,232 @@ import org.slf4j.LoggerFactory;
 import java.util.*;
 
 public class HBaseInternalLogHelper {
-       private final static Logger LOG  = 
LoggerFactory.getLogger(HBaseInternalLogHelper.class);
-
-       private static final EntitySerDeserializer ENTITY_SERDESER = new 
EntitySerDeserializer();
-
-       /**
-        *
-        * @param ed
-        * @param r
-        * @param qualifiers if null, return all qualifiers defined in ed
-        * @return
-        */
-       public static InternalLog parse(EntityDefinition ed, Result r, byte[][] 
qualifiers) {
-               final byte[] row = r.getRow();
-               // skip the first 4 bytes : prefix
-               final int offset = (ed.getPartitions() == null) ? (4) : (4 + 
ed.getPartitions().length * 4);
-               long timestamp = ByteUtil.bytesToLong(row, offset);
-               // reverse timestamp
-               timestamp = Long.MAX_VALUE - timestamp;
-               final byte[] family = ed.getColumnFamily().getBytes();
-               final Map<String, byte[]> allQualifierValues = new 
HashMap<String, byte[]>();
-
-               if (qualifiers != null) {
-                       int count = qualifiers.length;
-                       final byte[][] values = new byte[count][];
-                       for (int i = 0; i < count; i++) {
-                               // TODO if returned value is null, it means no 
this column for this row, so why set null to the object?
-                               values[i] = r.getValue(family, qualifiers[i]);
-                               allQualifierValues.put(new 
String(qualifiers[i]), values[i]);
-                       }
-               }else{
-                       // return all qualifiers
-                       for(KeyValue kv:r.list()){
-                               byte[] qualifier = kv.getQualifier();
-                               byte[] value = kv.getValue();
-                               allQualifierValues.put(new 
String(qualifier),value);
-                       }
-               }
-               final InternalLog log = buildObject(ed, row, timestamp, 
allQualifierValues);
-               return log;
-       }
-
-       /**
-        *
-        * @param ed
-        * @param row
-        * @param timestamp
-        * @param allQualifierValues <code>Map &lt; Qualifier name (not display 
name),Value in bytes array &gt;</code>
-        * @return
-        */
-       public static InternalLog buildObject(EntityDefinition ed, byte[] row, 
long timestamp, Map<String, byte[]> allQualifierValues) {
-               InternalLog log = new InternalLog();
-               String myRow = 
EagleBase64Wrapper.encodeByteArray2URLSafeString(row);
-               log.setEncodedRowkey(myRow);
-               log.setPrefix(ed.getPrefix());
-               log.setTimestamp(timestamp);
-
-               Map<String, byte[]> logQualifierValues = new HashMap<String, 
byte[]>();
-               Map<String, String> logTags = new HashMap<String, String>();
-               Map<String, Object> extra = null;
-
-               Map<String,Double> doubleMap = null;
-               // handle with metric
-               boolean isMetricEntity = 
GenericMetricEntity.GENERIC_METRIC_SERVICE.equals(ed.getService());
-               double[] metricValueArray = null;
-
-               for (Map.Entry<String, byte[]> entry : 
allQualifierValues.entrySet()) {
-                       if (ed.isTag(entry.getKey())) {
-                               if (entry.getValue() != null) {
-                                       logTags.put(entry.getKey(), new 
String(entry.getValue()));
-                               }else if 
(TokenConstant.isExpression(entry.getKey())){
-                                       if(doubleMap == null) doubleMap = 
EntityQualifierUtils.bytesMapToDoubleMap(allQualifierValues, ed);
-                                       // Caculate expression based fields
-                                       String expression = 
TokenConstant.parseExpressionContent(entry.getKey());
-                                       if (extra == null) extra = new 
HashMap<String, Object>();
-
-                                       // Evaluation expression as output 
based on entity
-                                       // 
-----------------------------------------------
-                                       // 1) Firstly, check whether is metric 
entity and expression requires value and also value is not number (i.e. 
double[])
-                                       // 2) Treat all required fields as 
double, if not number, then set result as NaN
-
-                                       try {
-                                               ExpressionParser parser = 
ExpressionParser.parse(expression);
-                                               boolean isRequiringValue = 
parser.getDependentFields().contains(GenericMetricEntity.VALUE_FIELD);
-
-                                               if(isMetricEntity && 
isRequiringValue && doubleMap.get(GenericMetricEntity.VALUE_FIELD)!=null
-                                                               && 
Double.isNaN(doubleMap.get(GenericMetricEntity.VALUE_FIELD))) // 
EntityQualifierUtils will convert non-number field into Double.NaN
-                                               {
-                                                       // if dependent fields 
require "value"
-                                                       // and value exists but 
value's type is double[] instead of double
-
-                                                       // handle with metric 
value array based expression
-                                                       // lazily extract 
metric value as double array if required
-                                                       if(metricValueArray == 
null){
-                                                               // 
if(allQualifierValues.containsKey(GenericMetricEntity.VALUE_FIELD)){
-                                                               Qualifier 
qualifier = ed.getDisplayNameMap().get(GenericMetricEntity.VALUE_FIELD);
-                                                               EntitySerDeser 
serDeser = qualifier.getSerDeser();
-                                                               if(serDeser 
instanceof DoubleArraySerDeser){
-                                                                       byte[] 
value = allQualifierValues.get(qualifier.getQualifierName());
-                                                                       
if(value !=null ) metricValueArray = (double[]) serDeser.deserialize(value);
-                                                               }
-                                                               // }
-                                                       }
-
-                                                       
if(metricValueArray!=null){
-                                                               double[] 
resultBucket = new double[metricValueArray.length];
-                                                               Map<String, 
Double> _doubleMap = new HashMap<String,Double>(doubleMap);
-                                                               
_doubleMap.remove(entry.getKey());
-                                                               for(int i=0;i< 
resultBucket.length;i++) {
-                                                                       
_doubleMap.put(GenericMetricEntity.VALUE_FIELD, metricValueArray[i]);
-                                                                       
resultBucket[i]=  parser.eval(_doubleMap);
-                                                               }
-                                                               
extra.put(expression,resultBucket);
-                                                       }else{
-                                                               
LOG.warn("Failed convert metric value into double[] type which is required by 
expression: "+expression);
-                                                               // if require 
value in double[] is NaN
-                                                               double value = 
parser.eval(doubleMap);
-                                                               
extra.put(expression, value);
-                                                       }
-                                               }else {
-                                                       double value = 
parser.eval(doubleMap);
-                                                       extra.put(expression, 
value);
-                                                       // LOG.info("DEBUG: 
"+entry.getKey()+" = "+ value);
-                                               }
-                                       } catch (Exception e) {
-                                               LOG.error("Failed to eval 
expression "+expression+", exception: "+e.getMessage(),e);
-                                       }
-                               }
-                       } else {
-                               
logQualifierValues.put(entry.getKey(),entry.getValue());
-                       }
-               }
-               log.setQualifierValues(logQualifierValues);
-               log.setTags(logTags);
-               log.setExtraValues(extra);
-               return log;
-       }
-       
-       public static TaggedLogAPIEntity buildEntity(InternalLog log, 
EntityDefinition entityDef) throws Exception {
-               Map<String, byte[]> qualifierValues = log.getQualifierValues();
-               TaggedLogAPIEntity entity = 
ENTITY_SERDESER.readValue(qualifierValues, entityDef);
-               if (entity.getTags() == null && log.getTags() != null) {
-                       entity.setTags(log.getTags());
-               }
-               entity.setExp(log.getExtraValues());
-               entity.setTimestamp(log.getTimestamp());
-               entity.setEncodedRowkey(log.getEncodedRowkey());
-               entity.setPrefix(log.getPrefix());
-               return entity;
-       }
-       
-       public static List<TaggedLogAPIEntity> buildEntities(List<InternalLog> 
logs, EntityDefinition entityDef) throws Exception {
-               final List<TaggedLogAPIEntity> result = new 
ArrayList<TaggedLogAPIEntity>(logs.size());
-               for (InternalLog log : logs) {
-                       result.add(buildEntity(log, entityDef));
-               }
-               return result;
-       }
-       
-       public static byte[][] getOutputQualifiers(EntityDefinition entityDef, 
List<String> outputFields) {
-               final byte[][] result = new byte[outputFields.size()][];
-               int index = 0;
-               for(String field : outputFields){
-                       // convert displayName to qualifierName
-                       Qualifier q = entityDef.getDisplayNameMap().get(field);
-                       if(q == null){ // for tag case
-                               result[index++] = field.getBytes();
-                       }else{ // for qualifier case
-                               result[index++] = 
q.getQualifierName().getBytes();
-                       }
-               }
-               return result;
-       }
-
-       public static InternalLog convertToInternalLog(TaggedLogAPIEntity 
entity, EntityDefinition entityDef) throws Exception {
-               final InternalLog log = new InternalLog();
-               final Map<String, String> inputTags = entity.getTags();
-               final Map<String, String> tags = new TreeMap<String, String>();
-               if(inputTags!=null) {
-                       for (Map.Entry<String, String> entry : 
inputTags.entrySet()) {
-                               tags.put(entry.getKey(), entry.getValue());
-                       }
-               }
-               log.setTags(tags);
-               if(entityDef.isTimeSeries()){
-                       log.setTimestamp(entity.getTimestamp());
-               }else{
-                       
log.setTimestamp(EntityConstants.FIXED_WRITE_TIMESTAMP); // set timestamp to 
MAX, then actually stored 0
-               }
-               
-               // For Metric entity, prefix is populated along with entity 
instead of EntityDefinition
-               if(entity.getPrefix() != null && !entity.getPrefix().isEmpty()){
-                       log.setPrefix(entity.getPrefix());
-               }else{
-                       log.setPrefix(entityDef.getPrefix());
-               }
-               
-               log.setPartitions(entityDef.getPartitions());
-               EntitySerDeserializer des = new EntitySerDeserializer();
-               log.setQualifierValues(des.writeValue(entity, entityDef));
-               
-               final IndexDefinition[] indexDefs = entityDef.getIndexes();
-               if (indexDefs != null) {
-                       final List<byte[]> indexRowkeys = new 
ArrayList<byte[]>();
-                       for (int i = 0; i < indexDefs.length; ++i) {
-                               final IndexDefinition indexDef = indexDefs[i];
-                               final byte[] indexRowkey = 
indexDef.generateIndexRowkey(entity);
-                               indexRowkeys.add(indexRowkey);
-                       }
-                       log.setIndexRowkeys(indexRowkeys);
-               }
-               return log;
-       }
+    private static final Logger LOG = 
LoggerFactory.getLogger(HBaseInternalLogHelper.class);
+
+    private static final EntitySerDeserializer ENTITY_SERDESER = new 
EntitySerDeserializer();
+
+    /**
+     * @param ed
+     * @param r
+     * @param qualifiers if null, return all qualifiers defined in ed
+     * @return
+     */
+    public static InternalLog parse(EntityDefinition ed, Result r, byte[][] 
qualifiers) {
+        final byte[] row = r.getRow();
+        // skip the first 4 bytes : prefix
+        final int offset = (ed.getPartitions() == null) ? (4) : (4 + 
ed.getPartitions().length * 4);
+        long timestamp = ByteUtil.bytesToLong(row, offset);
+        // reverse timestamp
+        timestamp = Long.MAX_VALUE - timestamp;
+        final byte[] family = ed.getColumnFamily().getBytes();
+        final Map<String, byte[]> allQualifierValues = new HashMap<String, 
byte[]>();
+
+        if (qualifiers != null) {
+            int count = qualifiers.length;
+            final byte[][] values = new byte[count][];
+            for (int i = 0; i < count; i++) {
+                // TODO if returned value is null, it means no this column for 
this row, so why set null to
+                // the object?
+                values[i] = r.getValue(family, qualifiers[i]);
+                allQualifierValues.put(new String(qualifiers[i]), values[i]);
+            }
+        } else {
+            // return all qualifiers
+            for (KeyValue kv : r.list()) {
+                byte[] qualifier = kv.getQualifier();
+                byte[] value = kv.getValue();
+                allQualifierValues.put(new String(qualifier), value);
+            }
+        }
+        final InternalLog log = buildObject(ed, row, timestamp, 
allQualifierValues);
+        return log;
+    }
+
+    /**
+     * @param ed
+     * @param row
+     * @param timestamp
+     * @param allQualifierValues
+     *            <code>Map &lt; Qualifier name (not display name),Value in 
bytes array &gt;</code>
+     * @return
+     */
+    public static InternalLog buildObject(EntityDefinition ed, byte[] row, 
long timestamp,
+                                          Map<String, byte[]> 
allQualifierValues) {
+        InternalLog log = new InternalLog();
+        String myRow = EagleBase64Wrapper.encodeByteArray2URLSafeString(row);
+        log.setEncodedRowkey(myRow);
+        log.setPrefix(ed.getPrefix());
+        log.setTimestamp(timestamp);
+
+        Map<String, byte[]> logQualifierValues = new HashMap<String, byte[]>();
+        Map<String, String> logTags = new HashMap<String, String>();
+        Map<String, Object> extra = null;
+
+        Map<String, Double> doubleMap = null;
+        // handle with metric
+        boolean isMetricEntity = 
GenericMetricEntity.GENERIC_METRIC_SERVICE.equals(ed.getService());
+        double[] metricValueArray = null;
+
+        for (Map.Entry<String, byte[]> entry : allQualifierValues.entrySet()) {
+            if (ed.isTag(entry.getKey())) {
+                if (entry.getValue() != null) {
+                    logTags.put(entry.getKey(), new String(entry.getValue()));
+                } else if (TokenConstant.isExpression(entry.getKey())) {
+                    if (doubleMap == null) {
+                        doubleMap = 
EntityQualifierUtils.bytesMapToDoubleMap(allQualifierValues, ed);
+                    }
+                    // Caculate expression based fields
+                    String expression = 
TokenConstant.parseExpressionContent(entry.getKey());
+                    if (extra == null) {
+                        extra = new HashMap<String, Object>();
+                    }
+
+                    // Evaluation expression as output based on entity
+                    // -----------------------------------------------
+                    // 1) Firstly, check whether is metric entity and 
expression requires value and also value
+                    // is not number (i.e. double[])
+                    // 2) Treat all required fields as double, if not number, 
then set result as NaN
+
+                    try {
+                        ExpressionParser parser = 
ExpressionParser.parse(expression);
+                        boolean isRequiringValue = parser.getDependentFields()
+                            .contains(GenericMetricEntity.VALUE_FIELD);
+
+                        if (isMetricEntity && isRequiringValue
+                            && doubleMap.get(GenericMetricEntity.VALUE_FIELD) 
!= null
+                            && 
Double.isNaN(doubleMap.get(GenericMetricEntity.VALUE_FIELD))) {
+                            // EntityQualifierUtils will convert non-number 
field into Double.NaN
+                            // if dependent fields require "value"
+                            // and value exists but value's type is double[] 
instead of double
+
+                            // handle with metric value array based expression
+                            // lazily extract metric value as double array if 
required
+                            if (metricValueArray == null) {
+                                // 
if(allQualifierValues.containsKey(GenericMetricEntity.VALUE_FIELD)){
+                                Qualifier qualifier = ed.getDisplayNameMap()
+                                    .get(GenericMetricEntity.VALUE_FIELD);
+                                EntitySerDeser serDeser = 
qualifier.getSerDeser();
+                                if (serDeser instanceof DoubleArraySerDeser) {
+                                    byte[] value = 
allQualifierValues.get(qualifier.getQualifierName());
+                                    if (value != null) {
+                                        metricValueArray = 
(double[])serDeser.deserialize(value);
+                                    }
+                                }
+                                // }
+                            }
+
+                            if (metricValueArray != null) {
+                                double[] resultBucket = new 
double[metricValueArray.length];
+                                Map<String, Double> _doubleMap = new 
HashMap<String, Double>(doubleMap);
+                                _doubleMap.remove(entry.getKey());
+                                for (int i = 0; i < resultBucket.length; i++) {
+                                    
_doubleMap.put(GenericMetricEntity.VALUE_FIELD, metricValueArray[i]);
+                                    resultBucket[i] = parser.eval(_doubleMap);
+                                }
+                                extra.put(expression, resultBucket);
+                            } else {
+                                LOG.warn("Failed convert metric value into 
double[] type which is required by expression: "
+                                         + expression);
+                                // if require value in double[] is NaN
+                                double value = parser.eval(doubleMap);
+                                extra.put(expression, value);
+                            }
+                        } else {
+                            double value = parser.eval(doubleMap);
+                            extra.put(expression, value);
+                            // LOG.info("DEBUG: "+entry.getKey()+" = "+ value);
+                        }
+                    } catch (Exception e) {
+                        LOG.error("Failed to eval expression " + expression + 
", exception: "
+                                  + e.getMessage(), e);
+                    }
+                }
+            } else {
+                logQualifierValues.put(entry.getKey(), entry.getValue());
+            }
+        }
+        log.setQualifierValues(logQualifierValues);
+        log.setTags(logTags);
+        log.setExtraValues(extra);
+        return log;
+    }
+
+    public static TaggedLogAPIEntity buildEntity(InternalLog log, 
EntityDefinition entityDef)
+        throws Exception {
+        Map<String, byte[]> qualifierValues = log.getQualifierValues();
+        TaggedLogAPIEntity entity = ENTITY_SERDESER.readValue(qualifierValues, 
entityDef);
+        if (entity.getTags() == null && log.getTags() != null) {
+            entity.setTags(log.getTags());
+        }
+        entity.setExp(log.getExtraValues());
+        entity.setTimestamp(log.getTimestamp());
+        entity.setEncodedRowkey(log.getEncodedRowkey());
+        entity.setPrefix(log.getPrefix());
+        return entity;
+    }
+
+    public static List<TaggedLogAPIEntity> buildEntities(List<InternalLog> 
logs, EntityDefinition entityDef)
+        throws Exception {
+        final List<TaggedLogAPIEntity> result = new 
ArrayList<TaggedLogAPIEntity>(logs.size());
+        for (InternalLog log : logs) {
+            result.add(buildEntity(log, entityDef));
+        }
+        return result;
+    }
+
+    public static byte[][] getOutputQualifiers(EntityDefinition entityDef, 
List<String> outputFields) {
+        final byte[][] result = new byte[outputFields.size()][];
+        int index = 0;
+        for (String field : outputFields) {
+            // convert displayName to qualifierName
+            Qualifier q = entityDef.getDisplayNameMap().get(field);
+            if (q == null) { // for tag case
+                result[index++] = field.getBytes();
+            } else { // for qualifier case
+                result[index++] = q.getQualifierName().getBytes();
+            }
+        }
+        return result;
+    }
+
+    public static InternalLog convertToInternalLog(TaggedLogAPIEntity entity, 
EntityDefinition entityDef)
+        throws Exception {
+        final InternalLog log = new InternalLog();
+        final Map<String, String> inputTags = entity.getTags();
+        final Map<String, String> tags = new TreeMap<String, String>();
+        if (inputTags != null) {
+            for (Map.Entry<String, String> entry : inputTags.entrySet()) {
+                tags.put(entry.getKey(), entry.getValue());
+            }
+        }
+        log.setTags(tags);
+        if (entityDef.isTimeSeries()) {
+            log.setTimestamp(entity.getTimestamp());
+        } else {
+            log.setTimestamp(EntityConstants.FIXED_WRITE_TIMESTAMP); // set 
timestamp to MAX, then actually stored 0
+        }
+
+        // For Metric entity, prefix is populated along with entity instead of 
EntityDefinition
+        if (entity.getPrefix() != null && !entity.getPrefix().isEmpty()) {
+            log.setPrefix(entity.getPrefix());
+        } else {
+            log.setPrefix(entityDef.getPrefix());
+        }
+
+        log.setPartitions(entityDef.getPartitions());
+        EntitySerDeserializer des = new EntitySerDeserializer();
+        log.setQualifierValues(des.writeValue(entity, entityDef));
+
+        final IndexDefinition[] indexDefs = entityDef.getIndexes();
+        if (indexDefs != null) {
+            final List<byte[]> indexRowkeys = new ArrayList<byte[]>();
+            for (int i = 0; i < indexDefs.length; ++i) {
+                final IndexDefinition indexDef = indexDefs[i];
+                final byte[] indexRowkey = 
indexDef.generateIndexRowkey(entity);
+                indexRowkeys.add(indexRowkey);
+            }
+            log.setIndexRowkeys(indexRowkeys);
+        }
+        return log;
+    }
 }

http://git-wip-us.apache.org/repos/asf/eagle/blob/6e919c2e/eagle-core/eagle-query/eagle-entity-base/src/main/java/org/apache/eagle/log/entity/HBaseLogReader2.java
----------------------------------------------------------------------
diff --git 
a/eagle-core/eagle-query/eagle-entity-base/src/main/java/org/apache/eagle/log/entity/HBaseLogReader2.java
 
b/eagle-core/eagle-query/eagle-entity-base/src/main/java/org/apache/eagle/log/entity/HBaseLogReader2.java
index c8b9a33..d5c8e2c 100755
--- 
a/eagle-core/eagle-query/eagle-entity-base/src/main/java/org/apache/eagle/log/entity/HBaseLogReader2.java
+++ 
b/eagle-core/eagle-query/eagle-entity-base/src/main/java/org/apache/eagle/log/entity/HBaseLogReader2.java
@@ -28,59 +28,62 @@ import java.util.Date;
 import java.util.List;
 
 public class HBaseLogReader2 extends AbstractHBaseLogReader<InternalLog> {
-       protected ResultScanner rs;
+    protected ResultScanner rs;
 
-       public HBaseLogReader2(EntityDefinition ed, List<String> partitions, 
Date startTime, Date endTime, Filter filter, String lastScanKey, byte[][] 
outputQualifiers) {
-               super(ed, partitions, startTime, endTime, filter, lastScanKey, 
outputQualifiers);
-       }
+    public HBaseLogReader2(EntityDefinition ed, List<String> partitions, Date 
startTime, Date endTime,
+                           Filter filter, String lastScanKey, byte[][] 
outputQualifiers) {
+        super(ed, partitions, startTime, endTime, filter, lastScanKey, 
outputQualifiers);
+    }
 
-       /**
-        * This constructor supports partition.
-        *
-        * @param ed               entity definition
-        * @param partitions       partition values, which is sorted in 
partition definition order. TODO: in future we need to support
-        *                         multiple values for one partition field
-        * @param startTime        start time of the query
-        * @param endTime          end time of the query
-        * @param filter           filter for the hbase scan
-        * @param lastScanKey      the key of last scan
-        * @param outputQualifiers the bytes of output qualifier names
-        * @param prefix           can be populated from outside world 
specifically for generic metric reader
-        */
-       public HBaseLogReader2(EntityDefinition ed, List<String> partitions, 
Date startTime, Date endTime, Filter filter, String lastScanKey, byte[][] 
outputQualifiers, String prefix) {
-               super(ed, partitions, startTime, endTime, filter, lastScanKey, 
outputQualifiers, prefix);
-       }
+    /**
+     * This constructor supports partition.
+     *
+     * @param ed entity definition
+     * @param partitions partition values, which is sorted in partition 
definition order. TODO: in future we
+     *            need to support multiple values for one partition field
+     * @param startTime start time of the query
+     * @param endTime end time of the query
+     * @param filter filter for the hbase scan
+     * @param lastScanKey the key of last scan
+     * @param outputQualifiers the bytes of output qualifier names
+     * @param prefix can be populated from outside world specifically for 
generic metric reader
+     */
+    public HBaseLogReader2(EntityDefinition ed, List<String> partitions, Date 
startTime, Date endTime,
+                           Filter filter, String lastScanKey, byte[][] 
outputQualifiers, String prefix) {
+        super(ed, partitions, startTime, endTime, filter, lastScanKey, 
outputQualifiers, prefix);
+    }
 
-       @Override
-       protected void onOpen(HTableInterface tbl, Scan scan) throws 
IOException {
-               rs = tbl.getScanner(scan);
-       }
+    @Override
+    protected void onOpen(HTableInterface tbl, Scan scan) throws IOException {
+        rs = tbl.getScanner(scan);
+    }
 
-       /**
-        * <h2>Close:</h2>
-        * 1. Call super.close(): release current table connection <br></br>
-        * 2. Close Scanner<br></br>
-        *
-        * @throws IOException
-        */
-       @Override
-       public void close() throws IOException {
-               super.close();
-               if(rs != null){
-                       rs.close();
-               }
-       }
+    /**
+     * <h2>Close:</h2> 1. Call super.close(): release current table connection 
<br>
+     * <br>
+     * 2. Close Scanner<br>
+     * <br>
+     *
+     * @throws IOException
+     */
+    @Override
+    public void close() throws IOException {
+        super.close();
+        if (rs != null) {
+            rs.close();
+        }
+    }
 
-       @Override
-       public InternalLog read() throws IOException {
-               if (rs == null)
-                       throw new IllegalArgumentException(
-                                       "ResultScanner must be initialized 
before reading");
-               InternalLog t = null;
-               Result r = rs.next();
-               if (r != null) {
-                       t = HBaseInternalLogHelper.parse(_ed, r, qualifiers);
-               }
-               return t;
-       }
+    @Override
+    public InternalLog read() throws IOException {
+        if (rs == null) {
+            throw new IllegalArgumentException("ResultScanner must be 
initialized before reading");
+        }
+        InternalLog t = null;
+        Result r = rs.next();
+        if (r != null) {
+            t = HBaseInternalLogHelper.parse(ed, r, qualifiers);
+        }
+        return t;
+    }
 }

http://git-wip-us.apache.org/repos/asf/eagle/blob/6e919c2e/eagle-core/eagle-query/eagle-entity-base/src/main/java/org/apache/eagle/log/entity/HBaseLogWriter.java
----------------------------------------------------------------------
diff --git 
a/eagle-core/eagle-query/eagle-entity-base/src/main/java/org/apache/eagle/log/entity/HBaseLogWriter.java
 
b/eagle-core/eagle-query/eagle-entity-base/src/main/java/org/apache/eagle/log/entity/HBaseLogWriter.java
index 059ee7f..1cf23b6 100644
--- 
a/eagle-core/eagle-query/eagle-entity-base/src/main/java/org/apache/eagle/log/entity/HBaseLogWriter.java
+++ 
b/eagle-core/eagle-query/eagle-entity-base/src/main/java/org/apache/eagle/log/entity/HBaseLogWriter.java
@@ -29,124 +29,125 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 public class HBaseLogWriter implements LogWriter {
-       private static Logger LOG = 
LoggerFactory.getLogger(HBaseLogWriter.class);
-       private static byte[] EMPTY_INDEX_QUALIFER_VALUE = "".getBytes();
-       
-       private HTableInterface tbl;
-       private String table;
-       private String columnFamily;
-       
-       public HBaseLogWriter(String table, String columnFamily) {
-               // TODO assert for non-null of table and columnFamily
-               this.table = table;
-               this.columnFamily = columnFamily;
-       }
-       
-       @Override
-       public void open() throws IOException {
-               try{
-                       tbl = EagleConfigFactory.load().getHTable(this.table);
-//                     LOGGER.info("HBase table " + table + " audo reflush is 
" + (tbl.isAutoFlush() ? "enabled" : "disabled"));
-               }catch(Exception ex){
-                       LOG.error("Cannot create htable", ex);
-                       throw new IOException(ex);
-               }
-       }
-
-       @Override
-       public void close() throws IOException {
-               if(tbl != null){
-                       new HTableFactory().releaseHTableInterface(tbl);
-               }
-       }
-
-       @Override
-       public void flush() throws IOException {
-               tbl.flushCommits();
-       }
-       
-       protected void populateColumnValues(Put p, InternalLog log){
-               Map<String, byte[]> qualifierValues = log.getQualifierValues();
-               // iterate all qualifierValues
-               for(Map.Entry<String, byte[]> entry : 
qualifierValues.entrySet()){
-                       p.add(columnFamily.getBytes(), 
entry.getKey().getBytes(), entry.getValue());
-               }
-               
-               Map<String, String> tags = log.getTags();
-               // iterate all tags, each tag will be stored as a column 
qualifier
-               if(tags != null){
-                       for(Map.Entry<String, String> entry : tags.entrySet()){
-                               // TODO need a consistent handling of null 
values
-                               if(entry.getValue() != null)
-                                       p.add(columnFamily.getBytes(), 
entry.getKey().getBytes(), entry.getValue().getBytes());
-                       }
-               }
-       }
-
-       /**
-        * TODO need think about if multi-PUT is necessary, by checking if 
autoFlush works
-        */
-       @Override
-       public byte[] write(InternalLog log) throws IOException{
-               final byte[] rowkey = RowkeyBuilder.buildRowkey(log);
-               final Put p = new Put(rowkey);
-               populateColumnValues(p, log);
-               tbl.put(p);
-               final List<byte[]> indexRowkeys = log.getIndexRowkeys();
-               if (indexRowkeys != null) {
-                       writeIndexes(rowkey, indexRowkeys);
-               }
-               return rowkey;
-       }
-
-       /**
-        * TODO need think about if multi-PUT is necessary, by checking if 
autoFlush works
-        */
-       public List<byte[]> write(List<InternalLog> logs) throws IOException{
-               final List<Put> puts = new ArrayList<Put>(logs.size());
-               final List<byte[]> result = new ArrayList<byte[]>(logs.size());
-               for (InternalLog log : logs) {
-                       final byte[] rowkey = RowkeyBuilder.buildRowkey(log);
-                       final Put p = new Put(rowkey);
-                       populateColumnValues(p, log);
-                       puts.add(p);
-                       final List<byte[]> indexRowkeys = log.getIndexRowkeys();
-                       if (indexRowkeys != null) {
-                               writeIndexes(rowkey, indexRowkeys, puts);
-                       }
-                       result.add(rowkey);
-               }
-               tbl.put(puts);
-               return result;
-       }
-       
-       @Override
-       public void updateByRowkey(byte[] rowkey, InternalLog log) throws 
IOException{
-               Put p = new Put(rowkey);
-               populateColumnValues(p, log);
-               tbl.put(p);
-               final List<byte[]> indexRowkeys = log.getIndexRowkeys();
-               if (indexRowkeys != null) {
-                       writeIndexes(rowkey, indexRowkeys);
-               }
-       }
-
-       private void writeIndexes(byte[] rowkey, List<byte[]> indexRowkeys) 
throws IOException {
-               for (byte[] indexRowkey : indexRowkeys) {
-                       Put p = new Put(indexRowkey);
-                       p.add(columnFamily.getBytes(), rowkey, 
EMPTY_INDEX_QUALIFER_VALUE);
-                       tbl.put(p);
-               }
-       }
-
-       private void writeIndexes(byte[] rowkey, List<byte[]> indexRowkeys, 
List<Put> puts) throws IOException {
-               for (byte[] indexRowkey : indexRowkeys) {
-                       Put p = new Put(indexRowkey);
-                       p.add(columnFamily.getBytes(), rowkey, 
EMPTY_INDEX_QUALIFER_VALUE);
-                       puts.add(p);
-//                     tbl.put(p);
-               }
-       }
-
-       
+    private static Logger LOG = LoggerFactory.getLogger(HBaseLogWriter.class);
+    private static byte[] EMPTY_INDEX_QUALIFER_VALUE = "".getBytes();
+
+    private HTableInterface tbl;
+    private String table;
+    private String columnFamily;
+
+    public HBaseLogWriter(String table, String columnFamily) {
+        // TODO assert for non-null of table and columnFamily
+        this.table = table;
+        this.columnFamily = columnFamily;
+    }
+
+    @Override
+    public void open() throws IOException {
+        try {
+            tbl = EagleConfigFactory.load().getHTable(this.table);
+            // LOGGER.info("HBase table " + table + " audo reflush is " + 
(tbl.isAutoFlush() ? "enabled" :
+            // "disabled"));
+        } catch (Exception ex) {
+            LOG.error("Cannot create htable", ex);
+            throw new IOException(ex);
+        }
+    }
+
+    @Override
+    public void close() throws IOException {
+        if (tbl != null) {
+            new HTableFactory().releaseHTableInterface(tbl);
+        }
+    }
+
+    @Override
+    public void flush() throws IOException {
+        tbl.flushCommits();
+    }
+
+    protected void populateColumnValues(Put p, InternalLog log) {
+        Map<String, byte[]> qualifierValues = log.getQualifierValues();
+        // iterate all qualifierValues
+        for (Map.Entry<String, byte[]> entry : qualifierValues.entrySet()) {
+            p.add(columnFamily.getBytes(), entry.getKey().getBytes(), 
entry.getValue());
+        }
+
+        Map<String, String> tags = log.getTags();
+        // iterate all tags, each tag will be stored as a column qualifier
+        if (tags != null) {
+            for (Map.Entry<String, String> entry : tags.entrySet()) {
+                // TODO need a consistent handling of null values
+                if (entry.getValue() != null) {
+                    p.add(columnFamily.getBytes(), entry.getKey().getBytes(), 
entry.getValue().getBytes());
+                }
+            }
+        }
+    }
+
+    /**
+     * TODO need think about if multi-PUT is necessary, by checking if 
autoFlush works
+     */
+    @Override
+    public byte[] write(InternalLog log) throws IOException {
+        final byte[] rowkey = RowkeyBuilder.buildRowkey(log);
+        final Put p = new Put(rowkey);
+        populateColumnValues(p, log);
+        tbl.put(p);
+        final List<byte[]> indexRowkeys = log.getIndexRowkeys();
+        if (indexRowkeys != null) {
+            writeIndexes(rowkey, indexRowkeys);
+        }
+        return rowkey;
+    }
+
+    /**
+     * TODO need think about if multi-PUT is necessary, by checking if 
autoFlush works
+     */
+    public List<byte[]> write(List<InternalLog> logs) throws IOException {
+        final List<Put> puts = new ArrayList<Put>(logs.size());
+        final List<byte[]> result = new ArrayList<byte[]>(logs.size());
+        for (InternalLog log : logs) {
+            final byte[] rowkey = RowkeyBuilder.buildRowkey(log);
+            final Put p = new Put(rowkey);
+            populateColumnValues(p, log);
+            puts.add(p);
+            final List<byte[]> indexRowkeys = log.getIndexRowkeys();
+            if (indexRowkeys != null) {
+                writeIndexes(rowkey, indexRowkeys, puts);
+            }
+            result.add(rowkey);
+        }
+        tbl.put(puts);
+        return result;
+    }
+
+    @Override
+    public void updateByRowkey(byte[] rowkey, InternalLog log) throws 
IOException {
+        Put p = new Put(rowkey);
+        populateColumnValues(p, log);
+        tbl.put(p);
+        final List<byte[]> indexRowkeys = log.getIndexRowkeys();
+        if (indexRowkeys != null) {
+            writeIndexes(rowkey, indexRowkeys);
+        }
+    }
+
+    private void writeIndexes(byte[] rowkey, List<byte[]> indexRowkeys) throws 
IOException {
+        for (byte[] indexRowkey : indexRowkeys) {
+            Put p = new Put(indexRowkey);
+            p.add(columnFamily.getBytes(), rowkey, EMPTY_INDEX_QUALIFER_VALUE);
+            tbl.put(p);
+        }
+    }
+
+    private void writeIndexes(byte[] rowkey, List<byte[]> indexRowkeys, 
List<Put> puts) throws IOException {
+        for (byte[] indexRowkey : indexRowkeys) {
+            Put p = new Put(indexRowkey);
+            p.add(columnFamily.getBytes(), rowkey, EMPTY_INDEX_QUALIFER_VALUE);
+            puts.add(p);
+            // tbl.put(p);
+        }
+    }
+
 }

http://git-wip-us.apache.org/repos/asf/eagle/blob/6e919c2e/eagle-core/eagle-query/eagle-entity-base/src/main/java/org/apache/eagle/log/entity/InternalLog.java
----------------------------------------------------------------------
diff --git 
a/eagle-core/eagle-query/eagle-entity-base/src/main/java/org/apache/eagle/log/entity/InternalLog.java
 
b/eagle-core/eagle-query/eagle-entity-base/src/main/java/org/apache/eagle/log/entity/InternalLog.java
index 8276640..066401f 100755
--- 
a/eagle-core/eagle-query/eagle-entity-base/src/main/java/org/apache/eagle/log/entity/InternalLog.java
+++ 
b/eagle-core/eagle-query/eagle-entity-base/src/main/java/org/apache/eagle/log/entity/InternalLog.java
@@ -25,115 +25,134 @@ import java.util.Map;
  * TODO we should decouple BaseLog during write time and BaseLog during read 
time
  */
 public class InternalLog {
-       private String encodedRowkey;
-       private String prefix;
-       private String[] partitions;
-       private long timestamp;
-       private Map<String, byte[]> qualifierValues;
-
-       private Map<String,Object> extraValues;
-       private Map<String, String> tags;
-       private Map<String, List<String>> searchTags;
-       private List<byte[]> indexRowkeys;
-
-       public String getEncodedRowkey() {
-               return encodedRowkey;
-       }
-
-       public void setEncodedRowkey(String encodedRowkey) {
-               this.encodedRowkey = encodedRowkey;
-       }
-       
-       public Map<String, byte[]> getQualifierValues() {
-               return qualifierValues;
-       }
-       public void setQualifierValues(Map<String, byte[]> qualifierValues) {
-               this.qualifierValues = qualifierValues;
-       }
-
-       public Map<String, List<String>> getSearchTags() {
-               return searchTags;
-       }
-       public void setSearchTags(Map<String, List<String>> searchTags) {
-               this.searchTags = searchTags;
-       }
-       public String getPrefix() {
-               return prefix;
-       }
-       public void setPrefix(String prefix) {
-               this.prefix = prefix;
-       }
-       public String[] getPartitions() {
-               return partitions;
-       }
-       public void setPartitions(String[] partitions) {
-               this.partitions = partitions;
-       }
-       public long getTimestamp() {
-               return timestamp;
-       }
-       public void setTimestamp(long timestamp) {
-               this.timestamp = timestamp;
-       }
-       public Map<String, String> getTags() {
-               return tags;
-       }
-       public void setTags(Map<String, String> tags) {
-               this.tags = tags;
-       }
-       public List<byte[]> getIndexRowkeys() {
-               return indexRowkeys;
-       }
-       public void setIndexRowkeys(List<byte[]> indexRowkeys) {
-               this.indexRowkeys = indexRowkeys;
-       }
-       public Map<String, Object> getExtraValues() { return extraValues; }
-       public void setExtraValues(Map<String, Object> extraValues) { 
this.extraValues = extraValues; }
-
-       public String toString(){
-               StringBuffer sb = new StringBuffer();
-               sb.append(prefix);
-               sb.append("|");
-               
sb.append(DateTimeUtil.millisecondsToHumanDateWithMilliseconds(timestamp));
-               sb.append("(");
-               sb.append(timestamp);
-               sb.append(")");
-               sb.append("|searchTags:");
-               if(searchTags != null){
-                       for(String tagkey : searchTags.keySet()){
-                               sb.append(tagkey);
-                               sb.append('=');
-                               List<String> tagValues = searchTags.get(tagkey);
-                               sb.append("(");
-                               for(String tagValue : tagValues){
-                                       sb.append(tagValue);
-                                       sb.append(",");
-                               }
-                               sb.append(")");
-                               sb.append(",");
-                       }
-               }
-               sb.append("|tags:");
-               if(tags != null){
-                       for(Map.Entry<String, String> entry : tags.entrySet()){
-                               sb.append(entry.getKey());
-                               sb.append("=");
-                               sb.append(entry.getValue());
-                               sb.append(",");
-                       }
-               }
-               sb.append("|columns:");
-               if(qualifierValues != null){
-                       for(String qualifier : qualifierValues.keySet()){
-                               byte[] value = qualifierValues.get(qualifier);
-                               sb.append(qualifier);
-                               sb.append("=");
-                               if(value != null){
-                                       sb.append(new String(value));
-                               }
-                               sb.append(",");
-                       }
-               }
-               return sb.toString();
-       }
+    private String encodedRowkey;
+    private String prefix;
+    private String[] partitions;
+    private long timestamp;
+    private Map<String, byte[]> qualifierValues;
+
+    private Map<String, Object> extraValues;
+    private Map<String, String> tags;
+    private Map<String, List<String>> searchTags;
+    private List<byte[]> indexRowkeys;
+
+    public String getEncodedRowkey() {
+        return encodedRowkey;
+    }
+
+    public void setEncodedRowkey(String encodedRowkey) {
+        this.encodedRowkey = encodedRowkey;
+    }
+
+    public Map<String, byte[]> getQualifierValues() {
+        return qualifierValues;
+    }
+
+    public void setQualifierValues(Map<String, byte[]> qualifierValues) {
+        this.qualifierValues = qualifierValues;
+    }
+
+    public Map<String, List<String>> getSearchTags() {
+        return searchTags;
+    }
+
+    public void setSearchTags(Map<String, List<String>> searchTags) {
+        this.searchTags = searchTags;
+    }
+
+    public String getPrefix() {
+        return prefix;
+    }
+
+    public void setPrefix(String prefix) {
+        this.prefix = prefix;
+    }
+
+    public String[] getPartitions() {
+        return partitions;
+    }
+
+    public void setPartitions(String[] partitions) {
+        this.partitions = partitions;
+    }
+
+    public long getTimestamp() {
+        return timestamp;
+    }
+
+    public void setTimestamp(long timestamp) {
+        this.timestamp = timestamp;
+    }
+
+    public Map<String, String> getTags() {
+        return tags;
+    }
+
+    public void setTags(Map<String, String> tags) {
+        this.tags = tags;
+    }
+
+    public List<byte[]> getIndexRowkeys() {
+        return indexRowkeys;
+    }
+
+    public void setIndexRowkeys(List<byte[]> indexRowkeys) {
+        this.indexRowkeys = indexRowkeys;
+    }
+
+    public Map<String, Object> getExtraValues() {
+        return extraValues;
+    }
+
+    public void setExtraValues(Map<String, Object> extraValues) {
+        this.extraValues = extraValues;
+    }
+
+    @Override
+    public String toString() {
+        StringBuffer sb = new StringBuffer();
+        sb.append(prefix);
+        sb.append("|");
+        
sb.append(DateTimeUtil.millisecondsToHumanDateWithMilliseconds(timestamp));
+        sb.append("(");
+        sb.append(timestamp);
+        sb.append(")");
+        sb.append("|searchTags:");
+        if (searchTags != null) {
+            for (String tagkey : searchTags.keySet()) {
+                sb.append(tagkey);
+                sb.append('=');
+                List<String> tagValues = searchTags.get(tagkey);
+                sb.append("(");
+                for (String tagValue : tagValues) {
+                    sb.append(tagValue);
+                    sb.append(",");
+                }
+                sb.append(")");
+                sb.append(",");
+            }
+        }
+        sb.append("|tags:");
+        if (tags != null) {
+            for (Map.Entry<String, String> entry : tags.entrySet()) {
+                sb.append(entry.getKey());
+                sb.append("=");
+                sb.append(entry.getValue());
+                sb.append(",");
+            }
+        }
+        sb.append("|columns:");
+        if (qualifierValues != null) {
+            for (String qualifier : qualifierValues.keySet()) {
+                byte[] value = qualifierValues.get(qualifier);
+                sb.append(qualifier);
+                sb.append("=");
+                if (value != null) {
+                    sb.append(new String(value));
+                }
+                sb.append(",");
+            }
+        }
+        return sb.toString();
+    }
 }

Reply via email to