[EAGLE-815] Add eagle alert template, severity and category support

Support alert template to generate human readable message

# New Features
* Support to define alert template in PolicyDefinition
* Support to generate alert event (subject&body, instead of using 
AlertPublishEvent for less changes) based on policy'template and alert event as 
context
* Dynamically load policy metadata in alert publisher
* Integration VelocityAlertTemplateEngine in alertPublisherBolt with metadata 
lifecycle
* Support persist alert message in AlertEntity as immutable field.
* Refactor alert mail template using 
https://github.com/mailgun/transactional-email-templates
* Refactor Alert Template to become simple and human-readable.
* Add Alert Category
* Add Alert Severity (notification template color will change according to 
different severiy): UNKNOWN (blue), OK(green), WARNING (orange), CRITICAL/FATAL 
(dark black)
* Add Alert Subject &Body Template Engine, for example:
    * Sample Event:

          {
              "host": "localhost",
              "timestamp": 1480319108000,
              "metric": "hadoop.cpu.usage",
              "component": "namenode",
              "site": "test2",
              "value": 0.96
          }

    * Sample Subject “RESOURCEMANAGER JMX Metric Alert” is defined by:

           $component.toUpperCase() JMX Metric Alert

    * Sample Body is defined as:

            An alert happened on <strong>$component</strong> 
(<strong>$host</strong>) of cluster <strong>$site</strong> at 
<strong>$ALERT_TIME</strong> because <span style="color: red">$metric = 
$value</span>

* Add VelocityTemplateParser

# TODO
* Integrate velocity parser into policy validator, make sure the template only 
contains `global variable` and `alert stream schema fields`.
* Add alert definition preview

Author: Hao Chen <h...@apache.org>

Closes #711 from haoch/EAGLE-815.


Project: http://git-wip-us.apache.org/repos/asf/incubator-eagle/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-eagle/commit/1c81c086
Tree: http://git-wip-us.apache.org/repos/asf/incubator-eagle/tree/1c81c086
Diff: http://git-wip-us.apache.org/repos/asf/incubator-eagle/diff/1c81c086

Branch: refs/heads/master
Commit: 1c81c0865c70b9a0a91b86d7813be923e86e1dba
Parents: e5e215e
Author: Hao Chen <h...@apache.org>
Authored: Mon Dec 5 14:12:49 2016 +0800
Committer: Hao Chen <h...@apache.org>
Committed: Mon Dec 5 14:12:49 2016 +0800

----------------------------------------------------------------------
 .../eagle/alert/app/AlertEagleStorePlugin.java  |  14 +-
 .../engine/coordinator/AlertDefinition.java     |  72 +++
 .../alert/engine/coordinator/AlertSeverity.java |  21 +
 .../engine/coordinator/PolicyDefinition.java    |  18 +
 .../alert/engine/model/AlertPublishEvent.java   |  81 ++-
 .../alert/engine/model/AlertStreamEvent.java    |  60 ++-
 .../engine/model/AlertPublishEventTest.java     |   2 +-
 .../engine/model/AlertStreamEventTest.java      |  17 +-
 .../interpreter/PolicyExecutionPlanner.java     |   3 +
 .../engine/publisher/AlertStreamFilter.java     |  26 +
 .../engine/publisher/PipeStreamFilter.java      |  46 ++
 .../engine/publisher/PublishConstants.java      |   7 +-
 .../publisher/email/AlertEmailGenerator.java    |  56 ++-
 .../publisher/impl/AlertEmailPublisher.java     |   4 +-
 .../publisher/template/AlertContextFields.java  |  43 ++
 .../publisher/template/AlertTemplateEngine.java |  48 ++
 .../template/AlertTemplateProvider.java         |  23 +
 .../template/VelocityAlertTemplateEngine.java   | 170 +++++++
 .../template/VelocityTemplateParser.java        |  95 ++++
 .../alert/engine/runner/AlertPublisherBolt.java | 128 +++--
 .../src/main/resources/ALERT_DEFAULT.vm         | 301 -----------
 .../main/resources/ALERT_DEFAULT_TEMPLATE.vm    | 301 +++++++++++
 .../main/resources/ALERT_INLINED_TEMPLATE.vm    | 259 ++++++++++
 .../src/main/resources/ALERT_LIGHT_TEMPLATE.vm  | 495 +++++++++++++++++++
 .../publisher/AlertPublisherTestHelper.java     |  19 +-
 .../dedup/DefaultDedupWithoutStateTest.java     |   2 +-
 .../dedup/DefaultDeduplicatorTest.java          |   2 +-
 .../VelocityAlertTemplateEngineTest.java        | 135 +++++
 .../template/VelocityTemplateParserTest.java    |  65 +++
 .../template/VelocityTemplateTest.java          |  95 ++++
 .../app/messaging/KafkaStreamProvider.java      |  35 +-
 .../eagle/app/messaging/KafkaStreamSource.java  |   7 +-
 .../app/messaging/KafkaStreamSourceConfig.java  |   4 +-
 .../eagle/metadata/model/AlertEntity.java       |  21 +
 ...le.metric.HadoopMetricMonitorAppProdiver.xml |  50 +-
 .../eagle/metric/SendSampleDataToKafka.java     |  15 +-
 .../resources/hadoop_jmx_metric_sample.json     |   2 +-
 .../test/resources/integrate_test_policy.json   |  37 ++
 .../history/storm/SparkHistoryJobParseBolt.java |   2 +-
 .../partials/alert/policyEdit/advancedMode.html |  25 +-
 .../app/dev/public/js/ctrls/alertEditCtrl.js    |   6 +
 41 files changed, 2386 insertions(+), 426 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/1c81c086/eagle-core/eagle-alert-parent/eagle-alert-app/src/main/java/org/apache/eagle/alert/app/AlertEagleStorePlugin.java
----------------------------------------------------------------------
diff --git 
a/eagle-core/eagle-alert-parent/eagle-alert-app/src/main/java/org/apache/eagle/alert/app/AlertEagleStorePlugin.java
 
b/eagle-core/eagle-alert-parent/eagle-alert-app/src/main/java/org/apache/eagle/alert/app/AlertEagleStorePlugin.java
index a534012..30d2b78 100644
--- 
a/eagle-core/eagle-alert-parent/eagle-alert-app/src/main/java/org/apache/eagle/alert/app/AlertEagleStorePlugin.java
+++ 
b/eagle-core/eagle-alert-parent/eagle-alert-app/src/main/java/org/apache/eagle/alert/app/AlertEagleStorePlugin.java
@@ -80,14 +80,18 @@ public class AlertEagleStorePlugin extends 
AbstractPublishPlugin {
         Map<String, String> tags = new HashMap<>();
         tags.put(POLICY_ID_KEY, event.getPolicyId());
         tags.put(ALERT_ID_KEY, event.getAlertId());
-        if (event.getExtraData() != null && !event.getExtraData().isEmpty()) {
-            tags.put(SITE_ID_KEY, 
event.getExtraData().get(SITE_ID_KEY).toString());
-            
alertEvent.setPolicyValue(event.getExtraData().get(POLICY_VALUE_KEY).toString());
-            alertEvent.setAppIds((List<String>) 
event.getExtraData().get(APP_IDS_KEY));
+        tags.put(ALERT_CATEGORY, event.getCategory());
+        tags.put(ALERT_SEVERITY, event.getSeverity().toString());
+        if (event.getContext() != null && !event.getContext().isEmpty()) {
+            tags.put(SITE_ID_KEY, 
event.getContext().get(SITE_ID_KEY).toString());
+            
alertEvent.setPolicyValue(event.getContext().get(POLICY_VALUE_KEY).toString());
+            alertEvent.setAppIds((List<String>) 
event.getContext().get(APP_IDS_KEY));
         }
         alertEvent.setTimestamp(event.getCreatedTime());
         alertEvent.setAlertData(event.getDataMap());
+        alertEvent.setAlertSubject(event.getSubject());
+        alertEvent.setAlertBody(event.getBody());
         alertEvent.setTags(tags);
         return alertEvent;
     }
-}
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/1c81c086/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/engine/coordinator/AlertDefinition.java
----------------------------------------------------------------------
diff --git 
a/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/engine/coordinator/AlertDefinition.java
 
