http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/a0fc8708/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/evaluator/absence/AbsenceWindowProcessor.java
----------------------------------------------------------------------
diff --git 
a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/evaluator/absence/AbsenceWindowProcessor.java
 
b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/evaluator/absence/AbsenceWindowProcessor.java
index 4e8d381..8f00b31 100644
--- 
a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/evaluator/absence/AbsenceWindowProcessor.java
+++ 
b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/evaluator/absence/AbsenceWindowProcessor.java
@@ -33,65 +33,66 @@ public class AbsenceWindowProcessor {
     private boolean expired; // to mark if the time range has been went through
     private OccurStatus status = OccurStatus.not_sure;
 
-    public enum OccurStatus{
+    public enum OccurStatus {
         not_sure,
         occured,
         absent
     }
 
-    public AbsenceWindowProcessor(List<Object> expectAttrs, AbsenceWindow 
window){
+    public AbsenceWindowProcessor(List<Object> expectAttrs, AbsenceWindow 
window) {
         this.expectAttrs = expectAttrs;
         this.window = window;
         expired = false;
     }
 
     /**
-     * return true if it is certain that expected attributes don't occur 
during startTime and endTime, else return false
-     * @param appearAttrs
-     * @param occurTime
-     * @return
+     * return true if it is certain that expected attributes don't occur 
during startTime and endTime, else return false.
      */
-    public void process(List<Object> appearAttrs, long occurTime){
-        if(expired)
+    public void process(List<Object> appearAttrs, long occurTime) {
+        if (expired) {
             throw new IllegalStateException("Expired window can't recieve 
events");
-        switch(status) {
+        }
+        switch (status) {
             case not_sure:
-                if(occurTime < window.startTime) {
+                if (occurTime < window.startTime) {
                     break;
-                }else if(occurTime >= window.startTime &&
-                        occurTime <= window.endTime) {
-                    if(expectAttrs.equals(appearAttrs)) {
+                } else if (occurTime >= window.startTime
+                    && occurTime <= window.endTime) {
+                    if (expectAttrs.equals(appearAttrs)) {
                         status = OccurStatus.occured;
                     }
                     break;
-                }else{
+                } else {
                     status = OccurStatus.absent;
                     break;
                 }
             case occured:
-                if(occurTime > window.endTime)
+                if (occurTime > window.endTime) {
                     expired = true;
+                }
                 break;
             default:
                 break;
         }
         // reset status
-        if(status == OccurStatus.absent){
+        if (status == OccurStatus.absent) {
             expired = true;
         }
     }
 
-    public OccurStatus checkStatus(){
+    public OccurStatus checkStatus() {
         return status;
     }
-    public boolean checkExpired(){
+
+    public boolean checkExpired() {
         return expired;
     }
-    public AbsenceWindow currWindow(){
+
+    public AbsenceWindow currWindow() {
         return window;
     }
 
-    public String toString(){
+    public String toString() {
         return "expectAttrs=" + expectAttrs + ", status=" + status + ", 
expired=" + expired + ", window=[" + window + "]";
     }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/a0fc8708/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/evaluator/impl/AlertBoltOutputCollectorThreadSafeWrapper.java
----------------------------------------------------------------------
diff --git 
a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/evaluator/impl/AlertBoltOutputCollectorThreadSafeWrapper.java
 
b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/evaluator/impl/AlertBoltOutputCollectorThreadSafeWrapper.java
index ca1f622..33d502e 100755
--- 
a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/evaluator/impl/AlertBoltOutputCollectorThreadSafeWrapper.java
+++ 
b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/evaluator/impl/AlertBoltOutputCollectorThreadSafeWrapper.java
@@ -16,37 +16,36 @@
  */
 package org.apache.eagle.alert.engine.evaluator.impl;
 
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.List;
-import java.util.concurrent.LinkedBlockingQueue;
-import java.util.concurrent.atomic.AtomicLong;
-
 import org.apache.eagle.alert.engine.AlertStreamCollector;
 import org.apache.eagle.alert.engine.model.AlertStreamEvent;
+import backtype.storm.task.OutputCollector;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import backtype.storm.task.OutputCollector;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.atomic.AtomicLong;
 
 /**
- * <h2>Thread Safe Mechanism</h2>
+ * <h2>Thread Safe Mechanism.</h2>
  * <ul>
  * <li>
- *     emit() method is thread-safe enough to be called anywhere 
asynchronously in multi-thread
+ * emit() method is thread-safe enough to be called anywhere asynchronously in 
multi-thread
  * </li>
  * <li>
- *     flush() method must be called synchronously, because Storm 
OutputCollector is not thread-safe
+ * flush() method must be called synchronously, because Storm OutputCollector 
is not thread-safe
  * </li>
  * </ul>
  */
 public class AlertBoltOutputCollectorThreadSafeWrapper implements 
AlertStreamCollector {
     private final OutputCollector delegate;
     private final LinkedBlockingQueue<AlertStreamEvent> queue;
-    private final static Logger LOG = 
LoggerFactory.getLogger(AlertBoltOutputCollectorThreadSafeWrapper.class);
+    private static final Logger LOG = 
LoggerFactory.getLogger(AlertBoltOutputCollectorThreadSafeWrapper.class);
     private final AtomicLong lastFlushTime = new 
AtomicLong(System.currentTimeMillis());
     private final AutoAlertFlusher flusher;
-    private final static int MAX_ALERT_DELAY_SECS = 10;
+    private static final int MAX_ALERT_DELAY_SECS = 10;
 
     public AlertBoltOutputCollectorThreadSafeWrapper(OutputCollector 
outputCollector) {
         this.delegate = outputCollector;
@@ -59,7 +58,7 @@ public class AlertBoltOutputCollectorThreadSafeWrapper 
implements AlertStreamCol
     private static class AutoAlertFlusher extends Thread {
         private final AlertBoltOutputCollectorThreadSafeWrapper collector;
         private boolean stopped = false;
-        private final static Logger LOG = 
LoggerFactory.getLogger(AutoAlertFlusher.class);
+        private static final Logger LOG = 
LoggerFactory.getLogger(AutoAlertFlusher.class);
 
         private AutoAlertFlusher(AlertBoltOutputCollectorThreadSafeWrapper 
collector) {
             this.collector = collector;
@@ -75,6 +74,7 @@ public class AlertBoltOutputCollectorThreadSafeWrapper 
implements AlertStreamCol
                 try {
                     Thread.sleep(5000);
                 } catch (InterruptedException ignored) {
+                    // ignored
                 }
             }
             LOG.info("Stopped");
@@ -87,7 +87,8 @@ public class AlertBoltOutputCollectorThreadSafeWrapper 
implements AlertStreamCol
     }
 
     /**
-     * Emit method can be called in multi-thread
+     * Emit method can be called in multi-thread.
+     *
      * @param event
      */
     @Override
@@ -95,7 +96,7 @@ public class AlertBoltOutputCollectorThreadSafeWrapper 
implements AlertStreamCol
         try {
             queue.put(event);
         } catch (InterruptedException e) {
-            LOG.error(e.getMessage(),e);
+            LOG.error(e.getMessage(), e);
         }
     }
 
@@ -104,7 +105,7 @@ public class AlertBoltOutputCollectorThreadSafeWrapper 
implements AlertStreamCol
      */
     @Override
     public void flush() {
-        if(!queue.isEmpty()) {
+        if (!queue.isEmpty()) {
             List<AlertStreamEvent> events = new ArrayList<>();
             queue.drainTo(events);
             events.forEach((event) -> 
delegate.emit(Arrays.asList(event.getStreamId(), event)));

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/a0fc8708/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/evaluator/impl/AlertBoltOutputCollectorWrapper.java
----------------------------------------------------------------------
diff --git 
a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/evaluator/impl/AlertBoltOutputCollectorWrapper.java
 
b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/evaluator/impl/AlertBoltOutputCollectorWrapper.java
index 8d8a4d2..042fca7 100755
--- 
a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/evaluator/impl/AlertBoltOutputCollectorWrapper.java
+++ 
b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/evaluator/impl/AlertBoltOutputCollectorWrapper.java
@@ -16,20 +16,19 @@
  */
 package org.apache.eagle.alert.engine.evaluator.impl;
 
-import java.util.Arrays;
-
 import org.apache.eagle.alert.engine.AlertStreamCollector;
 import org.apache.eagle.alert.engine.StreamContext;
 import org.apache.eagle.alert.engine.model.AlertStreamEvent;
-
 import backtype.storm.task.OutputCollector;
 
+import java.util.Arrays;
+
 public class AlertBoltOutputCollectorWrapper implements AlertStreamCollector {
     private final OutputCollector delegate;
     private final Object outputLock;
     private final StreamContext streamContext;
 
-    public AlertBoltOutputCollectorWrapper(OutputCollector outputCollector, 
Object outputLock, StreamContext streamContext){
+    public AlertBoltOutputCollectorWrapper(OutputCollector outputCollector, 
Object outputLock, StreamContext streamContext) {
         this.delegate = outputCollector;
         this.outputLock = outputLock;
         this.streamContext = streamContext;

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/a0fc8708/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/evaluator/impl/AlertStreamCallback.java
----------------------------------------------------------------------
diff --git 
a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/evaluator/impl/AlertStreamCallback.java
 
b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/evaluator/impl/AlertStreamCallback.java
index b9a109c..ee1853c 100644
--- 
a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/evaluator/impl/AlertStreamCallback.java
+++ 
b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/evaluator/impl/AlertStreamCallback.java
@@ -52,14 +52,12 @@ public class AlertStreamCallback extends StreamCallback {
     }
 
     /**
-     * Possibly more than one event will be triggered for alerting
-     *
-     * @param events
+     * Possibly more than one event will be triggered for alerting.
      */
     @Override
     public void receive(Event[] events) {
         String policyName = context.getPolicyDefinition().getName();
-        CompositePolicyHandler handler = 
((PolicyGroupEvaluatorImpl)context.getPolicyEvaluator()).getPolicyHandler(policyName);
+        CompositePolicyHandler handler = ((PolicyGroupEvaluatorImpl) 
context.getPolicyEvaluator()).getPolicyHandler(policyName);
         if (LOG.isDebugEnabled()) {
             LOG.debug("Generated {} alerts from policy '{}' in {}, index of 
definiton {} ", events.length, policyName, context.getPolicyEvaluatorId(), 
currentIndex);
         }
@@ -75,18 +73,21 @@ public class AlertStreamCallback extends StreamCallback {
             event.setCreatedTime(System.currentTimeMillis());
             event.setSchema(definition);
 
-            if (LOG.isDebugEnabled())
+            if (LOG.isDebugEnabled()) {
                 LOG.debug("Generate new alert event: {}", event);
+            }
             try {
                 if (handler == null) {
                     // extreme case: the handler is removed from the 
evaluator. Just emit.
-                    if (LOG.isDebugEnabled()) LOG.debug(" handler not found 
when callback received event, directly emit. policy removed? ");
+                    if (LOG.isDebugEnabled()) {
+                        LOG.debug(" handler not found when callback received 
event, directly emit. policy removed? ");
+                    }
                     collector.emit(event);
                 } else {
                     handler.send(event, currentIndex + 1);
                 }
             } catch (Exception ex) {
-                LOG.error(String.format("send event %s to index %d failed with 
exception. ",event, currentIndex), ex);
+                LOG.error(String.format("send event %s to index %d failed with 
exception. ", event, currentIndex), ex);
             }
         }
         context.getPolicyCounter().scope(String.format("%s.%s", 
this.context.getPolicyDefinition().getName(), 
"alert_count")).incrBy(events.length);

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/a0fc8708/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/evaluator/impl/PolicyGroupEvaluatorImpl.java
----------------------------------------------------------------------
diff --git 
a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/evaluator/impl/PolicyGroupEvaluatorImpl.java
 
b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/evaluator/impl/PolicyGroupEvaluatorImpl.java
index 26ae19e..eed4b3b 100644
--- 
a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/evaluator/impl/PolicyGroupEvaluatorImpl.java
+++ 
b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/evaluator/impl/PolicyGroupEvaluatorImpl.java
@@ -35,17 +35,17 @@ import java.util.Map;
 public class PolicyGroupEvaluatorImpl implements PolicyGroupEvaluator {
     private static final long serialVersionUID = -5499413193675985288L;
 
-    private final static Logger LOG = 
LoggerFactory.getLogger(PolicyGroupEvaluatorImpl.class);
+    private static final Logger LOG = 
LoggerFactory.getLogger(PolicyGroupEvaluatorImpl.class);
 
     private AlertStreamCollector collector;
     // mapping from policy name to PolicyDefinition
-    private volatile Map<String,PolicyDefinition> policyDefinitionMap = new 
HashMap<>();
+    private volatile Map<String, PolicyDefinition> policyDefinitionMap = new 
HashMap<>();
     // mapping from policy name to PolicyStreamHandler
-    private volatile Map<String,CompositePolicyHandler> policyStreamHandlerMap 
= new HashMap<>();
+    private volatile Map<String, CompositePolicyHandler> 
policyStreamHandlerMap = new HashMap<>();
     private String policyEvaluatorId;
     private StreamContext context;
 
-    public PolicyGroupEvaluatorImpl(String policyEvaluatorId){
+    public PolicyGroupEvaluatorImpl(String policyEvaluatorId) {
         this.policyEvaluatorId = policyEvaluatorId;
     }
 
@@ -67,34 +67,35 @@ public class PolicyGroupEvaluatorImpl implements 
PolicyGroupEvaluator {
     }
 
     public void close() {
-        for(PolicyStreamHandler handler: policyStreamHandlerMap.values()){
+        for (PolicyStreamHandler handler : policyStreamHandlerMap.values()) {
             try {
                 handler.close();
             } catch (Exception e) {
-                LOG.error("Failed to close handler {}",handler.toString(),e);
+                LOG.error("Failed to close handler {}", handler.toString(), e);
             }
         }
     }
 
     /**
-     * fixme make selection of PolicyStreamHandler to be more efficient
+     * fixme make selection of PolicyStreamHandler to be more efficient.
+     *
      * @param partitionedEvent PartitionedEvent
      */
-    private void dispatch(PartitionedEvent partitionedEvent){
+    private void dispatch(PartitionedEvent partitionedEvent) {
         boolean handled = false;
-        for(Map.Entry<String,CompositePolicyHandler> policyStreamHandler: 
policyStreamHandlerMap.entrySet()){
-            
if(isAcceptedByPolicy(partitionedEvent,policyDefinitionMap.get(policyStreamHandler.getKey()))){
+        for (Map.Entry<String, CompositePolicyHandler> policyStreamHandler : 
policyStreamHandlerMap.entrySet()) {
+            if (isAcceptedByPolicy(partitionedEvent, 
policyDefinitionMap.get(policyStreamHandler.getKey()))) {
                 try {
                     handled = true;
                     this.context.counter().scope("eval_count").incr();
                     
policyStreamHandler.getValue().send(partitionedEvent.getEvent());
                 } catch (Exception e) {
                     this.context.counter().scope("fail_count").incr();
-                    LOG.error("{} failed to handle 
{}",policyStreamHandler.getValue(), partitionedEvent.getEvent(), e);
+                    LOG.error("{} failed to handle {}", 
policyStreamHandler.getValue(), partitionedEvent.getEvent(), e);
                 }
             }
         }
-        if(!handled){
+        if (!handled) {
             this.context.counter().scope("drop_count").incr();
             LOG.warn("Drop stream non-matched event {}, which should not be 
sent to evaluator", partitionedEvent);
         } else {
@@ -103,22 +104,22 @@ public class PolicyGroupEvaluatorImpl implements 
PolicyGroupEvaluator {
     }
 
     private static boolean isAcceptedByPolicy(PartitionedEvent event, 
PolicyDefinition policy) {
-        return policy.getPartitionSpec().contains(event.getPartition()) &&
-                ( 
policy.getInputStreams().contains(event.getEvent().getStreamId()) ||
-                    
policy.getDefinition().getInputStreams().contains(event.getEvent().getStreamId())
 );
+        return policy.getPartitionSpec().contains(event.getPartition())
+            && 
(policy.getInputStreams().contains(event.getEvent().getStreamId())
+            || 
policy.getDefinition().getInputStreams().contains(event.getEvent().getStreamId()));
     }
 
     @Override
     public void onPolicyChange(List<PolicyDefinition> added, 
List<PolicyDefinition> removed, List<PolicyDefinition> modified, Map<String, 
StreamDefinition> sds) {
-        Map<String,PolicyDefinition> copyPolicies = new 
HashMap<>(policyDefinitionMap);
-        Map<String,CompositePolicyHandler> copyHandlers = new 
HashMap<>(policyStreamHandlerMap);
-        for(PolicyDefinition pd : added){
+        Map<String, PolicyDefinition> copyPolicies = new 
HashMap<>(policyDefinitionMap);
+        Map<String, CompositePolicyHandler> copyHandlers = new 
HashMap<>(policyStreamHandlerMap);
+        for (PolicyDefinition pd : added) {
             inplaceAdd(copyPolicies, copyHandlers, pd, sds);
         }
-        for(PolicyDefinition pd : removed){
+        for (PolicyDefinition pd : removed) {
             inplaceRemove(copyPolicies, copyHandlers, pd);
         }
-        for(PolicyDefinition pd : modified){
+        for (PolicyDefinition pd : modified) {
             inplaceRemove(copyPolicies, copyHandlers, pd);
             inplaceAdd(copyPolicies, copyHandlers, pd, sds);
         }
@@ -132,9 +133,9 @@ public class PolicyGroupEvaluatorImpl implements 
PolicyGroupEvaluator {
     }
 
     private void inplaceAdd(Map<String, PolicyDefinition> policies, 
Map<String, CompositePolicyHandler> handlers, PolicyDefinition policy, 
Map<String, StreamDefinition> sds) {
-        if(handlers.containsKey(policy.getName())){
+        if (handlers.containsKey(policy.getName())) {
             LOG.error("metadata calculation error, try to add existing 
PolicyDefinition " + policy);
-        }else {
+        } else {
             policies.put(policy.getName(), policy);
             CompositePolicyHandler handler = new CompositePolicyHandler(sds);
             try {
@@ -154,20 +155,20 @@ public class PolicyGroupEvaluatorImpl implements 
PolicyGroupEvaluator {
         }
     }
 
-    private void inplaceRemove(Map<String, PolicyDefinition> policies, 
Map<String, CompositePolicyHandler> handlers, PolicyDefinition policy)  {
-        if(handlers.containsKey(policy.getName())) {
+    private void inplaceRemove(Map<String, PolicyDefinition> policies, 
Map<String, CompositePolicyHandler> handlers, PolicyDefinition policy) {
+        if (handlers.containsKey(policy.getName())) {
             PolicyStreamHandler handler = handlers.get(policy.getName());
             try {
                 handler.close();
             } catch (Exception e) {
-                LOG.error("Failed to close handler {}",handler,e);
-            }finally {
+                LOG.error("Failed to close handler {}", handler, e);
+            } finally {
                 policies.remove(policy.getName());
                 handlers.remove(policy.getName());
-                LOG.info("Removed policy: {}",policy);
+                LOG.info("Removed policy: {}", policy);
             }
         } else {
-            LOG.error("metadata calculation error, try to remove nonexisting 
PolicyDefinition: "+policy);
+            LOG.error("metadata calculation error, try to remove nonexisting 
PolicyDefinition: " + policy);
         }
     }
 

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/a0fc8708/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/evaluator/impl/SiddhiDefinitionAdapter.java
----------------------------------------------------------------------
diff --git 
a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/evaluator/impl/SiddhiDefinitionAdapter.java
 
b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/evaluator/impl/SiddhiDefinitionAdapter.java
index 5b83abb..9b9fcac 100644
--- 
a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/evaluator/impl/SiddhiDefinitionAdapter.java
+++ 
b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/evaluator/impl/SiddhiDefinitionAdapter.java
@@ -16,119 +16,118 @@
  */
 package org.apache.eagle.alert.engine.evaluator.impl;
 
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-
-import org.apache.commons.lang3.StringUtils;
 import org.apache.eagle.alert.engine.coordinator.StreamColumn;
 import org.apache.eagle.alert.engine.coordinator.StreamDefinition;
+import com.google.common.base.Preconditions;
+import org.apache.commons.lang3.StringUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.wso2.siddhi.query.api.definition.AbstractDefinition;
 import org.wso2.siddhi.query.api.definition.Attribute;
 
-import com.google.common.base.Preconditions;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
 
 public class SiddhiDefinitionAdapter {
-    private final static Logger LOG = 
LoggerFactory.getLogger(SiddhiDefinitionAdapter.class);
-    public final static String DEFINE_STREAM_TEMPLATE =  "define stream %s ( 
%s );";
+    private static final Logger LOG = 
LoggerFactory.getLogger(SiddhiDefinitionAdapter.class);
+    public static final String DEFINE_STREAM_TEMPLATE = "define stream %s ( %s 
);";
 
-    public static String buildStreamDefinition(StreamDefinition 
streamDefinition){
+    public static String buildStreamDefinition(StreamDefinition 
streamDefinition) {
         List<String> columns = new ArrayList<>();
-        Preconditions.checkNotNull(streamDefinition,"StreamDefinition is 
null");
-        if(streamDefinition.getColumns()!=null) {
+        Preconditions.checkNotNull(streamDefinition, "StreamDefinition is 
null");
+        if (streamDefinition.getColumns() != null) {
             for (StreamColumn column : streamDefinition.getColumns()) {
                 columns.add(String.format("%s %s", column.getName(), 
convertToSiddhiAttributeType(column.getType()).toString().toLowerCase()));
             }
-        }else{
-            LOG.warn("No columns found for stream 
{}"+streamDefinition.getStreamId());
+        } else {
+            LOG.warn("No columns found for stream {}" + 
streamDefinition.getStreamId());
         }
-        return String.format(DEFINE_STREAM_TEMPLATE, 
streamDefinition.getStreamId(),StringUtils.join(columns,","));
+        return String.format(DEFINE_STREAM_TEMPLATE, 
streamDefinition.getStreamId(), StringUtils.join(columns, ","));
     }
 
-    public static Attribute.Type 
convertToSiddhiAttributeType(StreamColumn.Type type){
-        if(_EAGLE_SIDDHI_TYPE_MAPPING.containsKey(type)){
+    public static Attribute.Type 
convertToSiddhiAttributeType(StreamColumn.Type type) {
+        if (_EAGLE_SIDDHI_TYPE_MAPPING.containsKey(type)) {
             return _EAGLE_SIDDHI_TYPE_MAPPING.get(type);
         }
 
-        throw new IllegalArgumentException("Unknown stream type: "+type);
+        throw new IllegalArgumentException("Unknown stream type: " + type);
     }
 
-    public static Class<?> convertToJavaAttributeType(StreamColumn.Type type){
-        if(_EAGLE_JAVA_TYPE_MAPPING.containsKey(type)){
+    public static Class<?> convertToJavaAttributeType(StreamColumn.Type type) {
+        if (_EAGLE_JAVA_TYPE_MAPPING.containsKey(type)) {
             return _EAGLE_JAVA_TYPE_MAPPING.get(type);
         }
 
-        throw new IllegalArgumentException("Unknown stream type: "+type);
+        throw new IllegalArgumentException("Unknown stream type: " + type);
     }
 
-    public static StreamColumn.Type convertFromJavaAttributeType(Class<?> 
type){
-        if(_JAVA_EAGLE_TYPE_MAPPING.containsKey(type)){
+    public static StreamColumn.Type convertFromJavaAttributeType(Class<?> 
type) {
+        if (_JAVA_EAGLE_TYPE_MAPPING.containsKey(type)) {
             return _JAVA_EAGLE_TYPE_MAPPING.get(type);
         }
 
-        throw new IllegalArgumentException("Unknown stream type: "+type);
+        throw new IllegalArgumentException("Unknown stream type: " + type);
     }
 
-    public static StreamColumn.Type 
convertFromSiddhiAttributeType(Attribute.Type type){
-        if(_SIDDHI_EAGLE_TYPE_MAPPING.containsKey(type)){
+    public static StreamColumn.Type 
convertFromSiddhiAttributeType(Attribute.Type type) {
+        if (_SIDDHI_EAGLE_TYPE_MAPPING.containsKey(type)) {
             return _SIDDHI_EAGLE_TYPE_MAPPING.get(type);
         }
 
-        throw new IllegalArgumentException("Unknown siddhi type: "+type);
+        throw new IllegalArgumentException("Unknown siddhi type: " + type);
     }
 
     /**
      * public enum Type {
-     *   STRING, INT, LONG, FLOAT, DOUBLE, BOOL, OBJECT
-     * }
+     * STRING, INT, LONG, FLOAT, DOUBLE, BOOL, OBJECT
+     * }.
      */
-    private final static Map<StreamColumn.Type,Attribute.Type> 
_EAGLE_SIDDHI_TYPE_MAPPING = new HashMap<>();
-    private final static Map<StreamColumn.Type,Class<?>> 
_EAGLE_JAVA_TYPE_MAPPING = new HashMap<>();
-    private final static Map<Class<?>,StreamColumn.Type> 
_JAVA_EAGLE_TYPE_MAPPING = new HashMap<>();
-    private final static Map<Attribute.Type,StreamColumn.Type> 
_SIDDHI_EAGLE_TYPE_MAPPING = new HashMap<>();
+    private static final Map<StreamColumn.Type, Attribute.Type> 
_EAGLE_SIDDHI_TYPE_MAPPING = new HashMap<>();
+    private static final Map<StreamColumn.Type, Class<?>> 
_EAGLE_JAVA_TYPE_MAPPING = new HashMap<>();
+    private static final Map<Class<?>, StreamColumn.Type> 
_JAVA_EAGLE_TYPE_MAPPING = new HashMap<>();
+    private static final Map<Attribute.Type, StreamColumn.Type> 
_SIDDHI_EAGLE_TYPE_MAPPING = new HashMap<>();
 
     static {
-        
_EAGLE_SIDDHI_TYPE_MAPPING.put(StreamColumn.Type.STRING,Attribute.Type.STRING);
-        
_EAGLE_SIDDHI_TYPE_MAPPING.put(StreamColumn.Type.INT,Attribute.Type.INT);
-        
_EAGLE_SIDDHI_TYPE_MAPPING.put(StreamColumn.Type.LONG,Attribute.Type.LONG);
-        
_EAGLE_SIDDHI_TYPE_MAPPING.put(StreamColumn.Type.FLOAT,Attribute.Type.FLOAT);
-        
_EAGLE_SIDDHI_TYPE_MAPPING.put(StreamColumn.Type.DOUBLE,Attribute.Type.DOUBLE);
-        
_EAGLE_SIDDHI_TYPE_MAPPING.put(StreamColumn.Type.BOOL,Attribute.Type.BOOL);
-        
_EAGLE_SIDDHI_TYPE_MAPPING.put(StreamColumn.Type.OBJECT,Attribute.Type.OBJECT);
-
-        _EAGLE_JAVA_TYPE_MAPPING.put(StreamColumn.Type.STRING,String.class);
-        _EAGLE_JAVA_TYPE_MAPPING.put(StreamColumn.Type.INT,Integer.class);
-        _EAGLE_JAVA_TYPE_MAPPING.put(StreamColumn.Type.LONG,Long.class);
-        _EAGLE_JAVA_TYPE_MAPPING.put(StreamColumn.Type.FLOAT,Float.class);
-        _EAGLE_JAVA_TYPE_MAPPING.put(StreamColumn.Type.DOUBLE,Double.class);
-        _EAGLE_JAVA_TYPE_MAPPING.put(StreamColumn.Type.BOOL,Boolean.class);
-        _EAGLE_JAVA_TYPE_MAPPING.put(StreamColumn.Type.OBJECT,Object.class);
-
-        _JAVA_EAGLE_TYPE_MAPPING.put(String.class,StreamColumn.Type.STRING);
-        _JAVA_EAGLE_TYPE_MAPPING.put(Integer.class,StreamColumn.Type.INT);
-        _JAVA_EAGLE_TYPE_MAPPING.put(Long.class,StreamColumn.Type.LONG);
-        _JAVA_EAGLE_TYPE_MAPPING.put(Float.class,StreamColumn.Type.FLOAT);
-        _JAVA_EAGLE_TYPE_MAPPING.put(Double.class,StreamColumn.Type.DOUBLE);
-        _JAVA_EAGLE_TYPE_MAPPING.put(Boolean.class,StreamColumn.Type.BOOL);
-        _JAVA_EAGLE_TYPE_MAPPING.put(Object.class,StreamColumn.Type.OBJECT);
-
-        
_SIDDHI_EAGLE_TYPE_MAPPING.put(Attribute.Type.STRING,StreamColumn.Type.STRING);
-        
_SIDDHI_EAGLE_TYPE_MAPPING.put(Attribute.Type.INT,StreamColumn.Type.INT);
-        
_SIDDHI_EAGLE_TYPE_MAPPING.put(Attribute.Type.LONG,StreamColumn.Type.LONG);
-        
_SIDDHI_EAGLE_TYPE_MAPPING.put(Attribute.Type.FLOAT,StreamColumn.Type.FLOAT);
-        
_SIDDHI_EAGLE_TYPE_MAPPING.put(Attribute.Type.DOUBLE,StreamColumn.Type.DOUBLE);
-        
_SIDDHI_EAGLE_TYPE_MAPPING.put(Attribute.Type.BOOL,StreamColumn.Type.BOOL);
-        
_SIDDHI_EAGLE_TYPE_MAPPING.put(Attribute.Type.OBJECT,StreamColumn.Type.OBJECT);
+        _EAGLE_SIDDHI_TYPE_MAPPING.put(StreamColumn.Type.STRING, 
Attribute.Type.STRING);
+        _EAGLE_SIDDHI_TYPE_MAPPING.put(StreamColumn.Type.INT, 
Attribute.Type.INT);
+        _EAGLE_SIDDHI_TYPE_MAPPING.put(StreamColumn.Type.LONG, 
Attribute.Type.LONG);
+        _EAGLE_SIDDHI_TYPE_MAPPING.put(StreamColumn.Type.FLOAT, 
Attribute.Type.FLOAT);
+        _EAGLE_SIDDHI_TYPE_MAPPING.put(StreamColumn.Type.DOUBLE, 
Attribute.Type.DOUBLE);
+        _EAGLE_SIDDHI_TYPE_MAPPING.put(StreamColumn.Type.BOOL, 
Attribute.Type.BOOL);
+        _EAGLE_SIDDHI_TYPE_MAPPING.put(StreamColumn.Type.OBJECT, 
Attribute.Type.OBJECT);
+
+        _EAGLE_JAVA_TYPE_MAPPING.put(StreamColumn.Type.STRING, String.class);
+        _EAGLE_JAVA_TYPE_MAPPING.put(StreamColumn.Type.INT, Integer.class);
+        _EAGLE_JAVA_TYPE_MAPPING.put(StreamColumn.Type.LONG, Long.class);
+        _EAGLE_JAVA_TYPE_MAPPING.put(StreamColumn.Type.FLOAT, Float.class);
+        _EAGLE_JAVA_TYPE_MAPPING.put(StreamColumn.Type.DOUBLE, Double.class);
+        _EAGLE_JAVA_TYPE_MAPPING.put(StreamColumn.Type.BOOL, Boolean.class);
+        _EAGLE_JAVA_TYPE_MAPPING.put(StreamColumn.Type.OBJECT, Object.class);
+
+        _JAVA_EAGLE_TYPE_MAPPING.put(String.class, StreamColumn.Type.STRING);
+        _JAVA_EAGLE_TYPE_MAPPING.put(Integer.class, StreamColumn.Type.INT);
+        _JAVA_EAGLE_TYPE_MAPPING.put(Long.class, StreamColumn.Type.LONG);
+        _JAVA_EAGLE_TYPE_MAPPING.put(Float.class, StreamColumn.Type.FLOAT);
+        _JAVA_EAGLE_TYPE_MAPPING.put(Double.class, StreamColumn.Type.DOUBLE);
+        _JAVA_EAGLE_TYPE_MAPPING.put(Boolean.class, StreamColumn.Type.BOOL);
+        _JAVA_EAGLE_TYPE_MAPPING.put(Object.class, StreamColumn.Type.OBJECT);
+
+        _SIDDHI_EAGLE_TYPE_MAPPING.put(Attribute.Type.STRING, 
StreamColumn.Type.STRING);
+        _SIDDHI_EAGLE_TYPE_MAPPING.put(Attribute.Type.INT, 
StreamColumn.Type.INT);
+        _SIDDHI_EAGLE_TYPE_MAPPING.put(Attribute.Type.LONG, 
StreamColumn.Type.LONG);
+        _SIDDHI_EAGLE_TYPE_MAPPING.put(Attribute.Type.FLOAT, 
StreamColumn.Type.FLOAT);
+        _SIDDHI_EAGLE_TYPE_MAPPING.put(Attribute.Type.DOUBLE, 
StreamColumn.Type.DOUBLE);
+        _SIDDHI_EAGLE_TYPE_MAPPING.put(Attribute.Type.BOOL, 
StreamColumn.Type.BOOL);
+        _SIDDHI_EAGLE_TYPE_MAPPING.put(Attribute.Type.OBJECT, 
StreamColumn.Type.OBJECT);
     }
 
-    public static StreamDefinition 
convertFromSiddiDefinition(AbstractDefinition siddhiDefinition){
+    public static StreamDefinition 
convertFromSiddiDefinition(AbstractDefinition siddhiDefinition) {
         StreamDefinition streamDefinition = new StreamDefinition();
         streamDefinition.setStreamId(siddhiDefinition.getId());
         List<StreamColumn> columns = new 
ArrayList<>(siddhiDefinition.getAttributeNameArray().length);
-        for(Attribute attribute:siddhiDefinition.getAttributeList()){
+        for (Attribute attribute : siddhiDefinition.getAttributeList()) {
             StreamColumn column = new StreamColumn();
             
column.setType(convertFromSiddhiAttributeType(attribute.getType()));
             column.setName(attribute.getName());
@@ -136,7 +135,7 @@ public class SiddhiDefinitionAdapter {
         }
         streamDefinition.setColumns(columns);
         streamDefinition.setTimeseries(true);
-        streamDefinition.setDescription("Auto-generated stream schema from 
siddhi for "+siddhiDefinition.getId());
+        streamDefinition.setDescription("Auto-generated stream schema from 
siddhi for " + siddhiDefinition.getId());
         return streamDefinition;
     }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/a0fc8708/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/evaluator/impl/SiddhiPolicyHandler.java
----------------------------------------------------------------------
diff --git 
a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/evaluator/impl/SiddhiPolicyHandler.java
 
b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/evaluator/impl/SiddhiPolicyHandler.java
index 481e3af..e7ed56f 100755
--- 
a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/evaluator/impl/SiddhiPolicyHandler.java
+++ 
b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/evaluator/impl/SiddhiPolicyHandler.java
@@ -34,7 +34,7 @@ import java.util.List;
 import java.util.Map;
 
 public class SiddhiPolicyHandler implements PolicyStreamHandler {
-    private final static Logger LOG = 
LoggerFactory.getLogger(SiddhiPolicyHandler.class);
+    private static final Logger LOG = 
LoggerFactory.getLogger(SiddhiPolicyHandler.class);
     private ExecutionPlanRuntime executionRuntime;
     private SiddhiManager siddhiManager;
     private Map<String, StreamDefinition> sds;
@@ -43,7 +43,7 @@ public class SiddhiPolicyHandler implements 
PolicyStreamHandler {
 
     private int currentIndex = 0; // the index of current definition statement 
inside the policy definition
 
-    public SiddhiPolicyHandler(Map<String, StreamDefinition> sds, int index){
+    public SiddhiPolicyHandler(Map<String, StreamDefinition> sds, int index) {
         this.sds = sds;
         this.currentIndex = index;
     }
@@ -59,44 +59,46 @@ public class SiddhiPolicyHandler implements 
PolicyStreamHandler {
             
coreDefinition.setOutputStreams(policyDefinition.getOutputStreams());
         }
 
-        for(String inputStream : coreDefinition.getInputStreams()) {
+        for (String inputStream : coreDefinition.getInputStreams()) {
             
builder.append(SiddhiDefinitionAdapter.buildStreamDefinition(sds.get(inputStream)));
             builder.append("\n");
         }
         builder.append(coreDefinition.value);
-        if(LOG.isDebugEnabled()) LOG.debug("Generated siddhi execution plan: 
{} from definition: {}", builder.toString(), coreDefinition);
+        if (LOG.isDebugEnabled()) {
+            LOG.debug("Generated siddhi execution plan: {} from definition: 
{}", builder.toString(), coreDefinition);
+        }
         return builder.toString();
     }
 
     @Override
     public void prepare(final Collector<AlertStreamEvent> collector, 
PolicyHandlerContext context) throws Exception {
-        LOG.info("Initializing handler for policy 
{}",context.getPolicyDefinition());
+        LOG.info("Initializing handler for policy {}", 
context.getPolicyDefinition());
         this.policy = context.getPolicyDefinition();
         this.siddhiManager = new SiddhiManager();
         String plan = generateExecutionPlan(policy, sds);
         try {
             this.executionRuntime = 
siddhiManager.createExecutionPlanRuntime(plan);
-            LOG.info("Created siddhi runtime {}",executionRuntime.getName());
-        }catch (Exception parserException){
-            LOG.error("Failed to create siddhi runtime for policy: {}, siddhi 
plan: \n\n{}\n",context.getPolicyDefinition().getName(),plan,parserException);
+            LOG.info("Created siddhi runtime {}", executionRuntime.getName());
+        } catch (Exception parserException) {
+            LOG.error("Failed to create siddhi runtime for policy: {}, siddhi 
plan: \n\n{}\n", context.getPolicyDefinition().getName(), plan, 
parserException);
             throw parserException;
         }
 
         // add output stream callback
         List<String> outputStreams = getOutputStreams(policy);
-        for(final String outputStream: outputStreams) {
+        for (final String outputStream : outputStreams) {
             if 
(executionRuntime.getStreamDefinitionMap().containsKey(outputStream)) {
                 this.executionRuntime.addCallback(outputStream,
-                        new AlertStreamCallback(
-                                outputStream, 
SiddhiDefinitionAdapter.convertFromSiddiDefinition(executionRuntime.getStreamDefinitionMap().get(outputStream))
-                                , collector, context, currentIndex));
+                    new AlertStreamCallback(
+                        outputStream, 
SiddhiDefinitionAdapter.convertFromSiddiDefinition(executionRuntime.getStreamDefinitionMap().get(outputStream)),
+                        collector, context, currentIndex));
             } else {
                 throw new IllegalStateException("Undefined output stream " + 
outputStream);
             }
         }
         this.executionRuntime.start();
         this.context = context;
-        LOG.info("Initialized policy handler for policy: {}",policy.getName());
+        LOG.info("Initialized policy handler for policy: {}", 
policy.getName());
     }
 
     protected List<String> getOutputStreams(PolicyDefinition policy) {
@@ -104,29 +106,29 @@ public class SiddhiPolicyHandler implements 
PolicyStreamHandler {
     }
 
     public void send(StreamEvent event) throws Exception {
-        
context.getPolicyCounter().scope(String.format("%s.%s",this.context.getPolicyDefinition().getName(),"receive_count")).incr();
+        context.getPolicyCounter().scope(String.format("%s.%s", 
this.context.getPolicyDefinition().getName(), "receive_count")).incr();
         String streamId = event.getStreamId();
         InputHandler inputHandler = executionRuntime.getInputHandler(streamId);
-        if(inputHandler != null){
-            
context.getPolicyCounter().scope(String.format("%s.%s",this.context.getPolicyDefinition().getName(),"eval_count")).incr();
-            inputHandler.send(event.getTimestamp(),event.getData());
-            
+        if (inputHandler != null) {
+            context.getPolicyCounter().scope(String.format("%s.%s", 
this.context.getPolicyDefinition().getName(), "eval_count")).incr();
+            inputHandler.send(event.getTimestamp(), event.getData());
+
             if (LOG.isDebugEnabled()) {
                 LOG.debug("sent event to siddhi stream {} ", streamId);
             }
-        }else{
-            
context.getPolicyCounter().scope(String.format("%s.%s",this.context.getPolicyDefinition().getName(),"drop_count")).incr();
-            LOG.warn("No input handler found for stream {}",streamId);
+        } else {
+            context.getPolicyCounter().scope(String.format("%s.%s", 
this.context.getPolicyDefinition().getName(), "drop_count")).incr();
+            LOG.warn("No input handler found for stream {}", streamId);
         }
     }
 
     public void close() throws Exception {
-        LOG.info("Closing handler for policy {}",this.policy.getName());
+        LOG.info("Closing handler for policy {}", this.policy.getName());
         this.executionRuntime.shutdown();
-        LOG.info("Shutdown siddhi runtime {}",this.executionRuntime.getName());
+        LOG.info("Shutdown siddhi runtime {}", 
this.executionRuntime.getName());
         this.siddhiManager.shutdown();
-        LOG.info("Shutdown siddhi manager {}",this.siddhiManager);
-        LOG.info("Closed handler for policy {}",this.policy.getName());
+        LOG.info("Shutdown siddhi manager {}", this.siddhiManager);
+        LOG.info("Closed handler for policy {}", this.policy.getName());
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/a0fc8708/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/evaluator/impl/SiddhiPolicyStateHandler.java
----------------------------------------------------------------------
diff --git 
a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/evaluator/impl/SiddhiPolicyStateHandler.java
 
b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/evaluator/impl/SiddhiPolicyStateHandler.java
index 43b8f30..11f484d 100644
--- 
a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/evaluator/impl/SiddhiPolicyStateHandler.java
+++ 
b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/evaluator/impl/SiddhiPolicyStateHandler.java
@@ -30,7 +30,7 @@ import java.util.Map;
  */
 public class SiddhiPolicyStateHandler extends SiddhiPolicyHandler {
 
-    private final static Logger LOG = 
LoggerFactory.getLogger(SiddhiPolicyStateHandler.class);
+    private static final Logger LOG = 
LoggerFactory.getLogger(SiddhiPolicyStateHandler.class);
 
     public SiddhiPolicyStateHandler(Map<String, StreamDefinition> sds, int 
index) {
         super(sds, index);
@@ -40,12 +40,14 @@ public class SiddhiPolicyStateHandler extends 
SiddhiPolicyHandler {
     protected String generateExecutionPlan(PolicyDefinition policyDefinition, 
Map<String, StreamDefinition> sds) throws StreamDefinitionNotFoundException {
         StringBuilder builder = new StringBuilder();
         PolicyDefinition.Definition stateDefiniton = 
policyDefinition.getStateDefinition();
-        for(String inputStream : stateDefiniton.getInputStreams()) { // the 
state stream follow the output stream of the policy definition
+        for (String inputStream : stateDefiniton.getInputStreams()) { // the 
state stream follow the output stream of the policy definition
             
builder.append(SiddhiDefinitionAdapter.buildStreamDefinition(sds.get(inputStream)));
             builder.append("\n");
         }
         builder.append(stateDefiniton.value);
-        if(LOG.isDebugEnabled()) LOG.debug("Generated siddhi state execution 
plan: {} from definiton: {}", builder.toString(), stateDefiniton);
+        if (LOG.isDebugEnabled()) {
+            LOG.debug("Generated siddhi state execution plan: {} from 
definiton: {}", builder.toString(), stateDefiniton);
+        }
         return builder.toString();
     }
 

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/a0fc8708/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/evaluator/nodata/DistinctValuesInTimeBatchWindow.java
----------------------------------------------------------------------
diff --git 
a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/evaluator/nodata/DistinctValuesInTimeBatchWindow.java
 
b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/evaluator/nodata/DistinctValuesInTimeBatchWindow.java
index 357504e..ef806fb 100644
--- 
a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/evaluator/nodata/DistinctValuesInTimeBatchWindow.java
+++ 
b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/evaluator/nodata/DistinctValuesInTimeBatchWindow.java
@@ -16,6 +16,10 @@
  */
 package org.apache.eagle.alert.engine.evaluator.nodata;
 
+import org.apache.eagle.alert.engine.model.StreamEvent;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.Map;
@@ -24,105 +28,101 @@ import java.util.concurrent.Executors;
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.TimeUnit;
 
-import org.apache.eagle.alert.engine.model.StreamEvent;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
 public class DistinctValuesInTimeBatchWindow {
 
-       private static final Logger LOG = 
LoggerFactory.getLogger(DistinctValuesInTimeBatchWindow.class);
-       
-       private final ScheduledExecutorService scheduler = 
Executors.newScheduledThreadPool(1);
-       
-       // wisb (what is should be) set for expected full set value of multiple 
columns
-       @SuppressWarnings("rawtypes")
-       private volatile Set wisb = new HashSet();
-       
-       private NoDataPolicyTimeBatchHandler handler;
-       
-       /**
-        * map from value to max timestamp for this value
-        */
-       private Map<Object, Long> valueMaxTimeMap = new HashMap<>();
-       
-       private long startTime = -1;
-       private long nextEmitTime = -1;
-       private long timeInMilliSeconds;
-
-       public DistinctValuesInTimeBatchWindow(NoDataPolicyTimeBatchHandler 
handler, 
-                       long timeInMilliSeconds, @SuppressWarnings("rawtypes") 
Set wisb) {
-               this.handler = handler;
-               this.timeInMilliSeconds = timeInMilliSeconds;
-               if (wisb != null) {
-                       this.wisb = wisb;
-               }
-       }
-
-       public Map<Object, Long> distinctValues() {
-               return valueMaxTimeMap;
-       }
-       
-       public void send(StreamEvent event, Object value, long timestamp) {
-               synchronized(this) {
-                       if (startTime < 0) {
-                               startTime = System.currentTimeMillis();
-                               
-                               scheduler.scheduleAtFixedRate(new Runnable() {
-
-                                       @SuppressWarnings({ "unchecked", 
"rawtypes" })
-                                       @Override
-                                       public void run() {
-                                               try {
-                                                       LOG.info("{}/{}: {}", 
startTime, nextEmitTime, valueMaxTimeMap.keySet());
-                                                       synchronized 
(valueMaxTimeMap) {
-                                                               boolean 
sendAlerts = false;
-                                                               
-                                                               if 
(nextEmitTime < 0) {
-                                                                       
nextEmitTime = startTime + timeInMilliSeconds;
-                                                               }
-                                                               
-                                                               if 
(System.currentTimeMillis() > nextEmitTime) {
-                                                                       
startTime = nextEmitTime;
-                                                                       
nextEmitTime += timeInMilliSeconds;
-                                                                       
sendAlerts = true;
-                                                               } else {
-                                                                       
sendAlerts = false;
-                                                               }
-                                                               
-                                                               if (sendAlerts) 
{
-                                                                       // alert
-                                                                       
handler.compareAndEmit(wisb, distinctValues().keySet(), event);
-                                                                       
LOG.info("alert for wiri: {} compares to wisb: {}", distinctValues().keySet(), 
wisb);
-                                                                       
-                                                                       if 
(distinctValues().keySet().size() > 0) {
-                                                                               
wisb = new HashSet(distinctValues().keySet());
-                                                                       }
-                                                                       
valueMaxTimeMap.clear();
-                                                                       
LOG.info("Clear wiri & update wisb to {}", wisb);
-                                                               }
-                                                       }
-                                               } catch (Throwable t) {
-                                                       LOG.error("failed to 
run batch window for gap alert", t);
-                                               }
-                                       }
-                                       
-                               }, 0, timeInMilliSeconds / 2, 
TimeUnit.MILLISECONDS);
-                       }
-               }
-               
-               if (valueMaxTimeMap.containsKey(value)) {
-                       // remove that entry with old timestamp in timeSortedMap
-                       long oldTime = valueMaxTimeMap.get(value);
-                       if (oldTime >= timestamp) {
-                               // no any effect as the new timestamp is equal 
or even less than
-                               // old timestamp
-                               return;
-                       }
-               }
-               // update new timestamp in valueMaxTimeMap
-               valueMaxTimeMap.put(value, timestamp);
-               
-               LOG.info("sent: {} with start: {}/next: {}", 
valueMaxTimeMap.keySet(), startTime, nextEmitTime);
-       }
+    private static final Logger LOG = 
LoggerFactory.getLogger(DistinctValuesInTimeBatchWindow.class);
+
+    private final ScheduledExecutorService scheduler = 
Executors.newScheduledThreadPool(1);
+
+    // wisb (what is should be) set for expected full set value of multiple 
columns
+    @SuppressWarnings("rawtypes")
+    private volatile Set wisb = new HashSet();
+
+    private NoDataPolicyTimeBatchHandler handler;
+
+    /**
+     * map from value to max timestamp for this value.
+     */
+    private Map<Object, Long> valueMaxTimeMap = new HashMap<>();
+
+    private long startTime = -1;
+    private long nextEmitTime = -1;
+    private long timeInMilliSeconds;
+
+    public DistinctValuesInTimeBatchWindow(NoDataPolicyTimeBatchHandler 
handler,
+                                           long timeInMilliSeconds, 
@SuppressWarnings("rawtypes") Set wisb) {
+        this.handler = handler;
+        this.timeInMilliSeconds = timeInMilliSeconds;
+        if (wisb != null) {
+            this.wisb = wisb;
+        }
+    }
+
+    public Map<Object, Long> distinctValues() {
+        return valueMaxTimeMap;
+    }
+
+    public void send(StreamEvent event, Object value, long timestamp) {
+        synchronized (this) {
+            if (startTime < 0) {
+                startTime = System.currentTimeMillis();
+
+                scheduler.scheduleAtFixedRate(new Runnable() {
+
+                    @SuppressWarnings( {"unchecked", "rawtypes"})
+                    @Override
+                    public void run() {
+                        try {
+                            LOG.info("{}/{}: {}", startTime, nextEmitTime, 
valueMaxTimeMap.keySet());
+                            synchronized (valueMaxTimeMap) {
+                                boolean sendAlerts = false;
+
+                                if (nextEmitTime < 0) {
+                                    nextEmitTime = startTime + 
timeInMilliSeconds;
+                                }
+
+                                if (System.currentTimeMillis() > nextEmitTime) 
{
+                                    startTime = nextEmitTime;
+                                    nextEmitTime += timeInMilliSeconds;
+                                    sendAlerts = true;
+                                } else {
+                                    sendAlerts = false;
+                                }
+
+                                if (sendAlerts) {
+                                    // alert
+                                    handler.compareAndEmit(wisb, 
distinctValues().keySet(), event);
+                                    LOG.info("alert for wiri: {} compares to 
wisb: {}", distinctValues().keySet(), wisb);
+
+                                    if (distinctValues().keySet().size() > 0) {
+                                        wisb = new 
HashSet(distinctValues().keySet());
+                                    }
+                                    valueMaxTimeMap.clear();
+                                    LOG.info("Clear wiri & update wisb to {}", 
wisb);
+                                }
+                            }
+                        } catch (Throwable t) {
+                            LOG.error("failed to run batch window for gap 
alert", t);
+                        }
+                    }
+
+                }, 0, timeInMilliSeconds / 2, TimeUnit.MILLISECONDS);
+            }
+        }
+
+        if (valueMaxTimeMap.containsKey(value)) {
+            // remove that entry with old timestamp in timeSortedMap
+            long oldTime = valueMaxTimeMap.get(value);
+            if (oldTime >= timestamp) {
+                // no any effect as the new timestamp is equal or even less 
than
+                // old timestamp
+                return;
+            }
+        }
+        // update new timestamp in valueMaxTimeMap
+        valueMaxTimeMap.put(value, timestamp);
+
+        LOG.info("sent: {} with start: {}/next: {}", valueMaxTimeMap.keySet(), 
startTime, nextEmitTime);
+    }
 
 }

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/a0fc8708/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/evaluator/nodata/DistinctValuesInTimeWindow.java
----------------------------------------------------------------------
diff --git 
a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/evaluator/nodata/DistinctValuesInTimeWindow.java
 
b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/evaluator/nodata/DistinctValuesInTimeWindow.java
index 676357a..4aae040 100644
--- 
a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/evaluator/nodata/DistinctValuesInTimeWindow.java
+++ 
b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/evaluator/nodata/DistinctValuesInTimeWindow.java
@@ -16,70 +16,69 @@
  */
 package org.apache.eagle.alert.engine.evaluator.nodata;
 
-import java.util.Comparator;
-import java.util.HashMap;
-import java.util.Iterator;
-import java.util.Map;
-import java.util.SortedMap;
-import java.util.TreeMap;
-
 import org.apache.commons.lang.builder.HashCodeBuilder;
 
+import java.util.*;
+
 /**
  * Since 6/28/16.
  * to get distinct values within a specified time window
  * valueMaxTimeMap : each distinct value is associated with max timestamp it 
ever had
  * timeSortedMap : map sorted by timestamp first and then value
- * With the above 2 data structure, we can get distinct values in LOG(N)
+ * With the above 2 data structure, we can get distinct values in LOG(N).
  */
 public class DistinctValuesInTimeWindow {
-    public static class ValueAndTime{
+    public static class ValueAndTime {
         Object value;
         long timestamp;
-        public ValueAndTime(Object value, long timestamp){
+
+        public ValueAndTime(Object value, long timestamp) {
             this.value = value;
             this.timestamp = timestamp;
         }
 
-        public String toString(){
+        public String toString() {
             return "[" + value + "," + timestamp + "]";
         }
 
-        public int hashCode(){
+        public int hashCode() {
             return new 
HashCodeBuilder().append(value).append(timestamp).toHashCode();
         }
 
-        public boolean equals(Object that){
-            if(!(that instanceof ValueAndTime))
+        public boolean equals(Object that) {
+            if (!(that instanceof ValueAndTime)) {
                 return false;
-            ValueAndTime another = (ValueAndTime)that;
+            }
+            ValueAndTime another = (ValueAndTime) that;
             return another.timestamp == this.timestamp && 
another.value.equals(this.value);
         }
     }
 
-    public static class ValueAndTimeComparator implements 
Comparator<ValueAndTime>{
+    public static class ValueAndTimeComparator implements 
Comparator<ValueAndTime> {
         @Override
         public int compare(ValueAndTime o1, ValueAndTime o2) {
-            if(o1.timestamp != o2.timestamp)
+            if (o1.timestamp != o2.timestamp) {
                 return (o1.timestamp > o2.timestamp) ? 1 : -1;
-            if(o1.value.equals(o2.value))
+            }
+            if (o1.value.equals(o2.value)) {
                 return 0;
-            else {
+            } else {
                 // this is not strictly correct, but I don't want to write too 
many comparators here :-)
-                if(o1.hashCode() > o2.hashCode())
+                if (o1.hashCode() > o2.hashCode()) {
                     return 1;
-                else
+                } else {
                     return -1;
+                }
             }
         }
     }
 
     /**
-     * map from value to max timestamp for this value
+     * map from value to max timestamp for this value.
      */
     private Map<Object, Long> valueMaxTimeMap = new HashMap<>();
     /**
-     * map sorted by time(max timestamp for the value) and then value
+     * map sorted by time(max timestamp for the value) and then value.
      */
     private SortedMap<ValueAndTime, ValueAndTime> timeSortedMap = new 
TreeMap<>(new ValueAndTimeComparator());
     private long maxTimestamp = 0L;
@@ -87,20 +86,20 @@ public class DistinctValuesInTimeWindow {
     private boolean windowSlided;
 
     /**
-     * @param window - milliseconds
+     * @param window - milliseconds.
      */
-    public DistinctValuesInTimeWindow(long window){
+    public DistinctValuesInTimeWindow(long window) {
         this.window = window;
     }
 
-    public void send(Object value, long timestamp){
+    public void send(Object value, long timestamp) {
         ValueAndTime vt = new ValueAndTime(value, timestamp);
 
         // todo think of time out of order
-        if(valueMaxTimeMap.containsKey(value)){
+        if (valueMaxTimeMap.containsKey(value)) {
             // remove that entry with old timestamp in timeSortedMap
             long oldTime = valueMaxTimeMap.get(value);
-            if(oldTime >= timestamp){
+            if (oldTime >= timestamp) {
                 // no any effect as the new timestamp is equal or even less 
than old timestamp
                 return;
             }
@@ -117,25 +116,25 @@ public class DistinctValuesInTimeWindow {
 
         // check if some values should be evicted because of time window
         Iterator<Map.Entry<ValueAndTime, ValueAndTime>> it = 
timeSortedMap.entrySet().iterator();
-        while(it.hasNext()){
+        while (it.hasNext()) {
             Map.Entry<ValueAndTime, ValueAndTime> entry = it.next();
-            if(entry.getKey().timestamp < maxTimestamp - window){
+            if (entry.getKey().timestamp < maxTimestamp - window) {
                 // should remove the entry in valueMaxTimeMap and timeSortedMap
                 valueMaxTimeMap.remove(entry.getKey().value);
                 windowSlided = true;
 
                 it.remove();
-            }else {
+            } else {
                 break;
             }
         }
     }
 
-    public Map<Object, Long> distinctValues(){
+    public Map<Object, Long> distinctValues() {
         return valueMaxTimeMap;
     }
 
-    public boolean windowSlided(){
+    public boolean windowSlided() {
         return windowSlided;
     }
 }

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/a0fc8708/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/evaluator/nodata/NoDataPolicyHandler.java
----------------------------------------------------------------------
diff --git 
a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/evaluator/nodata/NoDataPolicyHandler.java
 
b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/evaluator/nodata/NoDataPolicyHandler.java
index 8f37b93..ec6e6e9 100644
--- 
a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/evaluator/nodata/NoDataPolicyHandler.java
+++ 
b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/evaluator/nodata/NoDataPolicyHandler.java
@@ -16,7 +16,6 @@
  */
 package org.apache.eagle.alert.engine.evaluator.nodata;
 
-import org.apache.commons.collections.CollectionUtils;
 import org.apache.eagle.alert.engine.Collector;
 import org.apache.eagle.alert.engine.coordinator.PolicyDefinition;
 import org.apache.eagle.alert.engine.coordinator.StreamDefinition;
@@ -25,6 +24,7 @@ import 
org.apache.eagle.alert.engine.evaluator.PolicyStreamHandler;
 import org.apache.eagle.alert.engine.model.AlertStreamEvent;
 import org.apache.eagle.alert.engine.model.StreamEvent;
 import org.apache.eagle.alert.utils.TimePeriodUtils;
+import org.apache.commons.collections.CollectionUtils;
 import org.joda.time.Period;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -39,56 +39,54 @@ import java.util.*;
  * 2. timestamp field: timestamp column
  * 3. wiri safe time window: how long window is good for full set of wiri
  * 4. wisb: full set
- *
  * No data policy definition should include
  * fixed fields and dynamic fields
  * fixed fields are leading fields : windowPeriod, type, numOfFields, f1_name, 
f2_name
  * dynamic fields depend on wisb type.
- *
  * policy would be like:
  * {
- "name": "noDataAlertPolicy",
- "description": "noDataAlertPolicy",
- "inputStreams": [
- "noDataAlertStream"
- ],
- "outputStreams": [
- "noDataAlertStream_out"
- ],
- "definition": {
- "type": "nodataalert",
- "value": "PT1M,plain,1,host,host1,host2"   // or "value": 
"PT1M,dynamic,1,host"
- },
- "partitionSpec": [
- {
- "streamId": "noDataAlertStream",
- "type": "GROUPBY"
- }
- ],
- "parallelismHint": 2
- }
-     "name": "noDataAlertPolicy",
-     "description": "noDataAlertPolicy",
-     "inputStreams": [
-        "noDataAlertStream"
-     ],
-     "outputStreams": [
-        "noDataAlertStream_out"
-     ],
-     "definition": {
-        "type": "nodataalert",
-        "value": "PT1M,plain,1,host,host1,host2"   // or "value": 
"PT1M,dynamic,1,host"
-     },
-     "partitionSpec": [
-     {
-        "streamId": "noDataAlertStream",
-        "type": "GROUPBY"
-     }
-     ],
-     "parallelismHint": 2
-   }
+ * "name": "noDataAlertPolicy",
+ * "description": "noDataAlertPolicy",
+ * "inputStreams": [
+ * "noDataAlertStream"
+ * ],
+ * "outputStreams": [
+ * "noDataAlertStream_out"
+ * ],
+ * "definition": {
+ * "type": "nodataalert",
+ * "value": "PT1M,plain,1,host,host1,host2"   // or "value": 
"PT1M,dynamic,1,host"
+ * },
+ * "partitionSpec": [
+ * {
+ * "streamId": "noDataAlertStream",
+ * "type": "GROUPBY"
+ * }
+ * ],
+ * "parallelismHint": 2
+ * }
+ * "name": "noDataAlertPolicy",
+ * "description": "noDataAlertPolicy",
+ * "inputStreams": [
+ * "noDataAlertStream"
+ * ],
+ * "outputStreams": [
+ * "noDataAlertStream_out"
+ * ],
+ * "definition": {
+ * "type": "nodataalert",
+ * "value": "PT1M,plain,1,host,host1,host2"   // or "value": 
"PT1M,dynamic,1,host"
+ * },
+ * "partitionSpec": [
+ * {
+ * "streamId": "noDataAlertStream",
+ * "type": "GROUPBY"
+ * }
+ * ],
+ * "parallelismHint": 2
+ * }
  */
-public class NoDataPolicyHandler implements PolicyStreamHandler{
+public class NoDataPolicyHandler implements PolicyStreamHandler {
     private static final Logger LOG = 
LoggerFactory.getLogger(NoDataPolicyHandler.class);
     private Map<String, StreamDefinition> sds;
 
@@ -103,9 +101,10 @@ public class NoDataPolicyHandler implements 
PolicyStreamHandler{
     private volatile NoDataWisbType wisbType;
     private volatile DistinctValuesInTimeWindow distinctWindow;
 
-    public NoDataPolicyHandler(Map<String, StreamDefinition> sds){
+    public NoDataPolicyHandler(Map<String, StreamDefinition> sds) {
         this.sds = sds;
     }
+
     @Override
     public void prepare(Collector<AlertStreamEvent> collector, 
PolicyHandlerContext context) throws Exception {
         this.collector = collector;
@@ -113,11 +112,13 @@ public class NoDataPolicyHandler implements 
PolicyStreamHandler{
         this.policyDef = context.getPolicyDefinition();
         List<String> inputStreams = policyDef.getInputStreams();
         // validate inputStreams has to contain only one stream
-        if(inputStreams.size() != 1)
+        if (inputStreams.size() != 1) {
             throw new IllegalArgumentException("policy inputStream size has to 
be 1 for no data alert");
+        }
         // validate outputStream has to contain only one stream
-        if(policyDef.getOutputStreams().size() != 1)
+        if (policyDef.getOutputStreams().size() != 1) {
             throw new IllegalArgumentException("policy outputStream size has 
to be 1 for no data alert");
+        }
 
         String is = inputStreams.get(0);
         StreamDefinition sd = sds.get(is);
@@ -129,23 +130,23 @@ public class NoDataPolicyHandler implements 
PolicyStreamHandler{
         distinctWindow = new DistinctValuesInTimeWindow(windowPeriod);
         this.wisbType = NoDataWisbType.valueOf(segments[1]);
         // for provided wisb values, need to parse, for dynamic wisb values, 
it is computed through a window
-        if(wisbType == NoDataWisbType.provided) {
+        if (wisbType == NoDataWisbType.provided) {
             wisbValues = new NoDataWisbProvidedParser().parse(segments);
         }
         // populate wisb field names
         int numOfFields = Integer.parseInt(segments[2]);
-        for(int i = 3; i < 3+numOfFields; i++){
+        for (int i = 3; i < 3 + numOfFields; i++) {
             String fn = segments[i];
             wisbFieldIndices.add(sd.getColumnIndex(fn));
         }
     }
 
-    @SuppressWarnings({ "rawtypes", "unchecked" })
+    @SuppressWarnings( {"rawtypes", "unchecked"})
     @Override
     public void send(StreamEvent event) throws Exception {
         Object[] data = event.getData();
         List<Object> columnValues = new ArrayList<>();
-        for(int i=0; i<wisbFieldIndices.size(); i++){
+        for (int i = 0; i < wisbFieldIndices.size(); i++) {
             Object o = data[wisbFieldIndices.get(i)];
             // convert value to string
             columnValues.add(o.toString());
@@ -155,18 +156,18 @@ public class NoDataPolicyHandler implements 
PolicyStreamHandler{
 
         LOG.debug("window slided: {}, with wiri: {}", 
distinctWindow.windowSlided(), distinctWindow.distinctValues());
 
-        if(distinctWindow.windowSlided()) {
+        if (distinctWindow.windowSlided()) {
             compareAndEmit(wisbValues, wiriValues, event);
         }
 
-        if(wisbType == NoDataWisbType.dynamic) {
+        if (wisbType == NoDataWisbType.dynamic) {
             // deep copy
             wisbValues = new HashSet<>(wiriValues);
         }
     }
 
     @SuppressWarnings("rawtypes")
-    private void compareAndEmit(Set wisb, Set wiri, StreamEvent event){
+    private void compareAndEmit(Set wisb, Set wiri, StreamEvent event) {
         // compare with wisbValues if wisbValues are already there for dynamic 
type
         Collection noDataValues = CollectionUtils.subtract(wisb, wiri);
         LOG.debug("nodatavalues:" + noDataValues + ", wisb: " + wisb + ", 
wiri: " + wiri);
@@ -177,9 +178,9 @@ public class NoDataPolicyHandler implements 
PolicyStreamHandler{
         }
     }
 
-    private AlertStreamEvent createAlertEvent(long timestamp, Object[] 
triggerEvent){
+    private AlertStreamEvent createAlertEvent(long timestamp, Object[] 
triggerEvent) {
         String is = policyDef.getInputStreams().get(0);
-        StreamDefinition sd = sds.get(is);
+        final StreamDefinition sd = sds.get(is);
 
         AlertStreamEvent event = new AlertStreamEvent();
         event.setTimestamp(timestamp);

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/a0fc8708/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/evaluator/nodata/NoDataPolicyTimeBatchHandler.java
----------------------------------------------------------------------
diff --git 
a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/evaluator/nodata/NoDataPolicyTimeBatchHandler.java
 
b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/evaluator/nodata/NoDataPolicyTimeBatchHandler.java
index 53a7af0..73ee9b2 100644
--- 
a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/evaluator/nodata/NoDataPolicyTimeBatchHandler.java
+++ 
b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/evaluator/nodata/NoDataPolicyTimeBatchHandler.java
@@ -16,7 +16,6 @@
  */
 package org.apache.eagle.alert.engine.evaluator.nodata;
 
-import org.apache.commons.collections.CollectionUtils;
 import org.apache.eagle.alert.engine.Collector;
 import org.apache.eagle.alert.engine.coordinator.PolicyDefinition;
 import org.apache.eagle.alert.engine.coordinator.StreamDefinition;
@@ -25,6 +24,7 @@ import 
org.apache.eagle.alert.engine.evaluator.PolicyStreamHandler;
 import org.apache.eagle.alert.engine.model.AlertStreamEvent;
 import org.apache.eagle.alert.engine.model.StreamEvent;
 import org.apache.eagle.alert.utils.TimePeriodUtils;
+import org.apache.commons.collections.CollectionUtils;
 import org.apache.storm.guava.base.Joiner;
 import org.joda.time.Period;
 import org.slf4j.Logger;
@@ -33,132 +33,134 @@ import org.slf4j.LoggerFactory;
 import java.util.*;
 
 public class NoDataPolicyTimeBatchHandler implements PolicyStreamHandler {
-       
-       private static final Logger LOG = 
LoggerFactory.getLogger(NoDataPolicyTimeBatchHandler.class);
-       private Map<String, StreamDefinition> sds;
-       
-       private volatile List<Integer> wisbFieldIndices = new ArrayList<>();
-       // reuse PolicyDefinition.defintion.value field to store full set of 
values
-       // separated by comma
-       private volatile PolicyDefinition policyDef;
-       private volatile Collector<AlertStreamEvent> collector;
-       private volatile PolicyHandlerContext context;
-       private volatile NoDataWisbType wisbType;
-       private volatile DistinctValuesInTimeBatchWindow distinctWindow;
-
-       public NoDataPolicyTimeBatchHandler(Map<String, StreamDefinition> sds){
+
+    private static final Logger LOG = 
LoggerFactory.getLogger(NoDataPolicyTimeBatchHandler.class);
+    private Map<String, StreamDefinition> sds;
+
+    private volatile List<Integer> wisbFieldIndices = new ArrayList<>();
+    // reuse PolicyDefinition.defintion.value field to store full set of values
+    // separated by comma
+    private volatile PolicyDefinition policyDef;
+    private volatile Collector<AlertStreamEvent> collector;
+    private volatile PolicyHandlerContext context;
+    private volatile NoDataWisbType wisbType;
+    private volatile DistinctValuesInTimeBatchWindow distinctWindow;
+
+    public NoDataPolicyTimeBatchHandler(Map<String, StreamDefinition> sds) {
         this.sds = sds;
     }
 
-       @Override
-       public void prepare(Collector<AlertStreamEvent> collector, 
PolicyHandlerContext context) throws Exception {
-               this.collector = collector;
-               this.context = context;
-               this.policyDef = context.getPolicyDefinition();
-               List<String> inputStreams = policyDef.getInputStreams();
-               // validate inputStreams has to contain only one stream
-               if (inputStreams.size() != 1)
-                       throw new IllegalArgumentException("policy inputStream 
size has to be 1 for no data alert");
-               // validate outputStream has to contain only one stream
-               if (policyDef.getOutputStreams().size() != 1)
-                       throw new IllegalArgumentException("policy outputStream 
size has to be 1 for no data alert");
-
-               String is = inputStreams.get(0);
-               StreamDefinition sd = sds.get(is);
-
-               String policyValue = policyDef.getDefinition().getValue();
-               // assume that no data alert policy value consists of 
"windowPeriod,
-               // type, numOfFields, f1_name, f2_name, f1_value, f2_value, 
f1_value,
-               // f2_value}
-               String[] segments = policyValue.split(",");
-               this.wisbType = NoDataWisbType.valueOf(segments[1]);
-               // for provided wisb values, need to parse, for dynamic wisb 
values, it
-               // is computed through a window
-               @SuppressWarnings("rawtypes")
-               Set wisbValues = null;
-               if (wisbType == NoDataWisbType.provided) {
-                       wisbValues = new 
NoDataWisbProvidedParser().parse(segments);
-               }
-               long windowPeriod = 
TimePeriodUtils.getMillisecondsOfPeriod(Period.parse(segments[0]));
-               distinctWindow = new DistinctValuesInTimeBatchWindow(this, 
windowPeriod, wisbValues);
-               // populate wisb field names
-               int numOfFields = Integer.parseInt(segments[2]);
-               for (int i = 3; i < 3 + numOfFields; i++) {
-                       String fn = segments[i];
-                       wisbFieldIndices.add(sd.getColumnIndex(fn));
-               }
-       }
-
-       @Override
-       public void send(StreamEvent event) throws Exception {
-               Object[] data = event.getData();
-               
-               List<Object> columnValues = new ArrayList<>();
-               for (int i = 0; i < wisbFieldIndices.size(); i++) {
-                       Object o = data[wisbFieldIndices.get(i)];
-                       // convert value to string
-                       columnValues.add(o.toString());
-               }
-               // use local timestamp rather than event timestamp
-               distinctWindow.send(event, columnValues, 
System.currentTimeMillis());
-               LOG.debug("event sent to window with wiri: {}", 
distinctWindow.distinctValues());
-       }
-       
-       @SuppressWarnings("rawtypes")
-       public void compareAndEmit(Set wisb, Set wiri, StreamEvent event) {
-               // compare with wisbValues if wisbValues are already there for 
dynamic
-               // type
-               Collection noDataValues = CollectionUtils.subtract(wisb, wiri);
-               LOG.debug("nodatavalues:" + noDataValues + ", wisb: " + wisb + 
", wiri: " + wiri);
-               if (noDataValues != null && noDataValues.size() > 0) {
-                       LOG.info("No data alert is triggered with no data 
values {} and wisb {}", noDataValues, wisb);
-                       
-                       String is = policyDef.getOutputStreams().get(0);
-                       StreamDefinition sd = sds.get(is);
-                       int timestampIndex = sd.getColumnIndex("timestamp");
-                       int hostIndex = sd.getColumnIndex("host");
-                       int originalStreamNameIndex = 
sd.getColumnIndex("originalStreamName");
-                       
-                       for (Object one : noDataValues) {
-                               Object[] triggerEvent = new 
Object[sd.getColumns().size()];
-                               for (int i = 0; i < sd.getColumns().size(); i 
++) {
-                                       if (i == timestampIndex) {
-                                               triggerEvent[i] = 
System.currentTimeMillis();
-                                       } else if (i == hostIndex) {
-                                               triggerEvent[hostIndex] = 
((List) one).get(0);
-                                       } else if (i == 
originalStreamNameIndex) {
-                                               
triggerEvent[originalStreamNameIndex] = event.getStreamId();
-                                       } else if (sd.getColumns().size() < i) {
-                                               LOG.error("strema event data 
have different lenght compare to column definition!");
-                                       } else {
-                                               triggerEvent[i] = 
sd.getColumns().get(i).getDefaultValue();
-                                       }
-                               }
-                               AlertStreamEvent alertEvent = 
createAlertEvent(sd, event.getTimestamp(), triggerEvent);
-                               LOG.info(String.format("Nodata alert %s 
generated and will be emitted", Joiner.on(",").join(triggerEvent)));
-                               collector.emit(alertEvent);
-                       }
-                       
-               }
-       }
-
-       private AlertStreamEvent createAlertEvent(StreamDefinition sd, long 
timestamp, Object[] triggerEvent) {
-               AlertStreamEvent event = new AlertStreamEvent();
-               event.setTimestamp(timestamp);
-               event.setData(triggerEvent);
-               event.setStreamId(policyDef.getOutputStreams().get(0));
-               event.setPolicyId(context.getPolicyDefinition().getName());
-               if (this.context.getPolicyEvaluator() != null) {
-                       
event.setCreatedBy(context.getPolicyEvaluator().getName());
-               }
-               event.setCreatedTime(System.currentTimeMillis());
-               event.setSchema(sd);
-               return event;
-       }
-
-       @Override
-       public void close() throws Exception {
-
-       }
+    @Override
+    public void prepare(Collector<AlertStreamEvent> collector, 
PolicyHandlerContext context) throws Exception {
+        this.collector = collector;
+        this.context = context;
+        this.policyDef = context.getPolicyDefinition();
+        List<String> inputStreams = policyDef.getInputStreams();
+        // validate inputStreams has to contain only one stream
+        if (inputStreams.size() != 1) {
+            throw new IllegalArgumentException("policy inputStream size has to 
be 1 for no data alert");
+        }
+        // validate outputStream has to contain only one stream
+        if (policyDef.getOutputStreams().size() != 1) {
+            throw new IllegalArgumentException("policy outputStream size has 
to be 1 for no data alert");
+        }
+
+        String is = inputStreams.get(0);
+        StreamDefinition sd = sds.get(is);
+
+        String policyValue = policyDef.getDefinition().getValue();
+        // assume that no data alert policy value consists of "windowPeriod,
+        // type, numOfFields, f1_name, f2_name, f1_value, f2_value, f1_value,
+        // f2_value}
+        String[] segments = policyValue.split(",");
+        this.wisbType = NoDataWisbType.valueOf(segments[1]);
+        // for provided wisb values, need to parse, for dynamic wisb values, it
+        // is computed through a window
+        @SuppressWarnings("rawtypes")
+        Set wisbValues = null;
+        if (wisbType == NoDataWisbType.provided) {
+            wisbValues = new NoDataWisbProvidedParser().parse(segments);
+        }
+        long windowPeriod = 
TimePeriodUtils.getMillisecondsOfPeriod(Period.parse(segments[0]));
+        distinctWindow = new DistinctValuesInTimeBatchWindow(this, 
windowPeriod, wisbValues);
+        // populate wisb field names
+        int numOfFields = Integer.parseInt(segments[2]);
+        for (int i = 3; i < 3 + numOfFields; i++) {
+            String fn = segments[i];
+            wisbFieldIndices.add(sd.getColumnIndex(fn));
+        }
+    }
+
+    @Override
+    public void send(StreamEvent event) throws Exception {
+        Object[] data = event.getData();
+
+        List<Object> columnValues = new ArrayList<>();
+        for (int i = 0; i < wisbFieldIndices.size(); i++) {
+            Object o = data[wisbFieldIndices.get(i)];
+            // convert value to string
+            columnValues.add(o.toString());
+        }
+        // use local timestamp rather than event timestamp
+        distinctWindow.send(event, columnValues, System.currentTimeMillis());
+        LOG.debug("event sent to window with wiri: {}", 
distinctWindow.distinctValues());
+    }
+
+    @SuppressWarnings("rawtypes")
+    public void compareAndEmit(Set wisb, Set wiri, StreamEvent event) {
+        // compare with wisbValues if wisbValues are already there for dynamic
+        // type
+        Collection noDataValues = CollectionUtils.subtract(wisb, wiri);
+        LOG.debug("nodatavalues:" + noDataValues + ", wisb: " + wisb + ", 
wiri: " + wiri);
+        if (noDataValues != null && noDataValues.size() > 0) {
+            LOG.info("No data alert is triggered with no data values {} and 
wisb {}", noDataValues, wisb);
+
+            String is = policyDef.getOutputStreams().get(0);
+            StreamDefinition sd = sds.get(is);
+            int timestampIndex = sd.getColumnIndex("timestamp");
+            int hostIndex = sd.getColumnIndex("host");
+            int originalStreamNameIndex = 
sd.getColumnIndex("originalStreamName");
+
+            for (Object one : noDataValues) {
+                Object[] triggerEvent = new Object[sd.getColumns().size()];
+                for (int i = 0; i < sd.getColumns().size(); i++) {
+                    if (i == timestampIndex) {
+                        triggerEvent[i] = System.currentTimeMillis();
+                    } else if (i == hostIndex) {
+                        triggerEvent[hostIndex] = ((List) one).get(0);
+                    } else if (i == originalStreamNameIndex) {
+                        triggerEvent[originalStreamNameIndex] = 
event.getStreamId();
+                    } else if (sd.getColumns().size() < i) {
+                        LOG.error("strema event data have different lenght 
compare to column definition!");
+                    } else {
+                        triggerEvent[i] = 
sd.getColumns().get(i).getDefaultValue();
+                    }
+                }
+                AlertStreamEvent alertEvent = createAlertEvent(sd, 
event.getTimestamp(), triggerEvent);
+                LOG.info(String.format("Nodata alert %s generated and will be 
emitted", Joiner.on(",").join(triggerEvent)));
+                collector.emit(alertEvent);
+            }
+
+        }
+    }
+
+    private AlertStreamEvent createAlertEvent(StreamDefinition sd, long 
timestamp, Object[] triggerEvent) {
+        AlertStreamEvent event = new AlertStreamEvent();
+        event.setTimestamp(timestamp);
+        event.setData(triggerEvent);
+        event.setStreamId(policyDef.getOutputStreams().get(0));
+        event.setPolicyId(context.getPolicyDefinition().getName());
+        if (this.context.getPolicyEvaluator() != null) {
+            event.setCreatedBy(context.getPolicyEvaluator().getName());
+        }
+        event.setCreatedTime(System.currentTimeMillis());
+        event.setSchema(sd);
+        return event;
+    }
+
+    @Override
+    public void close() throws Exception {
+
+    }
 
 }

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/a0fc8708/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/evaluator/nodata/NoDataWisbParser.java
----------------------------------------------------------------------
diff --git 
a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/evaluator/nodata/NoDataWisbParser.java
 
b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/evaluator/nodata/NoDataWisbParser.java
index fe06067..fa27108 100644
--- 
a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/evaluator/nodata/NoDataWisbParser.java
+++ 
b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/evaluator/nodata/NoDataWisbParser.java
@@ -25,7 +25,8 @@ import java.util.Set;
 public interface NoDataWisbParser {
     /**
      * parse policy definition and return WISB values for one or multiple 
fields
-     * for example host and data center are 2 fields for no data alert, then 
WISB is a list of two values
+     * for example host and data center are 2 fields for no data alert, then 
WISB is a list of two values.
+     *
      * @param args some information parsed from policy defintion
      * @return list of list of field values
      */

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/a0fc8708/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/evaluator/nodata/NoDataWisbProvidedParser.java
----------------------------------------------------------------------
diff --git 
a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/evaluator/nodata/NoDataWisbProvidedParser.java
 
b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/evaluator/nodata/NoDataWisbProvidedParser.java
index e13826a..4f54358 100644
--- 
a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/evaluator/nodata/NoDataWisbProvidedParser.java
+++ 
b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/evaluator/nodata/NoDataWisbProvidedParser.java
@@ -24,19 +24,19 @@ import java.util.Set;
 /**
  * Since 6/29/16.
  */
-public class NoDataWisbProvidedParser implements NoDataWisbParser{
-    @Override
+public class NoDataWisbProvidedParser implements NoDataWisbParser {
     /**
-     * policy value consists of "windowPeriod, type, numOfFields, f1_name, 
f2_name, f1_value, f2_value, f1_value, f2_value"
+     * policy value consists of "windowPeriod, type, numOfFields, f1_name, 
f2_name, f1_value, f2_value, f1_value, f2_value".
      */
+    @Override
     public Set<List<String>> parse(String[] args) {
         int numOfFields = Integer.parseInt(args[2]);
         Set<List<String>> wisbValues = new HashSet<>();
         int i = 3 + numOfFields;
-        while(i<args.length){
+        while (i < args.length) {
             List<String> fields = new ArrayList<>();
-            for(int j=0; j<numOfFields; j++){
-                fields.add(args[i+j]);
+            for (int j = 0; j < numOfFields; j++) {
+                fields.add(args[i + j]);
             }
             wisbValues.add(fields);
             i += numOfFields;

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/a0fc8708/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/AlertDeduplicator.java
----------------------------------------------------------------------
diff --git 
a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/AlertDeduplicator.java
 
b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/AlertDeduplicator.java
index ed04d5a..2f71c7f 100644
--- 
a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/AlertDeduplicator.java
+++ 
b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/AlertDeduplicator.java
@@ -25,8 +25,8 @@ import org.apache.eagle.alert.engine.model.AlertStreamEvent;
  */
 public interface AlertDeduplicator {
 
-       AlertStreamEvent dedup(AlertStreamEvent event);
+    AlertStreamEvent dedup(AlertStreamEvent event);
 
-       void setDedupIntervalMin(String intervalMin);
+    void setDedupIntervalMin(String intervalMin);
 
 }

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/a0fc8708/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/AlertPublishListener.java
----------------------------------------------------------------------
diff --git 
a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/AlertPublishListener.java
 
b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/AlertPublishListener.java
index 8f1c248..fdb01a7 100644
--- 
a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/AlertPublishListener.java
+++ 
b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/AlertPublishListener.java
@@ -1,10 +1,4 @@
-package org.apache.eagle.alert.engine.publisher;
-
-import java.util.List;
-
-import org.apache.eagle.alert.engine.coordinator.Publishment;
-
-/**
+/*
  * Licensed to the Apache Software Foundation (ASF) under one or more
  * contributor license agreements.  See the NOTICE file distributed with
  * this work for additional information regarding copyright ownership.
@@ -20,6 +14,12 @@ import org.apache.eagle.alert.engine.coordinator.Publishment;
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
+package org.apache.eagle.alert.engine.publisher;
+
+import org.apache.eagle.alert.engine.coordinator.Publishment;
+
+import java.util.List;
+
 public interface AlertPublishListener {
     void onPublishChange(List<Publishment> added,
                          List<Publishment> removed,

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/a0fc8708/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/AlertPublishPlugin.java
----------------------------------------------------------------------
diff --git 
a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/AlertPublishPlugin.java
 
b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/AlertPublishPlugin.java
index d24bdb0..4c3a2ad 100644
--- 
a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/AlertPublishPlugin.java
+++ 
b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/AlertPublishPlugin.java
@@ -17,22 +17,22 @@
  */
 package org.apache.eagle.alert.engine.publisher;
 
-import java.io.Closeable;
-import java.util.Map;
-
 import org.apache.eagle.alert.engine.coordinator.Publishment;
 import org.apache.eagle.alert.engine.model.AlertStreamEvent;
 import org.apache.eagle.alert.engine.publisher.impl.PublishStatus;
-
 import com.typesafe.config.Config;
 
+import java.io.Closeable;
+import java.util.Map;
+
 /**
  * Created on 2/10/16.
  * Notification Plug-in interface which provide abstraction layer to notify to 
different system
  */
 public interface AlertPublishPlugin extends Closeable {
     /**
-     * 
+     * Init alert publish plugin.
+     *
      * @param config
      * @param publishment
      * @param configProperties - storm config that would be useful for some 
implementation


Reply via email to