http://git-wip-us.apache.org/repos/asf/eagle/blob/6e919c2e/eagle-core/eagle-query/eagle-entity-base/src/main/java/org/apache/eagle/log/entity/meta/EntityDefinitionManager.java
----------------------------------------------------------------------
diff --git 
a/eagle-core/eagle-query/eagle-entity-base/src/main/java/org/apache/eagle/log/entity/meta/EntityDefinitionManager.java
 
b/eagle-core/eagle-query/eagle-entity-base/src/main/java/org/apache/eagle/log/entity/meta/EntityDefinitionManager.java
index 8795ba0..a4557f2 100755
--- 
a/eagle-core/eagle-query/eagle-entity-base/src/main/java/org/apache/eagle/log/entity/meta/EntityDefinitionManager.java
+++ 
b/eagle-core/eagle-query/eagle-entity-base/src/main/java/org/apache/eagle/log/entity/meta/EntityDefinitionManager.java
@@ -35,440 +35,483 @@ import java.util.concurrent.ConcurrentHashMap;
  * static initialization of all registered entities. As of now, dynamic 
registration is not supported
  */
 public class EntityDefinitionManager {
-       private static final Logger LOG = 
LoggerFactory.getLogger(EntityDefinitionManager.class);
-       private static volatile boolean initialized = false;
-       /**
-        * using concurrent hashmap is due to the fact that entity can be 
registered any time from any thread
-        */
-       private static Map<String, EntityDefinition> entityServiceMap = new 
ConcurrentHashMap<String, EntityDefinition>();
-       private static Map<Class<? extends TaggedLogAPIEntity>, 
EntityDefinition> classMap = new ConcurrentHashMap<Class<? extends 
TaggedLogAPIEntity>, EntityDefinition>();
-       private static Map<Class<?>, EntitySerDeser<?>> _serDeserMap = new 
ConcurrentHashMap<Class<?>, EntitySerDeser<?>>(); 
-       private static Map<Class<?>, Integer> _serDeserClassIDMap = new 
ConcurrentHashMap<Class<?>, Integer>(); 
-       private static Map<Integer, Class<?>> _serIDDeserClassMap = new 
ConcurrentHashMap<Integer, Class<?>>(); 
-       private static Map<String, Map<Integer, EntityDefinition>> 
entityPrefixMap = new ConcurrentHashMap<String, Map<Integer, 
EntityDefinition>>();
-       private static Map<String, Map<Integer, IndexDefinition>> 
indexPrefixMap = new ConcurrentHashMap<String, Map<Integer, IndexDefinition>>();
-
-       static{
-               int id = 0;
-               _serDeserMap.put(NullObject.class, new NullSerDeser());
-               _serIDDeserClassMap.put(id, NullObject.class);
-               _serDeserClassIDMap.put(NullObject.class, id++);
-               
-               _serDeserMap.put(String.class, new StringSerDeser());
-               _serIDDeserClassMap.put(id, String.class);
-               _serDeserClassIDMap.put(String.class, id++);
-               
-               _serDeserMap.put(long.class, new LongSerDeser());
-               _serIDDeserClassMap.put(id, long.class);
-               _serDeserClassIDMap.put(long.class, id++);
-               
-               _serDeserMap.put(Long.class, new LongSerDeser());
-               _serIDDeserClassMap.put(id, Long.class);
-               _serDeserClassIDMap.put(Long.class, id++);
-               
-               _serDeserMap.put(int.class, new IntSerDeser());
-               _serIDDeserClassMap.put(id, int.class);
-               _serDeserClassIDMap.put(int.class, id++);
-               
-               _serDeserMap.put(Integer.class, new IntSerDeser());
-               _serIDDeserClassMap.put(id, Integer.class);
-               _serDeserClassIDMap.put(Integer.class, id++);
-               
-               _serDeserMap.put(Double.class, new DoubleSerDeser());
-               _serIDDeserClassMap.put(id, Double.class);
-               _serDeserClassIDMap.put(Double.class, id++);
-               
-               _serDeserMap.put(double.class, new DoubleSerDeser());
-               _serIDDeserClassMap.put(id, double.class);
-               _serDeserClassIDMap.put(double.class, id++);
-               
-               _serDeserMap.put(int[].class, new IntArraySerDeser());
-               _serIDDeserClassMap.put(id, int[].class);
-               _serDeserClassIDMap.put(int[].class, id++);
-               
-               _serDeserMap.put(double[].class, new DoubleArraySerDeser());
-               _serIDDeserClassMap.put(id, double[].class);
-               _serDeserClassIDMap.put(double[].class, id++);
-
-               _serDeserMap.put(double[][].class, new Double2DArraySerDeser());
-               _serIDDeserClassMap.put(id, double[][].class);
-               _serDeserClassIDMap.put(double[][].class, id++);
-               
-               _serDeserMap.put(Boolean.class, new BooleanSerDeser());
-               _serIDDeserClassMap.put(id, Boolean.class);
-               _serDeserClassIDMap.put(Boolean.class, id++);
-               
-               _serDeserMap.put(boolean.class, new BooleanSerDeser());
-               _serIDDeserClassMap.put(id, boolean.class);
-               _serDeserClassIDMap.put(boolean.class, id++);
-               
-               _serDeserMap.put(String[].class, new StringArraySerDeser());
-               _serIDDeserClassMap.put(id, String[].class);
-               _serDeserClassIDMap.put(String[].class, id++);
-               
-               _serDeserMap.put(Map.class, new MapSerDeser());
-               _serIDDeserClassMap.put(id, Map.class);
-               _serDeserClassIDMap.put(Map.class, id++);
-               
-               _serDeserMap.put(List.class, new ListSerDeser());
-               _serIDDeserClassMap.put(id, List.class);
-               _serDeserClassIDMap.put(List.class, id++);
-       }
-       
-       
-
-       @SuppressWarnings("rawtypes")
-       public static EntitySerDeser getSerDeser(Class<?> clazz){
-               return _serDeserMap.get(clazz);
-       }
-
-       /**
-        * Get internal ID by the predefined registered class
-        * @param clazz original for serialization/deserialization 
-        * @return the internal id if the input class has been registered, 
otherwise return -1
-        */
-       public static int getIDBySerDerClass(Class<?> clazz) {
-               final Integer id = _serDeserClassIDMap.get(clazz);
-               if (id == null) {
-                       return -1;
-               }
-               return id;
-       }
-       
-
-       /**
-        * Get the predefined registered class by internal ID
-        * @param id the internal class ID
-        * @return the predefined registered class, if the class hasn't been 
registered, return null
-        */
-       public static Class<?> getClassByID(int id) {
-               return _serIDDeserClassMap.get(id);
-       }
-       
-       /**
-        * it is allowed that user can register their own entity
-        * @param clazz entity class
-        * @throws IllegalArgumentException
-        */
-       public static void registerEntity(Class<? extends TaggedLogAPIEntity> 
clazz) throws IllegalArgumentException{
-               registerEntity(createEntityDefinition(clazz));
-       }
-       
-       /**
-        * it is allowed that user can register their own entity
-        * @deprecated This API is deprecated since we need to use Service 
annotation to define service name for entities
-        * @param serviceName entity service name
-        * @param clazz entity class
-        * @throws IllegalArgumentException
-        * 
-        */
+    private static final Logger LOG = 
LoggerFactory.getLogger(EntityDefinitionManager.class);
+    private static volatile boolean initialized = false;
+    /**
+     * using concurrent hashmap is due to the fact that entity can be 
registered any time from any thread
+     */
+    private static Map<String, EntityDefinition> entityServiceMap = new 
ConcurrentHashMap<String, EntityDefinition>();
+    private static Map<Class<? extends TaggedLogAPIEntity>, EntityDefinition> 
classMap = new ConcurrentHashMap<Class<? extends TaggedLogAPIEntity>, 
EntityDefinition>();
+    private static Map<Class<?>, EntitySerDeser<?>> _serDeserMap = new 
ConcurrentHashMap<Class<?>, EntitySerDeser<?>>();
+    private static Map<Class<?>, Integer> _serDeserClassIDMap = new 
ConcurrentHashMap<Class<?>, Integer>();
+    private static Map<Integer, Class<?>> _serIDDeserClassMap = new 
ConcurrentHashMap<Integer, Class<?>>();
+    private static Map<String, Map<Integer, EntityDefinition>> entityPrefixMap 
= new ConcurrentHashMap<String, Map<Integer, EntityDefinition>>();
+    private static Map<String, Map<Integer, IndexDefinition>> indexPrefixMap = 
new ConcurrentHashMap<String, Map<Integer, IndexDefinition>>();
+
+    static {
+        int id = 0;
+        _serDeserMap.put(NullObject.class, new NullSerDeser());
+        _serIDDeserClassMap.put(id, NullObject.class);
+        _serDeserClassIDMap.put(NullObject.class, id++);
+
+        _serDeserMap.put(String.class, new StringSerDeser());
+        _serIDDeserClassMap.put(id, String.class);
+        _serDeserClassIDMap.put(String.class, id++);
+
+        _serDeserMap.put(long.class, new LongSerDeser());
+        _serIDDeserClassMap.put(id, long.class);
+        _serDeserClassIDMap.put(long.class, id++);
+
+        _serDeserMap.put(Long.class, new LongSerDeser());
+        _serIDDeserClassMap.put(id, Long.class);
+        _serDeserClassIDMap.put(Long.class, id++);
+
+        _serDeserMap.put(int.class, new IntSerDeser());
+        _serIDDeserClassMap.put(id, int.class);
+        _serDeserClassIDMap.put(int.class, id++);
+
+        _serDeserMap.put(Integer.class, new IntSerDeser());
+        _serIDDeserClassMap.put(id, Integer.class);
+        _serDeserClassIDMap.put(Integer.class, id++);
+
+        _serDeserMap.put(Double.class, new DoubleSerDeser());
+        _serIDDeserClassMap.put(id, Double.class);
+        _serDeserClassIDMap.put(Double.class, id++);
+
+        _serDeserMap.put(double.class, new DoubleSerDeser());
+        _serIDDeserClassMap.put(id, double.class);
+        _serDeserClassIDMap.put(double.class, id++);
+
+        _serDeserMap.put(int[].class, new IntArraySerDeser());
+        _serIDDeserClassMap.put(id, int[].class);
+        _serDeserClassIDMap.put(int[].class, id++);
+
+        _serDeserMap.put(double[].class, new DoubleArraySerDeser());
+        _serIDDeserClassMap.put(id, double[].class);
+        _serDeserClassIDMap.put(double[].class, id++);
+
+        _serDeserMap.put(double[][].class, new Double2DArraySerDeser());
+        _serIDDeserClassMap.put(id, double[][].class);
+        _serDeserClassIDMap.put(double[][].class, id++);
+
+        _serDeserMap.put(Boolean.class, new BooleanSerDeser());
+        _serIDDeserClassMap.put(id, Boolean.class);
+        _serDeserClassIDMap.put(Boolean.class, id++);
+
+        _serDeserMap.put(boolean.class, new BooleanSerDeser());
+        _serIDDeserClassMap.put(id, boolean.class);
+        _serDeserClassIDMap.put(boolean.class, id++);
+
+        _serDeserMap.put(String[].class, new StringArraySerDeser());
+        _serIDDeserClassMap.put(id, String[].class);
+        _serDeserClassIDMap.put(String[].class, id++);
+
+        _serDeserMap.put(Map.class, new MapSerDeser());
+        _serIDDeserClassMap.put(id, Map.class);
+        _serDeserClassIDMap.put(Map.class, id++);
+
+        _serDeserMap.put(List.class, new ListSerDeser());
+        _serIDDeserClassMap.put(id, List.class);
+        _serDeserClassIDMap.put(List.class, id++);
+    }
+
+    @SuppressWarnings("rawtypes")
+    public static EntitySerDeser getSerDeser(Class<?> clazz) {
+        return _serDeserMap.get(clazz);
+    }
+
+    /**
+     * Get internal ID by the predefined registered class
+     * 
+     * @param clazz original for serialization/deserialization
+     * @return the internal id if the input class has been registered, 
otherwise return -1
+     */
+    public static int getIDBySerDerClass(Class<?> clazz) {
+        final Integer id = _serDeserClassIDMap.get(clazz);
+        if (id == null) {
+            return -1;
+        }
+        return id;
+    }
+
+    /**
+     * Get the predefined registered class by internal ID
+     * 
+     * @param id the internal class ID
+     * @return the predefined registered class, if the class hasn't been 
registered, return null
+     */
+    public static Class<?> getClassByID(int id) {
+        return _serIDDeserClassMap.get(id);
+    }
+
+    /**
+     * it is allowed that user can register their own entity
+     * 
+     * @param clazz entity class
+     * @throws IllegalArgumentException
+     */
+    public static void registerEntity(Class<? extends TaggedLogAPIEntity> 
clazz)
+        throws IllegalArgumentException {
+        registerEntity(createEntityDefinition(clazz));
+    }
+
+    /**
+     * it is allowed that user can register their own entity
+     * 
+     * @deprecated This API is deprecated since we need to use Service 
annotation to define service name for
+     *             entities
+     * @param serviceName entity service name
+     * @param clazz entity class
+     * @throws IllegalArgumentException
+     */
     @Deprecated
-       public static void registerEntity(String serviceName, Class<? extends 
TaggedLogAPIEntity> clazz) throws IllegalArgumentException{
-               registerEntity(serviceName, createEntityDefinition(clazz));
-       }
-       
-       /**
-        * it is allowed that user can register their own entity definition
-        * @param entityDef entity definition
-        * @throws IllegalArgumentException
-        */
-       public static void registerEntity(EntityDefinition entityDef) {
-               registerEntity(entityDef.getService(), entityDef);
-       }
-       
-       /**
-        * it is allowed that user can register their own entity definition
-        * @deprecated This API is deprecated since we need to use Service 
annotation to define service name for entities. 
-        * 
-        * @param entityDef entity definition
-        * @throws IllegalArgumentException
-        */
-       public static void registerEntity(String serviceName, EntityDefinition 
entityDef) {
-               final String table = entityDef.getTable();
-               if (entityServiceMap.containsKey(serviceName)) {
-                       final EntityDefinition existing = 
entityServiceMap.get(serviceName);
-                       if (entityDef.getClass().equals(existing.getClass())) {
-                               return;
-                       }
-                       throw new IllegalArgumentException("Service " + 
serviceName + " has already been registered by " + 
existing.getClass().getName() + ", so class " + entityDef.getClass() + " can 
NOT be registered");
-               }
-               synchronized (EntityDefinitionManager.class) {
-                       checkPrefix(entityDef);
-                       entityServiceMap.put(serviceName, entityDef);
-                       Map<Integer, EntityDefinition> entityHashMap = 
entityPrefixMap.get(table);
-                       if (entityHashMap == null) {
-                               entityHashMap = new ConcurrentHashMap<Integer, 
EntityDefinition>();
-                               entityPrefixMap.put(table, entityHashMap);
-                       }
-                       entityHashMap.put(entityDef.getPrefix().hashCode(), 
entityDef);
-                       final IndexDefinition[] indexes = 
entityDef.getIndexes();
-                       if (indexes != null) {
-                               for (IndexDefinition index : indexes) {
-                                       Map<Integer, IndexDefinition> 
indexHashMap = indexPrefixMap.get(table);
-                                       if (indexHashMap == null) {
-                                               indexHashMap = new 
ConcurrentHashMap<Integer, IndexDefinition>();
-                                               indexPrefixMap.put(table, 
indexHashMap);
-                                       }
-                                       
indexHashMap.put(index.getIndexPrefix().hashCode(), index);
-                               }
-                       }
-                       classMap.put(entityDef.getEntityClass(), entityDef);
-               }
-        if(LOG.isDebugEnabled()) {
-            LOG.debug(entityDef.getEntityClass().getSimpleName() + " entity 
registered successfully, table name: " + entityDef.getTable() +
-                                       ", prefix: " + entityDef.getPrefix() + 
", service: " + serviceName + ", CF: " + entityDef.getColumnFamily());
-        }else{
-            LOG.info(String.format("Registered %s (%s)", 
entityDef.getEntityClass().getSimpleName(), serviceName));
+    public static void registerEntity(String serviceName, Class<? extends 
TaggedLogAPIEntity> clazz)
+        throws IllegalArgumentException {
+        registerEntity(serviceName, createEntityDefinition(clazz));
+    }
+
+    /**
+     * it is allowed that user can register their own entity definition
+     * 
+     * @param entityDef entity definition
+     * @throws IllegalArgumentException
+     */
+    public static void registerEntity(EntityDefinition entityDef) {
+        registerEntity(entityDef.getService(), entityDef);
+    }
+
+    /**
+     * it is allowed that user can register their own entity definition
+     * 
+     * @deprecated This API is deprecated since we need to use Service 
annotation to define service name for
+     *             entities.
+     * @param entityDef entity definition
+     * @throws IllegalArgumentException
+     */
+    @Deprecated
+    public static void registerEntity(String serviceName, EntityDefinition 
entityDef) {
+        final String table = entityDef.getTable();
+        if (entityServiceMap.containsKey(serviceName)) {
+            final EntityDefinition existing = 
entityServiceMap.get(serviceName);
+            if (entityDef.getClass().equals(existing.getClass())) {
+                return;
+            }
+            throw new IllegalArgumentException("Service " + serviceName + " 
has already been registered by "
+                                               + existing.getClass().getName() 
+ ", so class "
+                                               + entityDef.getClass() + " can 
NOT be registered");
+        }
+        synchronized (EntityDefinitionManager.class) {
+            checkPrefix(entityDef);
+            entityServiceMap.put(serviceName, entityDef);
+            Map<Integer, EntityDefinition> entityHashMap = 
entityPrefixMap.get(table);
+            if (entityHashMap == null) {
+                entityHashMap = new ConcurrentHashMap<Integer, 
EntityDefinition>();
+                entityPrefixMap.put(table, entityHashMap);
+            }
+            entityHashMap.put(entityDef.getPrefix().hashCode(), entityDef);
+            final IndexDefinition[] indexes = entityDef.getIndexes();
+            if (indexes != null) {
+                for (IndexDefinition index : indexes) {
+                    Map<Integer, IndexDefinition> indexHashMap = 
indexPrefixMap.get(table);
+                    if (indexHashMap == null) {
+                        indexHashMap = new ConcurrentHashMap<Integer, 
IndexDefinition>();
+                        indexPrefixMap.put(table, indexHashMap);
+                    }
+                    indexHashMap.put(index.getIndexPrefix().hashCode(), index);
+                }
+            }
+            classMap.put(entityDef.getEntityClass(), entityDef);
+        }
+        if (LOG.isDebugEnabled()) {
+            LOG.debug(entityDef.getEntityClass().getSimpleName()
+                      + " entity registered successfully, table name: " + 
entityDef.getTable() + ", prefix: "
+                      + entityDef.getPrefix() + ", service: " + serviceName + 
", CF: "
+                      + entityDef.getColumnFamily());
+        } else {
+            LOG.info(String.format("Registered %s (%s)", 
entityDef.getEntityClass().getSimpleName(),
+                                   serviceName));
         }
-       }
-
-       private static void checkPrefix(EntityDefinition entityDef) {
-               final Integer entityPrefixHashcode = 
entityDef.getPrefix().hashCode();
-               if (entityPrefixMap.containsKey(entityDef.getTable())) {
-                       final Map<Integer, EntityDefinition> entityHashMap = 
entityPrefixMap.get(entityDef.getTable());
-                       if (entityHashMap.containsKey(entityPrefixHashcode) && 
(!entityDef.equals(entityHashMap.get(entityPrefixHashcode)))) {
-                               throw new IllegalArgumentException("Failed to 
register entity " + entityDef.getClass().getName() + ", because of the prefix 
hash code conflict! The entity prefix " + entityDef.getPrefix() + " has already 
been registered by entity service " + 
entityHashMap.get(entityPrefixHashcode).getService());
-                       }
-                       final IndexDefinition[] indexes = 
entityDef.getIndexes();
-                       if (indexes != null) {
-                               for (IndexDefinition index : indexes) {
-                                       final Integer indexPrefixHashcode = 
index.getIndexPrefix().hashCode();
-                                       if 
(entityHashMap.containsKey(indexPrefixHashcode)) {
-                                               throw new 
IllegalArgumentException("Failed to register entity " + 
entityDef.getClass().getName() + ", because of the prefix hash code conflict! 
The index prefix " + index.getIndexPrefix() + " has already been registered by 
entity " + entityHashMap.get(indexPrefixHashcode).getService());
-                                       }
-                                       final Map<Integer, IndexDefinition> 
indexHashMap = indexPrefixMap.get(entityDef.getTable());
-                                       if (indexHashMap != null && 
indexHashMap.containsKey(indexPrefixHashcode) && 
(!index.equals(indexHashMap.get(indexPrefixHashcode)))) {
-                                               throw new 
IllegalArgumentException("Failed to register entity " + 
entityDef.getClass().getName() + ", because of the prefix hash code conflict! 
The index prefix " + index.getIndexPrefix() + " has already been registered by 
entity " + 
indexHashMap.get(indexPrefixHashcode).getEntityDefinition().getService());
-                                       }
-                               }
-                       }
-               }
-       }
-       
-       /**
-        * Get entity definition by name
-        * @param serviceName
-        * @return
-        * @throws IllegalAccessException 
-        * @throws InstantiationException 
-        */
-       public static EntityDefinition getEntityByServiceName(String 
serviceName) throws InstantiationException, IllegalAccessException{
-               checkInit();
-               return entityServiceMap.get(serviceName);
-       }
-       
-       public static EntityDefinition getEntityDefinitionByEntityClass(Class<? 
extends TaggedLogAPIEntity> clazz) throws InstantiationException, 
IllegalAccessException {
-               checkInit();
-               return classMap.get(clazz);
-       }
-
-       private static void checkInit() throws InstantiationException, 
IllegalAccessException {
-               if (!initialized) {
-                       synchronized (EntityDefinitionManager.class) {
-                               if (!initialized) {
-                                       EntityRepositoryScanner.scan();
-                                       initialized = true;
-                               }
-                       }
-               }
-       }
-
-       public static void load() throws IllegalAccessException, 
InstantiationException {
-               checkInit();
-       }
-
-       /**
-        * UserPrincipal can register their own field SerDeser
-        * @param clazz class of the the SerDeser 
-        * @param entitySerDeser entity or field SerDeser
-        * @throws IllegalArgumentException
-        */
-       public static void registerSerDeser(Class<?> clazz, EntitySerDeser<?> 
entitySerDeser) {
-               _serDeserMap.put(clazz, entitySerDeser);
-       }
-
-       /**
-        * Check whether the entity class is time series, false by default
-        * @param clazz
-        * @return
-        */
-       public static boolean isTimeSeries(Class<? extends TaggedLogAPIEntity> 
clazz){
-               TimeSeries ts = clazz.getAnnotation(TimeSeries.class);
-               return ts != null && ts.value();
-       }
-
-       @SuppressWarnings("unchecked")
-       public static EntityDefinition createEntityDefinition(Class<? extends 
TaggedLogAPIEntity> cls) {
-               
-               final EntityDefinition ed = new EntityDefinition();
-
-               ed.setEntityClass(cls);
-               // parse cls' annotations
-               Table table = cls.getAnnotation(Table.class);
-               if(table == null || table.value().isEmpty()){
-                       throw new IllegalArgumentException("Entity class must 
have a non-empty table name annotated with @Table");
-               }
-               String tableName = table.value();
-               
if(EagleConfigFactory.load().isTableNamePrefixedWithEnvironment()){
-                       tableName = EagleConfigFactory.load().getEnv() + "_" + 
tableName;
-               }
-               ed.setTable(tableName);
-               
-               ColumnFamily family = cls.getAnnotation(ColumnFamily.class);
-               if(family == null || family.value().isEmpty()){
-                       throw new IllegalArgumentException("Entity class must 
have a non-empty column family name annotated with @ColumnFamily");
-               }
-               ed.setColumnFamily(family.value());
-               
-               Prefix prefix = cls.getAnnotation(Prefix.class);
-               if(prefix == null || prefix.value().isEmpty()){
-                       throw new IllegalArgumentException("Entity class must 
have a non-empty prefix name annotated with @Prefix");
-               }
-               ed.setPrefix(prefix.value());
-               
-               TimeSeries ts = cls.getAnnotation(TimeSeries.class);
-               if(ts == null){
-                       throw new IllegalArgumentException("Entity class must 
have a non-empty timeseries name annotated with @TimeSeries");
-               }
-               ed.setTimeSeries(ts.value());
-               
-               Service service = cls.getAnnotation(Service.class);
-               if(service == null || service.value().isEmpty()){
-                       ed.setService(cls.getSimpleName());
-               } else {
-                       ed.setService(service.value());
-               }
-
-               Metric m = cls.getAnnotation(Metric.class);
-               Map<String, Class<?>> dynamicFieldTypes = new HashMap<String, 
Class<?>>();
-               if(m != null){
-                       // metric has to be timeseries
-                       if(!ts.value()){
-                               throw new IllegalArgumentException("Metric 
entity must be time series as well");
-                       }
-                       MetricDefinition md = new MetricDefinition();
-                       md.setInterval(m.interval());
-                       ed.setMetricDefinition(md);
-               }
-
-               java.lang.reflect.Field[] fields = cls.getDeclaredFields();
-               for(java.lang.reflect.Field f : fields){
-                       Column column = f.getAnnotation(Column.class); 
-                       if(column == null || column.value().isEmpty()){
-                               continue;
-                       }
-                       Class<?> fldCls = f.getType();
-                       // intrusive check field type for metric entity
-                       checkFieldTypeForMetric(ed.getMetricDefinition(), 
f.getName(), fldCls, dynamicFieldTypes);
-                       Qualifier q = new Qualifier();
-                       q.setDisplayName(f.getName());
-                       q.setQualifierName(column.value());
-                       EntitySerDeser<?> serDeser = _serDeserMap.get(fldCls); 
-                       if(serDeser == null){
-//                             throw new 
IllegalArgumentException(fldCls.getName() + " in field " + f.getName() +
-//                                             " of entity " + 
cls.getSimpleName() + " has no serializer associated ");
-                               serDeser = DefaultJavaObjctSerDeser.INSTANCE;
-                       }
-
-                       q.setSerDeser((EntitySerDeser<Object>)serDeser);
-                       ed.getQualifierNameMap().put(q.getQualifierName(), q);
-                       ed.getDisplayNameMap().put(q.getDisplayName(), q);
-                       // TODO: should refine rules, consider fields like 
"hCol", getter method should be gethCol() according to 
org.apache.commons.beanutils.PropertyUtils
-                       final String propertyName = 
f.getName().substring(0,1).toUpperCase() + f.getName().substring(1);
-                       String getterName = "get" + propertyName;
-                       try {
-                               Method method = cls.getMethod(getterName);
-                               ed.getQualifierGetterMap().put(f.getName(), 
method);
-                       } catch (Exception e) {
-                               // Check if the type is boolean
-                               getterName = "is" + propertyName;
-                               try {
-                                       Method method = 
cls.getMethod(getterName);
-                                       
ed.getQualifierGetterMap().put(f.getName(), method);
-                               } catch (Exception e1) {
-                                       throw new 
IllegalArgumentException("Field " + f.getName() + " hasn't defined valid getter 
method: " + getterName, e);
-                               }
-                       }
-                       if(LOG.isDebugEnabled()) LOG.debug("Field registered " 
+ q);
-               }
-
-               // TODO: Lazy create because not used at all
-               // dynamically create bean class
-               if(ed.getMetricDefinition() != null){
-                       Class<?> metricCls = 
createDynamicClassForMetric(cls.getName()+"_SingleTimestamp", 
dynamicFieldTypes);
-                       
ed.getMetricDefinition().setSingleTimestampEntityClass(metricCls);
-               }
-               
-               final Partition partition = cls.getAnnotation(Partition.class);
-               if (partition != null) {
-                       final String[] partitions = partition.value();
-                       ed.setPartitions(partitions);
-                       // Check if partition fields are all tag fields. 
Partition field can't be column field, must be tag field.
-                       for (String part : partitions) {
-                               if (!ed.isTag(part)) {
-                                       throw new 
IllegalArgumentException("Partition field can't be column field, must be tag 
field. "
-                                                       + "Partition name: " + 
part);
-                               }
-                       }
-               }
-               
-               final Indexes indexes = cls.getAnnotation(Indexes.class);
-               if (indexes != null) {
-                       final Index[] inds = indexes.value();
-                       final IndexDefinition[] indexDefinitions = new 
IndexDefinition[inds.length];
-                       for (int i = 0; i < inds.length; ++i) {
-                               final Index ind = inds[i];
-                               indexDefinitions[i] = new IndexDefinition(ed, 
ind);
-                       }
-                       ed.setIndexes(indexDefinitions);
-               }
-               
-               final ServicePath path = cls.getAnnotation(ServicePath.class);
-               if (path != null) {
-                       if (path.path() != null && (!path.path().isEmpty())) {
-                               ed.setServiceCreationPath(path.path());
-                       }
-               }
-
-               final Tags tags = cls.getAnnotation(Tags.class);
-               if(tags != null) {
-                       String[] tagNames = tags.value();
-                       ed.setTags(tagNames);
-               }
-
-               return ed;
-       }
-       
-       private static void checkFieldTypeForMetric(MetricDefinition md, String 
fieldName, Object fldCls, Map<String, Class<?>> dynamicFieldTypes){
-               if(md != null){
-                       if(fldCls.equals(int[].class)){
-                               dynamicFieldTypes.put(fieldName, int.class);
-                               return;
-                       }else if(fldCls.equals(long[].class)){
-                               dynamicFieldTypes.put(fieldName, long.class);
-                               return;
-                       }else if(fldCls.equals(double[].class)){
-                               dynamicFieldTypes.put(fieldName, double.class);
-                               return;
-                       }
-                       throw new IllegalArgumentException("Fields for metric 
entity must be one of int[], long[] or double[]");
-               }
-       }
-       
-       private static Class<?> createDynamicClassForMetric(final String 
className, Map<String, Class<?>> dynamicFieldTypes){
-               BeanGenerator beanGenerator = new BeanGenerator();
-               beanGenerator.setNamingPolicy(new NamingPolicy(){
-               @Override 
-               public String getClassName(String prefix,String source, Object 
key, Predicate names){
-                   return className;
-               }});
-           BeanGenerator.addProperties(beanGenerator, dynamicFieldTypes);
-           beanGenerator.setSuperclass(TaggedLogAPIEntity.class);
-           return (Class<?>) beanGenerator.createClass();
-       }
-       
-       public static Map<String, EntityDefinition> entities() throws Exception{
-               checkInit();
-               return entityServiceMap;
-       }
+    }
+
+    private static void checkPrefix(EntityDefinition entityDef) {
+        final Integer entityPrefixHashcode = entityDef.getPrefix().hashCode();
+        if (entityPrefixMap.containsKey(entityDef.getTable())) {
+            final Map<Integer, EntityDefinition> entityHashMap = 
entityPrefixMap.get(entityDef.getTable());
+            if (entityHashMap.containsKey(entityPrefixHashcode)
+                && 
(!entityDef.equals(entityHashMap.get(entityPrefixHashcode)))) {
+                throw new IllegalArgumentException("Failed to register entity 
" + entityDef.getClass()
+                    .getName() + ", because of the prefix hash code conflict! 
The entity prefix "
+                                                   + entityDef.getPrefix()
+                                                   + " has already been 
registered by entity service "
+                                                   + 
entityHashMap.get(entityPrefixHashcode).getService());
+            }
+            final IndexDefinition[] indexes = entityDef.getIndexes();
+            if (indexes != null) {
+                for (IndexDefinition index : indexes) {
+                    final Integer indexPrefixHashcode = 
index.getIndexPrefix().hashCode();
+                    if (entityHashMap.containsKey(indexPrefixHashcode)) {
+                        throw new IllegalArgumentException("Failed to register 
entity " + entityDef.getClass()
+                            .getName() + ", because of the prefix hash code 
conflict! The index prefix "
+                                                           + 
index.getIndexPrefix()
+                                                           + " has already 
been registered by entity "
+                                                           + 
entityHashMap.get(indexPrefixHashcode)
+                                                               .getService());
+                    }
+                    final Map<Integer, IndexDefinition> indexHashMap = 
indexPrefixMap
+                        .get(entityDef.getTable());
+                    if (indexHashMap != null && 
indexHashMap.containsKey(indexPrefixHashcode)
+                        && 
(!index.equals(indexHashMap.get(indexPrefixHashcode)))) {
+                        throw new IllegalArgumentException("Failed to register 
entity " + entityDef.getClass()
+                            .getName() + ", because of the prefix hash code 
conflict! The index prefix "
+                                                           + 
index.getIndexPrefix()
+                                                           + " has already 
been registered by entity "
+                                                           + 
indexHashMap.get(indexPrefixHashcode)
+                                                               
.getEntityDefinition().getService());
+                    }
+                }
+            }
+        }
+    }
+
+    /**
+     * Get entity definition by name
+     * 
+     * @param serviceName
+     * @return
+     * @throws IllegalAccessException
+     * @throws InstantiationException
+     */
+    public static EntityDefinition getEntityByServiceName(String serviceName)
+        throws InstantiationException, IllegalAccessException {
+        checkInit();
+        return entityServiceMap.get(serviceName);
+    }
+
+    public static EntityDefinition getEntityDefinitionByEntityClass(Class<? 
extends TaggedLogAPIEntity> clazz)
+        throws InstantiationException, IllegalAccessException {
+        checkInit();
+        return classMap.get(clazz);
+    }
+
+    private static void checkInit() throws InstantiationException, 
IllegalAccessException {
+        if (!initialized) {
+            synchronized (EntityDefinitionManager.class) {
+                if (!initialized) {
+                    EntityRepositoryScanner.scan();
+                    initialized = true;
+                }
+            }
+        }
+    }
+
+    public static void load() throws IllegalAccessException, 
InstantiationException {
+        checkInit();
+    }
+
+    /**
+     * UserPrincipal can register their own field SerDeser
+     * 
+     * @param clazz class of the the SerDeser
+     * @param entitySerDeser entity or field SerDeser
+     * @throws IllegalArgumentException
+     */
+    public static void registerSerDeser(Class<?> clazz, EntitySerDeser<?> 
entitySerDeser) {
+        _serDeserMap.put(clazz, entitySerDeser);
+    }
+
+    /**
+     * Check whether the entity class is time series, false by default
+     * 
+     * @param clazz
+     * @return
+     */
+    public static boolean isTimeSeries(Class<? extends TaggedLogAPIEntity> 
clazz) {
+        TimeSeries ts = clazz.getAnnotation(TimeSeries.class);
+        return ts != null && ts.value();
+    }
+
+    @SuppressWarnings("unchecked")
+    public static EntityDefinition createEntityDefinition(Class<? extends 
TaggedLogAPIEntity> cls) {
+
+        final EntityDefinition ed = new EntityDefinition();
+
+        ed.setEntityClass(cls);
+        // parse cls' annotations
+        Table table = cls.getAnnotation(Table.class);
+        if (table == null || table.value().isEmpty()) {
+            throw new IllegalArgumentException("Entity class must have a 
non-empty table name annotated with @Table");
+        }
+        String tableName = table.value();
+        if (EagleConfigFactory.load().isTableNamePrefixedWithEnvironment()) {
+            tableName = EagleConfigFactory.load().getEnv() + "_" + tableName;
+        }
+        ed.setTable(tableName);
+
+        ColumnFamily family = cls.getAnnotation(ColumnFamily.class);
+        if (family == null || family.value().isEmpty()) {
+            throw new IllegalArgumentException("Entity class must have a 
non-empty column family name annotated with @ColumnFamily");
+        }
+        ed.setColumnFamily(family.value());
+
+        Prefix prefix = cls.getAnnotation(Prefix.class);
+        if (prefix == null || prefix.value().isEmpty()) {
+            throw new IllegalArgumentException("Entity class must have a 
non-empty prefix name annotated with @Prefix");
+        }
+        ed.setPrefix(prefix.value());
+
+        TimeSeries ts = cls.getAnnotation(TimeSeries.class);
+        if (ts == null) {
+            throw new IllegalArgumentException("Entity class must have a 
non-empty timeseries name annotated with @TimeSeries");
+        }
+        ed.setTimeSeries(ts.value());
+
+        Service service = cls.getAnnotation(Service.class);
+        if (service == null || service.value().isEmpty()) {
+            ed.setService(cls.getSimpleName());
+        } else {
+            ed.setService(service.value());
+        }
+
+        Metric m = cls.getAnnotation(Metric.class);
+        Map<String, Class<?>> dynamicFieldTypes = new HashMap<String, 
Class<?>>();
+        if (m != null) {
+            // metric has to be timeseries
+            if (!ts.value()) {
+                throw new IllegalArgumentException("Metric entity must be time 
series as well");
+            }
+            MetricDefinition md = new MetricDefinition();
+            md.setInterval(m.interval());
+            ed.setMetricDefinition(md);
+        }
+
+        java.lang.reflect.Field[] fields = cls.getDeclaredFields();
+        for (java.lang.reflect.Field f : fields) {
+            Column column = f.getAnnotation(Column.class);
+            if (column == null || column.value().isEmpty()) {
+                continue;
+            }
+            Class<?> fldCls = f.getType();
+            // intrusive check field type for metric entity
+            checkFieldTypeForMetric(ed.getMetricDefinition(), f.getName(), 
fldCls, dynamicFieldTypes);
+            Qualifier q = new Qualifier();
+            q.setDisplayName(f.getName());
+            q.setQualifierName(column.value());
+            EntitySerDeser<?> serDeser = _serDeserMap.get(fldCls);
+            if (serDeser == null) {
+                // throw new IllegalArgumentException(fldCls.getName() + " in 
field " + f.getName() +
+                // " of entity " + cls.getSimpleName() + " has no serializer 
associated ");
+                serDeser = DefaultJavaObjctSerDeser.INSTANCE;
+            }
+
+            q.setSerDeser((EntitySerDeser<Object>)serDeser);
+            ed.getQualifierNameMap().put(q.getQualifierName(), q);
+            ed.getDisplayNameMap().put(q.getDisplayName(), q);
+            // TODO: should refine rules, consider fields like "hCol", getter 
method should be gethCol()
+            // according to org.apache.commons.beanutils.PropertyUtils
+            final String propertyName = f.getName().substring(0, 
1).toUpperCase() + f.getName().substring(1);
+            String getterName = "get" + propertyName;
+            try {
+                Method method = cls.getMethod(getterName);
+                ed.getQualifierGetterMap().put(f.getName(), method);
+            } catch (Exception e) {
+                // Check if the type is boolean
+                getterName = "is" + propertyName;
+                try {
+                    Method method = cls.getMethod(getterName);
+                    ed.getQualifierGetterMap().put(f.getName(), method);
+                } catch (Exception e1) {
+                    throw new IllegalArgumentException("Field " + f.getName()
+                                                       + " hasn't defined 
valid getter method: " + getterName,
+                                                       e);
+                }
+            }
+            if (LOG.isDebugEnabled()) {
+                LOG.debug("Field registered " + q);
+            }
+        }
+
+        // TODO: Lazy create because not used at all
+        // dynamically create bean class
+        if (ed.getMetricDefinition() != null) {
+            Class<?> metricCls = createDynamicClassForMetric(cls.getName() + 
"_SingleTimestamp",
+                                                             
dynamicFieldTypes);
+            ed.getMetricDefinition().setSingleTimestampEntityClass(metricCls);
+        }
+
+        final Partition partition = cls.getAnnotation(Partition.class);
+        if (partition != null) {
+            final String[] partitions = partition.value();
+            ed.setPartitions(partitions);
+            // Check if partition fields are all tag fields. Partition field 
can't be column field, must be
+            // tag field.
+            for (String part : partitions) {
+                if (!ed.isTag(part)) {
+                    throw new IllegalArgumentException("Partition field can't 
be column field, must be tag field. "
+                                                       + "Partition name: " + 
part);
+                }
+            }
+        }
+
+        final Indexes indexes = cls.getAnnotation(Indexes.class);
+        if (indexes != null) {
+            final Index[] inds = indexes.value();
+            final IndexDefinition[] indexDefinitions = new 
IndexDefinition[inds.length];
+            for (int i = 0; i < inds.length; ++i) {
+                final Index ind = inds[i];
+                indexDefinitions[i] = new IndexDefinition(ed, ind);
+            }
+            ed.setIndexes(indexDefinitions);
+        }
+
+        final ServicePath path = cls.getAnnotation(ServicePath.class);
+        if (path != null) {
+            if (path.path() != null && (!path.path().isEmpty())) {
+                ed.setServiceCreationPath(path.path());
+            }
+        }
+
+        final Tags tags = cls.getAnnotation(Tags.class);
+        if (tags != null) {
+            String[] tagNames = tags.value();
+            ed.setTags(tagNames);
+        }
+
+        return ed;
+    }
+
+    private static void checkFieldTypeForMetric(MetricDefinition md, String 
fieldName, Object fldCls,
+                                                Map<String, Class<?>> 
dynamicFieldTypes) {
+        if (md != null) {
+            if (fldCls.equals(int[].class)) {
+                dynamicFieldTypes.put(fieldName, int.class);
+                return;
+            } else if (fldCls.equals(long[].class)) {
+                dynamicFieldTypes.put(fieldName, long.class);
+                return;
+            } else if (fldCls.equals(double[].class)) {
+                dynamicFieldTypes.put(fieldName, double.class);
+                return;
+            }
+            throw new IllegalArgumentException("Fields for metric entity must 
be one of int[], long[] or double[]");
+        }
+    }
+
+    private static Class<?> createDynamicClassForMetric(final String className,
+                                                        Map<String, Class<?>> 
dynamicFieldTypes) {
+        BeanGenerator beanGenerator = new BeanGenerator();
+        beanGenerator.setNamingPolicy(new NamingPolicy() {
+            @Override
+            public String getClassName(String prefix, String source, Object 
key, Predicate names) {
+                return className;
+            }
+        });
+        BeanGenerator.addProperties(beanGenerator, dynamicFieldTypes);
+        beanGenerator.setSuperclass(TaggedLogAPIEntity.class);
+        return (Class<?>)beanGenerator.createClass();
+    }
+
+    public static Map<String, EntityDefinition> entities() throws Exception {
+        checkInit();
+        return entityServiceMap;
+    }
 }

http://git-wip-us.apache.org/repos/asf/eagle/blob/6e919c2e/eagle-core/eagle-query/eagle-entity-base/src/main/java/org/apache/eagle/log/entity/meta/EntitySerDeser.java
----------------------------------------------------------------------
diff --git 
a/eagle-core/eagle-query/eagle-entity-base/src/main/java/org/apache/eagle/log/entity/meta/EntitySerDeser.java
 
b/eagle-core/eagle-query/eagle-entity-base/src/main/java/org/apache/eagle/log/entity/meta/EntitySerDeser.java
index 25d55e0..08caeab 100755
--- 
a/eagle-core/eagle-query/eagle-entity-base/src/main/java/org/apache/eagle/log/entity/meta/EntitySerDeser.java
+++ 
b/eagle-core/eagle-query/eagle-entity-base/src/main/java/org/apache/eagle/log/entity/meta/EntitySerDeser.java
@@ -17,7 +17,9 @@
 package org.apache.eagle.log.entity.meta;
 
 public interface EntitySerDeser<T> {
-       public T deserialize(byte[] bytes);
-       public byte[] serialize(T t);
-       public Class<T> type();
+    public T deserialize(byte[] bytes);
+
+    public byte[] serialize(T t);
+
+    public Class<T> type();
 }

http://git-wip-us.apache.org/repos/asf/eagle/blob/6e919c2e/eagle-core/eagle-query/eagle-entity-base/src/main/java/org/apache/eagle/log/entity/meta/EntitySerDeserializer.java
----------------------------------------------------------------------
diff --git 
a/eagle-core/eagle-query/eagle-entity-base/src/main/java/org/apache/eagle/log/entity/meta/EntitySerDeserializer.java
 
b/eagle-core/eagle-query/eagle-entity-base/src/main/java/org/apache/eagle/log/entity/meta/EntitySerDeserializer.java
index a7ec4e4..1e1ca48 100755
--- 
a/eagle-core/eagle-query/eagle-entity-base/src/main/java/org/apache/eagle/log/entity/meta/EntitySerDeserializer.java
+++ 
b/eagle-core/eagle-query/eagle-entity-base/src/main/java/org/apache/eagle/log/entity/meta/EntitySerDeserializer.java
@@ -26,54 +26,54 @@ import java.util.HashMap;
 import java.util.Map;
 
 public class EntitySerDeserializer {
-       private static final Logger LOG = 
LoggerFactory.getLogger(EntitySerDeserializer.class);
-       
-       // TODO throws seperate exceptions
-       @SuppressWarnings("unchecked")
-       public <T> T readValue(Map<String, byte[]> qualifierValues, 
EntityDefinition ed) throws Exception{
-               Class<? extends TaggedLogAPIEntity> clazz = ed.getEntityClass();
-               if(clazz == null){
-                       throw new NullPointerException("Entity class of service 
"+ed.getService()+" is null");
-               }
-               TaggedLogAPIEntity obj = clazz.newInstance();
-               Map<String, Qualifier> map = ed.getQualifierNameMap();
-               for(Map.Entry<String, byte[]> entry : 
qualifierValues.entrySet()){
-                       Qualifier q = map.get(entry.getKey());
-                       if(q == null){
-                               // if it's not pre-defined qualifier, it must 
be tag unless it's a bug
-                               if(obj.getTags() == null){
-                                       obj.setTags(new HashMap<String, 
String>());
-                               }
-                               obj.getTags().put(entry.getKey(), new 
StringSerDeser().deserialize(entry.getValue()));
-                               continue;
-                       }
-                       
-                       // TODO performance loss compared with new operator
-                       // parse different types of qualifiers
-                       String fieldName = q.getDisplayName();
-                       PropertyDescriptor pd = 
PropertyUtils.getPropertyDescriptor(obj, fieldName);
-                       if(entry.getValue() != null){
-                               Object args = 
q.getSerDeser().deserialize(entry.getValue());
-                               pd.getWriteMethod().invoke(obj, args);
-//                             if (logger.isDebugEnabled()) {
-//                                     logger.debug(entry.getKey() + ":" + 
args + " is deserialized");
-//                             }
-                       }
-               }
-               return (T)obj;
-       }
-       
-       public Map<String, byte[]> writeValue(TaggedLogAPIEntity entity, 
EntityDefinition ed) throws Exception{
-               Map<String, byte[]> qualifierValues = new HashMap<String, 
byte[]>();
-               // iterate all modified qualifiers
-               for(String fieldName : entity.modifiedQualifiers()){
-                       PropertyDescriptor pd = 
PropertyUtils.getPropertyDescriptor(entity, fieldName);
-                       Object obj = pd.getReadMethod().invoke(entity);
-                       Qualifier q = ed.getDisplayNameMap().get(fieldName);
-                       EntitySerDeser<Object> ser = q.getSerDeser();
-                       byte[] value = ser.serialize(obj);
-                       qualifierValues.put(q.getQualifierName(), value);
-               }
-               return qualifierValues;
-       }
+    private static final Logger LOG = 
LoggerFactory.getLogger(EntitySerDeserializer.class);
+
+    // TODO throws seperate exceptions
+    @SuppressWarnings("unchecked")
+    public <T> T readValue(Map<String, byte[]> qualifierValues, 
EntityDefinition ed) throws Exception {
+        Class<? extends TaggedLogAPIEntity> clazz = ed.getEntityClass();
+        if (clazz == null) {
+            throw new NullPointerException("Entity class of service " + 
ed.getService() + " is null");
+        }
+        TaggedLogAPIEntity obj = clazz.newInstance();
+        Map<String, Qualifier> map = ed.getQualifierNameMap();
+        for (Map.Entry<String, byte[]> entry : qualifierValues.entrySet()) {
+            Qualifier q = map.get(entry.getKey());
+            if (q == null) {
+                // if it's not pre-defined qualifier, it must be tag unless 
it's a bug
+                if (obj.getTags() == null) {
+                    obj.setTags(new HashMap<String, String>());
+                }
+                obj.getTags().put(entry.getKey(), new 
StringSerDeser().deserialize(entry.getValue()));
+                continue;
+            }
+
+            // TODO performance loss compared with new operator
+            // parse different types of qualifiers
+            String fieldName = q.getDisplayName();
+            PropertyDescriptor pd = PropertyUtils.getPropertyDescriptor(obj, 
fieldName);
+            if (entry.getValue() != null) {
+                Object args = q.getSerDeser().deserialize(entry.getValue());
+                pd.getWriteMethod().invoke(obj, args);
+                // if (logger.isDebugEnabled()) {
+                // logger.debug(entry.getKey() + ":" + args + " is 
deserialized");
+                // }
+            }
+        }
+        return (T)obj;
+    }
+
+    public Map<String, byte[]> writeValue(TaggedLogAPIEntity entity, 
EntityDefinition ed) throws Exception {
+        Map<String, byte[]> qualifierValues = new HashMap<String, byte[]>();
+        // iterate all modified qualifiers
+        for (String fieldName : entity.modifiedQualifiers()) {
+            PropertyDescriptor pd = 
PropertyUtils.getPropertyDescriptor(entity, fieldName);
+            Object obj = pd.getReadMethod().invoke(entity);
+            Qualifier q = ed.getDisplayNameMap().get(fieldName);
+            EntitySerDeser<Object> ser = q.getSerDeser();
+            byte[] value = ser.serialize(obj);
+            qualifierValues.put(q.getQualifierName(), value);
+        }
+        return qualifierValues;
+    }
 }

http://git-wip-us.apache.org/repos/asf/eagle/blob/6e919c2e/eagle-core/eagle-query/eagle-entity-base/src/main/java/org/apache/eagle/log/entity/meta/Index.java
----------------------------------------------------------------------
diff --git 
a/eagle-core/eagle-query/eagle-entity-base/src/main/java/org/apache/eagle/log/entity/meta/Index.java
 
b/eagle-core/eagle-query/eagle-entity-base/src/main/java/org/apache/eagle/log/entity/meta/Index.java
index c7dc113..d13e550 100755
--- 
a/eagle-core/eagle-query/eagle-entity-base/src/main/java/org/apache/eagle/log/entity/meta/Index.java
+++ 
b/eagle-core/eagle-query/eagle-entity-base/src/main/java/org/apache/eagle/log/entity/meta/Index.java
@@ -21,12 +21,16 @@ import java.lang.annotation.Retention;
 import java.lang.annotation.RetentionPolicy;
 import java.lang.annotation.Target;
 
-@Target({ElementType.TYPE})
+@Target({
+         ElementType.TYPE
+})
 @Retention(RetentionPolicy.RUNTIME)
 public @interface Index {
 
     public String name();
+
     public String[] columns();
+
     public boolean unique();
-//     boolean unique() default true;
+    // boolean unique() default true;
 }

http://git-wip-us.apache.org/repos/asf/eagle/blob/6e919c2e/eagle-core/eagle-query/eagle-entity-base/src/main/java/org/apache/eagle/log/entity/meta/IndexDefinition.java
----------------------------------------------------------------------
diff --git 
a/eagle-core/eagle-query/eagle-entity-base/src/main/java/org/apache/eagle/log/entity/meta/IndexDefinition.java
 
b/eagle-core/eagle-query/eagle-entity-base/src/main/java/org/apache/eagle/log/entity/meta/IndexDefinition.java
index 2e62420..810ad6b 100755
--- 
a/eagle-core/eagle-query/eagle-entity-base/src/main/java/org/apache/eagle/log/entity/meta/IndexDefinition.java
+++ 
b/eagle-core/eagle-query/eagle-entity-base/src/main/java/org/apache/eagle/log/entity/meta/IndexDefinition.java
@@ -39,297 +39,304 @@ import org.apache.eagle.query.parser.ORExpression;
 import org.apache.eagle.common.ByteUtil;
 
 /**
- * Eagle index schema definition.
- * 
- * 1. Index schema can be defined in entity class by annotation.
- * 2. One index schema can contain multiple fields/tags, defined in order
- * 3. We only support immutable indexing for now
- * 4. When entity is created or deleted, the corresponding index entity should 
be created or deleted at the same time
- * 5. Index transparency to queries. Queries go through index when and only 
when index can serve all search conditions after query rewrite
- * 
- *
+ * Eagle index schema definition. 1. Index schema can be defined in entity 
class by annotation. 2. One index
+ * schema can contain multiple fields/tags, defined in order 3. We only 
support immutable indexing for now 4.
+ * When entity is created or deleted, the corresponding index entity should be 
created or deleted at the same
+ * time 5. Index transparency to queries. Queries go through index when and 
only when index can serve all
+ * search conditions after query rewrite
  */
 public class IndexDefinition {
-       
-       public enum IndexType {
-               UNIQUE_INDEX,
-               NON_CLUSTER_INDEX,
-               NON_INDEX
-       }
-
-       private final EntityDefinition entityDef;
-       private final Index index;
-       private final IndexColumn[] columns;
-       private final String indexPrefix;
-       
-       private static final byte[] EMPTY_VALUE = new byte[0];
-       private static final Charset UTF_8_CHARSET = Charset.forName("UTF-8");
-       public static final int EMPTY_PARTITION_DEFAULT_HASH_CODE = 0;
-       public static final int MAX_INDEX_VALUE_BYTE_LENGTH = 65535;
-       
-       private static final String FIELD_NAME_PATTERN_STRING = "^@(.*)$";
-       private static final Pattern FIELD_NAME_PATTERN = 
Pattern.compile(FIELD_NAME_PATTERN_STRING);
-       private final static Logger LOG = 
LoggerFactory.getLogger(IndexDefinition.class);
-
-       public IndexDefinition(EntityDefinition entityDef, Index index) {
-               this.entityDef = entityDef;
-               this.index = index;
-               this.indexPrefix = entityDef.getPrefix() + "_" + index.name();
-               final String[] indexColumns = index.columns();
-               this.columns = new IndexColumn[indexColumns.length];
-               for (int i = 0; i < indexColumns.length; ++i) {
-                       final String name = indexColumns[i];
-                       final boolean isTag = entityDef.isTag(name);
-                       final Qualifier qualifier = isTag ? null : 
entityDef.getDisplayNameMap().get(name);
-                       columns[i] = new IndexColumn(name, isTag, qualifier);
-               }
-               LOG.info("Created index " + index.name() + " for " + 
entityDef.getEntityClass().getSimpleName());
-       }
-
-       public EntityDefinition getEntityDefinition() {
-               return entityDef;
-       }
-       
-       public Index getIndex() {
-               return index;
-       }
-       
-       public String getIndexName() {
-               return index.name();
-       }
-       
-       public IndexColumn[] getIndexColumns() {
-               return columns;
-       }
-       
-       public String getIndexPrefix() {
-               return indexPrefix;
-       }
-       
-       public boolean isUnique() {
-               return index.unique();
-       }
-       
-       /**
-        * Check if the query is suitable to go through index. If true, then 
return the value of index fields in order. Otherwise return null.
-        * TODO: currently index fields should be string type.
-        * 
-        * @param query query expression after re-write
-        * @param rowkeys if the query can go through the index, all rowkeys 
will be added into rowkeys.
-        * @return true if the query can go through the index, otherwise return 
false
-        */
-       public IndexType canGoThroughIndex(ORExpression query, List<byte[]> 
rowkeys) {
-               if (query == null || query.getANDExprList() == null || 
query.getANDExprList().isEmpty()) 
-                       return IndexType.NON_CLUSTER_INDEX;
-               if (rowkeys != null) {
-                       rowkeys.clear();
-               }
-               final Map<String, String> indexfieldMap = new HashMap<String, 
String>();
-               for(ANDExpression andExpr : query.getANDExprList()) {
-                       indexfieldMap.clear();
-                       for(AtomicExpression ae : andExpr.getAtomicExprList()) {
-                               // TODO temporarily ignore those fields which 
are not for attributes
-                               final String fieldName = 
parseEntityAttribute(ae.getKey());
-                               if(fieldName != null && 
ComparisonOperator.EQUAL.equals(ae.getOp())){
-                                       indexfieldMap.put(fieldName, 
ae.getValue());
-                               }
-                       }
-                       final String[] partitions = entityDef.getPartitions();
-                       int[] partitionValueHashs = null;
-                       if (partitions != null) {
-                               partitionValueHashs = new 
int[partitions.length];
-                               for (int i = 0; i < partitions.length; ++i) {
-                                       final String value = 
indexfieldMap.get(partitions[i]);
-                                       if (value == null) {
-                                               throw new 
IllegalArgumentException("Partition " + partitions[i] + " is not defined in the 
query: " + query.toString());
-                                       }
-                                       partitionValueHashs[i] = 
value.hashCode();
-                               }
-                       }
-                       final byte[][] indexFieldValues = new 
byte[columns.length][];
-                       for (int i = 0; i < columns.length; ++i) {
-                               final IndexColumn col = columns[i];
-                               if 
(!indexfieldMap.containsKey(col.getColumnName())) {
-                                       // If we have to use scan anyway, 
there's no need to go through index
-                                       return IndexType.NON_INDEX;
-                               }
-                               final String value = 
indexfieldMap.get(col.getColumnName());
-                               indexFieldValues[i] = value.getBytes();
-                       }
-                       final byte[] rowkey = 
generateUniqueIndexRowkey(indexFieldValues, partitionValueHashs, null);
-                       if (rowkeys != null) {
-                               rowkeys.add(rowkey);
-                       }
-               }
-               if (index.unique()) {
-                       return IndexType.UNIQUE_INDEX;
-               }
-               return IndexType.NON_CLUSTER_INDEX;
-       }
-
-       private String parseEntityAttribute(String fieldName) {
-               Matcher m = FIELD_NAME_PATTERN.matcher(fieldName);
-               if(m.find()){
-                       return m.group(1);
-               }
-               return null;
-       }
-       
-       // TODO: We should move index rowkey generation later since this class 
is for general purpose, not only for hbase.
-       public byte[] generateIndexRowkey(TaggedLogAPIEntity entity) throws 
IllegalAccessException, InvocationTargetException, NoSuchMethodException {
-               if (entity.getClass() != entityDef.getEntityClass()) {
-                       throw new IllegalArgumentException("Expected entity 
class: " + entityDef.getEntityClass().getName() + ", but got class " + 
entity.getClass().getName());
-               }
-               final byte[][] indexValues = generateIndexValues(entity);
-               final int[] partitionHashCodes = 
generatePartitionHashCodes(entity);
-               SortedMap<Integer, Integer> tagMap = null;
-               if (!index.unique()) {
-                       // non cluster index
-                       tagMap = 
RowkeyBuilder.generateSortedTagMap(entityDef.getPartitions(), entity.getTags());
-               }
-               
-               return generateUniqueIndexRowkey(indexValues, 
partitionHashCodes, tagMap);
-       }
-       
-       private byte[] generateUniqueIndexRowkey(byte[][] indexValues, int[] 
partitionHashCodes, SortedMap<Integer, Integer> tagMap) {
-               final int prefixHashCode = indexPrefix.hashCode();
-               int totalLength = 4;
-               totalLength += (partitionHashCodes != null) ? (4 * 
partitionHashCodes.length) : 0;
-               
-               totalLength += (2 * indexValues.length);
-               for (int i = 0; i < indexValues.length; ++i) {
-                       final byte[] value = indexValues[i];
-                       totalLength += value.length;
-               }
-               if (tagMap != null && (!tagMap.isEmpty())) {
-                       totalLength += tagMap.size() * 8;
-               }
-               
-               int offset = 0;
-               final byte[] rowkey = new byte[totalLength];
-               
-               // 1. set prefix
-               ByteUtil.intToBytes(prefixHashCode, rowkey, offset);
-               offset += 4;
-               
-               // 2. set partition
-               if (partitionHashCodes != null) {
-                       for (Integer partitionHashCode : partitionHashCodes) {
-                               ByteUtil.intToBytes(partitionHashCode, rowkey, 
offset);
-                               offset += 4;
-                       }
-               }
-               
-               // 3. set index values
-               for (int i = 0; i < columns.length; ++i) {
-                       ByteUtil.shortToBytes((short)indexValues[i].length, 
rowkey, offset);
-                       offset += 2;
-                       for (int j = 0; j < indexValues[i].length; ++j) {
-                               rowkey[offset++] = indexValues[i][j];
-                       }
-               }
-               
-               // Check if it's non clustered index, then set the tag/value 
hash code
-               if (tagMap != null && (!tagMap.isEmpty())) {
-                       // 4. set tag key/value hashes
-                       for (Map.Entry<Integer, Integer> entry : 
tagMap.entrySet()) {
-                               ByteUtil.intToBytes(entry.getKey(), rowkey, 
offset);
-                               offset += 4;
-                               ByteUtil.intToBytes(entry.getValue(), rowkey, 
offset);
-                               offset += 4;
-                       }
-               }
-               
-               return rowkey;
-       }
-
-       private int[] generatePartitionHashCodes(TaggedLogAPIEntity entity) {
-               final String[] partitions = entityDef.getPartitions();
-               int[] result = null;
-               if (partitions != null) {
-                       result = new int[partitions.length];
-                       final Map<String, String> tags = entity.getTags();
-                       for (int i = 0 ; i < partitions.length; ++i) {
-                               final String partition = partitions[i];
-                               final String tagValue = tags.get(partition);
-                               if (tagValue != null) {
-                                       result[i] = tagValue.hashCode();
-                               } else {
-                                       result[i] = 
EMPTY_PARTITION_DEFAULT_HASH_CODE;
-                               }
-                       }
-               }
-               return result;
-       }
-
-       private byte[][] generateIndexValues(TaggedLogAPIEntity entity) throws 
IllegalAccessException, InvocationTargetException, NoSuchMethodException {
-
-               final byte[][] result = new byte[columns.length][];
-               for (int i = 0; i < columns.length; ++i) {
-                       final IndexColumn column = columns[i];
-                       final String columnName = column.getColumnName();
-                       if (column.isTag) {
-                               final Map<String, String> tags = 
entity.getTags();
-                               if (tags == null || tags.get(columnName) == 
null) {
-                                       result[i] = EMPTY_VALUE;
-                               } else {
-                                       result[i] = 
tags.get(columnName).getBytes(UTF_8_CHARSET);
-                               }
-                       } else {
-                               PropertyDescriptor pd = 
column.getPropertyDescriptor();
-                               if (pd == null) {
-                                       pd = 
PropertyUtils.getPropertyDescriptor(entity, columnName);
-                                       column.setPropertyDescriptor(pd);
-                               }
-                               final Object value = 
pd.getReadMethod().invoke(entity);
-                               if (value == null) {
-                                       result[i] = EMPTY_VALUE;
-                               } else {
-                                       final Qualifier q = 
column.getQualifier();
-                                       result[i] = 
q.getSerDeser().serialize(value);
-                               }
-                       }
-                       if (result[i].length > MAX_INDEX_VALUE_BYTE_LENGTH) {
-                               throw new IllegalArgumentException("Index field 
value exceeded the max length: " + MAX_INDEX_VALUE_BYTE_LENGTH + ", actual 
length: " + result[i].length);
-                       }
-               }
-               return result;
-       }
-       
-       /**
-        * Index column definition class
-        *
-        */
-       public static class IndexColumn {
-               private final String columnName;
-               private final boolean isTag;
-               private final Qualifier qualifier;
-               private PropertyDescriptor propertyDescriptor;
-               
-               public IndexColumn(String columnName, boolean isTag, Qualifier 
qualifier) {
-                       this.columnName = columnName;
-                       this.isTag = isTag;
-                       this.qualifier = qualifier;
-               }
-               
-               public String getColumnName() {
-                       return columnName;
-               }
-               public boolean isTag() {
-                       return isTag;
-               }
-               
-               public Qualifier getQualifier() {
-                       return qualifier;
-               }
-
-               public PropertyDescriptor getPropertyDescriptor() {
-                       return propertyDescriptor;
-               }
-
-               public void setPropertyDescriptor(PropertyDescriptor 
propertyDescriptor) {
-                       this.propertyDescriptor = propertyDescriptor;
-               }
-               
-       }
+
+    public enum IndexType {
+        UNIQUE_INDEX,
+        NON_CLUSTER_INDEX,
+        NON_INDEX
+    }
+
+    private final EntityDefinition entityDef;
+    private final Index index;
+    private final IndexColumn[] columns;
+    private final String indexPrefix;
+
+    private static final byte[] EMPTY_VALUE = new byte[0];
+    private static final Charset UTF_8_CHARSET = Charset.forName("UTF-8");
+    public static final int EMPTY_PARTITION_DEFAULT_HASH_CODE = 0;
+    public static final int MAX_INDEX_VALUE_BYTE_LENGTH = 65535;
+
+    private static final String FIELD_NAME_PATTERN_STRING = "^@(.*)$";
+    private static final Pattern FIELD_NAME_PATTERN = 
Pattern.compile(FIELD_NAME_PATTERN_STRING);
+    private static final Logger LOG = 
LoggerFactory.getLogger(IndexDefinition.class);
+
+    public IndexDefinition(EntityDefinition entityDef, Index index) {
+        this.entityDef = entityDef;
+        this.index = index;
+        this.indexPrefix = entityDef.getPrefix() + "_" + index.name();
+        final String[] indexColumns = index.columns();
+        this.columns = new IndexColumn[indexColumns.length];
+        for (int i = 0; i < indexColumns.length; ++i) {
+            final String name = indexColumns[i];
+            final boolean isTag = entityDef.isTag(name);
+            final Qualifier qualifier = isTag ? null : 
entityDef.getDisplayNameMap().get(name);
+            columns[i] = new IndexColumn(name, isTag, qualifier);
+        }
+        LOG.info("Created index " + index.name() + " for " + 
entityDef.getEntityClass().getSimpleName());
+    }
+
+    public EntityDefinition getEntityDefinition() {
+        return entityDef;
+    }
+
+    public Index getIndex() {
+        return index;
+    }
+
+    public String getIndexName() {
+        return index.name();
+    }
+
+    public IndexColumn[] getIndexColumns() {
+        return columns;
+    }
+
+    public String getIndexPrefix() {
+        return indexPrefix;
+    }
+
+    public boolean isUnique() {
+        return index.unique();
+    }
+
+    /**
+     * Check if the query is suitable to go through index. If true, then 
return the value of index fields in
+     * order. Otherwise return null. TODO: currently index fields should be 
string type.
+     * 
+     * @param query query expression after re-write
+     * @param rowkeys if the query can go through the index, all rowkeys will 
be added into rowkeys.
+     * @return true if the query can go through the index, otherwise return 
false
+     */
+    public IndexType canGoThroughIndex(ORExpression query, List<byte[]> 
rowkeys) {
+        if (query == null || query.getANDExprList() == null || 
query.getANDExprList().isEmpty()) {
+            return IndexType.NON_CLUSTER_INDEX;
+        }
+        if (rowkeys != null) {
+            rowkeys.clear();
+        }
+        final Map<String, String> indexfieldMap = new HashMap<String, 
String>();
+        for (ANDExpression andExpr : query.getANDExprList()) {
+            indexfieldMap.clear();
+            for (AtomicExpression ae : andExpr.getAtomicExprList()) {
+                // TODO temporarily ignore those fields which are not for 
attributes
+                final String fieldName = parseEntityAttribute(ae.getKey());
+                if (fieldName != null && 
ComparisonOperator.EQUAL.equals(ae.getOp())) {
+                    indexfieldMap.put(fieldName, ae.getValue());
+                }
+            }
+            final String[] partitions = entityDef.getPartitions();
+            int[] partitionValueHashs = null;
+            if (partitions != null) {
+                partitionValueHashs = new int[partitions.length];
+                for (int i = 0; i < partitions.length; ++i) {
+                    final String value = indexfieldMap.get(partitions[i]);
+                    if (value == null) {
+                        throw new IllegalArgumentException("Partition " + 
partitions[i]
+                                                           + " is not defined 
in the query: "
+                                                           + query.toString());
+                    }
+                    partitionValueHashs[i] = value.hashCode();
+                }
+            }
+            final byte[][] indexFieldValues = new byte[columns.length][];
+            for (int i = 0; i < columns.length; ++i) {
+                final IndexColumn col = columns[i];
+                if (!indexfieldMap.containsKey(col.getColumnName())) {
+                    // If we have to use scan anyway, there's no need to go 
through index
+                    return IndexType.NON_INDEX;
+                }
+                final String value = indexfieldMap.get(col.getColumnName());
+                indexFieldValues[i] = value.getBytes();
+            }
+            final byte[] rowkey = generateUniqueIndexRowkey(indexFieldValues, 
partitionValueHashs, null);
+            if (rowkeys != null) {
+                rowkeys.add(rowkey);
+            }
+        }
+        if (index.unique()) {
+            return IndexType.UNIQUE_INDEX;
+        }
+        return IndexType.NON_CLUSTER_INDEX;
+    }
+
+    private String parseEntityAttribute(String fieldName) {
+        Matcher m = FIELD_NAME_PATTERN.matcher(fieldName);
+        if (m.find()) {
+            return m.group(1);
+        }
+        return null;
+    }
+
+    // TODO: We should move index rowkey generation later since this class is 
for general purpose, not only
+    // for hbase.
+    public byte[] generateIndexRowkey(TaggedLogAPIEntity entity)
+        throws IllegalAccessException, InvocationTargetException, 
NoSuchMethodException {
+        if (entity.getClass() != entityDef.getEntityClass()) {
+            throw new IllegalArgumentException("Expected entity class: "
+                                               + 
entityDef.getEntityClass().getName() + ", but got class "
+                                               + entity.getClass().getName());
+        }
+        final byte[][] indexValues = generateIndexValues(entity);
+        final int[] partitionHashCodes = generatePartitionHashCodes(entity);
+        SortedMap<Integer, Integer> tagMap = null;
+        if (!index.unique()) {
+            // non cluster index
+            tagMap = 
RowkeyBuilder.generateSortedTagMap(entityDef.getPartitions(), entity.getTags());
+        }
+
+        return generateUniqueIndexRowkey(indexValues, partitionHashCodes, 
tagMap);
+    }
+
+    private byte[] generateUniqueIndexRowkey(byte[][] indexValues, int[] 
partitionHashCodes,
+                                             SortedMap<Integer, Integer> 
tagMap) {
+        final int prefixHashCode = indexPrefix.hashCode();
+        int totalLength = 4;
+        totalLength += (partitionHashCodes != null) ? (4 * 
partitionHashCodes.length) : 0;
+
+        totalLength += (2 * indexValues.length);
+        for (int i = 0; i < indexValues.length; ++i) {
+            final byte[] value = indexValues[i];
+            totalLength += value.length;
+        }
+        if (tagMap != null && (!tagMap.isEmpty())) {
+            totalLength += tagMap.size() * 8;
+        }
+
+        int offset = 0;
+        final byte[] rowkey = new byte[totalLength];
+
+        // 1. set prefix
+        ByteUtil.intToBytes(prefixHashCode, rowkey, offset);
+        offset += 4;
+
+        // 2. set partition
+        if (partitionHashCodes != null) {
+            for (Integer partitionHashCode : partitionHashCodes) {
+                ByteUtil.intToBytes(partitionHashCode, rowkey, offset);
+                offset += 4;
+            }
+        }
+
+        // 3. set index values
+        for (int i = 0; i < columns.length; ++i) {
+            ByteUtil.shortToBytes((short)indexValues[i].length, rowkey, 
offset);
+            offset += 2;
+            for (int j = 0; j < indexValues[i].length; ++j) {
+                rowkey[offset++] = indexValues[i][j];
+            }
+        }
+
+        // Check if it's non clustered index, then set the tag/value hash code
+        if (tagMap != null && (!tagMap.isEmpty())) {
+            // 4. set tag key/value hashes
+            for (Map.Entry<Integer, Integer> entry : tagMap.entrySet()) {
+                ByteUtil.intToBytes(entry.getKey(), rowkey, offset);
+                offset += 4;
+                ByteUtil.intToBytes(entry.getValue(), rowkey, offset);
+                offset += 4;
+            }
+        }
+
+        return rowkey;
+    }
+
+    private int[] generatePartitionHashCodes(TaggedLogAPIEntity entity) {
+        final String[] partitions = entityDef.getPartitions();
+        int[] result = null;
+        if (partitions != null) {
+            result = new int[partitions.length];
+            final Map<String, String> tags = entity.getTags();
+            for (int i = 0; i < partitions.length; ++i) {
+                final String partition = partitions[i];
+                final String tagValue = tags.get(partition);
+                if (tagValue != null) {
+                    result[i] = tagValue.hashCode();
+                } else {
+                    result[i] = EMPTY_PARTITION_DEFAULT_HASH_CODE;
+                }
+            }
+        }
+        return result;
+    }
+
+    private byte[][] generateIndexValues(TaggedLogAPIEntity entity)
+        throws IllegalAccessException, InvocationTargetException, 
NoSuchMethodException {
+
+        final byte[][] result = new byte[columns.length][];
+        for (int i = 0; i < columns.length; ++i) {
+            final IndexColumn column = columns[i];
+            final String columnName = column.getColumnName();
+            if (column.isTag) {
+                final Map<String, String> tags = entity.getTags();
+                if (tags == null || tags.get(columnName) == null) {
+                    result[i] = EMPTY_VALUE;
+                } else {
+                    result[i] = tags.get(columnName).getBytes(UTF_8_CHARSET);
+                }
+            } else {
+                PropertyDescriptor pd = column.getPropertyDescriptor();
+                if (pd == null) {
+                    pd = PropertyUtils.getPropertyDescriptor(entity, 
columnName);
+                    column.setPropertyDescriptor(pd);
+                }
+                final Object value = pd.getReadMethod().invoke(entity);
+                if (value == null) {
+                    result[i] = EMPTY_VALUE;
+                } else {
+                    final Qualifier q = column.getQualifier();
+                    result[i] = q.getSerDeser().serialize(value);
+                }
+            }
+            if (result[i].length > MAX_INDEX_VALUE_BYTE_LENGTH) {
+                throw new IllegalArgumentException("Index field value exceeded 
the max length: "
+                                                   + 
MAX_INDEX_VALUE_BYTE_LENGTH + ", actual length: "
+                                                   + result[i].length);
+            }
+        }
+        return result;
+    }
+
+    /**
+     * Index column definition class
+     */
+    public static class IndexColumn {
+        private final String columnName;
+        private final boolean isTag;
+        private final Qualifier qualifier;
+        private PropertyDescriptor propertyDescriptor;
+
+        public IndexColumn(String columnName, boolean isTag, Qualifier 
qualifier) {
+            this.columnName = columnName;
+            this.isTag = isTag;
+            this.qualifier = qualifier;
+        }
+
+        public String getColumnName() {
+            return columnName;
+        }
+
+        public boolean isTag() {
+            return isTag;
+        }
+
+        public Qualifier getQualifier() {
+            return qualifier;
+        }
+
+        public PropertyDescriptor getPropertyDescriptor() {
+            return propertyDescriptor;
+        }
+
+        public void setPropertyDescriptor(PropertyDescriptor 
propertyDescriptor) {
+            this.propertyDescriptor = propertyDescriptor;
+        }
+
+    }
 }

http://git-wip-us.apache.org/repos/asf/eagle/blob/6e919c2e/eagle-core/eagle-query/eagle-entity-base/src/main/java/org/apache/eagle/log/entity/meta/Indexes.java
----------------------------------------------------------------------
diff --git 
a/eagle-core/eagle-query/eagle-entity-base/src/main/java/org/apache/eagle/log/entity/meta/Indexes.java
 
b/eagle-core/eagle-query/eagle-entity-base/src/main/java/org/apache/eagle/log/entity/meta/Indexes.java
index 3c82a0a..b8ada4a 100644
--- 
a/eagle-core/eagle-query/eagle-entity-base/src/main/java/org/apache/eagle/log/entity/meta/Indexes.java
+++ 
b/eagle-core/eagle-query/eagle-entity-base/src/main/java/org/apache/eagle/log/entity/meta/Indexes.java
@@ -21,9 +21,11 @@ import java.lang.annotation.Retention;
 import java.lang.annotation.RetentionPolicy;
 import java.lang.annotation.Target;
 
-@Target({ElementType.TYPE})
+@Target({
+         ElementType.TYPE
+})
 @Retention(RetentionPolicy.RUNTIME)
 public @interface Indexes {
 
-       public Index[] value();
+    public Index[] value();
 }

http://git-wip-us.apache.org/repos/asf/eagle/blob/6e919c2e/eagle-core/eagle-query/eagle-entity-base/src/main/java/org/apache/eagle/log/entity/meta/IntArraySerDeser.java
----------------------------------------------------------------------
diff --git 
a/eagle-core/eagle-query/eagle-entity-base/src/main/java/org/apache/eagle/log/entity/meta/IntArraySerDeser.java
 
b/eagle-core/eagle-query/eagle-entity-base/src/main/java/org/apache/eagle/log/entity/meta/IntArraySerDeser.java
index 8831223..3daf4a1 100755
--- 
a/eagle-core/eagle-query/eagle-entity-base/src/main/java/org/apache/eagle/log/entity/meta/IntArraySerDeser.java
+++ 
b/eagle-core/eagle-query/eagle-entity-base/src/main/java/org/apache/eagle/log/entity/meta/IntArraySerDeser.java
@@ -18,54 +18,56 @@ package org.apache.eagle.log.entity.meta;
 
 import org.apache.eagle.common.ByteUtil;
 
-/**
- * serialize int array which is stored like the following
- * <int><int>*size, where the first <int> is the size of int
+/*
+ * serialize int array which is stored like the following <int><int>*size, 
where the first entry is the size
+ * of int
  */
-public class IntArraySerDeser implements EntitySerDeser<int[]>{
+public class IntArraySerDeser implements EntitySerDeser<int[]> {
+
+    public IntArraySerDeser() {
+    }
 
-       public IntArraySerDeser(){}
+    @Override
+    public int[] deserialize(byte[] bytes) {
+        if (bytes.length < 4) {
+            return null;
+        }
+        int offset = 0;
+        // get size of int array
+        int size = ByteUtil.bytesToInt(bytes, offset);
+        offset += 4;
+        int[] values = new int[size];
+        for (int i = 0; i < size; i++) {
+            values[i] = ByteUtil.bytesToInt(bytes, offset);
+            offset += 4;
+        }
+        return values;
+    }
 
-       @Override
-       public int[] deserialize(byte[] bytes){
-               if(bytes.length < 4)
-                       return null;
-               int offset = 0;
-               // get size of int array
-               int size = ByteUtil.bytesToInt(bytes, offset);
-               offset += 4;
-               int[] values = new int[size];
-               for(int i=0; i<size; i++){
-                       values[i] = ByteUtil.bytesToInt(bytes, offset);
-                       offset += 4;
-               }
-               return values;
-       }
-       
-       /**
-        * 
-        * @param obj
-        * @return
-        */
-       @Override
-       public byte[] serialize(int[] obj){
-               if(obj == null)
-                       return null;
-               int size = obj.length;
-               byte[] array = new byte[4 + 4*size];
-               byte[] first = ByteUtil.intToBytes(size);
-               int offset = 0;
-               System.arraycopy(first, 0, array, offset, first.length);
-               offset += first.length;
-               for(int i=0; i<size; i++){
-                       System.arraycopy(ByteUtil.intToBytes(obj[i]), 0, array, 
offset, 4);
-                       offset += 4;
-               }
-               return array;
-       }
+    /**
+     * @param obj
+     * @return
+     */
+    @Override
+    public byte[] serialize(int[] obj) {
+        if (obj == null) {
+            return null;
+        }
+        int size = obj.length;
+        byte[] array = new byte[4 + 4 * size];
+        byte[] first = ByteUtil.intToBytes(size);
+        int offset = 0;
+        System.arraycopy(first, 0, array, offset, first.length);
+        offset += first.length;
+        for (int i = 0; i < size; i++) {
+            System.arraycopy(ByteUtil.intToBytes(obj[i]), 0, array, offset, 4);
+            offset += 4;
+        }
+        return array;
+    }
 
-       @Override
-       public Class<int[]> type() {
-               return int[].class;
-       }
+    @Override
+    public Class<int[]> type() {
+        return int[].class;
+    }
 }

http://git-wip-us.apache.org/repos/asf/eagle/blob/6e919c2e/eagle-core/eagle-query/eagle-entity-base/src/main/java/org/apache/eagle/log/entity/meta/IntSerDeser.java
----------------------------------------------------------------------
diff --git 
a/eagle-core/eagle-query/eagle-entity-base/src/main/java/org/apache/eagle/log/entity/meta/IntSerDeser.java
 
b/eagle-core/eagle-query/eagle-entity-base/src/main/java/org/apache/eagle/log/entity/meta/IntSerDeser.java
index 695badd..8353499 100755
--- 
a/eagle-core/eagle-query/eagle-entity-base/src/main/java/org/apache/eagle/log/entity/meta/IntSerDeser.java
+++ 
b/eagle-core/eagle-query/eagle-entity-base/src/main/java/org/apache/eagle/log/entity/meta/IntSerDeser.java
@@ -18,25 +18,28 @@ package org.apache.eagle.log.entity.meta;
 
 import org.apache.eagle.common.ByteUtil;
 
-public class IntSerDeser implements EntitySerDeser<Integer>{
-       public IntSerDeser(){}
+public class IntSerDeser implements EntitySerDeser<Integer> {
+    public IntSerDeser() {
+    }
 
-       @Override
-       public Integer deserialize(byte[] bytes){
-               if(bytes.length < 4)
-                       return null;
-               return Integer.valueOf(ByteUtil.bytesToInt(bytes));
-       }
-       
-       @Override
-       public byte[] serialize(Integer obj){
-               if(obj == null)
-                       return null;
-               return ByteUtil.intToBytes(obj);
-       }
+    @Override
+    public Integer deserialize(byte[] bytes) {
+        if (bytes.length < 4) {
+            return null;
+        }
+        return Integer.valueOf(ByteUtil.bytesToInt(bytes));
+    }
 
-       @Override
-       public Class<Integer> type() {
-               return Integer.class;
-       }
+    @Override
+    public byte[] serialize(Integer obj) {
+        if (obj == null) {
+            return null;
+        }
+        return ByteUtil.intToBytes(obj);
+    }
+
+    @Override
+    public Class<Integer> type() {
+        return Integer.class;
+    }
 }

http://git-wip-us.apache.org/repos/asf/eagle/blob/6e919c2e/eagle-core/eagle-query/eagle-entity-base/src/main/java/org/apache/eagle/log/entity/meta/ListSerDeser.java
----------------------------------------------------------------------
diff --git 
a/eagle-core/eagle-query/eagle-entity-base/src/main/java/org/apache/eagle/log/entity/meta/ListSerDeser.java
 
b/eagle-core/eagle-query/eagle-entity-base/src/main/java/org/apache/eagle/log/entity/meta/ListSerDeser.java
index eaf5e92..b77f3ff 100644
--- 
a/eagle-core/eagle-query/eagle-entity-base/src/main/java/org/apache/eagle/log/entity/meta/ListSerDeser.java
+++ 
b/eagle-core/eagle-query/eagle-entity-base/src/main/java/org/apache/eagle/log/entity/meta/ListSerDeser.java
@@ -25,104 +25,104 @@ import org.apache.eagle.common.ByteUtil;
 
 /**
  * Serialization/deserialization for map type
- *
  */
 @SuppressWarnings("rawtypes")
 public class ListSerDeser implements EntitySerDeser<List> {
 
-       @SuppressWarnings({ "unchecked" })
-       @Override
-       public List deserialize(byte[] bytes) {
-               if (bytes == null || bytes.length == 0) {
-                       return null;
-               }
-               final List list = new ArrayList();
-               int offset = 0;
-               // get size of int array
-               final int size = ByteUtil.bytesToInt(bytes, offset);
-               offset += 4;
-               
-               for (int i = 0; i < size; ++i) {
-                       final int valueID = ByteUtil.bytesToInt(bytes, offset);
-                       offset += 4;
-                       final Class<?> valueClass = 
EntityDefinitionManager.getClassByID(valueID);
-                       if (valueClass == null) {
-                               throw new IllegalArgumentException("Unsupported 
value type ID: " + valueID);
-                       }
-                       final EntitySerDeser valueSerDer = 
EntityDefinitionManager.getSerDeser(valueClass);
-                       final int valueLength = ByteUtil.bytesToInt(bytes, 
offset);
-                       offset += 4;
-                       final byte[] valueContent = new byte[valueLength];
-                       System.arraycopy(bytes, offset, valueContent, 0, 
valueLength);
-                       offset += valueLength;
-                       final Object value = 
valueSerDer.deserialize(valueContent);
-                       
-                       list.add(value);
-               }
-               return list;
-       }
+    @SuppressWarnings({
+        "unchecked"
+        })
+    @Override
+    public List deserialize(byte[] bytes) {
+        if (bytes == null || bytes.length == 0) {
+            return null;
+        }
+        final List list = new ArrayList();
+        int offset = 0;
+        // get size of int array
+        final int size = ByteUtil.bytesToInt(bytes, offset);
+        offset += 4;
 
-       /**
-        *  size + value1 type id + value length + value1 binary content + ...
-        *   4B         4B              4B              value1 bytes
-        */
-       @SuppressWarnings({ "unchecked" })
-       @Override
-       public byte[] serialize(List list) {
-               if(list == null)
-                       return null;
-               final int size = list.size();
-               final int[] valueIDs = new int[size];
-               final byte[][] valueBytes = new byte[size][];
-               
-               int totalSize = 4 + size * 8;
-               int i = 0;
-               Iterator iter = list.iterator();
-               while (iter.hasNext()) {
-                       final Object value = iter.next();
-                       Class<?> valueClass = value.getClass();
-                       int valueTypeID = 
EntityDefinitionManager.getIDBySerDerClass(valueClass);
+        for (int i = 0; i < size; ++i) {
+            final int valueID = ByteUtil.bytesToInt(bytes, offset);
+            offset += 4;
+            final Class<?> valueClass = 
EntityDefinitionManager.getClassByID(valueID);
+            if (valueClass == null) {
+                throw new IllegalArgumentException("Unsupported value type ID: 
" + valueID);
+            }
+            final EntitySerDeser valueSerDer = 
EntityDefinitionManager.getSerDeser(valueClass);
+            final int valueLength = ByteUtil.bytesToInt(bytes, offset);
+            offset += 4;
+            final byte[] valueContent = new byte[valueLength];
+            System.arraycopy(bytes, offset, valueContent, 0, valueLength);
+            offset += valueLength;
+            final Object value = valueSerDer.deserialize(valueContent);
 
-                       if (valueTypeID == -1) {
-                               if (value instanceof List) {
-                                       valueClass = List.class;
-                                       valueTypeID = 
EntityDefinitionManager.getIDBySerDerClass(valueClass);
-                               }
-                               else if (value instanceof Map) {
-                                       valueClass = Map.class;
-                                       valueTypeID = 
EntityDefinitionManager.getIDBySerDerClass(valueClass);
-                               }
-                               else {
-                                       throw new 
IllegalArgumentException("Unsupported class: " + valueClass.getName());
-                               }
-                       }
-                       valueIDs[i] = valueTypeID;
-                       final EntitySerDeser valueSerDer = 
EntityDefinitionManager.getSerDeser(valueClass);
-                       if (valueSerDer == null) {
-                               throw new IllegalArgumentException("Unsupported 
class: " + valueClass.getName());
-                       }
-                       valueBytes[i] = valueSerDer.serialize(value);
-                       totalSize += valueBytes[i].length;
-                       ++i;
-               }
-               final byte[] result = new byte[totalSize];
-               int offset = 0;
-               ByteUtil.intToBytes(size, result, offset);
-               offset += 4;
-               for (i = 0; i < size; ++i) {                    
-                       ByteUtil.intToBytes(valueIDs[i], result, offset);
-                       offset += 4;
-                       ByteUtil.intToBytes(valueBytes[i].length, result, 
offset);
-                       offset += 4;
-                       System.arraycopy(valueBytes[i], 0, result, offset, 
valueBytes[i].length);
-                       offset += valueBytes[i].length;
-               }
-               return result;
-       }
+            list.add(value);
+        }
+        return list;
+    }
 
-       @Override
-       public Class<List> type() {
-               return List.class;
-       }
-}
+    /**
+     * size + value1 type id + value length + value1 binary content + ... 4B 
4B 4B value1 bytes
+     */
+    @SuppressWarnings({
+        "unchecked"
+        })
+    @Override
+    public byte[] serialize(List list) {
+        if (list == null) {
+            return null;
+        }
+        final int size = list.size();
+        final int[] valueIDs = new int[size];
+        final byte[][] valueBytes = new byte[size][];
+
+        int totalSize = 4 + size * 8;
+        int i = 0;
+        Iterator iter = list.iterator();
+        while (iter.hasNext()) {
+            final Object value = iter.next();
+            Class<?> valueClass = value.getClass();
+            int valueTypeID = 
EntityDefinitionManager.getIDBySerDerClass(valueClass);
 
+            if (valueTypeID == -1) {
+                if (value instanceof List) {
+                    valueClass = List.class;
+                    valueTypeID = 
EntityDefinitionManager.getIDBySerDerClass(valueClass);
+                } else if (value instanceof Map) {
+                    valueClass = Map.class;
+                    valueTypeID = 
EntityDefinitionManager.getIDBySerDerClass(valueClass);
+                } else {
+                    throw new IllegalArgumentException("Unsupported class: " + 
valueClass.getName());
+                }
+            }
+            valueIDs[i] = valueTypeID;
+            final EntitySerDeser valueSerDer = 
EntityDefinitionManager.getSerDeser(valueClass);
+            if (valueSerDer == null) {
+                throw new IllegalArgumentException("Unsupported class: " + 
valueClass.getName());
+            }
+            valueBytes[i] = valueSerDer.serialize(value);
+            totalSize += valueBytes[i].length;
+            ++i;
+        }
+        final byte[] result = new byte[totalSize];
+        int offset = 0;
+        ByteUtil.intToBytes(size, result, offset);
+        offset += 4;
+        for (i = 0; i < size; ++i) {
+            ByteUtil.intToBytes(valueIDs[i], result, offset);
+            offset += 4;
+            ByteUtil.intToBytes(valueBytes[i].length, result, offset);
+            offset += 4;
+            System.arraycopy(valueBytes[i], 0, result, offset, 
valueBytes[i].length);
+            offset += valueBytes[i].length;
+        }
+        return result;
+    }
+
+    @Override
+    public Class<List> type() {
+        return List.class;
+    }
+}

http://git-wip-us.apache.org/repos/asf/eagle/blob/6e919c2e/eagle-core/eagle-query/eagle-entity-base/src/main/java/org/apache/eagle/log/entity/meta/LongSerDeser.java
----------------------------------------------------------------------
diff --git 
a/eagle-core/eagle-query/eagle-entity-base/src/main/java/org/apache/eagle/log/entity/meta/LongSerDeser.java
 
b/eagle-core/eagle-query/eagle-entity-base/src/main/java/org/apache/eagle/log/entity/meta/LongSerDeser.java
index 914cd95..6f0c6ab 100755
--- 
a/eagle-core/eagle-query/eagle-entity-base/src/main/java/org/apache/eagle/log/entity/meta/LongSerDeser.java
+++ 
b/eagle-core/eagle-query/eagle-entity-base/src/main/java/org/apache/eagle/log/entity/meta/LongSerDeser.java
@@ -18,26 +18,29 @@ package org.apache.eagle.log.entity.meta;
 
 import org.apache.eagle.common.ByteUtil;
 
-public class LongSerDeser implements EntitySerDeser<Long>{
-       public LongSerDeser(){}
+public class LongSerDeser implements EntitySerDeser<Long> {
+    public LongSerDeser() {
+    }
 
-       @Override
-       public Long deserialize(byte[] bytes){
-               if(bytes.length < 8)
-                       return null;
-//             return new Long(ByteUtil.bytesToLong(bytes));
-               return Long.valueOf(ByteUtil.bytesToLong(bytes));
-       }
-       
-       @Override
-       public byte[] serialize(Long obj){
-               if(obj == null)
-                       return null;
-               return ByteUtil.longToBytes(obj);
-       }
+    @Override
+    public Long deserialize(byte[] bytes) {
+        if (bytes.length < 8) {
+            return null;
+        }
+        // return new Long(ByteUtil.bytesToLong(bytes));
+        return Long.valueOf(ByteUtil.bytesToLong(bytes));
+    }
 
-       @Override
-       public Class<Long> type() {
-               return Long.class;
-       }
+    @Override
+    public byte[] serialize(Long obj) {
+        if (obj == null) {
+            return null;
+        }
+        return ByteUtil.longToBytes(obj);
+    }
+
+    @Override
+    public Class<Long> type() {
+        return Long.class;
+    }
 }

Reply via email to