http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/a0fc8708/eagle-core/eagle-alert-parent/eagle-alert/alert-coordinator/src/test/java/org/apache/alert/coordinator/mock/TestTopologyMgmtService.java
----------------------------------------------------------------------
diff --git 
a/eagle-core/eagle-alert-parent/eagle-alert/alert-coordinator/src/test/java/org/apache/alert/coordinator/mock/TestTopologyMgmtService.java
 
b/eagle-core/eagle-alert-parent/eagle-alert/alert-coordinator/src/test/java/org/apache/alert/coordinator/mock/TestTopologyMgmtService.java
index d897454..80990a5 100644
--- 
a/eagle-core/eagle-alert-parent/eagle-alert/alert-coordinator/src/test/java/org/apache/alert/coordinator/mock/TestTopologyMgmtService.java
+++ 
b/eagle-core/eagle-alert-parent/eagle-alert/alert-coordinator/src/test/java/org/apache/alert/coordinator/mock/TestTopologyMgmtService.java
@@ -35,7 +35,7 @@ public class TestTopologyMgmtService extends 
TopologyMgmtService {
     private int routerNumber;
     private int i = 0;
     private String namePrefix = "Topology";
-    
+
     // a config used to check if createTopology is enabled. FIXME: another 
class of mgmt service might be better
     private boolean enableCreateTopology = false;
 

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/a0fc8708/eagle-core/eagle-alert-parent/eagle-alert/alert-coordinator/src/test/resources/application.conf
----------------------------------------------------------------------
diff --git 
a/eagle-core/eagle-alert-parent/eagle-alert/alert-coordinator/src/test/resources/application.conf
 
b/eagle-core/eagle-alert-parent/eagle-alert/alert-coordinator/src/test/resources/application.conf
index 1ef71a0..72b8012 100644
--- 
a/eagle-core/eagle-alert-parent/eagle-alert/alert-coordinator/src/test/resources/application.conf
+++ 
b/eagle-core/eagle-alert-parent/eagle-alert/alert-coordinator/src/test/resources/application.conf
@@ -14,38 +14,38 @@
 # limitations under the License.
 
 {
-       "coordinator" : {
-               "policiesPerBolt" : 5,
-               "boltParallelism" : 5,
-               "policyDefaultParallelism" : 5,
-               "boltLoadUpbound": 0.8,
-               "topologyLoadUpbound" : 0.8,
-               "numOfAlertBoltsPerTopology" : 5,
-               "zkConfig" : {
-                       "zkQuorum" : "localhost:2181",
-                       "zkRoot" : "/alert",
-                       "zkSessionTimeoutMs" : 10000,
-                       "connectionTimeoutMs" : 10000,
-                       "zkRetryTimes" : 3,
-                       "zkRetryInterval" : 3000
-               }
-               "metadataService" : {
-                       "host" : "localhost",
-                       "port" : 8080,
-                       "context" : "/rest"
-               }
-               "metadataDynamicCheck" : {
-                       "initDelayMillis" : 1000,
-                       "delayMillis" : 30000
-               },
-               "kafkaProducer": {
-                       "bootstrapServers": "localhost:9092"
-               },
-               "email": {
-                       "sender": "ea...@eagle.com",
-                       "recipients": "t...@eagle.com",
-                       "mailSmtpHost": "test.eagle.com",
-                       "mailSmtpPort": "25"
-               }
-       }
+  "coordinator": {
+    "policiesPerBolt": 5,
+    "boltParallelism": 5,
+    "policyDefaultParallelism": 5,
+    "boltLoadUpbound": 0.8,
+    "topologyLoadUpbound": 0.8,
+    "numOfAlertBoltsPerTopology": 5,
+    "zkConfig": {
+      "zkQuorum": "localhost:2181",
+      "zkRoot": "/alert",
+      "zkSessionTimeoutMs": 10000,
+      "connectionTimeoutMs": 10000,
+      "zkRetryTimes": 3,
+      "zkRetryInterval": 3000
+    }
+    "metadataService": {
+      "host": "localhost",
+      "port": 8080,
+      "context": "/rest"
+    }
+    "metadataDynamicCheck": {
+      "initDelayMillis": 1000,
+      "delayMillis": 30000
+    },
+    "kafkaProducer": {
+      "bootstrapServers": "localhost:9092"
+    },
+    "email": {
+      "sender": "ea...@eagle.com",
+      "recipients": "t...@eagle.com",
+      "mailSmtpHost": "test.eagle.com",
+      "mailSmtpPort": "25"
+    }
+  }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/a0fc8708/eagle-core/eagle-alert-parent/eagle-alert/alert-coordinator/src/test/resources/log4j.properties
----------------------------------------------------------------------
diff --git 
a/eagle-core/eagle-alert-parent/eagle-alert/alert-coordinator/src/test/resources/log4j.properties
 
b/eagle-core/eagle-alert-parent/eagle-alert/alert-coordinator/src/test/resources/log4j.properties
index d4bc126..53f091e 100644
--- 
a/eagle-core/eagle-alert-parent/eagle-alert/alert-coordinator/src/test/resources/log4j.properties
+++ 
b/eagle-core/eagle-alert-parent/eagle-alert/alert-coordinator/src/test/resources/log4j.properties
@@ -12,9 +12,7 @@
 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 # See the License for the specific language governing permissions and
 # limitations under the License.
-
 log4j.rootLogger=INFO, stdout
-
 # standard output
 log4j.appender.stdout=org.apache.log4j.ConsoleAppender
 log4j.appender.stdout.layout=org.apache.log4j.PatternLayout

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/a0fc8708/eagle-core/eagle-alert-parent/eagle-alert/alert-coordinator/src/test/resources/multi/datasources.json
----------------------------------------------------------------------
diff --git 
a/eagle-core/eagle-alert-parent/eagle-alert/alert-coordinator/src/test/resources/multi/datasources.json
 
b/eagle-core/eagle-alert-parent/eagle-alert/alert-coordinator/src/test/resources/multi/datasources.json
index 8678fe6..ef697f8 100644
--- 
a/eagle-core/eagle-alert-parent/eagle-alert/alert-coordinator/src/test/resources/multi/datasources.json
+++ 
b/eagle-core/eagle-alert-parent/eagle-alert/alert-coordinator/src/test/resources/multi/datasources.json
@@ -1,19 +1,19 @@
 [
-{
-       "name": "network_syslog_datasource",
-       "type": "KAFKA",
-       "properties": {
-       },
-       "topic": "logoutput",
-       "schemeCls": 
"org.apache.eagle.alert.engine.extension.SherlockEventScheme",
-       "codec": {
-               "streamNameSelectorProp": {
-                       "fieldNamesToInferStreamName" : "namespace",
-                       "streamNameFormat":"stream_%s"
-               },
-               
"streamNameSelectorCls":"org.apache.eagle.alert.engine.scheme.JsonStringStreamNameSelector",
-               "timestampColumn": null,
-               "timestampFormat":""
-       }
-}
+  {
+    "name": "network_syslog_datasource",
+    "type": "KAFKA",
+    "properties": {
+    },
+    "topic": "logoutput",
+    "schemeCls": "org.apache.eagle.alert.engine.extension.SherlockEventScheme",
+    "codec": {
+      "streamNameSelectorProp": {
+        "fieldNamesToInferStreamName": "namespace",
+        "streamNameFormat": "stream_%s"
+      },
+      "streamNameSelectorCls": 
"org.apache.eagle.alert.engine.scheme.JsonStringStreamNameSelector",
+      "timestampColumn": null,
+      "timestampFormat": ""
+    }
+  }
 ]

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/a0fc8708/eagle-core/eagle-alert-parent/eagle-alert/alert-coordinator/src/test/resources/multi/publishments.json
----------------------------------------------------------------------
diff --git 
a/eagle-core/eagle-alert-parent/eagle-alert/alert-coordinator/src/test/resources/multi/publishments.json
 
b/eagle-core/eagle-alert-parent/eagle-alert/alert-coordinator/src/test/resources/multi/publishments.json
index d306be2..9e8b0d3 100644
--- 
a/eagle-core/eagle-alert-parent/eagle-alert/alert-coordinator/src/test/resources/multi/publishments.json
+++ 
b/eagle-core/eagle-alert-parent/eagle-alert/alert-coordinator/src/test/resources/multi/publishments.json
@@ -1,26 +1,32 @@
 [
-{
-  "type": "org.apache.eagle.alert.engine.publisher.impl.AlertKafkaPublisher",
-  "name":"network-syslog-publish",
-  "policyIds": ["syslog_severity_check_stream_demons1", 
"syslog_severity_check_stream_umpns2"],
-  "dedupIntervalMin": "PT0M",
-  "properties":{
-    "kafka_broker":"127.0.0.1:9092",
-    "topic":"syslog_alerts",
-    "value_deserializer": 
"org.apache.kafka.common.serialization.ByteArrayDeserializer",
-    "value_serializer": 
"org.apache.kafka.common.serialization.ByteArraySerializer"
+  {
+    "type": "org.apache.eagle.alert.engine.publisher.impl.AlertKafkaPublisher",
+    "name": "network-syslog-publish",
+    "policyIds": [
+      "syslog_severity_check_stream_demons1",
+      "syslog_severity_check_stream_umpns2"
+    ],
+    "dedupIntervalMin": "PT0M",
+    "properties": {
+      "kafka_broker": "127.0.0.1:9092",
+      "topic": "syslog_alerts",
+      "value_deserializer": 
"org.apache.kafka.common.serialization.ByteArrayDeserializer",
+      "value_serializer": 
"org.apache.kafka.common.serialization.ByteArraySerializer"
+    },
+    "serializer": 
"org.apache.eagle.alert.engine.extension.SherlockAlertSerializer"
   },
-  "serializer" : 
"org.apache.eagle.alert.engine.extension.SherlockAlertSerializer"
-},
-{
-  "type": "org.apache.eagle.alert.engine.publisher.impl.AlertKafkaPublisher",
-  "name":"network-syslog-publish_rawjson",
-  "policyIds": ["syslog_severity_check_stream_demons1", 
"syslog_severity_check_stream_umpns2"],
-  "dedupIntervalMin": "PT0M",
-  "properties":{
-    "kafka_broker":"127.0.0.1:9092",
-    "topic":"syslog_alerts_json"
-  },
-  "serializer" : 
"org.apache.eagle.alert.engine.publisher.impl.JsonEventSerializer"
-}
+  {
+    "type": "org.apache.eagle.alert.engine.publisher.impl.AlertKafkaPublisher",
+    "name": "network-syslog-publish_rawjson",
+    "policyIds": [
+      "syslog_severity_check_stream_demons1",
+      "syslog_severity_check_stream_umpns2"
+    ],
+    "dedupIntervalMin": "PT0M",
+    "properties": {
+      "kafka_broker": "127.0.0.1:9092",
+      "topic": "syslog_alerts_json"
+    },
+    "serializer": 
"org.apache.eagle.alert.engine.publisher.impl.JsonEventSerializer"
+  }
 ]
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/a0fc8708/eagle-core/eagle-alert-parent/eagle-alert/alert-coordinator/src/test/resources/multi/streamdefinitions.json
----------------------------------------------------------------------
diff --git 
a/eagle-core/eagle-alert-parent/eagle-alert/alert-coordinator/src/test/resources/multi/streamdefinitions.json
 
b/eagle-core/eagle-alert-parent/eagle-alert/alert-coordinator/src/test/resources/multi/streamdefinitions.json
index cee6f8c..a4ecb79 100644
--- 
a/eagle-core/eagle-alert-parent/eagle-alert/alert-coordinator/src/test/resources/multi/streamdefinitions.json
+++ 
b/eagle-core/eagle-alert-parent/eagle-alert/alert-coordinator/src/test/resources/multi/streamdefinitions.json
@@ -1,138 +1,152 @@
 [
-{
-       "streamId": "stream_demons1",
-       "dataSource" : "network_syslog_datasource",
-       "description":"the data stream for syslog events",
-       "validate": false,
-       "timeseries":false,
-       "columns": [
-               {
-                       "name": "dims_facility",
-                       "type" : "STRING",
-                       "defaultValue": "",
-                       "required":true
-               },{
-                       "name": "dims_severity",
-                       "type" : "STRING",
-                       "defaultValue": "",
-                       "required": true
-               },
-               {
-                       "name": "dims_hostname",
-                       "type" : "STRING",
-                       "defaultValue": "",
-                       "required": true
-               },
-               {
-                       "name": "dims_msgid",
-                       "type" : "STRING",
-                       "defaultValue": "",
-                       "required": true
-               },{
-                       "name": "timestamp",
-                       "type" : "STRING",
-                       "defaultValue": "",
-                       "required":true
-               },{
-                       "name": "state",
-                       "type" : "STRING",
-                       "defaultValue": "",
-                       "required": true
-               },{
-                       "name": "interface",
-                       "type" : "STRING",
-                       "defaultValue": "",
-                       "required":true
-               },{
-                       "name": "version",
-                       "type" : "STRING",
-                       "defaultValue": "",
-                       "required":true
-               },{
-                       "name": "type",
-                       "type" : "STRING",
-                       "defaultValue": "",
-                       "required": true
-               },{
-                       "name": "origmsg",
-                       "type" : "STRING",
-                       "defaultValue": "",
-                       "required": true
-               },{
-                       "name": "name",
-                       "type" : "STRING",
-                       "defaultValue": "",
-                       "required": true
-               },
-               {
-                       "name": "namespace",
-                       "type" : "STRING",
-                       "defaultValue": "",
-                       "required": true
-               },{
-                       "name": "epochMillis",
-                       "type" : "LONG",
-                       "defaultValue": 0,
-                       "required": true
-               }
-       ]
-},
-{
-       "streamId": "stream_umpns2",
-       "dataSource" : "network_syslog_datasource",
-       "description":"the data stream for syslog events",
-       "validate": false,
-       "timeseries":false,
-       "columns": [
-               {
-                       "name": "dims_facility",
-                       "type" : "STRING",
-                       "defaultValue": "",
-                       "required":true
-               },{
-                       "name": "dims_severity",
-                       "type" : "STRING",
-                       "defaultValue": "",
-                       "required": true
-               },
-               {
-                       "name": "dims_hostname",
-                       "type" : "STRING",
-                       "defaultValue": "",
-                       "required": true
-               },
-               {
-                       "name": "dims_msgid",
-                       "type" : "STRING",
-                       "defaultValue": "",
-                       "required": true
-               },{
-                       "name": "program",
-                       "type" : "STRING",
-                       "defaultValue": "",
-                       "required":true
-               },{
-                       "name": "message",
-                       "type" : "STRING",
-                       "defaultValue": "",
-                       "required": true
-               },{
-                       "name": "name",
-                       "type" : "STRING",
-                       "defaultValue": "",
-                       "required": true
-               },
-               {
-                       "name": "namespace",
-                       "type" : "STRING",
-                       "defaultValue": "",
-                       "required": true
-               },{
-                       "name": "epochMillis",
-                       "type" : "LONG",
-                       "defaultValue": 0,
-                       "required": true
-               }
-       ]
-}
+  {
+    "streamId": "stream_demons1",
+    "dataSource": "network_syslog_datasource",
+    "description": "the data stream for syslog events",
+    "validate": false,
+    "timeseries": false,
+    "columns": [
+      {
+        "name": "dims_facility",
+        "type": "STRING",
+        "defaultValue": "",
+        "required": true
+      },
+      {
+        "name": "dims_severity",
+        "type": "STRING",
+        "defaultValue": "",
+        "required": true
+      },
+      {
+        "name": "dims_hostname",
+        "type": "STRING",
+        "defaultValue": "",
+        "required": true
+      },
+      {
+        "name": "dims_msgid",
+        "type": "STRING",
+        "defaultValue": "",
+        "required": true
+      },
+      {
+        "name": "timestamp",
+        "type": "STRING",
+        "defaultValue": "",
+        "required": true
+      },
+      {
+        "name": "state",
+        "type": "STRING",
+        "defaultValue": "",
+        "required": true
+      },
+      {
+        "name": "interface",
+        "type": "STRING",
+        "defaultValue": "",
+        "required": true
+      },
+      {
+        "name": "version",
+        "type": "STRING",
+        "defaultValue": "",
+        "required": true
+      },
+      {
+        "name": "type",
+        "type": "STRING",
+        "defaultValue": "",
+        "required": true
+      },
+      {
+        "name": "origmsg",
+        "type": "STRING",
+        "defaultValue": "",
+        "required": true
+      },
+      {
+        "name": "name",
+        "type": "STRING",
+        "defaultValue": "",
+        "required": true
+      },
+      {
+        "name": "namespace",
+        "type": "STRING",
+        "defaultValue": "",
+        "required": true
+      },
+      {
+        "name": "epochMillis",
+        "type": "LONG",
+        "defaultValue": 0,
+        "required": true
+      }
+    ]
+  },
+  {
+    "streamId": "stream_umpns2",
+    "dataSource": "network_syslog_datasource",
+    "description": "the data stream for syslog events",
+    "validate": false,
+    "timeseries": false,
+    "columns": [
+      {
+        "name": "dims_facility",
+        "type": "STRING",
+        "defaultValue": "",
+        "required": true
+      },
+      {
+        "name": "dims_severity",
+        "type": "STRING",
+        "defaultValue": "",
+        "required": true
+      },
+      {
+        "name": "dims_hostname",
+        "type": "STRING",
+        "defaultValue": "",
+        "required": true
+      },
+      {
+        "name": "dims_msgid",
+        "type": "STRING",
+        "defaultValue": "",
+        "required": true
+      },
+      {
+        "name": "program",
+        "type": "STRING",
+        "defaultValue": "",
+        "required": true
+      },
+      {
+        "name": "message",
+        "type": "STRING",
+        "defaultValue": "",
+        "required": true
+      },
+      {
+        "name": "name",
+        "type": "STRING",
+        "defaultValue": "",
+        "required": true
+      },
+      {
+        "name": "namespace",
+        "type": "STRING",
+        "defaultValue": "",
+        "required": true
+      },
+      {
+        "name": "epochMillis",
+        "type": "LONG",
+        "defaultValue": 0,
+        "required": true
+      }
+    ]
+  }
 ]
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/a0fc8708/eagle-core/eagle-alert-parent/eagle-alert/alert-coordinator/src/test/resources/multi/topologies.json
----------------------------------------------------------------------
diff --git 
a/eagle-core/eagle-alert-parent/eagle-alert/alert-coordinator/src/test/resources/multi/topologies.json
 
b/eagle-core/eagle-alert-parent/eagle-alert/alert-coordinator/src/test/resources/multi/topologies.json
index 7b1732f..4b53233 100644
--- 
a/eagle-core/eagle-alert-parent/eagle-alert/alert-coordinator/src/test/resources/multi/topologies.json
+++ 
b/eagle-core/eagle-alert-parent/eagle-alert/alert-coordinator/src/test/resources/multi/topologies.json
@@ -1,31 +1,31 @@
 [
-{
-       "name": "alertUnitTopology_1",
-       "numOfSpout":1,
-       "numOfGroupBolt": 4,
-       "numOfAlertBolt": 10,
-       "spoutId": "alertEngineSpout",
-       "groupNodeIds" : [
-               "streamRouterBolt0",
-               "streamRouterBolt1",
-               "streamRouterBolt2",
-               "streamRouterBolt3"
-       ],
-       "alertBoltIds": [
-               "alertBolt0",
-               "alertBolt1",
-               "alertBolt2",
-               "alertBolt3",
-               "alertBolt4",
-               "alertBolt5",
-               "alertBolt6",
-               "alertBolt7",
-               "alertBolt8",
-               "alertBolt9"
-       ],
-       "pubBoltId" : "alertPublishBolt",
-       "spoutParallelism": 1,
-       "groupParallelism": 1,
-       "alertParallelism": 1
-}
+  {
+    "name": "alertUnitTopology_1",
+    "numOfSpout": 1,
+    "numOfGroupBolt": 4,
+    "numOfAlertBolt": 10,
+    "spoutId": "alertEngineSpout",
+    "groupNodeIds": [
+      "streamRouterBolt0",
+      "streamRouterBolt1",
+      "streamRouterBolt2",
+      "streamRouterBolt3"
+    ],
+    "alertBoltIds": [
+      "alertBolt0",
+      "alertBolt1",
+      "alertBolt2",
+      "alertBolt3",
+      "alertBolt4",
+      "alertBolt5",
+      "alertBolt6",
+      "alertBolt7",
+      "alertBolt8",
+      "alertBolt9"
+    ],
+    "pubBoltId": "alertPublishBolt",
+    "spoutParallelism": 1,
+    "groupParallelism": 1,
+    "alertParallelism": 1
+  }
 ]
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/a0fc8708/eagle-core/eagle-alert-parent/eagle-alert/alert-coordinator/src/test/resources/test-application.conf
----------------------------------------------------------------------
diff --git 
a/eagle-core/eagle-alert-parent/eagle-alert/alert-coordinator/src/test/resources/test-application.conf
 
b/eagle-core/eagle-alert-parent/eagle-alert/alert-coordinator/src/test/resources/test-application.conf
index 63be6a9..963d2ad 100644
--- 
a/eagle-core/eagle-alert-parent/eagle-alert/alert-coordinator/src/test/resources/test-application.conf
+++ 
b/eagle-core/eagle-alert-parent/eagle-alert/alert-coordinator/src/test/resources/test-application.conf
@@ -14,38 +14,38 @@
 # limitations under the License.
 
 {
-       "coordinator" : {
-               "policiesPerBolt" : 5,
-               "boltParallelism" : 5,
-               "policyDefaultParallelism" : 5,
-               "boltLoadUpbound": 0.8,
-               "topologyLoadUpbound" : 0.8,
-               "numOfAlertBoltsPerTopology" : 5,
-               "zkConfig": {
-                       "zkQuorum" : "localhost:2181",
-                       "zkRoot": "/alert",
-                       "zkSessionTimeoutMs" : 10000,
-                       "connectionTimeoutMs" : 5000,
-                       "zkRetryTimes" : 5,
-                       "zkRetryInterval" : 1000
-               },
-               "metadataService": {
-                       "context" : "/api",
-                       "host" : "localhost",
-                       "port" : 8080
-               },
-               "metadataDynamicCheck" : {
-                       "initDelayMillis" : 1000,
-                       "delayMillis" : 30000
-               },
-               "kafkaProducer": {
-                       "bootstrapServers": "localhost:9092"
-               },
-               "email": {
-                       "sender": "ea...@eagle.com",
-                       "recipients": "t...@eagle.com",
-                       "mailSmtpHost": "test.eagle.com",
-                       "mailSmtpPort": "25"
-               }
-       }
+  "coordinator": {
+    "policiesPerBolt": 5,
+    "boltParallelism": 5,
+    "policyDefaultParallelism": 5,
+    "boltLoadUpbound": 0.8,
+    "topologyLoadUpbound": 0.8,
+    "numOfAlertBoltsPerTopology": 5,
+    "zkConfig": {
+      "zkQuorum": "localhost:2181",
+      "zkRoot": "/alert",
+      "zkSessionTimeoutMs": 10000,
+      "connectionTimeoutMs": 5000,
+      "zkRetryTimes": 5,
+      "zkRetryInterval": 1000
+    },
+    "metadataService": {
+      "context": "/api",
+      "host": "localhost",
+      "port": 8080
+    },
+    "metadataDynamicCheck": {
+      "initDelayMillis": 1000,
+      "delayMillis": 30000
+    },
+    "kafkaProducer": {
+      "bootstrapServers": "localhost:9092"
+    },
+    "email": {
+      "sender": "ea...@eagle.com",
+      "recipients": "t...@eagle.com",
+      "mailSmtpHost": "test.eagle.com",
+      "mailSmtpPort": "25"
+    }
+  }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/a0fc8708/eagle-core/eagle-alert-parent/eagle-alert/alert-devtools/conf/kafka-server.properties
----------------------------------------------------------------------
diff --git 
a/eagle-core/eagle-alert-parent/eagle-alert/alert-devtools/conf/kafka-server.properties
 
b/eagle-core/eagle-alert-parent/eagle-alert/alert-devtools/conf/kafka-server.properties
index a9e0010..e2618c2 100644
--- 
a/eagle-core/eagle-alert-parent/eagle-alert/alert-devtools/conf/kafka-server.properties
+++ 
b/eagle-core/eagle-alert-parent/eagle-alert/alert-devtools/conf/kafka-server.properties
@@ -13,14 +13,10 @@
 # See the License for the specific language governing permissions and
 # limitations under the License.
 # see kafka.server.KafkaConfig for additional details and defaults
-
 ############################# Server Basics #############################
-
 # The id of the broker. This must be set to a unique integer for each broker.
 broker.id=0
-
 ############################# Socket Server Settings 
#############################
-
 # The address the socket server listens on. It will get the value returned from
 # java.net.InetAddress.getCanonicalHostName() if not configured.
 #   FORMAT:
@@ -28,44 +24,31 @@ broker.id=0
 #   EXAMPLE:
 #     listeners = PLAINTEXT://your.host.name:9092
 #listeners=PLAINTEXT://:9092
-
 # Hostname and port the broker will advertise to producers and consumers. If 
not set,
 # it uses the value for "listeners" if configured.  Otherwise, it will use the 
value
 # returned from java.net.InetAddress.getCanonicalHostName().
 #advertised.listeners=PLAINTEXT://your.host.name:9092
-
 # The number of threads handling network requests
 num.network.threads=3
-
 # The number of threads doing disk I/O
 num.io.threads=8
-
 # The send buffer (SO_SNDBUF) used by the socket server
 socket.send.buffer.bytes=102400
-
 # The receive buffer (SO_RCVBUF) used by the socket server
 socket.receive.buffer.bytes=102400
-
 # The maximum size of a request that the socket server will accept (protection 
against OOM)
 socket.request.max.bytes=104857600
-
-
 ############################# Log Basics #############################
-
 # A comma seperated list of directories under which to store log files
 log.dirs=/tmp/dev-kafka-logs
-
 # The default number of log partitions per topic. More partitions allow greater
 # parallelism for consumption, but this will also result in more files across
 # the brokers.
 num.partitions=1
-
 # The number of threads per data directory to be used for log recovery at 
startup and flushing at shutdown.
 # This value is recommended to be increased for installations with data dirs 
located in RAID array.
 num.recovery.threads.per.data.dir=1
-
 ############################# Log Flush Policy #############################
-
 # Messages are immediately written to the filesystem but by default we only 
fsync() to sync
 # the OS cache lazily. The following configurations control the flush of data 
to disk.
 # There are a few important trade-offs here:
@@ -74,42 +57,31 @@ num.recovery.threads.per.data.dir=1
 #    3. Throughput: The flush is generally the most expensive operation, and a 
small flush interval may lead to exceessive seeks.
 # The settings below allow one to configure the flush policy to flush data 
after a period of time or
 # every N messages (or both). This can be done globally and overridden on a 
per-topic basis.
-
 # The number of messages to accept before forcing a flush of data to disk
 #log.flush.interval.messages=10000
-
 # The maximum amount of time a message can sit in a log before we force a flush
 #log.flush.interval.ms=1000
-
 ############################# Log Retention Policy 
#############################
-
 # The following configurations control the disposal of log segments. The 
policy can
 # be set to delete segments after a period of time, or after a given size has 
accumulated.
 # A segment will be deleted whenever *either* of these criteria are met. 
Deletion always happens
 # from the end of the log.
-
 # The minimum age of a log file to be eligible for deletion
 log.retention.hours=168
-
 # A size-based retention policy for logs. Segments are pruned from the log as 
long as the remaining
 # segments don't drop below log.retention.bytes.
 #log.retention.bytes=1073741824
-
 # The maximum size of a log segment file. When this size is reached a new log 
segment will be created.
 log.segment.bytes=1073741824
-
 # The interval at which log segments are checked to see if they can be deleted 
according
 # to the retention policies
 log.retention.check.interval.ms=300000
-
 ############################# Zookeeper #############################
-
 # Zookeeper connection string (see zookeeper docs for details).
 # This is a comma separated host:port pairs, each corresponding to a zk
 # server. e.g. "127.0.0.1:3000,127.0.0.1:3001,127.0.0.1:3002".
 # You can also append an optional chroot string to the urls to specify the
 # root directory for all kafka znodes.
 zookeeper.connect=localhost:2181
-
 # Timeout in ms for connecting to zookeeper
 zookeeper.connection.timeout.ms=6000
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/a0fc8708/eagle-core/eagle-alert-parent/eagle-alert/alert-devtools/src/main/java/org/apache/eagle/alert/tools/KafkaConsumerOffsetFetcher.java
----------------------------------------------------------------------
diff --git 
a/eagle-core/eagle-alert-parent/eagle-alert/alert-devtools/src/main/java/org/apache/eagle/alert/tools/KafkaConsumerOffsetFetcher.java
 
b/eagle-core/eagle-alert-parent/eagle-alert/alert-devtools/src/main/java/org/apache/eagle/alert/tools/KafkaConsumerOffsetFetcher.java
index 4fe471c..04a8cba 100644
--- 
a/eagle-core/eagle-alert-parent/eagle-alert/alert-devtools/src/main/java/org/apache/eagle/alert/tools/KafkaConsumerOffsetFetcher.java
+++ 
b/eagle-core/eagle-alert-parent/eagle-alert/alert-devtools/src/main/java/org/apache/eagle/alert/tools/KafkaConsumerOffsetFetcher.java
@@ -16,6 +16,7 @@
  */
 package org.apache.eagle.alert.tools;
 
+import org.apache.eagle.alert.config.ZKConfig;
 import com.fasterxml.jackson.databind.Module;
 import com.fasterxml.jackson.databind.ObjectMapper;
 import com.fasterxml.jackson.databind.jsontype.NamedType;
@@ -23,8 +24,6 @@ import com.fasterxml.jackson.databind.module.SimpleModule;
 import org.apache.curator.framework.CuratorFramework;
 import org.apache.curator.framework.CuratorFrameworkFactory;
 import org.apache.curator.retry.RetryNTimes;
-import org.apache.eagle.alert.config.ZKConfig;
-
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
@@ -35,10 +34,10 @@ public class KafkaConsumerOffsetFetcher {
     public ObjectMapper mapper;
     public String zkPathToPartition;
 
-    public KafkaConsumerOffsetFetcher(ZKConfig config, String ... parameters) {
+    public KafkaConsumerOffsetFetcher(ZKConfig config, String... parameters) {
         try {
             this.curator = CuratorFrameworkFactory.newClient(config.zkQuorum, 
config.zkSessionTimeoutMs, 15000,
-                    new RetryNTimes(config.zkRetryTimes, 
config.zkRetryInterval));
+                new RetryNTimes(config.zkRetryTimes, config.zkRetryInterval));
             curator.start();
             this.zkRoot = config.zkRoot;
             mapper = new ObjectMapper();

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/a0fc8708/eagle-core/eagle-alert-parent/eagle-alert/alert-devtools/src/main/java/org/apache/eagle/alert/tools/KafkaLatestOffsetFetcher.java
----------------------------------------------------------------------
diff --git 
a/eagle-core/eagle-alert-parent/eagle-alert/alert-devtools/src/main/java/org/apache/eagle/alert/tools/KafkaLatestOffsetFetcher.java
 
b/eagle-core/eagle-alert-parent/eagle-alert/alert-devtools/src/main/java/org/apache/eagle/alert/tools/KafkaLatestOffsetFetcher.java
index b78e233..e547a25 100644
--- 
a/eagle-core/eagle-alert-parent/eagle-alert/alert-devtools/src/main/java/org/apache/eagle/alert/tools/KafkaLatestOffsetFetcher.java
+++ 
b/eagle-core/eagle-alert-parent/eagle-alert/alert-devtools/src/main/java/org/apache/eagle/alert/tools/KafkaLatestOffsetFetcher.java
@@ -17,12 +17,16 @@
 package org.apache.eagle.alert.tools;
 
 
-import java.util.*;
 import kafka.api.PartitionOffsetRequestInfo;
 import kafka.common.TopicAndPartition;
-import kafka.javaapi.*;
+import kafka.javaapi.OffsetResponse;
+import kafka.javaapi.PartitionMetadata;
+import kafka.javaapi.TopicMetadata;
+import kafka.javaapi.TopicMetadataRequest;
 import kafka.javaapi.consumer.SimpleConsumer;
 
+import java.util.*;
+
 public class KafkaLatestOffsetFetcher {
 
     private List<String> brokerList;
@@ -50,7 +54,9 @@ public class KafkaLatestOffsetFetcher {
             String clientName = "Client_" + topic + "_" + partition;
             SimpleConsumer consumer = new SimpleConsumer(leadBroker, port, 
100000, 64 * 1024, clientName);
             long latestOffset = getLatestOffset(consumer, topic, partition, 
clientName);
-            if (consumer != null) consumer.close();
+            if (consumer != null) {
+                consumer.close();
+            }
             ret.put(partition, latestOffset);
         }
         return ret;
@@ -63,7 +69,7 @@ public class KafkaLatestOffsetFetcher {
         kafka.javaapi.OffsetRequest request = new 
kafka.javaapi.OffsetRequest(requestInfo, 
kafka.api.OffsetRequest.CurrentVersion(), clientName);
         OffsetResponse response = consumer.getOffsetsBefore(request);
         if (response.hasError()) {
-            throw new RuntimeException("Error fetching data offset from the 
broker. Reason: " + response.errorCode(topic, partition) );
+            throw new RuntimeException("Error fetching data offset from the 
broker. Reason: " + response.errorCode(topic, partition));
         }
         long[] offsets = response.offsets(topic, partition);
         return offsets[0];
@@ -85,11 +91,15 @@ public class KafkaLatestOffsetFetcher {
                         partitionMetadata.put(part.partitionId(), part);
                     }
                 }
-                if (partitionMetadata.size() == partitionCount) break;
+                if (partitionMetadata.size() == partitionCount) {
+                    break;
+                }
             } catch (Exception e) {
                 throw new RuntimeException("Error communicating with Broker [" 
+ broker + "] " + "to find Leader for [" + topic + "] Reason: ", e);
             } finally {
-                if (consumer != null) consumer.close();
+                if (consumer != null) {
+                    consumer.close();
+                }
             }
         }
         return partitionMetadata;

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/a0fc8708/eagle-core/eagle-alert-parent/eagle-alert/alert-devtools/src/main/scala/org/apache/eagle/contrib/kafka/ProducerTool.scala
----------------------------------------------------------------------
diff --git 
a/eagle-core/eagle-alert-parent/eagle-alert/alert-devtools/src/main/scala/org/apache/eagle/contrib/kafka/ProducerTool.scala
 
b/eagle-core/eagle-alert-parent/eagle-alert/alert-devtools/src/main/scala/org/apache/eagle/contrib/kafka/ProducerTool.scala
index 830e9ac..b89737a 100644
--- 
a/eagle-core/eagle-alert-parent/eagle-alert/alert-devtools/src/main/scala/org/apache/eagle/contrib/kafka/ProducerTool.scala
+++ 
b/eagle-core/eagle-alert-parent/eagle-alert/alert-devtools/src/main/scala/org/apache/eagle/contrib/kafka/ProducerTool.scala
@@ -176,9 +176,9 @@ object ProducerTool {
 
     val reader = 
Class.forName(readerClass).newInstance().asInstanceOf[MessageReader[AnyRef, 
AnyRef]]
 
-    if (messageData.size()>0) {
+    if (messageData.size() > 0) {
       reader.init(new 
ByteArrayInputStream(messageData.get(0).getBytes(StandardCharsets.UTF_8)), 
cmdLineProps)
-    } else if (messageFile.size()>0) {
+    } else if (messageFile.size() > 0) {
       reader.init(FileUtils.openInputStream(new File(messageFile.get(0))), 
cmdLineProps)
     } else {
       reader.init(System.in, cmdLineProps)

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/a0fc8708/eagle-core/eagle-alert-parent/eagle-alert/alert-devtools/src/test/java/org/apache/eagle/alert/tools/TestKafkaOffset.java
----------------------------------------------------------------------
diff --git 
a/eagle-core/eagle-alert-parent/eagle-alert/alert-devtools/src/test/java/org/apache/eagle/alert/tools/TestKafkaOffset.java
 
b/eagle-core/eagle-alert-parent/eagle-alert/alert-devtools/src/test/java/org/apache/eagle/alert/tools/TestKafkaOffset.java
index e4d9f8f..89b2921 100644
--- 
a/eagle-core/eagle-alert-parent/eagle-alert/alert-devtools/src/test/java/org/apache/eagle/alert/tools/TestKafkaOffset.java
+++ 
b/eagle-core/eagle-alert-parent/eagle-alert/alert-devtools/src/test/java/org/apache/eagle/alert/tools/TestKafkaOffset.java
@@ -43,17 +43,17 @@ public class TestKafkaOffset {
         String topic = "testTopic1";
         String topology = "alertUnitTopology_1";
 
-        while(true) {
+        while (true) {
             KafkaConsumerOffsetFetcher consumerOffsetFetcher = new 
KafkaConsumerOffsetFetcher(zkConfig, topic, topology);
             String kafkaBrokerList = 
config.getString("dataSourceConfig.kafkaBrokerList");
             KafkaLatestOffsetFetcher latestOffsetFetcher = new 
KafkaLatestOffsetFetcher(kafkaBrokerList);
 
             Map<String, Long> consumedOffset = consumerOffsetFetcher.fetch();
-            if(consumedOffset.size() == 0){
+            if (consumedOffset.size() == 0) {
                 System.out.println("no any consumer offset found for this 
topic " + topic);
             }
             Map<Integer, Long> latestOffset = latestOffsetFetcher.fetch(topic, 
consumedOffset.size());
-            if(latestOffset.size() == 0){
+            if (latestOffset.size() == 0) {
                 System.out.println("no any latest offset found for this topic 
" + topic);
             }
             for (Map.Entry<String, Long> entry : consumedOffset.entrySet()) {
@@ -61,7 +61,7 @@ public class TestKafkaOffset {
                 Integer partitionNumber = 
Integer.valueOf(partition.split("_")[1]);
                 Long lag = latestOffset.get(partitionNumber) - 
entry.getValue();
                 System.out.println(String.format("parition %s, total: %d, 
consumed: %d, lag: %d",
-                        partition, latestOffset.get(partitionNumber), 
entry.getValue(), lag));
+                    partition, latestOffset.get(partitionNumber), 
entry.getValue(), lag));
             }
             Thread.sleep(10000);
         }

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/a0fc8708/eagle-core/eagle-alert-parent/eagle-alert/alert-devtools/src/test/resources/kafka-offset-test.application.conf
----------------------------------------------------------------------
diff --git 
a/eagle-core/eagle-alert-parent/eagle-alert/alert-devtools/src/test/resources/kafka-offset-test.application.conf
 
b/eagle-core/eagle-alert-parent/eagle-alert/alert-devtools/src/test/resources/kafka-offset-test.application.conf
index 7819178..1da6fb1 100644
--- 
a/eagle-core/eagle-alert-parent/eagle-alert/alert-devtools/src/test/resources/kafka-offset-test.application.conf
+++ 
b/eagle-core/eagle-alert-parent/eagle-alert/alert-devtools/src/test/resources/kafka-offset-test.application.conf
@@ -13,12 +13,12 @@
 # See the License for the specific language governing permissions and
 # limitations under the License.
 
-"dataSourceConfig":{
-  "kafkaBrokerList" : "kafka-broker:9092",
-  "zkQuorum" : "zk-admin:2181",
-  "transactionZKRoot" : "/consumers/eagle_consumer/%s/%s",
-  "zkSessionTimeoutMs" : 10000,
-  "connectionTimeoutMs" : 10000,
-  "zkRetryTimes" : 3,
-  "zkRetryInterval" : 3000
+"dataSourceConfig": {
+  "kafkaBrokerList": "kafka-broker:9092",
+  "zkQuorum": "zk-admin:2181",
+  "transactionZKRoot": "/consumers/eagle_consumer/%s/%s",
+  "zkSessionTimeoutMs": 10000,
+  "connectionTimeoutMs": 10000,
+  "zkRetryTimes": 3,
+  "zkRetryInterval": 3000
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/a0fc8708/eagle-core/eagle-alert-parent/eagle-alert/alert-devtools/src/test/resources/log4j.properties
----------------------------------------------------------------------
diff --git 
a/eagle-core/eagle-alert-parent/eagle-alert/alert-devtools/src/test/resources/log4j.properties
 
b/eagle-core/eagle-alert-parent/eagle-alert/alert-devtools/src/test/resources/log4j.properties
index d59ded6..9c6875d 100644
--- 
a/eagle-core/eagle-alert-parent/eagle-alert/alert-devtools/src/test/resources/log4j.properties
+++ 
b/eagle-core/eagle-alert-parent/eagle-alert/alert-devtools/src/test/resources/log4j.properties
@@ -12,9 +12,7 @@
 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 # See the License for the specific language governing permissions and
 # limitations under the License.
-
 log4j.rootLogger=INFO, stdout
-
 # standard output
 log4j.appender.stdout=org.apache.log4j.ConsoleAppender
 log4j.appender.stdout.layout=org.apache.log4j.PatternLayout

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/a0fc8708/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/pom.xml
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/pom.xml 
b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/pom.xml
index d2f889b..89728fe 100644
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/pom.xml
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/pom.xml
@@ -47,8 +47,8 @@
             <artifactId>jersey-client</artifactId>
         </dependency>
         <!--<dependency>-->
-            <!--<groupId>org.codehaus.jackson</groupId>-->
-            <!--<artifactId>jackson-jaxrs</artifactId>-->
+        <!--<groupId>org.codehaus.jackson</groupId>-->
+        <!--<artifactId>jackson-jaxrs</artifactId>-->
         <!--</dependency>-->
         <dependency>
             <groupId>com.netflix.archaius</groupId>
@@ -80,11 +80,6 @@
             <scope>test</scope>
         </dependency>
         <dependency>
-            <groupId>org.apache.kafka</groupId>
-            <artifactId>kafka-clients</artifactId>
-            <scope>test</scope>
-        </dependency>
-        <dependency>
             <groupId>com.101tec</groupId>
             <artifactId>zkclient</artifactId>
         </dependency>
@@ -114,10 +109,6 @@
             <artifactId>mapdb</artifactId>
         </dependency>
         <dependency>
-            <groupId>joda-time</groupId>
-            <artifactId>joda-time</artifactId>
-        </dependency>
-        <dependency>
             <groupId>org.apache.velocity</groupId>
             <artifactId>velocity</artifactId>
         </dependency>

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/a0fc8708/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/AlertStreamCollector.java
----------------------------------------------------------------------
diff --git 
a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/AlertStreamCollector.java
 
b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/AlertStreamCollector.java
index 7219271..4e3c275 100755
--- 
a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/AlertStreamCollector.java
+++ 
b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/AlertStreamCollector.java
@@ -20,8 +20,9 @@ import org.apache.eagle.alert.engine.model.AlertStreamEvent;
  */
 public interface AlertStreamCollector extends Collector<AlertStreamEvent> {
     /**
-     * No need to be thread-safe, but should be called on in synchronous like 
in Storm bolt execute method
+     * No need to be thread-safe, but should be called on in synchronous like 
in Storm bolt execute method.
      */
     void flush();
+
     void close();
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/a0fc8708/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/Collector.java
----------------------------------------------------------------------
diff --git 
a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/Collector.java
 
b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/Collector.java
index 9a6489c..d1a2b56 100755
--- 
a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/Collector.java
+++ 
b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/Collector.java
@@ -19,7 +19,7 @@ package org.apache.eagle.alert.engine;
 @FunctionalInterface
 public interface Collector<T> {
     /**
-     * Must make sure thread-safe
+     * Must make sure thread-safe.
      *
      * @param t
      */

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/a0fc8708/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/PartitionedEventCollector.java
----------------------------------------------------------------------
diff --git 
a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/PartitionedEventCollector.java
 
b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/PartitionedEventCollector.java
index 3a99e98..7310d8b 100755
--- 
a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/PartitionedEventCollector.java
+++ 
b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/PartitionedEventCollector.java
@@ -19,11 +19,11 @@ package org.apache.eagle.alert.engine;
 import org.apache.eagle.alert.engine.model.PartitionedEvent;
 
 /**
- * Executed in thread-safe trace
+ * Executed in thread-safe trace.
  */
 public interface PartitionedEventCollector extends Collector<PartitionedEvent> 
{
     /**
-     * @param event to be dropped
+     * @param event to be dropped.
      */
     void drop(PartitionedEvent event);
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/a0fc8708/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/StreamContext.java
----------------------------------------------------------------------
diff --git 
a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/StreamContext.java
 
b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/StreamContext.java
index 609378b..a03932f 100644
--- 
a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/StreamContext.java
+++ 
b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/StreamContext.java
@@ -1,7 +1,6 @@
 package org.apache.eagle.alert.engine;
 
 import backtype.storm.metric.api.MultiCountMetric;
-
 import com.typesafe.config.Config;
 
 /**

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/a0fc8708/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/StreamContextImpl.java
----------------------------------------------------------------------
diff --git 
a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/StreamContextImpl.java
 
b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/StreamContextImpl.java
index 4b2d68f..d02028a 100644
--- 
a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/StreamContextImpl.java
+++ 
b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/StreamContextImpl.java
@@ -18,7 +18,6 @@ package org.apache.eagle.alert.engine;
 
 import backtype.storm.metric.api.MultiCountMetric;
 import backtype.storm.task.TopologyContext;
-
 import com.typesafe.config.Config;
 
 public class StreamContextImpl implements StreamContext {
@@ -26,7 +25,7 @@ public class StreamContextImpl implements StreamContext {
     private final MultiCountMetric counter;
 
     public StreamContextImpl(Config config, MultiCountMetric counter, 
TopologyContext context) {
-        this.counter=counter;
+        this.counter = counter;
         this.config = config;
     }
 

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/a0fc8708/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/UnitTopologyMain.java
----------------------------------------------------------------------
diff --git 
a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/UnitTopologyMain.java
 
b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/UnitTopologyMain.java
index 57529b6..497d908 100644
--- 
a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/UnitTopologyMain.java
+++ 
b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/UnitTopologyMain.java
@@ -19,28 +19,30 @@
 
 package org.apache.eagle.alert.engine;
 
-import org.apache.commons.cli.CommandLine;
-import org.apache.commons.cli.CommandLineParser;
-import org.apache.commons.cli.DefaultParser;
-import org.apache.commons.cli.Options;
 import org.apache.eagle.alert.config.ZKConfig;
 import org.apache.eagle.alert.config.ZKConfigBuilder;
 import 
org.apache.eagle.alert.engine.coordinator.impl.ZKMetadataChangeNotifyService;
 import org.apache.eagle.alert.engine.runner.UnitTopologyRunner;
-
 import backtype.storm.generated.StormTopology;
-
 import com.typesafe.config.Config;
 import com.typesafe.config.ConfigFactory;
+import org.apache.commons.cli.CommandLine;
+import org.apache.commons.cli.CommandLineParser;
+import org.apache.commons.cli.DefaultParser;
+import org.apache.commons.cli.Options;
+
 
 /**
- * Since 5/3/16. Make sure unit topology can be started either from command 
line
+ * Make sure unit topology can be started either from command line
  * or from remote A few parameters for starting unit topology 1. number of 
spout
  * tasks 2. number of router bolts 3. number of alert bolts 4. number of 
publish
  * bolts
  *
- * Connections 1. spout and router bolt 2. router bolt and alert bolt 3. alert
- * bolt and publish bolt
+ * <p>Connections 1. spout and router bolt 2. router bolt and alert bolt 3. 
alert
+ * bolt and publish bolt.</p>
+ *
+ * @since 5/3/16.
+ *
  */
 public class UnitTopologyMain {
 
@@ -48,7 +50,7 @@ public class UnitTopologyMain {
         // command line parse
         Options options = new Options();
         options.addOption("c", true,
-                "config URL (valid file name) - defaults application.conf 
according to typesafe config default behavior.");
+            "config URL (valid file name) - defaults application.conf 
according to typesafe config default behavior.");
         CommandLineParser parser = new DefaultParser();
         CommandLine cmd = parser.parse(options, args);
 
@@ -64,7 +66,7 @@ public class UnitTopologyMain {
         ZKMetadataChangeNotifyService changeNotifyService = 
createZKNotifyService(config, topologyId);
         new UnitTopologyRunner(changeNotifyService).run(topologyId, config);
     }
-    
+
     public static void runTopology(Config config, backtype.storm.Config 
stormConfig) {
         // load config and start
         String topologyId = config.getString("topology.name");
@@ -77,7 +79,7 @@ public class UnitTopologyMain {
         ZKMetadataChangeNotifyService changeNotifyService = new 
ZKMetadataChangeNotifyService(zkConfig, topologyId);
         return changeNotifyService;
     }
-    
+
     public static StormTopology createTopology(Config config) {
         String topologyId = config.getString("topology.name");
         ZKMetadataChangeNotifyService changeNotifyService = 
createZKNotifyService(config, topologyId);

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/a0fc8708/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/coordinator/IMetadataChangeNotifyService.java
----------------------------------------------------------------------
diff --git 
a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/coordinator/IMetadataChangeNotifyService.java
 
b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/coordinator/IMetadataChangeNotifyService.java
index 22fe56a..5aa754e 100644
--- 
a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/coordinator/IMetadataChangeNotifyService.java
+++ 
b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/coordinator/IMetadataChangeNotifyService.java
@@ -16,35 +16,31 @@
  */
 package org.apache.eagle.alert.engine.coordinator;
 
-import java.io.Closeable;
-import java.io.Serializable;
-
 import org.apache.eagle.alert.engine.publisher.AlertPublishSpecListener;
 import org.apache.eagle.alert.engine.router.AlertBoltSpecListener;
 import org.apache.eagle.alert.engine.router.SpoutSpecListener;
 import org.apache.eagle.alert.engine.router.StreamRouterBoltSpecListener;
-
 import com.typesafe.config.Config;
 
+import java.io.Closeable;
+import java.io.Serializable;
+
 /**
  * IMetadataChangeNotifyService defines the following features
  * 1) initialization
  * 2) register metadata change listener
  *
- * In distributed environment for example storm platform,
+ * <p>In distributed environment for example storm platform,
  * subclass implementing this interface should have the following lifecycle
  * 1. object is created in client machine
  * 2. object is serialized and transferred to other machine
  * 3. object is created through deserialization
  * 4. invoke init() method to do initialization
  * 5. invoke various registerListener to get notified of config change
- * 6. invoke close() to release system resource
+ * 6. invoke close() to release system resource</p>
  */
-public interface IMetadataChangeNotifyService extends Closeable,Serializable {
-    /**
-     *
-     * @param config
-     */
+public interface IMetadataChangeNotifyService extends Closeable, Serializable {
+
     void init(Config config, MetadataType type);
 
     void registerListener(SpoutSpecListener listener);

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/a0fc8708/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/coordinator/StreamDefinitionNotFoundException.java
----------------------------------------------------------------------
diff --git 
a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/coordinator/StreamDefinitionNotFoundException.java
 
b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/coordinator/StreamDefinitionNotFoundException.java
index 5814ac6..8c493f5 100644
--- 
a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/coordinator/StreamDefinitionNotFoundException.java
+++ 
b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/coordinator/StreamDefinitionNotFoundException.java
@@ -1,8 +1,4 @@
-package org.apache.eagle.alert.engine.coordinator;
-
-import java.io.IOException;
-
-/**
+/*
  * Licensed to the Apache Software Foundation (ASF) under one or more
  * contributor license agreements.  See the NOTICE file distributed with
  * this work for additional information regarding copyright ownership.
@@ -18,6 +14,10 @@ import java.io.IOException;
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
+package org.apache.eagle.alert.engine.coordinator;
+
+import java.io.IOException;
+
 public class StreamDefinitionNotFoundException extends IOException {
     private static final long serialVersionUID = 6027811718016485808L;
 

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/a0fc8708/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/coordinator/impl/AbstractMetadataChangeNotifyService.java
----------------------------------------------------------------------
diff --git 
a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/coordinator/impl/AbstractMetadataChangeNotifyService.java
 
b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/coordinator/impl/AbstractMetadataChangeNotifyService.java
index 52fcc04..5da4cbd 100755
--- 
a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/coordinator/impl/AbstractMetadataChangeNotifyService.java
+++ 
b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/coordinator/impl/AbstractMetadataChangeNotifyService.java
@@ -16,13 +16,6 @@
  */
 package org.apache.eagle.alert.engine.coordinator.impl;
 
-import java.io.Closeable;
-import java.io.IOException;
-import java.io.Serializable;
-import java.util.ArrayList;
-import java.util.List;
-import java.util.Map;
-
 import org.apache.eagle.alert.coordination.model.AlertBoltSpec;
 import org.apache.eagle.alert.coordination.model.PublishSpec;
 import org.apache.eagle.alert.coordination.model.RouterSpec;
@@ -34,28 +27,30 @@ import 
org.apache.eagle.alert.engine.publisher.AlertPublishSpecListener;
 import org.apache.eagle.alert.engine.router.AlertBoltSpecListener;
 import org.apache.eagle.alert.engine.router.SpoutSpecListener;
 import org.apache.eagle.alert.engine.router.StreamRouterBoltSpecListener;
+import com.google.common.base.Preconditions;
+import com.typesafe.config.Config;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import com.google.common.base.Preconditions;
-import com.typesafe.config.Config;
+import java.io.Closeable;
+import java.io.IOException;
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
 
 /**
- * notify 3 components of metadata change Spout, StreamRouterBolt and AlertBolt
+ * notify 3 components of metadata change Spout, StreamRouterBolt and 
AlertBolt.
  */
-@SuppressWarnings({ "serial" })
-public abstract class AbstractMetadataChangeNotifyService implements 
IMetadataChangeNotifyService, Closeable,
-        Serializable {
-    private final static Logger LOG = 
LoggerFactory.getLogger(AbstractMetadataChangeNotifyService.class);
+@SuppressWarnings( {"serial"})
+public abstract class AbstractMetadataChangeNotifyService implements 
IMetadataChangeNotifyService, Closeable, Serializable {
+    private static final Logger LOG = 
LoggerFactory.getLogger(AbstractMetadataChangeNotifyService.class);
     private final List<StreamRouterBoltSpecListener> 
streamRouterBoltSpecListeners = new ArrayList<>();
     private final List<SpoutSpecListener> spoutSpecListeners = new 
ArrayList<>();
     private final List<AlertBoltSpecListener> alertBoltSpecListeners = new 
ArrayList<>();
     private final List<AlertPublishSpecListener> alertPublishSpecListeners = 
new ArrayList<>();
     protected MetadataType type;
 
-    /**
-     * @param config
-     */
     @Override
     public void init(Config config, MetadataType type) {
         this.type = type;

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/a0fc8708/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/coordinator/impl/ZKMetadataChangeNotifyService.java
----------------------------------------------------------------------
diff --git 
a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/coordinator/impl/ZKMetadataChangeNotifyService.java
 
b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/coordinator/impl/ZKMetadataChangeNotifyService.java
index 1b93072..9b31727 100755
--- 
a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/coordinator/impl/ZKMetadataChangeNotifyService.java
+++ 
b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/coordinator/impl/ZKMetadataChangeNotifyService.java
@@ -16,40 +16,32 @@
  */
 package org.apache.eagle.alert.engine.coordinator.impl;
 
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.Map.Entry;
-
 import org.apache.eagle.alert.config.ConfigBusConsumer;
 import org.apache.eagle.alert.config.ConfigChangeCallback;
 import org.apache.eagle.alert.config.ConfigValue;
 import org.apache.eagle.alert.config.ZKConfig;
-import org.apache.eagle.alert.coordination.model.AlertBoltSpec;
-import org.apache.eagle.alert.coordination.model.PublishSpec;
-import org.apache.eagle.alert.coordination.model.RouterSpec;
-import org.apache.eagle.alert.coordination.model.ScheduleState;
-import org.apache.eagle.alert.coordination.model.SpoutSpec;
-import org.apache.eagle.alert.coordination.model.VersionedPolicyDefinition;
-import org.apache.eagle.alert.coordination.model.VersionedStreamDefinition;
+import org.apache.eagle.alert.coordination.model.*;
 import org.apache.eagle.alert.engine.coordinator.MetadataType;
 import org.apache.eagle.alert.engine.coordinator.PolicyDefinition;
 import org.apache.eagle.alert.engine.coordinator.StreamDefinition;
 import org.apache.eagle.alert.service.IMetadataServiceClient;
 import org.apache.eagle.alert.service.MetadataServiceClientImpl;
+import com.typesafe.config.Config;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import com.typesafe.config.Config;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
 
 /**
  * <b>TODO</b>: performance tuning: It is not JVM level service, so it may 
cause
  * zookeeper burden in case of too many listeners This does not support
  * dynamically adding topic, all topics should be available when service object
  * is created.
- * <p>
  * ZK path format is as following:
  * <ul>
  * <li>/alert/topology_1/spout</li>
@@ -80,22 +72,18 @@ public class ZKMetadataChangeNotifyService extends 
AbstractMetadataChangeNotifyS
         LOG.info("init called for client");
     }
 
-    /**
-     * @seeAlso Coordinator
-     * @return
-     */
     private String getMetadataTopicSuffix() {
         switch (type) {
-        case ALERT_BOLT:
-            return "alert";
-        case ALERT_PUBLISH_BOLT:
-            return "publisher";
-        case SPOUT:
-            return "spout";
-        case STREAM_ROUTER_BOLT:
-            return "router";
-        default:
-            throw new RuntimeException(String.format("unexpected metadata 
type: %s !", type));
+            case ALERT_BOLT:
+                return "alert";
+            case ALERT_PUBLISH_BOLT:
+                return "publisher";
+            case SPOUT:
+                return "spout";
+            case STREAM_ROUTER_BOLT:
+                return "router";
+            default:
+                throw new RuntimeException(String.format("unexpected metadata 
type: %s !", type));
         }
     }
 
@@ -107,7 +95,7 @@ public class ZKMetadataChangeNotifyService extends 
AbstractMetadataChangeNotifyS
 
     @Override
     public void onNewConfig(ConfigValue value) {
-        LOG.info("Metadata changed {}",value);
+        LOG.info("Metadata changed {}", value);
 
         if (client == null) {
             LOG.error("OnNewConfig trigger, but metadata service client is 
null. Metadata type {}", type);
@@ -126,42 +114,42 @@ public class ZKMetadataChangeNotifyService extends 
AbstractMetadataChangeNotifyS
         }
         Map<String, StreamDefinition> sds = 
getStreams(state.getStreamSnapshots());
         switch (type) {
-        case ALERT_BOLT:
-            // we might query metadata service query get metadata snapshot and 
StreamDefinition
-            AlertBoltSpec alertSpec = state.getAlertSpecs().get(topologyId);
-            if (alertSpec == null) {
-                LOG.error(" alert spec for version {} not found for topology 
{} !", version, topologyId);
-            } else {
-                prePopulate(alertSpec, state.getPolicySnapshots());
-                notifyAlertBolt(alertSpec, sds);
-            }
-            break;
-        case ALERT_PUBLISH_BOLT:
-            PublishSpec pubSpec = state.getPublishSpecs().get(topologyId);
-            if (pubSpec == null) {
-                LOG.error(" alert spec for version {} not found for topology 
{} !", version, topologyId);
-            } else {
-                notifyAlertPublishBolt(pubSpec, sds);
-            }
-            break;
-        case SPOUT:
-            SpoutSpec spoutSpec = state.getSpoutSpecs().get(topologyId);
-            if (spoutSpec == null) {
-                LOG.error(" alert spec for version {} not found for topology 
{} !", version, topologyId);
-            } else {
-                notifySpout(spoutSpec, sds);
-            }
-            break;
-        case STREAM_ROUTER_BOLT:
-            RouterSpec gSpec = state.getGroupSpecs().get(topologyId);
-            if (gSpec == null) {
-                LOG.error(" alert spec for version {} not found for topology 
{} !", version, topologyId);
-            } else {
-                notifyStreamRouterBolt(gSpec, sds);
-            }
-            break;
-        default:
-            LOG.error("unexpected metadata type: {} ", type);
+            case ALERT_BOLT:
+                // we might query metadata service query get metadata snapshot 
and StreamDefinition
+                AlertBoltSpec alertSpec = 
state.getAlertSpecs().get(topologyId);
+                if (alertSpec == null) {
+                    LOG.error(" alert spec for version {} not found for 
topology {} !", version, topologyId);
+                } else {
+                    prePopulate(alertSpec, state.getPolicySnapshots());
+                    notifyAlertBolt(alertSpec, sds);
+                }
+                break;
+            case ALERT_PUBLISH_BOLT:
+                PublishSpec pubSpec = state.getPublishSpecs().get(topologyId);
+                if (pubSpec == null) {
+                    LOG.error(" alert spec for version {} not found for 
topology {} !", version, topologyId);
+                } else {
+                    notifyAlertPublishBolt(pubSpec, sds);
+                }
+                break;
+            case SPOUT:
+                SpoutSpec spoutSpec = state.getSpoutSpecs().get(topologyId);
+                if (spoutSpec == null) {
+                    LOG.error(" alert spec for version {} not found for 
topology {} !", version, topologyId);
+                } else {
+                    notifySpout(spoutSpec, sds);
+                }
+                break;
+            case STREAM_ROUTER_BOLT:
+                RouterSpec gSpec = state.getGroupSpecs().get(topologyId);
+                if (gSpec == null) {
+                    LOG.error(" alert spec for version {} not found for 
topology {} !", version, topologyId);
+                } else {
+                    notifyStreamRouterBolt(gSpec, sds);
+                }
+                break;
+            default:
+                LOG.error("unexpected metadata type: {} ", type);
         }
     }
 

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/a0fc8708/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/evaluator/CompositePolicyHandler.java
----------------------------------------------------------------------
diff --git 
a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/evaluator/CompositePolicyHandler.java
 
b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/evaluator/CompositePolicyHandler.java
index 4d69bca..d90fd9c 100644
--- 
a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/evaluator/CompositePolicyHandler.java
+++ 
b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/evaluator/CompositePolicyHandler.java
@@ -62,7 +62,7 @@ public class CompositePolicyHandler implements 
PolicyStreamHandler {
 
     @Override
     public void send(StreamEvent event) throws Exception {
-//        policyHandler.send(event);
+        // policyHandler.send(event);
         send(event, 0);
     }
 
@@ -71,8 +71,9 @@ public class CompositePolicyHandler implements 
PolicyStreamHandler {
         if (handlers.size() > idx) {
             handlers.get(idx).send(event);
         } else if (event instanceof AlertStreamEvent) {
-            if (LOG.isDebugEnabled())
+            if (LOG.isDebugEnabled()) {
                 LOG.debug("Emit new alert event: {}", event);
+            }
             collector.emit((AlertStreamEvent) event); // for alert stream 
events, emit if no handler found.
         } else {
             // nothing found. LOG, and throw exception

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/a0fc8708/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/evaluator/PolicyChangeListener.java
----------------------------------------------------------------------
diff --git 
a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/evaluator/PolicyChangeListener.java
 
b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/evaluator/PolicyChangeListener.java
index 22f1408..40351ad 100644
--- 
a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/evaluator/PolicyChangeListener.java
+++ 
b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/evaluator/PolicyChangeListener.java
@@ -1,12 +1,4 @@
-package org.apache.eagle.alert.engine.evaluator;
-
-import java.util.List;
-import java.util.Map;
-
-import org.apache.eagle.alert.engine.coordinator.PolicyDefinition;
-import org.apache.eagle.alert.engine.coordinator.StreamDefinition;
-
-/**
+/*
  * Licensed to the Apache Software Foundation (ASF) under one or more
  * contributor license agreements.  See the NOTICE file distributed with
  * this work for additional information regarding copyright ownership.
@@ -22,6 +14,14 @@ import 
org.apache.eagle.alert.engine.coordinator.StreamDefinition;
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
+package org.apache.eagle.alert.engine.evaluator;
+
+import org.apache.eagle.alert.engine.coordinator.PolicyDefinition;
+import org.apache.eagle.alert.engine.coordinator.StreamDefinition;
+
+import java.util.List;
+import java.util.Map;
+
 public interface PolicyChangeListener {
     void onPolicyChange(List<PolicyDefinition> added,
                         List<PolicyDefinition> removed,

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/a0fc8708/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/evaluator/PolicyGroupEvaluator.java
----------------------------------------------------------------------
diff --git 
a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/evaluator/PolicyGroupEvaluator.java
 
b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/evaluator/PolicyGroupEvaluator.java
index dd3ca24..e970ddd 100644
--- 
a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/evaluator/PolicyGroupEvaluator.java
+++ 
b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/evaluator/PolicyGroupEvaluator.java
@@ -1,11 +1,11 @@
 package org.apache.eagle.alert.engine.evaluator;
 
-import java.io.Serializable;
-
 import org.apache.eagle.alert.engine.AlertStreamCollector;
 import org.apache.eagle.alert.engine.StreamContext;
 import org.apache.eagle.alert.engine.model.PartitionedEvent;
 
+import java.io.Serializable;
+
 /**
  * Licensed to the Apache Software Foundation (ASF) under one or more
  * contributor license agreements.  See the NOTICE file distributed with
@@ -31,11 +31,11 @@ import org.apache.eagle.alert.engine.model.PartitionedEvent;
  * Step 3: nextEvent
  * Step 4: close
  */
-public interface PolicyGroupEvaluator extends PolicyChangeListener, 
Serializable{
+public interface PolicyGroupEvaluator extends PolicyChangeListener, 
Serializable {
     void init(StreamContext context, AlertStreamCollector collector);
 
     /**
-     * Evaluate event
+     * Evaluate event.
      */
     void nextEvent(PartitionedEvent event);
 

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/a0fc8708/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/evaluator/PolicyHandlerContext.java
----------------------------------------------------------------------
diff --git 
a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/evaluator/PolicyHandlerContext.java
 
b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/evaluator/PolicyHandlerContext.java
index 335b237..49f7eed 100644
--- 
a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/evaluator/PolicyHandlerContext.java
+++ 
b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/evaluator/PolicyHandlerContext.java
@@ -1,10 +1,4 @@
-package org.apache.eagle.alert.engine.evaluator;
-
-import backtype.storm.metric.api.MultiCountMetric;
-import com.typesafe.config.Config;
-import org.apache.eagle.alert.engine.coordinator.PolicyDefinition;
-
-/**
+/*
  * Licensed to the Apache Software Foundation (ASF) under one or more
  * contributor license agreements.  See the NOTICE file distributed with
  * this work for additional information regarding copyright ownership.
@@ -20,6 +14,13 @@ import 
org.apache.eagle.alert.engine.coordinator.PolicyDefinition;
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
+
+package org.apache.eagle.alert.engine.evaluator;
+
+import org.apache.eagle.alert.engine.coordinator.PolicyDefinition;
+import backtype.storm.metric.api.MultiCountMetric;
+import com.typesafe.config.Config;
+
 public class PolicyHandlerContext {
     private PolicyDefinition policyDefinition;
     private PolicyGroupEvaluator policyEvaluator;

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/a0fc8708/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/evaluator/PolicyStreamHandler.java
----------------------------------------------------------------------
diff --git 
a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/evaluator/PolicyStreamHandler.java
 
b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/evaluator/PolicyStreamHandler.java
index 069d321..7b457b0 100755
--- 
a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/evaluator/PolicyStreamHandler.java
+++ 
b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/evaluator/PolicyStreamHandler.java
@@ -1,10 +1,4 @@
-package org.apache.eagle.alert.engine.evaluator;
-
-import org.apache.eagle.alert.engine.Collector;
-import org.apache.eagle.alert.engine.model.AlertStreamEvent;
-import org.apache.eagle.alert.engine.model.StreamEvent;
-
-/**
+/*
  * Licensed to the Apache Software Foundation (ASF) under one or more
  * contributor license agreements.  See the NOTICE file distributed with
  * this work for additional information regarding copyright ownership.
@@ -20,8 +14,16 @@ import org.apache.eagle.alert.engine.model.StreamEvent;
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
+package org.apache.eagle.alert.engine.evaluator;
+
+import org.apache.eagle.alert.engine.Collector;
+import org.apache.eagle.alert.engine.model.AlertStreamEvent;
+import org.apache.eagle.alert.engine.model.StreamEvent;
+
 public interface PolicyStreamHandler {
     void prepare(Collector<AlertStreamEvent> collector, PolicyHandlerContext 
context) throws Exception;
+
     void send(StreamEvent event) throws Exception;
+
     void close() throws Exception;
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/a0fc8708/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/evaluator/PolicyStreamHandlers.java
----------------------------------------------------------------------
diff --git 
a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/evaluator/PolicyStreamHandlers.java
 
b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/evaluator/PolicyStreamHandlers.java
index 1e7aacc..116f633 100644
--- 
a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/evaluator/PolicyStreamHandlers.java
+++ 
b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/evaluator/PolicyStreamHandlers.java
@@ -16,8 +16,6 @@
  */
 package org.apache.eagle.alert.engine.evaluator;
 
-import java.util.Map;
-
 import org.apache.eagle.alert.engine.coordinator.PolicyDefinition;
 import org.apache.eagle.alert.engine.coordinator.StreamDefinition;
 import org.apache.eagle.alert.engine.evaluator.absence.AbsencePolicyHandler;
@@ -27,6 +25,8 @@ import 
org.apache.eagle.alert.engine.evaluator.nodata.NoDataPolicyTimeBatchHandl
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.util.Map;
+
 /**
  * TODO/FIXME: to support multiple stage definition in single policy. The 
methods in this class is not good to understand now.(Hard code of 0/1).
  */
@@ -42,8 +42,8 @@ public class PolicyStreamHandlers {
         if (SIDDHI_ENGINE.equals(definition.getType())) {
             return new SiddhiPolicyHandler(sds, 0);// // FIXME: 8/2/16 
         } else if (NO_DATA_ALERT_ENGINE.equals(definition.getType())) {
-               // no data for an entire stream won't trigger gap alert  (use 
local time & batch window instead)
-               return new NoDataPolicyTimeBatchHandler(sds);
+            // no data for an entire stream won't trigger gap alert  (use 
local time & batch window instead)
+            return new NoDataPolicyTimeBatchHandler(sds);
         } else if (ABSENCE_ALERT_ENGINE.equals(definition.getType())) {
             return new AbsencePolicyHandler(sds);
         } else if (CUSTOMIZED_ENGINE.equals(definition.getType())) {

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/a0fc8708/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/evaluator/absence/AbsenceAlertDriver.java
----------------------------------------------------------------------
diff --git 
a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/evaluator/absence/AbsenceAlertDriver.java
 
b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/evaluator/absence/AbsenceAlertDriver.java
index 3b4aba8..f53139f 100644
--- 
a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/evaluator/absence/AbsenceAlertDriver.java
+++ 
b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/evaluator/absence/AbsenceAlertDriver.java
@@ -22,8 +22,8 @@ import org.slf4j.LoggerFactory;
 import java.util.List;
 
 /**
+ * this assumes that event comes in time order.
  * Since 7/7/16.
- * this assumes that event comes in time order
  */
 public class AbsenceAlertDriver {
     private static final Logger LOG = 
LoggerFactory.getLogger(AbsenceAlertDriver.class);
@@ -38,7 +38,7 @@ public class AbsenceAlertDriver {
 
     public boolean process(List<Object> appearAttrs, long occurTime) {
         // initialize window
-        if(processor == null){
+        if (processor == null) {
             processor = nextProcessor(occurTime);
             LOG.info("initialized a new window {}", processor);
         }
@@ -46,8 +46,8 @@ public class AbsenceAlertDriver {
         AbsenceWindowProcessor.OccurStatus status = processor.checkStatus();
         boolean expired = processor.checkExpired();
         boolean isAbsenceAlert = false;
-        if (expired){
-            if(status == AbsenceWindowProcessor.OccurStatus.absent){
+        if (expired) {
+            if (status == AbsenceWindowProcessor.OccurStatus.absent) {
                 // send alert
                 LOG.info("===================");
                 LOG.info("|| Absence Alert ||");
@@ -63,7 +63,8 @@ public class AbsenceAlertDriver {
     }
 
     /**
-     * calculate absolute time range based on current timestamp
+     * calculate absolute time range based on current timestamp.
+     *
      * @param currTime milliseconds
      * @return
      */

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/a0fc8708/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/evaluator/absence/AbsenceDailyRule.java
----------------------------------------------------------------------
diff --git 
a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/evaluator/absence/AbsenceDailyRule.java
 
b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/evaluator/absence/AbsenceDailyRule.java
index ed50280..db4be7c 100644
--- 
a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/evaluator/absence/AbsenceDailyRule.java
+++ 
b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/evaluator/absence/AbsenceDailyRule.java
@@ -20,7 +20,7 @@ package org.apache.eagle.alert.engine.evaluator.absence;
  * Since 7/7/16.
  */
 public class AbsenceDailyRule implements AbsenceRule {
-    public static final long DAY_MILLI_SECONDS = 86400*1000L;
+    public static final long DAY_MILLI_SECONDS = 86400 * 1000L;
     public long startOffset;
     public long endOffset;
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/a0fc8708/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/evaluator/absence/AbsencePolicyHandler.java
----------------------------------------------------------------------
diff --git 
a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/evaluator/absence/AbsencePolicyHandler.java
 
b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/evaluator/absence/AbsencePolicyHandler.java
index 826a69d..c372411 100644
--- 
a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/evaluator/absence/AbsencePolicyHandler.java
+++ 
b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/evaluator/absence/AbsencePolicyHandler.java
@@ -32,29 +32,29 @@ import java.util.*;
 
 /**
  * Since 7/6/16.
- *  * policy would be like:
+ * * policy would be like:
  * {
- "name": "absenceAlertPolicy",
- "description": "absenceAlertPolicy",
- "inputStreams": [
- "absenceAlertStream"
- ],
- "outputStreams": [
- "absenceAlertStream_out"
- ],
- "definition": {
- "type": "absencealert",
- "value": "1,jobID,job1,daily_rule,14:00:00,15:00:00"
- },
- "partitionSpec": [
- {
- "streamId": "absenceAlertStream",
- "type": "GROUPBY",
- "columns" : ["jobID"]
- }
- ],
- "parallelismHint": 2
- }
+ * "name": "absenceAlertPolicy",
+ * "description": "absenceAlertPolicy",
+ * "inputStreams": [
+ * "absenceAlertStream"
+ * ],
+ * "outputStreams": [
+ * "absenceAlertStream_out"
+ * ],
+ * "definition": {
+ * "type": "absencealert",
+ * "value": "1,jobID,job1,daily_rule,14:00:00,15:00:00"
+ * },
+ * "partitionSpec": [
+ * {
+ * "streamId": "absenceAlertStream",
+ * "type": "GROUPBY",
+ * "columns" : ["jobID"]
+ * }
+ * ],
+ * "parallelismHint": 2
+ * }
  */
 public class AbsencePolicyHandler implements PolicyStreamHandler {
     private static final Logger LOG = 
LoggerFactory.getLogger(AbsencePolicyHandler.class);
@@ -66,7 +66,7 @@ public class AbsencePolicyHandler implements 
PolicyStreamHandler {
     private volatile List<Object> expectValues = new ArrayList<>();
     private AbsenceAlertDriver driver;
 
-    public AbsencePolicyHandler(Map<String, StreamDefinition> sds){
+    public AbsencePolicyHandler(Map<String, StreamDefinition> sds) {
         this.sds = sds;
     }
 
@@ -77,11 +77,13 @@ public class AbsencePolicyHandler implements 
PolicyStreamHandler {
         this.policyDef = context.getPolicyDefinition();
         List<String> inputStreams = policyDef.getInputStreams();
         // validate inputStreams has to contain only one stream
-        if(inputStreams.size() != 1)
+        if (inputStreams.size() != 1) {
             throw new IllegalArgumentException("policy inputStream size has to 
be 1 for absence alert");
+        }
         // validate outputStream has to contain only one stream
-        if(policyDef.getOutputStreams().size() != 1)
+        if (policyDef.getOutputStreams().size() != 1) {
             throw new IllegalArgumentException("policy outputStream size has 
to be 1 for absence alert");
+        }
 
         String is = inputStreams.get(0);
         StreamDefinition sd = sds.get(is);
@@ -94,17 +96,17 @@ public class AbsencePolicyHandler implements 
PolicyStreamHandler {
         int offset = 0;
         // populate wisb field names
         int numOfFields = Integer.parseInt(segments[offset++]);
-        for(int i = offset; i < offset+numOfFields; i++){
+        for (int i = offset; i < offset + numOfFields; i++) {
             String fn = segments[i];
             expectFieldIndices.add(sd.getColumnIndex(fn));
         }
         offset += numOfFields;
-        for(int i = offset; i < offset+numOfFields; i++){
+        for (int i = offset; i < offset + numOfFields; i++) {
             String fn = segments[i];
             expectValues.add(fn);
         }
         offset += numOfFields;
-        String absence_window_rule_type = segments[offset++];
+        String absenceWindowRuleType = segments[offset++];
         AbsenceDailyRule rule = new AbsenceDailyRule();
         SimpleDateFormat sdf = new SimpleDateFormat("HH:mm:ss");
         sdf.setTimeZone(TimeZone.getTimeZone("UTC"));
@@ -120,7 +122,7 @@ public class AbsencePolicyHandler implements 
PolicyStreamHandler {
     public void send(StreamEvent event) throws Exception {
         Object[] data = event.getData();
         List<Object> columnValues = new ArrayList<>();
-        for(int i=0; i<expectFieldIndices.size(); i++){
+        for (int i = 0; i < expectFieldIndices.size(); i++) {
             Object o = data[expectFieldIndices.get(i)];
             // convert value to string
             columnValues.add(o.toString());

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/a0fc8708/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/evaluator/absence/AbsenceWindow.java
----------------------------------------------------------------------
diff --git 
a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/evaluator/absence/AbsenceWindow.java
 
b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/evaluator/absence/AbsenceWindow.java
index 728e702..9958dc7 100644
--- 
a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/evaluator/absence/AbsenceWindow.java
+++ 
b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/evaluator/absence/AbsenceWindow.java
@@ -27,7 +27,7 @@ public class AbsenceWindow {
     public long startTime;
     public long endTime;
 
-    public String toString(){
+    public String toString() {
         SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
         sdf.setTimeZone(TimeZone.getTimeZone("UTC"));
         String t1 = sdf.format(new Date(startTime));

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/a0fc8708/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/evaluator/absence/AbsenceWindowGenerator.java
----------------------------------------------------------------------
diff --git 
a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/evaluator/absence/AbsenceWindowGenerator.java
 
b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/evaluator/absence/AbsenceWindowGenerator.java
index 6cd0880..dfde09a 100644
--- 
a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/evaluator/absence/AbsenceWindowGenerator.java
+++ 
b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/evaluator/absence/AbsenceWindowGenerator.java
@@ -21,20 +21,22 @@ package org.apache.eagle.alert.engine.evaluator.absence;
  */
 public class AbsenceWindowGenerator {
     private AbsenceRule rule;
-    public AbsenceWindowGenerator(AbsenceRule rule){
+
+    public AbsenceWindowGenerator(AbsenceRule rule) {
         this.rule = rule;
     }
 
     /**
-     * @param currTime
-     * @return
+     * nextWindow.
+     *
+     * @param currTime current timestamp
      */
-    public AbsenceWindow nextWindow(long currTime){
+    public AbsenceWindow nextWindow(long currTime) {
         AbsenceWindow window = new AbsenceWindow();
-        if(rule instanceof AbsenceDailyRule){
-            AbsenceDailyRule r = (AbsenceDailyRule)rule;
+        if (rule instanceof AbsenceDailyRule) {
+            AbsenceDailyRule r = (AbsenceDailyRule) rule;
             long adjustment = 0; // if today's window already expires, then 
adjust to tomorrow's window
-            if(currTime % AbsenceDailyRule.DAY_MILLI_SECONDS > r.startOffset){
+            if (currTime % AbsenceDailyRule.DAY_MILLI_SECONDS > r.startOffset) 
{
                 adjustment = AbsenceDailyRule.DAY_MILLI_SECONDS;
             }
             // use current timestamp to round down to day
@@ -43,7 +45,7 @@ public class AbsenceWindowGenerator {
             window.startTime = day + r.startOffset;
             window.endTime = day + r.endOffset;
             return window;
-        }else{
+        } else {
             throw new UnsupportedOperationException("not supported rule " + 
rule);
         }
     }


Reply via email to