b/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/engine/coordinator/AlertDefinition.java
new file mode 100644
index 0000000..66579bb
--- /dev/null
+++ 
b/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/engine/coordinator/AlertDefinition.java
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.eagle.alert.engine.coordinator;
+
+public class AlertDefinition {
+    private TemplateType templateType = TemplateType.TEXT;
+    private String subject;
+    private String body;
+
+    private AlertSeverity severity;
+    private String category;
+
+    public String getBody() {
+        return body;
+    }
+
+    public void setBody(String templateResource) {
+        this.body = templateResource;
+    }
+
+    public TemplateType getTemplateType() {
+        return templateType;
+    }
+
+    public void setTemplateType(TemplateType type) {
+        this.templateType = type;
+    }
+
+    public String getSubject() {
+        return subject;
+    }
+
+    public void setSubject(String subject) {
+        this.subject = subject;
+    }
+
+    public AlertSeverity getSeverity() {
+        return severity;
+    }
+
+    public void setSeverity(AlertSeverity severity) {
+        this.severity = severity;
+    }
+
+    public String getCategory() {
+        return category;
+    }
+
+    public void setCategory(String category) {
+        this.category = category;
+    }
+
+    public enum TemplateType {
+        TEXT,
+        // FILE,
+        // HTTP
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/1c81c086/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/engine/coordinator/AlertSeverity.java
----------------------------------------------------------------------
diff --git 
a/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/engine/coordinator/AlertSeverity.java
 
b/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/engine/coordinator/AlertSeverity.java
new file mode 100644
index 0000000..0d36231
--- /dev/null
+++ 
b/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/engine/coordinator/AlertSeverity.java
@@ -0,0 +1,21 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.eagle.alert.engine.coordinator;
+
+public enum AlertSeverity {
+    UNKNOWN, OK, WARNING, CRITICAL, FATAL
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/1c81c086/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/engine/coordinator/PolicyDefinition.java
----------------------------------------------------------------------
diff --git 
a/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/engine/coordinator/PolicyDefinition.java
 
b/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/engine/coordinator/PolicyDefinition.java
index 94d84f2..3663670 100644
--- 
a/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/engine/coordinator/PolicyDefinition.java
+++ 
b/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/engine/coordinator/PolicyDefinition.java
@@ -39,6 +39,7 @@ public class PolicyDefinition implements Serializable {
     private Definition definition;
     private Definition stateDefinition;
     private PolicyStatus policyStatus = PolicyStatus.ENABLED;
+    private AlertDefinition alertDefinition;
 
     // one stream only have one partition in one policy, since we don't 
support stream alias
     private List<StreamPartition> partitionSpec = new 
ArrayList<StreamPartition>();
@@ -170,6 +171,22 @@ public class PolicyDefinition implements Serializable {
         return false;
     }
 
+    public AlertDefinition getAlertDefinition() {
+        return alertDefinition;
+    }
+
+    public void setAlertDefinition(AlertDefinition alertDefinition) {
+        this.alertDefinition = alertDefinition;
+    }
+
+    public AlertSeverity getAlertSeverity() {
+        return alertDefinition == null ? null : alertDefinition.getSeverity();
+    }
+
+    public String getAlertCategory() {
+        return alertDefinition == null ? null : alertDefinition.getCategory();
+    }
+
     @JsonIgnoreProperties(ignoreUnknown = true)
     public static class Definition implements Serializable {
         private static final long serialVersionUID = -622366527887848346L;
@@ -273,6 +290,7 @@ public class PolicyDefinition implements Serializable {
         ENABLED, DISABLED
     }
 
+
     @Override
     public String toString() {
         return String.format("{name=\"%s\",definition=%s}", this.getName(), 
this.getDefinition() == null ? "null" : this.getDefinition().toString());

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/1c81c086/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/engine/model/AlertPublishEvent.java
----------------------------------------------------------------------
diff --git 
a/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/engine/model/AlertPublishEvent.java
 
b/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/engine/model/AlertPublishEvent.java
index aee0ba0..a794e49 100644
--- 
a/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/engine/model/AlertPublishEvent.java
+++ 
b/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/engine/model/AlertPublishEvent.java
@@ -24,6 +24,9 @@ import org.apache.eagle.common.DateTimeUtil;
 import java.util.List;
 import java.util.Map;
 
+/**
+ * Use as final rich alert event.
+ */
 public class AlertPublishEvent {
     private String alertId;
     private String siteId;
@@ -32,12 +35,19 @@ public class AlertPublishEvent {
     private String policyValue;
     private long alertTimestamp;
     private Map<String, Object> alertData;
+    private String alertSubject;
+    private String alertBody;
+    private String streamId;
+    private String createdBy;
+    private long createdTime;
 
     public static final String ALERT_ID_KEY = "alertId";
     public static final String SITE_ID_KEY = "siteId";
     public static final String APP_IDS_KEY = "appIds";
     public static final String POLICY_ID_KEY = "policyId";
     public static final String POLICY_VALUE_KEY = "policyValue";
+    public static final String ALERT_CATEGORY = "category";
+    public static final String ALERT_SEVERITY = "severity";
 
     public String getAlertId() {
         return alertId;
@@ -102,10 +112,21 @@ public class AlertPublishEvent {
         alertEvent.setAlertId(event.getAlertId());
         alertEvent.setPolicyId(event.getPolicyId());
         alertEvent.setAlertTimestamp(event.getCreatedTime());
-        if (event.getExtraData() != null && !event.getExtraData().isEmpty()) {
-            
alertEvent.setSiteId(event.getExtraData().get(SITE_ID_KEY).toString());
-            
alertEvent.setPolicyValue(event.getExtraData().get(POLICY_VALUE_KEY).toString());
-            alertEvent.setAppIds((List<String>) 
event.getExtraData().get(APP_IDS_KEY));
+        alertEvent.setStreamId(event.getStreamId());
+        alertEvent.setCreatedBy(event.getCreatedBy());
+        alertEvent.setCreatedTime(event.getCreatedTime());
+        alertEvent.setAlertSubject(event.getSubject());
+        alertEvent.setAlertBody(event.getBody());
+        if (event.getContext() != null && !event.getContext().isEmpty()) {
+            if (event.getContext().containsKey(SITE_ID_KEY)) {
+                
alertEvent.setSiteId(event.getContext().get(SITE_ID_KEY).toString());
+            }
+            if (event.getContext().containsKey(POLICY_VALUE_KEY)) {
+                
alertEvent.setPolicyValue(event.getContext().get(POLICY_VALUE_KEY).toString());
+            }
+            if (event.getContext().containsKey(APP_IDS_KEY)) {
+                alertEvent.setAppIds((List<String>) 
event.getContext().get(APP_IDS_KEY));
+            }
         }
         alertEvent.setAlertData(event.getDataMap());
         return alertEvent;
@@ -113,11 +134,51 @@ public class AlertPublishEvent {
 
     public String toString() {
         return String.format("%s %s alertId=%s, siteId=%s, policyId=%s, 
alertData=%s",
-            DateTimeUtil.millisecondsToHumanDateWithSeconds(alertTimestamp),
-            DateTimeUtil.CURRENT_TIME_ZONE.getID(),
-            alertId,
-            siteId,
-            policyId,
-            alertData.toString());
+                
DateTimeUtil.millisecondsToHumanDateWithSeconds(alertTimestamp),
+                DateTimeUtil.CURRENT_TIME_ZONE.getID(),
+                alertId,
+                siteId,
+                policyId,
+                alertData.toString());
+    }
+
+    public String getAlertSubject() {
+        return alertSubject;
+    }
+
+    public void setAlertSubject(String alertSubject) {
+        this.alertSubject = alertSubject;
+    }
+
+    public String getAlertBody() {
+        return alertBody;
+    }
+
+    public void setAlertBody(String alertBody) {
+        this.alertBody = alertBody;
+    }
+
+    public String getStreamId() {
+        return streamId;
+    }
+
+    public void setStreamId(String streamId) {
+        this.streamId = streamId;
+    }
+
+    public String getCreatedBy() {
+        return createdBy;
+    }
+
+    public void setCreatedBy(String createdBy) {
+        this.createdBy = createdBy;
+    }
+
+    public long getCreatedTime() {
+        return createdTime;
+    }
+
+    public void setCreatedTime(long createdTime) {
+        this.createdTime = createdTime;
     }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/1c81c086/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/engine/model/AlertStreamEvent.java
----------------------------------------------------------------------
diff --git 
a/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/engine/model/AlertStreamEvent.java
 
b/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/engine/model/AlertStreamEvent.java
index b7f0132..50512b1 100644
--- 
a/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/engine/model/AlertStreamEvent.java
+++ 
b/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/engine/model/AlertStreamEvent.java
@@ -16,6 +16,7 @@
  */
 package org.apache.eagle.alert.engine.model;
 
+import org.apache.eagle.alert.engine.coordinator.AlertSeverity;
 import org.apache.eagle.common.DateTimeUtil;
 import org.apache.eagle.alert.engine.coordinator.StreamColumn;
 import org.apache.eagle.alert.engine.coordinator.StreamDefinition;
@@ -35,8 +36,18 @@ public class AlertStreamEvent extends StreamEvent {
     private StreamDefinition schema;
     private String createdBy;
     private long createdTime;
-    // app related fields
-    private Map<String, Object> extraData;
+    private String category;
+    private AlertSeverity severity = AlertSeverity.WARNING;
+
+    // ----------------------
+    // Lazy Alert Fields
+    // ----------------------
+
+    // Dynamical context like app related fields
+    private Map<String, Object> context;
+    // Alert content like subject and body
+    private String subject;
+    private String body;
 
     public AlertStreamEvent() {
     }
@@ -72,9 +83,10 @@ public class AlertStreamEvent extends StreamEvent {
                 dataStrings.add(null);
             }
         }
-        return 
String.format("AlertStreamEvent[stream=%S,timestamp=%s,data=[%s], policyId=%s, 
createdBy=%s, metaVersion=%s]",
+
+        return String.format("Alert {stream=%S,timestamp=%s,data=%s, 
policyId=%s, createdBy=%s, metaVersion=%s}",
                 this.getStreamId(), 
DateTimeUtil.millisecondsToHumanDateWithMilliseconds(this.getTimestamp()),
-                StringUtils.join(dataStrings, ","), this.getPolicyId(), 
this.getCreatedBy(), this.getMetaVersion());
+                this.getDataMap(), this.getPolicyId(), this.getCreatedBy(), 
this.getMetaVersion());
     }
 
     public String getCreatedBy() {
@@ -114,12 +126,12 @@ public class AlertStreamEvent extends StreamEvent {
         return event;
     }
 
-    public Map<String, Object> getExtraData() {
-        return extraData;
+    public Map<String, Object> getContext() {
+        return context;
     }
 
-    public void setExtraData(Map<String, Object> extraData) {
-        this.extraData = extraData;
+    public void setContext(Map<String, Object> context) {
+        this.context = context;
     }
 
     public String getAlertId() {
@@ -132,4 +144,36 @@ public class AlertStreamEvent extends StreamEvent {
             this.alertId = UUID.randomUUID().toString();
         }
     }
+
+    public String getSubject() {
+        return subject;
+    }
+
+    public void setSubject(String subject) {
+        this.subject = subject;
+    }
+
+    public String getBody() {
+        return body;
+    }
+
+    public void setBody(String body) {
+        this.body = body;
+    }
+
+    public String getCategory() {
+        return category;
+    }
+
+    public void setCategory(String category) {
+        this.category = category;
+    }
+
+    public AlertSeverity getSeverity() {
+        return severity;
+    }
+
+    public void setSeverity(AlertSeverity severity) {
+        this.severity = severity;
+    }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/1c81c086/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/test/java/org/apache/eagle/alert/engine/model/AlertPublishEventTest.java
----------------------------------------------------------------------
diff --git 
a/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/test/java/org/apache/eagle/alert/engine/model/AlertPublishEventTest.java
 
b/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/test/java/org/apache/eagle/alert/engine/model/AlertPublishEventTest.java
index 01eaa3c..5903ffd 100644
--- 
a/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/test/java/org/apache/eagle/alert/engine/model/AlertPublishEventTest.java
+++ 
b/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/test/java/org/apache/eagle/alert/engine/model/AlertPublishEventTest.java
@@ -95,7 +95,7 @@ public class AlertPublishEventTest {
         extraData.put(AlertPublishEvent.SITE_ID_KEY, "SITE_ID_KEY");
         extraData.put(AlertPublishEvent.POLICY_VALUE_KEY, "POLICY_VALUE_KEY");
         extraData.put(AlertPublishEvent.APP_IDS_KEY, Arrays.asList("appId1", 
"appId2"));
-        alertStreamEvent.setExtraData(extraData);
+        alertStreamEvent.setContext(extraData);
 
         alertPublishEvent = 
AlertPublishEvent.createAlertPublishEvent(alertStreamEvent);
         Assert.assertEquals("SITE_ID_KEY", alertPublishEvent.getSiteId());

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/1c81c086/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/test/java/org/apache/eagle/alert/engine/model/AlertStreamEventTest.java
----------------------------------------------------------------------
diff --git 
a/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/test/java/org/apache/eagle/alert/engine/model/AlertStreamEventTest.java
 
b/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/test/java/org/apache/eagle/alert/engine/model/AlertStreamEventTest.java
index eec3675..beaa8fa 100644
--- 
a/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/test/java/org/apache/eagle/alert/engine/model/AlertStreamEventTest.java
+++ 
b/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/test/java/org/apache/eagle/alert/engine/model/AlertStreamEventTest.java
@@ -47,14 +47,21 @@ public class AlertStreamEventTest {
         AlertStreamEvent alertStreamEvent = new AlertStreamEvent();
         alertStreamEvent.setSchema(streamDefinition);
         alertStreamEvent.setData(new Object[]{"namevalue", "hostvalue", "1", 
10, 0.1, -0.2, "{\"name\":\"heap.COMMITTED\", \"Value\":\"175636480\"}", 1});
-        Assert.assertEquals("AlertStreamEvent[stream=NULL,timestamp=1970-01-01 
00:00:00,000,data=[namevalue,hostvalue,1,10,0.1,-0.2,{\"name\":\"heap.COMMITTED\",
 \"Value\":\"175636480\"},1], policyId=null, createdBy=null, 
metaVersion=null]", alertStreamEvent.toString());
-        Assert.assertEquals("{flag=1, data=0.1, name=namevalue, 
host=hostvalue, salary=-0.2, value=10, int=1, 
object={\"name\":\"heap.COMMITTED\", \"Value\":\"175636480\"}}", 
alertStreamEvent.getDataMap().toString());
+        Assert.assertEquals(
+                "Alert {stream=NULL,timestamp=1970-01-01 
00:00:00,000,data={flag=1, data=0.1, name=namevalue, host=hostvalue, 
salary=-0.2, value=10, int=1, object={\"name\":\"heap.COMMITTED\", 
\"Value\":\"175636480\"}}, policyId=null, createdBy=null, metaVersion=null}",
+                alertStreamEvent.toString());
+        Assert.assertEquals(
+                "{flag=1, data=0.1, name=namevalue, host=hostvalue, 
salary=-0.2, value=10, int=1, object={\"name\":\"heap.COMMITTED\", 
\"Value\":\"175636480\"}}",
+                alertStreamEvent.getDataMap().toString());
 
         AlertStreamEvent alertStreamEvent1 = new 
AlertStreamEvent(alertStreamEvent);
 
-        Assert.assertEquals("AlertStreamEvent[stream=NULL,timestamp=1970-01-01 
00:00:00,000,data=[namevalue,hostvalue,1,10,0.1,-0.2,{\"name\":\"heap.COMMITTED\",
 \"Value\":\"175636480\"},1], policyId=null, createdBy=null, 
metaVersion=null]", alertStreamEvent1.toString());
-        Assert.assertEquals("{flag=1, data=0.1, name=namevalue, 
host=hostvalue, salary=-0.2, value=10, int=1, 
object={\"name\":\"heap.COMMITTED\", \"Value\":\"175636480\"}}", 
alertStreamEvent1.getDataMap().toString());
-
+        Assert.assertEquals(
+                "Alert {stream=NULL,timestamp=1970-01-01 
00:00:00,000,data={flag=1, data=0.1, name=namevalue, host=hostvalue, 
salary=-0.2, value=10, int=1, object={\"name\":\"heap.COMMITTED\", 
\"Value\":\"175636480\"}}, policyId=null, createdBy=null, metaVersion=null}",
+                alertStreamEvent1.toString());
+        Assert.assertEquals(
+                "{flag=1, data=0.1, name=namevalue, host=hostvalue, 
salary=-0.2, value=10, int=1, object={\"name\":\"heap.COMMITTED\", 
\"Value\":\"175636480\"}}",
+                alertStreamEvent1.getDataMap().toString());
 
         Assert.assertFalse(alertStreamEvent1 == alertStreamEvent);
         Assert.assertTrue(alertStreamEvent1.equals(alertStreamEvent));

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/1c81c086/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/interpreter/PolicyExecutionPlanner.java
----------------------------------------------------------------------
diff --git 
a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/interpreter/PolicyExecutionPlanner.java
 
b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/interpreter/PolicyExecutionPlanner.java
index 9e8f9f1..b8e5e42 100644
--- 
a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/interpreter/PolicyExecutionPlanner.java
+++ 
b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/interpreter/PolicyExecutionPlanner.java
@@ -16,6 +16,9 @@
  */
 package org.apache.eagle.alert.engine.interpreter;
 
+/**
+ * Keep PolicyExecutionPlanner as simple and fast as possible (avoid any 
backend data exchanging).
+ */
 interface PolicyExecutionPlanner {
     /**
      * @return PolicyExecutionPlan.

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/1c81c086/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/AlertStreamFilter.java
----------------------------------------------------------------------
diff --git 
a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/AlertStreamFilter.java
 
b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/AlertStreamFilter.java
new file mode 100644
index 0000000..71c2a8e
--- /dev/null
+++ 
b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/AlertStreamFilter.java
@@ -0,0 +1,26 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.eagle.alert.engine.publisher;
+
+import org.apache.eagle.alert.engine.model.AlertStreamEvent;
+
+public interface AlertStreamFilter {
+    /**
+     * Filter Stream Event, if skipped, return null.
+     */
+    AlertStreamEvent filter(AlertStreamEvent event);
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/1c81c086/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/PipeStreamFilter.java
----------------------------------------------------------------------
diff --git 
a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/PipeStreamFilter.java
 
b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/PipeStreamFilter.java
new file mode 100644
index 0000000..a6cc3e5
--- /dev/null
+++ 
b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/PipeStreamFilter.java
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.eagle.alert.engine.publisher;
+
+import org.apache.eagle.alert.engine.model.AlertStreamEvent;
+
+import java.util.ArrayList;
+import java.util.List;
+
+public class PipeStreamFilter implements AlertStreamFilter {
+
+    private final List<AlertStreamFilter> filters;
+
+    public PipeStreamFilter(AlertStreamFilter... filters) {
+        this.filters = new ArrayList<>();
+        for (AlertStreamFilter filter : filters) {
+            this.filters.add(filter);
+        }
+    }
+
+    @Override
+    public AlertStreamEvent filter(AlertStreamEvent event) {
+        AlertStreamEvent current = event;
+        for (AlertStreamFilter filter : this.filters) {
+            if (current == null) {
+                return null;
+            }
+            current = filter.filter(current);
+        }
+        return current;
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/1c81c086/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/PublishConstants.java
----------------------------------------------------------------------
diff --git 
a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/PublishConstants.java
 
b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/PublishConstants.java
index 46cce29..e716fbe 100644
--- 
a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/PublishConstants.java
+++ 
b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/PublishConstants.java
@@ -51,11 +51,14 @@ public class PublishConstants {
     public static final String ALERT_EMAIL_ALERTLIST_PROPERTY = "alertList";
     public static final String ALERT_EMAIL_ORIGIN_PROPERTY = 
"alertEmailOrigin";
 
-    public static final String ALERT_EMAIL_MESSAGE = "alertMessage";
+    public static final String ALERT_EMAIL_SUBJECT = "alertSubject";
+    public static final String ALERT_EMAIL_BODY = "alertBody";
     public static final String ALERT_EMAIL_STREAM_ID = "streamId";
-    public static final String ALERT_EMAIL_TIMESTAMP = "alertTime";
+    public static final String ALERT_EMAIL_TIME = "alertTime";
     public static final String ALERT_EMAIL_POLICY_ID = "policyId";
     public static final String ALERT_EMAIL_ALERT_ID = "alertId";
+    public static final String ALERT_EMAIL_ALERT_CATEGORY = "alertCategory";
+    public static final String ALERT_EMAIL_ALERT_SEVERITY = "alertSeverity";
     public static final String ALERT_EMAIL_ALERT_DATA = "alertData";
     public static final String ALERT_EMAIL_ALERT_DATA_DESC = "alertDataDesc";
     public static final String ALERT_EMAIL_CREATOR = "creator";

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/1c81c086/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/email/AlertEmailGenerator.java
----------------------------------------------------------------------
diff --git 
a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/email/AlertEmailGenerator.java
 
b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/email/AlertEmailGenerator.java
index 809bb09..8aaf310 100644
--- 
a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/email/AlertEmailGenerator.java
+++ 
b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/email/AlertEmailGenerator.java
@@ -22,10 +22,9 @@ package org.apache.eagle.alert.engine.publisher.email;
 
 import org.apache.commons.httpclient.URIException;
 import org.apache.commons.httpclient.util.URIUtil;
-import org.apache.eagle.common.DateTimeUtil;
 import org.apache.eagle.alert.engine.model.AlertStreamEvent;
 import org.apache.eagle.alert.engine.publisher.PublishConstants;
-
+import org.apache.eagle.common.DateTimeUtil;
 import org.apache.eagle.common.Version;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -65,7 +64,13 @@ public class AlertEmailGenerator {
         Map<String, String> alertContext = buildAlertContext(event);
         email.setAlertContext(alertContext);
         email.setVelocityTplFile(tplFile);
-        email.setSubject(subject);
+        if (event.getCategory() != null) {
+            email.setSubject(String.format("[Eagle Alert][%s][%s] %s",
+                event.getSeverity(), event.getCategory(), event.getSubject() 
!= null ? event.getSubject() : subject));
+        } else {
+            email.setSubject(String.format("[Eagle Alert][%s] %s",
+                event.getSeverity(), event.getSubject() != null ? 
event.getSubject() : subject));
+        }
         email.setSender(sender);
         email.setRecipients(recipients);
         email.setCc(cc);
@@ -94,11 +99,12 @@ public class AlertEmailGenerator {
         return status;
     }
 
-    /**
-     * TODO Support template-based alert message.
-     */
-    private String renderAlertMessage(AlertStreamEvent event) {
-        return String.format("Alert policy \"%s\" was triggered: 
%s",event.getPolicyId(), generateAlertDataDesc(event));
+    private String getAlertBody(AlertStreamEvent event) {
+        if (event.getBody() == null) {
+            return String.format("Alert policy \"%s\" was triggered: %s", 
event.getPolicyId(), generateAlertDataDesc(event));
+        } else {
+            return event.getBody();
+        }
     }
 
     private String generateAlertDataDesc(AlertStreamEvent event) {
@@ -106,7 +112,7 @@ public class AlertEmailGenerator {
             return "N/A";
         }
         StringBuilder sb = new StringBuilder();
-        for (Map.Entry<String,Object> entry : event.getDataMap().entrySet()) {
+        for (Map.Entry<String, Object> entry : event.getDataMap().entrySet()) {
             
sb.append(entry.getKey()).append("=").append(entry.getValue()).append(" ");
         }
         return sb.toString();
@@ -114,31 +120,45 @@ public class AlertEmailGenerator {
 
     private Map<String, String> buildAlertContext(AlertStreamEvent event) {
         Map<String, String> alertContext = new HashMap<>();
-        alertContext.put(PublishConstants.ALERT_EMAIL_MESSAGE, 
renderAlertMessage(event));
+
+        if (event.getContext() != null) {
+            for (Map.Entry<String, Object> entry : 
event.getContext().entrySet()) {
+                if (entry.getValue() == null) {
+                    alertContext.put(entry.getKey(), "N/A");
+                } else {
+                    alertContext.put(entry.getKey(), 
entry.getValue().toString());
+                }
+            }
+        }
+
+        alertContext.put(PublishConstants.ALERT_EMAIL_SUBJECT, 
event.getSubject());
+        alertContext.put(PublishConstants.ALERT_EMAIL_BODY, 
getAlertBody(event));
         alertContext.put(PublishConstants.ALERT_EMAIL_POLICY_ID, 
event.getPolicyId());
         alertContext.put(PublishConstants.ALERT_EMAIL_ALERT_ID, 
event.getAlertId());
         alertContext.put(PublishConstants.ALERT_EMAIL_ALERT_DATA, 
event.getDataMap().toString());
         alertContext.put(PublishConstants.ALERT_EMAIL_ALERT_DATA_DESC, 
generateAlertDataDesc(event));
-        alertContext.put(PublishConstants.ALERT_EMAIL_TIMESTAMP, 
DateTimeUtil.millisecondsToHumanDateWithSeconds(event.getCreatedTime()));
+        alertContext.put(PublishConstants.ALERT_EMAIL_ALERT_CATEGORY, 
event.getCategory());
+        alertContext.put(PublishConstants.ALERT_EMAIL_ALERT_SEVERITY, 
event.getSeverity().toString());
+        alertContext.put(PublishConstants.ALERT_EMAIL_TIME, 
DateTimeUtil.millisecondsToHumanDateWithSeconds(event.getCreatedTime()));
         alertContext.put(PublishConstants.ALERT_EMAIL_STREAM_ID, 
event.getStreamId());
         alertContext.put(PublishConstants.ALERT_EMAIL_CREATOR, 
event.getCreatedBy());
         alertContext.put(PublishConstants.ALERT_EMAIL_VERSION, 
Version.version);
 
-        String rootUrl = this.getServerPort() == 80 ? 
String.format("http://%s",this.getServerHost())
-            : String.format("http://%s:%s",this.getServerHost(), 
this.getServerPort());
+        String rootUrl = this.getServerPort() == 80 ? 
String.format("http://%s";, this.getServerHost())
+            : String.format("http://%s:%s";, this.getServerHost(), 
this.getServerPort());
         try {
             alertContext.put(PublishConstants.ALERT_EMAIL_ALERT_DETAIL_URL,
-                String.format("%s/#/alert/detail/%s", rootUrl, 
URIUtil.encodeQuery(event.getAlertId(),"UTF-8")));
+                String.format("%s/#/alert/detail/%s", rootUrl, 
URIUtil.encodeQuery(event.getAlertId(), "UTF-8")));
             alertContext.put(PublishConstants.ALERT_EMAIL_POLICY_DETAIL_URL,
-                String.format("%s/#/policy/detail/%s",rootUrl, 
URIUtil.encodeQuery(event.getPolicyId(),"UTF-8")));
+                String.format("%s/#/policy/detail/%s", rootUrl, 
URIUtil.encodeQuery(event.getPolicyId(), "UTF-8")));
         } catch (URIException e) {
-            LOG.warn(e.getMessage(),e);
+            LOG.warn(e.getMessage(), e);
             alertContext.put(PublishConstants.ALERT_EMAIL_ALERT_DETAIL_URL,
                 String.format("%s/#/alert/detail/%s", rootUrl, 
event.getAlertId()));
             alertContext.put(PublishConstants.ALERT_EMAIL_POLICY_DETAIL_URL,
-                String.format("%s/#/policy/detail/%s",rootUrl, 
event.getPolicyId()));
+                String.format("%s/#/policy/detail/%s", rootUrl, 
event.getPolicyId()));
         }
-        alertContext.put(PublishConstants.ALERT_EMAIL_HOME_URL,rootUrl);
+        alertContext.put(PublishConstants.ALERT_EMAIL_HOME_URL, rootUrl);
         return alertContext;
     }
 

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/1c81c086/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/impl/AlertEmailPublisher.java
----------------------------------------------------------------------
diff --git 
a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/impl/AlertEmailPublisher.java
 
b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/impl/AlertEmailPublisher.java
index 7431d35..d81ec2a 100644
--- 
a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/impl/AlertEmailPublisher.java
+++ 
b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/impl/AlertEmailPublisher.java
@@ -158,7 +158,9 @@ public class AlertEmailPublisher extends 
AbstractPublishPlugin {
     private AlertEmailGenerator createEmailGenerator(Map<String, Object> 
notificationConfig) {
         String tplFileName = (String) 
notificationConfig.get(PublishConstants.TEMPLATE);
         if (tplFileName == null || tplFileName.equals("")) {
-            tplFileName = "ALERT_DEFAULT.vm";
+            // tplFileName = "ALERT_DEFAULT_TEMPLATE.vm";
+            // tplFileName = "ALERT_LIGHT_TEMPLATE.vm";
+            tplFileName = "ALERT_INLINED_TEMPLATE.vm";
         }
         String subject = (String) 
notificationConfig.get(PublishConstants.SUBJECT);
         if (subject == null) {

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/1c81c086/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/template/AlertContextFields.java
----------------------------------------------------------------------
diff --git 
a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/template/AlertContextFields.java
 
b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/template/AlertContextFields.java
new file mode 100644
index 0000000..9f85952
--- /dev/null
+++ 
b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/template/AlertContextFields.java
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.eagle.alert.engine.publisher.template;
+
+import java.util.Arrays;
+import java.util.List;
+
+public class AlertContextFields {
+    public static final String STREAM_ID = "STREAM_ID";
+    public static final String ALERT_ID = "ALERT_ID";
+    public static final String CREATED_BY = "CREATED_BY";
+    public static final String POLICY_ID = "POLICY_ID";
+    public static final String CREATED_TIMESTAMP = "CREATED_TIMESTAMP";
+    public static final String CREATED_TIME = "CREATED_TIME";
+    public static final String ALERT_TIMESTAMP = "ALERT_TIMESTAMP";
+    public static final String ALERT_TIME = "ALERT_TIME";
+    public static final String ALERT_SCHEMA = "ALERT_SCHEMA";
+    public static final String ALERT_EVENT = "ALERT_EVENT";
+    public static final String POLICY_DESC = "POLICY_DESC";
+    public static final String POLICY_TYPE = "POLICY_TYPE";
+    public static final String POLICY_DEFINITION = "POLICY_DEFINITION";
+    public static final String POLICY_HANDLER = "POLICY_HANDLER";
+
+    public static List<String> getAllContextFields() {
+        return Arrays.asList(
+            STREAM_ID, ALERT_ID, CREATED_BY, POLICY_ID, CREATED_TIMESTAMP, 
CREATED_TIME, ALERT_TIMESTAMP, ALERT_TIME, ALERT_SCHEMA, POLICY_DESC, 
POLICY_TYPE, POLICY_DEFINITION, POLICY_HANDLER
+        );
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/1c81c086/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/template/AlertTemplateEngine.java
----------------------------------------------------------------------
diff --git 
a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/template/AlertTemplateEngine.java
 
b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/template/AlertTemplateEngine.java
new file mode 100644
index 0000000..760ec7c
--- /dev/null
+++ 
b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/template/AlertTemplateEngine.java
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.eagle.alert.engine.publisher.template;
+
+import com.typesafe.config.Config;
+import org.apache.eagle.alert.engine.publisher.AlertStreamFilter;
+import org.apache.eagle.alert.engine.coordinator.PolicyDefinition;
+
+import java.util.Collection;
+
+/**
+ * Alert Template Engine.
+ */
+public interface AlertTemplateEngine extends AlertStreamFilter {
+    /**
+     * Initialize AlertTemplateEngine with Config.
+     */
+    void init(Config config);
+
+    /**
+     * Register policy with definition.
+     */
+    void register(PolicyDefinition policyDefinition);
+
+    /**
+     * Register policy by policyId.
+     */
+    void unregister(String policyId);
+
+    /**
+     * @return registered policy definitions.
+     */
+    Collection<PolicyDefinition> getPolicies();
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/1c81c086/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/template/AlertTemplateProvider.java
----------------------------------------------------------------------
diff --git 
a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/template/AlertTemplateProvider.java
 
b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/template/AlertTemplateProvider.java
new file mode 100644
index 0000000..875facb
--- /dev/null
+++ 
b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/template/AlertTemplateProvider.java
@@ -0,0 +1,23 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.eagle.alert.engine.publisher.template;
+
+public class AlertTemplateProvider {
+    public static AlertTemplateEngine createAlertTemplateEngine() {
+        return new VelocityAlertTemplateEngine();
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/1c81c086/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/template/VelocityAlertTemplateEngine.java
----------------------------------------------------------------------
diff --git 
a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/template/VelocityAlertTemplateEngine.java
 
b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/template/VelocityAlertTemplateEngine.java
new file mode 100644
index 0000000..a019ca6
--- /dev/null
+++ 
b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/template/VelocityAlertTemplateEngine.java
@@ -0,0 +1,170 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.eagle.alert.engine.publisher.template;
+
+import com.google.common.base.Preconditions;
+import com.typesafe.config.Config;
+import org.apache.eagle.alert.engine.coordinator.AlertDefinition;
+import org.apache.eagle.alert.engine.coordinator.PolicyDefinition;
+import org.apache.eagle.alert.engine.model.AlertStreamEvent;
+import org.apache.eagle.common.DateTimeUtil;
+import org.apache.velocity.Template;
+import org.apache.velocity.VelocityContext;
+import org.apache.velocity.app.Velocity;
+import org.apache.velocity.app.VelocityEngine;
+import org.apache.velocity.runtime.RuntimeConstants;
+import org.apache.velocity.runtime.resource.loader.StringResourceLoader;
+import org.apache.velocity.runtime.resource.util.StringResourceRepository;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.io.StringWriter;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.Map;
+
+public class VelocityAlertTemplateEngine implements AlertTemplateEngine {
+    private static final String ALERT_BODY_TPL_PREFIX = "AlertBodyTemplate";
+    private static final String ALERT_SUBJECT_TPL_PREFIX = 
"AlertSubjectTemplate";
+    private static final Logger LOG = 
LoggerFactory.getLogger(VelocityAlertTemplateEngine.class);
+    private StringResourceRepository stringResourceRepository;
+    private Map<String, PolicyDefinition> policyDefinitionRepository;
+    private VelocityEngine engine;
+
+
+    @Override
+    public void init(Config config) {
+        engine = new VelocityEngine();
+        engine.setProperty(RuntimeConstants.RUNTIME_LOG_LOGSYSTEM_CLASS, 
"org.apache.velocity.runtime.log.Log4JLogChute");
+        engine.setProperty("runtime.log.logsystem.log4j.logger", 
LOG.getName());
+        engine.setProperty(Velocity.RESOURCE_LOADER, "string");
+        engine.addProperty("string.resource.loader.class", 
StringResourceLoader.class.getName());
+        engine.addProperty("string.resource.loader.repository.static", 
"false");
+        engine.init();
+
+        stringResourceRepository = (StringResourceRepository) 
engine.getApplicationAttribute(StringResourceLoader.REPOSITORY_NAME_DEFAULT);
+        policyDefinitionRepository = new HashMap<>();
+    }
+
+    private String getAlertBodyTemplateName(String policyId) {
+        return String.format("%s:%s", ALERT_BODY_TPL_PREFIX, policyId);
+    }
+
+    private String getAlertSubjectTemplateName(String policyId) {
+        return String.format("%s:%s", ALERT_SUBJECT_TPL_PREFIX, policyId);
+    }
+
+    @Override
+    public synchronized void register(PolicyDefinition policyDefinition) {
+        LOG.info("Registering {}", policyDefinition.getName());
+        Preconditions.checkNotNull(policyDefinition.getName(), "policyId is 
null");
+        AlertDefinition alertDefinition = 
policyDefinition.getAlertDefinition();
+        if (alertDefinition == null) {
+            LOG.warn("Subject template of policy {} is null, using policy name 
by default");
+            
stringResourceRepository.putStringResource(getAlertSubjectTemplateName(policyDefinition.getName()),
 policyDefinition.getName());
+
+            LOG.warn("Body template of policy {} is null, using $ALERT_EVENT 
by default");
+            String defaultAlertBodyTmpl = String.format("Message: $%s 
(Auto-generated alert message as template not defined in policy %s)",
+                AlertContextFields.ALERT_EVENT, policyDefinition.getName());
+            
stringResourceRepository.putStringResource(getAlertBodyTemplateName(policyDefinition.getName()),
 defaultAlertBodyTmpl);
+        } else if 
(alertDefinition.getTemplateType().equals(AlertDefinition.TemplateType.TEXT)) {
+            if (alertDefinition.getSubject() != null) {
+                
stringResourceRepository.putStringResource(getAlertSubjectTemplateName(policyDefinition.getName()),
 alertDefinition.getSubject());
+            } else {
+                LOG.warn("Subject template of policy {} is null, using policy 
name by default");
+                
stringResourceRepository.putStringResource(getAlertSubjectTemplateName(policyDefinition.getName()),
 policyDefinition.getName());
+            }
+            if (alertDefinition.getBody() != null) {
+                
stringResourceRepository.putStringResource(getAlertBodyTemplateName(policyDefinition.getName()),
 alertDefinition.getBody());
+            } else {
+                LOG.warn("Body template of policy {} is null, using 
ALERT_EVENT by default");
+                
stringResourceRepository.putStringResource(getAlertBodyTemplateName(policyDefinition.getName()),
 "$" + AlertContextFields.ALERT_EVENT);
+            }
+        } else {
+            throw new IllegalArgumentException("Unsupported alert template 
type " + alertDefinition.getTemplateType());
+        }
+        policyDefinitionRepository.put(policyDefinition.getName(), 
policyDefinition);
+    }
+
+    @Override
+    public synchronized void unregister(String policyId) {
+        LOG.info("Unregistering {}", policyId);
+        
stringResourceRepository.removeStringResource(getAlertBodyTemplateName(policyId));
+        
stringResourceRepository.removeStringResource(getAlertSubjectTemplateName(policyId));
+        policyDefinitionRepository.remove(policyId);
+    }
+
+    @Override
+    public synchronized AlertStreamEvent filter(AlertStreamEvent event) {
+        
Preconditions.checkArgument(this.policyDefinitionRepository.containsKey(event.getPolicyId()),
 "Unknown policyId " + event.getPolicyId());
+        PolicyDefinition policyDefinition = 
this.policyDefinitionRepository.get(event.getPolicyId());
+        StringWriter bodyWriter = new StringWriter();
+        StringWriter subjectWriter = new StringWriter();
+        try {
+            VelocityContext alertContext = buildAlertContext(policyDefinition, 
event);
+            Template template = 
engine.getTemplate(getAlertBodyTemplateName(event.getPolicyId()));
+            template.merge(alertContext, bodyWriter);
+            event.setBody(bodyWriter.toString());
+
+            template = 
engine.getTemplate(getAlertSubjectTemplateName(event.getPolicyId()));
+            template.merge(alertContext, subjectWriter);
+            event.setSubject(subjectWriter.toString());
+        } finally {
+            try {
+                bodyWriter.close();
+            } catch (IOException e) {
+                LOG.warn(e.getMessage(), e);
+            }
+            try {
+                subjectWriter.close();
+            } catch (IOException e) {
+                LOG.warn(e.getMessage(), e);
+            }
+        }
+        return event;
+    }
+
+    @Override
+    public synchronized Collection<PolicyDefinition> getPolicies() {
+        return policyDefinitionRepository.values();
+    }
+
+    private static VelocityContext buildAlertContext(PolicyDefinition 
policyDefinition, AlertStreamEvent event) {
+        VelocityContext context = new VelocityContext();
+        context.put(AlertContextFields.STREAM_ID, event.getStreamId());
+        context.put(AlertContextFields.ALERT_ID, event.getAlertId());
+        context.put(AlertContextFields.CREATED_BY, event.getCreatedBy());
+        context.put(AlertContextFields.CREATED_TIMESTAMP, 
event.getCreatedTime());
+        context.put(AlertContextFields.CREATED_TIME, 
DateTimeUtil.millisecondsToHumanDateWithSeconds(event.getCreatedTime()));
+        context.put(AlertContextFields.ALERT_TIMESTAMP, event.getTimestamp());
+        context.put(AlertContextFields.ALERT_TIME, 
DateTimeUtil.millisecondsToHumanDateWithSeconds(event.getTimestamp()));
+        context.put(AlertContextFields.ALERT_SCHEMA, event.getSchema());
+        context.put(AlertContextFields.ALERT_EVENT, event);
+
+        context.put(AlertContextFields.POLICY_ID, policyDefinition.getName());
+        context.put(AlertContextFields.POLICY_DESC, 
policyDefinition.getDescription());
+        context.put(AlertContextFields.POLICY_TYPE, 
policyDefinition.getDefinition().getType());
+        context.put(AlertContextFields.POLICY_DEFINITION, 
policyDefinition.getDefinition().getValue());
+        context.put(AlertContextFields.POLICY_HANDLER, 
policyDefinition.getDefinition().getHandlerClass());
+
+        for (Map.Entry<String, Object> entry : event.getDataMap().entrySet()) {
+            context.put(entry.getKey(), entry.getValue());
+        }
+        return context;
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/1c81c086/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/template/VelocityTemplateParser.java
----------------------------------------------------------------------
diff --git 
a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/template/VelocityTemplateParser.java
 
b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/template/VelocityTemplateParser.java
new file mode 100644
index 0000000..a824a0d
--- /dev/null
+++ 
b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/template/VelocityTemplateParser.java
@@ -0,0 +1,95 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.eagle.alert.engine.publisher.template;
+
+import org.apache.velocity.Template;
+import org.apache.velocity.VelocityContext;
+import org.apache.velocity.app.Velocity;
+import org.apache.velocity.app.VelocityEngine;
+import org.apache.velocity.exception.MethodInvocationException;
+import org.apache.velocity.exception.ParseErrorException;
+import org.apache.velocity.runtime.RuntimeConstants;
+import org.apache.velocity.runtime.parser.node.ASTReference;
+import org.apache.velocity.runtime.parser.node.ASTprocess;
+import org.apache.velocity.runtime.resource.loader.StringResourceLoader;
+import org.apache.velocity.runtime.resource.util.StringResourceRepository;
+import org.apache.velocity.runtime.visitor.NodeViewMode;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.StringWriter;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+
+public class VelocityTemplateParser {
+    private static final Logger LOG = 
LoggerFactory.getLogger(VelocityTemplateParser.class);
+    private static final String TEMPLATE_NAME = "template";
+    private final Template template;
+    private final ParserNodeVisitor visitor;
+
+    public VelocityTemplateParser(String templateString) throws 
ParseErrorException {
+        VelocityEngine engine = new VelocityEngine();
+        engine.setProperty(RuntimeConstants.RUNTIME_LOG_LOGSYSTEM_CLASS, 
"org.apache.velocity.runtime.log.Log4JLogChute");
+        engine.setProperty("runtime.log.logsystem.log4j.logger", 
LOG.getName());
+        engine.setProperty(Velocity.RESOURCE_LOADER, "string");
+        engine.addProperty("string.resource.loader.class", 
StringResourceLoader.class.getName());
+        engine.addProperty("string.resource.loader.repository.static", 
"false");
+        engine.addProperty("runtime.references.strict", "true");
+        engine.init();
+        StringResourceRepository resourceRepository = 
(StringResourceRepository) 
engine.getApplicationAttribute(StringResourceLoader.REPOSITORY_NAME_DEFAULT);
+        resourceRepository.putStringResource(TEMPLATE_NAME, templateString);
+        template = engine.getTemplate(TEMPLATE_NAME);
+        ASTprocess data = (ASTprocess) template.getData();
+        visitor = new ParserNodeVisitor();
+        data.jjtAccept(visitor, null);
+    }
+
+    public List<String> getReferenceNames() {
+        return this.visitor.getReferenceNames();
+    }
+
+    public Template getTemplate() {
+        return template;
+    }
+
+    /**
+     * @throws MethodInvocationException if required variable is missing in 
context.
+     */
+    public void validateContext(Map<String, Object> context) throws 
MethodInvocationException {
+        VelocityContext velocityContext = new VelocityContext();
+        for (Map.Entry<String, Object> entry : context.entrySet()) {
+            velocityContext.put(entry.getKey(), entry.getValue());
+        }
+        template.merge(velocityContext, new StringWriter());
+    }
+
+    private class ParserNodeVisitor extends NodeViewMode {
+        private List<String> referenceNames = new ArrayList<>();
+
+        @Override
+        public Object visit(ASTReference node, Object data) {
+            referenceNames.add(node.getRootString());
+            return super.visit(node, data);
+        }
+
+        public List<String> getReferenceNames() {
+            return this.referenceNames;
+        }
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/1c81c086/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/runner/AlertPublisherBolt.java
----------------------------------------------------------------------
diff --git 
a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/runner/AlertPublisherBolt.java
 
b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/runner/AlertPublisherBolt.java
index 72eafe4..2b57e96 100644
--- 
a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/runner/AlertPublisherBolt.java
+++ 
b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/runner/AlertPublisherBolt.java
@@ -16,7 +16,13 @@
  */
 package org.apache.eagle.alert.engine.runner;
 
-import org.apache.commons.collections.map.HashedMap;
+import backtype.storm.metric.api.MultiCountMetric;
+import backtype.storm.task.OutputCollector;
+import backtype.storm.task.TopologyContext;
+import backtype.storm.topology.OutputFieldsDeclarer;
+import backtype.storm.tuple.Fields;
+import backtype.storm.tuple.Tuple;
+import com.typesafe.config.Config;
 import org.apache.eagle.alert.coordination.model.PublishSpec;
 import org.apache.eagle.alert.engine.StreamContextImpl;
 import org.apache.eagle.alert.engine.coordinator.*;
@@ -24,16 +30,12 @@ import 
org.apache.eagle.alert.engine.model.AlertPublishEvent;
 import org.apache.eagle.alert.engine.model.AlertStreamEvent;
 import org.apache.eagle.alert.engine.publisher.AlertPublishSpecListener;
 import org.apache.eagle.alert.engine.publisher.AlertPublisher;
+import org.apache.eagle.alert.engine.publisher.AlertStreamFilter;
+import org.apache.eagle.alert.engine.publisher.PipeStreamFilter;
 import org.apache.eagle.alert.engine.publisher.impl.AlertPublisherImpl;
+import org.apache.eagle.alert.engine.publisher.template.AlertTemplateEngine;
+import org.apache.eagle.alert.engine.publisher.template.AlertTemplateProvider;
 import org.apache.eagle.alert.utils.AlertConstants;
-import backtype.storm.metric.api.MultiCountMetric;
-import backtype.storm.task.OutputCollector;
-import backtype.storm.task.TopologyContext;
-import backtype.storm.topology.OutputFieldsDeclarer;
-import backtype.storm.tuple.Fields;
-import backtype.storm.tuple.Tuple;
-import com.typesafe.config.Config;
-
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -41,22 +43,24 @@ import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.stream.Collectors;
 
-@SuppressWarnings("serial")
 public class AlertPublisherBolt extends AbstractStreamBolt implements 
AlertPublishSpecListener {
     private static final Logger LOG = 
LoggerFactory.getLogger(AlertPublisherBolt.class);
     private final AlertPublisher alertPublisher;
     private volatile Map<String, Publishment> cachedPublishments = new 
HashMap<>();
     private volatile Map<String, PolicyDefinition> policyDefinitionMap;
     private volatile Map<String, StreamDefinition> streamDefinitionMap;
+    private AlertTemplateEngine alertTemplateEngine;
 
     private boolean logEventEnabled;
     private TopologyContext context;
-    
+    private AlertStreamFilter alertFilter;
+
     public AlertPublisherBolt(String alertPublisherName, Config config, 
IMetadataChangeNotifyService coordinatorService) {
         super(alertPublisherName, coordinatorService, config);
         this.alertPublisher = new AlertPublisherImpl(alertPublisherName);
-        
+
         if (config != null && config.hasPath("topology.logEventEnabled")) {
             logEventEnabled = config.getBoolean("topology.logEventEnabled");
         }
@@ -69,23 +73,28 @@ public class AlertPublisherBolt extends AbstractStreamBolt 
implements AlertPubli
         this.alertPublisher.init(config, stormConf);
         streamContext = new StreamContextImpl(config, 
context.registerMetric("eagle.publisher", new MultiCountMetric(), 60), context);
         this.context = context;
+        this.alertTemplateEngine = 
AlertTemplateProvider.createAlertTemplateEngine();
+        this.alertTemplateEngine.init(config);
+        this.alertFilter = new PipeStreamFilter(new 
AlertContextEnrichFilter(this), new AlertTemplateFilter(alertTemplateEngine));
     }
 
     @Override
     public void execute(Tuple input) {
         try {
-            streamContext.counter().scope("receive_count");
+            streamContext.counter().incr("receive_count");
             PublishPartition partition = (PublishPartition) 
input.getValueByField(AlertConstants.FIELD_0);
             AlertStreamEvent event = (AlertStreamEvent) 
input.getValueByField(AlertConstants.FIELD_1);
             if (logEventEnabled) {
                 LOG.info("Alert publish bolt {}/{} with partition {} received 
event: {}", this.getBoltId(), this.context.getThisTaskId(), partition, event);
             }
-            wrapAlertPublishEvent(event);
-            alertPublisher.nextEvent(partition, event);
+            AlertStreamEvent filteredEvent = alertFilter.filter(event);
+            if (filteredEvent != null) {
+                alertPublisher.nextEvent(partition, filteredEvent);
+            }
             this.collector.ack(input);
-            streamContext.counter().scope("ack_count");
-        } catch (Exception ex) {
-            streamContext.counter().scope("fail_count");
+            streamContext.counter().incr("ack_count");
+        } catch (Throwable ex) {
+            streamContext.counter().incr("fail_count");
             LOG.error(ex.getMessage(), ex);
             collector.reportError(ex);
         }
@@ -131,30 +140,79 @@ public class AlertPublisherBolt extends 
AbstractStreamBolt implements AlertPubli
 
     @Override
     public void onAlertPolicyChange(Map<String, PolicyDefinition> pds, 
Map<String, StreamDefinition> sds) {
+        List<String> policyToRemove = new ArrayList<>();
+        if (this.policyDefinitionMap != null) {
+            
policyToRemove.addAll(this.policyDefinitionMap.keySet().stream().filter(policyId
 -> !pds.containsKey(policyId)).collect(Collectors.toList()));
+        }
+
         this.policyDefinitionMap = pds;
         this.streamDefinitionMap = sds;
+
+        for (Map.Entry<String, PolicyDefinition> entry : pds.entrySet()) {
+            try {
+                this.alertTemplateEngine.register(entry.getValue());
+            } catch (Throwable throwable) {
+                LOG.error("Failed to register policy {} in template engine", 
entry.getKey(), throwable);
+            }
+        }
+
+        for (String policyId : policyToRemove) {
+            try {
+                this.alertTemplateEngine.unregister(policyId);
+            } catch (Throwable throwable) {
+                LOG.error("Failed to unregister policy {} from template 
engine", policyId, throwable);
+            }
+        }
     }
 
-    @SuppressWarnings("unchecked")
-    private void wrapAlertPublishEvent(AlertStreamEvent event) {
-        Map<String, Object> extraData = new HashedMap();
-        List<String> appIds = new ArrayList<>();
-        if (policyDefinitionMap == null || streamDefinitionMap == null) {
-            LOG.warn("policyDefinitions or streamDefinitions in publisher bolt 
have not been initialized");
-            return;
+    private class AlertContextEnrichFilter implements AlertStreamFilter {
+        private final AlertPublisherBolt alertPublisherBolt;
+
+        private AlertContextEnrichFilter(AlertPublisherBolt 
alertPublisherBolt) {
+            this.alertPublisherBolt = alertPublisherBolt;
         }
-        PolicyDefinition policyDefinition = 
policyDefinitionMap.get(event.getPolicyId());
-        if (this.policyDefinitionMap != null && policyDefinition != null) {
-            for (String inputStreamId : policyDefinition.getInputStreams()) {
-                StreamDefinition sd = 
this.streamDefinitionMap.get(inputStreamId);
-                if (sd != null) {
-                    extraData.put(AlertPublishEvent.SITE_ID_KEY, 
sd.getSiteId());
-                    appIds.add(sd.getDataSource());
+
+        /**
+         * TODO: Refactor wrapAlertPublishEvent into alertTemplateEngine and 
remove extraData from AlertStreamEvent.
+         */
+        @Override
+        public AlertStreamEvent filter(AlertStreamEvent event) {
+            event.ensureAlertId();
+            Map<String, Object> extraData = new HashMap<>();
+            List<String> appIds = new ArrayList<>();
+            if (alertPublisherBolt.policyDefinitionMap == null || 
alertPublisherBolt.streamDefinitionMap == null) {
+                LOG.warn("policyDefinitions or streamDefinitions in publisher 
bolt have not been initialized");
+            } else {
+                PolicyDefinition policyDefinition = 
alertPublisherBolt.policyDefinitionMap.get(event.getPolicyId());
+                if (alertPublisherBolt.policyDefinitionMap != null && 
policyDefinition != null) {
+                    for (String inputStreamId : 
policyDefinition.getInputStreams()) {
+                        StreamDefinition sd = 
alertPublisherBolt.streamDefinitionMap.get(inputStreamId);
+                        if (sd != null) {
+                            extraData.put(AlertPublishEvent.SITE_ID_KEY, 
sd.getSiteId());
+                            appIds.add(sd.getDataSource());
+                        }
+                    }
+                    extraData.put(AlertPublishEvent.APP_IDS_KEY, appIds);
+                    extraData.put(AlertPublishEvent.POLICY_VALUE_KEY, 
policyDefinition.getDefinition().getValue());
+                    event.setSeverity(policyDefinition.getAlertSeverity());
+                    event.setCategory(policyDefinition.getAlertCategory());
                 }
+                event.setContext(extraData);
             }
-            extraData.put(AlertPublishEvent.APP_IDS_KEY, appIds);
-            extraData.put(AlertPublishEvent.POLICY_VALUE_KEY, 
policyDefinition.getDefinition().getValue());
+            return event;
+        }
+    }
+
+    private class AlertTemplateFilter implements AlertStreamFilter {
+        private final AlertTemplateEngine alertTemplateEngine;
+
+        private AlertTemplateFilter(AlertTemplateEngine alertTemplateEngine) {
+            this.alertTemplateEngine = alertTemplateEngine;
+        }
+
+        @Override
+        public AlertStreamEvent filter(AlertStreamEvent event) {
+            return this.alertTemplateEngine.filter(event);
         }
-        event.setExtraData(extraData);
     }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/1c81c086/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/resources/ALERT_DEFAULT.vm
----------------------------------------------------------------------
diff --git 
a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/resources/ALERT_DEFAULT.vm
 
b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/resources/ALERT_DEFAULT.vm
deleted file mode 100644
index fad3aa9..0000000
--- 
a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/resources/ALERT_DEFAULT.vm
+++ /dev/null
@@ -1,301 +0,0 @@
-<!--
-  ~ Licensed to the Apache Software Foundation (ASF) under one or more
-  ~ contributor license agreements.  See the NOTICE file distributed with
-  ~ this work for additional information regarding copyright ownership.
-  ~ The ASF licenses this file to You under the Apache License, Version 2.0
-  ~ (the "License"); you may not use this file except in compliance with
-  ~ the License.  You may obtain a copy of the License at
-  ~
-  ~    http://www.apache.org/licenses/LICENSE-2.0
-  ~
-  ~ Unless required by applicable law or agreed to in writing, software
-  ~ distributed under the License is distributed on an "AS IS" BASIS,
-  ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-  ~ See the License for the specific language governing permissions and
-  ~ limitations under the License.
-  -->
-<!DOCTYPE html PUBLIC "-//W3C//DTD XHTML 1.0 Strict//EN" 
"http://www.w3.org/TR/xhtml1/DTD/xhtml1-strict.dtd";>
-<html xmlns="http://www.w3.org/1999/xhtml";>
-<head>
-    <meta http-equiv="Content-Type" content="text/html; charset=utf-8"/>
-    <meta name="viewport" content="width=device-width"/>
-    <style>
-        body {
-            width: 100% !important;
-            min-width: 100%;
-            -webkit-text-size-adjust: 100%;
-            -ms-text-size-adjust: 100%;
-            margin: 0;
-            padding: 0;
-        }
-
-        table {
-            border-spacing: 0;
-            border-collapse: collapse;
-        }
-
-        table th,
-        table td {
-            padding: 3px 0 3px 0;
-        }
-
-        .body {
-            width: 100%;
-        }
-
-        p, a, h1, h2, h3, ul, ol, li {
-            font-family: Helvetica, Arial, sans-serif;
-            font-weight: normal;
-            margin: 0;
-            padding: 0;
-        }
-
-        p {
-            font-size: 14px;
-            line-height: 19px;
-        }
-
-        a {
-            color: #3294b1;
-        }
-
-        h1 {
-            font-size: 36px;
-            margin: 15px 0 5px 0;
-        }
-
-        h2 {
-            font-size: 32px;
-        }
-
-        h3 {
-            font-size: 28px;
-        }
-
-        ul, ol {
-            margin: 0 0 0 25px;
-            padding: 0;
-        }
-
-        .btn {
-            background: #2ba6cb !important;
-            border: 1px solid #2284a1;
-            padding: 10px 20px 10px 20px;
-            text-align: center;
-        }
-
-        .btn:hover {
-            background: #2795b6 !important;
-        }
-
-        .btn a {
-            color: #FFFFFF;
-            text-decoration: none;
-            font-weight: bold;
-            padding: 10px 20px 10px 20px;
-        }
-
-        .tableBordered {
-            border-top: 1px solid #b9e5ff;
-        }
-
-        .tableBordered th {
-            background: #ECF8FF;
-        }
-
-        .tableBordered th p {
-            font-weight: bold;
-            color: #3294b1;
-        }
-
-        .tableBordered th,
-        .tableBordered td {
-            color: #333333;
-            border-bottom: 1px solid #b9e5ff;
-            text-align: center;
-            padding-bottom: 5px;
-        }
-
-        .panel {
-            height: 100px;
-        }
-    </style>
-</head>
-<body>
-    #set ( $elem = $alertList[0] )
-<table class="body">
-    <tr>
-        <td align="center" valign="top" style="background: #999999; padding: 0 
0 0 0;">
-            <!-- Header -->
-            <table width="580">
-                <tr>
-                    <td style="padding: 0 0 0 0;" align="left">
-                        <p style="color:#FFFFFF;font-weight: bold; font-size: 
22px">Eagle Alert Notification</p>
-                    </td>
-                </tr>
-            </table>
-        </td>
-    </tr>
-
-    <tr>
-        <td align="center" valign="top">
-            <!-- Eagle Body -->
-            <table width="580">
-                <tr>
-                    <!-- Title -->
-                    <td align="center">
-                        <h2>[Alert] $elem["policyId"]</h2>
-                    </td>
-                </tr>
-                <tr>
-                    <!-- Time -->
-                    <td>
-                        <table width="580">
-                            <tr>
-                                <td>
-                                    <p><b>Detected Time: 
$elem["alertTime"]</b></p>
-                                </td>
-                                #set ( $severity = $elem["severity"] )
-                                #if (!$severity || ("$severity" == ""))
-                                    #set ( $elem["severity"] = "WARNING")
-                                #end
-                                <td align="right">
-                                    <p><b>
-                                        Severity:
-                                        #if ($elem["severity"] == "WARNING")
-                                            <span>$elem["severity"]</span>
-                                        #else
-                                            <span style="color: 
#FF0000;">$elem["severity"]</span>
-                                        #end
-                                    </b></p>
-                                </td>
-                            </tr>
-                        </table>
-                    </td>
-                </tr>
-
-                <tr>
-                    <!-- Basic Information -->
-                    <td style="padding: 20px 0 10px 0;">
-                        <p><b>Alert Message </b></p>
-                    </td>
-                </tr>
-                <tr>
-                    <!-- Description -->
-                    <td valign="top"
-                        style="background: #ECF8FF; border: 1px solid #b9e5ff; 
padding: 10px 10px 12px 20px;">
-                        <p>$elem["alertMessage"]</p>
-                    </td>
-                </tr>
-                <tr>
-                    <!-- Basic Information -->
-                    <td style="padding: 20px 0 10px 0;">
-                        <p><b>Alert Detail</b></p>
-                    </td>
-                </tr>
-                <tr>
-                    <!-- Basic Information Content -->
-                    <td>
-                        <table class="tableBordered" width="580">
-                            <tr>
-                                <th>
-                                    <p>Policy Name</p>
-                                </th>
-                                <td>
-                                    <p><a 
href="$elem["policyDetailUrl"]">$elem["policyId"]</a></p>
-                                </td>
-                            </tr>
-                            <tr>
-                                <th>
-                                    <p>Severity Level</p>
-                                </th>
-                                <td>
-                                    <p>$elem["severity"]</p>
-                                </td>
-                            </tr>
-                            <tr>
-                                <th>
-                                    <p>Alert Stream</p>
-                                </th>
-                                <td>
-                                    <p>$elem["streamId"]</p>
-                                </td>
-                            </tr>
-                            <tr>
-                                <th>
-                                    <p>Created Time</p>
-                                </th>
-                                <td>
-                                    <p>$elem["alertTime"]</p>
-                                </td>
-                            </tr>
-                            <tr>
-                                <th>
-                                    <p>Created By</p>
-                                </th>
-                                <td>
-                                    <p>$elem["creator"]</p>
-                                </td>
-                            </tr>
-                        </table>
-                    </td>
-                </tr>
-##                <tr>
-##                    <!-- View Detail -->
-##                    <td align="center" style="padding: 10px 0 0 0;">
-##                        <table width="580">
-##                            <tr>
-##                                <td class="btn">
-##                                    <a href="$elem["policyDetailUrl"]">View 
Policy Details</a>
-##                                </td>
-##                            </tr>
-##                        </table>
-##                    </td>
-##                </tr>
-
-                <tr>
-                    <!-- View Detail -->
-                    <td align="center" style="padding: 10px 0 0 0;">
-                        <table width="580">
-                            <tr>
-                                <td class="btn">
-                                    <a href="$elem["alertDetailUrl"]">View 
Alert on Eagle</a>
-                                </td>
-                            </tr>
-                        </table>
-                    </td>
-                </tr>
-                <tr>
-                    <!-- Actions Required -->
-                    <td style="padding: 20px 0 10px 0;">
-                        <p><b>Actions Required</b></p>
-                    </td>
-                </tr>
-                <tr>
-                    <!-- Possible Root Causes Content -->
-                    <td class="panel" valign="top"
-                        style="background: #F4F4F4; border: 1px solid #AAAAAA; 
padding: 10px 10px 12px 10px;">
-                        <p>
-                            The alert notification was automatically detected 
and sent by Eagle according to policy: $elem["policyId"].
-                            To follow-up on this, please verify the alert and 
diagnose the root cause with Eagle:
-                        </p>
-                        <p></p>
-                        <ul>
-                            <li><p><a href="$elem["alertDetailUrl"]">View 
alert detail</a></p></li>
-                            <li><p><a href="$elem["policyDetailUrl"]">View 
policy detail</a></p></li>
-                            <li><p><a href="$elem["homeUrl"]">View eagle 
home</a></p></li>
-                        </ul>
-                    </td>
-                </tr>
-                <tr>
-                    <!-- Copyright -->
-                    <td align="center">
-                        <p><i>Powered by <a 
href="http://eagle.incubator.apache.org";>Apache Eagle</a> (version: 
$elem["version"])</i></p>
-                    </td>
-                </tr>
-            </table>
-        </td>
-    </tr>
-</table>
-</body>
-</html>
\ No newline at end of file


Reply via email to