Repository: incubator-eagle
Updated Branches:
  refs/heads/master aeec7f389 -> 70600b260


EAGLE-794: support publish bolt parallelism

Author: Li, Garrett
Reviewer: ralphsu

This closes #687


Project: http://git-wip-us.apache.org/repos/asf/incubator-eagle/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-eagle/commit/70600b26
Tree: http://git-wip-us.apache.org/repos/asf/incubator-eagle/tree/70600b26
Diff: http://git-wip-us.apache.org/repos/asf/incubator-eagle/diff/70600b26

Branch: refs/heads/master
Commit: 70600b2609c20c3f1385e60813823abf48c63c03
Parents: aeec7f3
Author: Xiancheng Li <xiancheng...@ebay.com>
Authored: Fri Nov 25 09:47:41 2016 +0800
Committer: Ralph, Su <suliang...@gmail.com>
Committed: Mon Nov 28 20:37:22 2016 +0800

----------------------------------------------------------------------
 .../alert/coordination/model/AlertBoltSpec.java |  22 ++-
 .../engine/coordinator/PublishPartition.java    | 121 +++++++++++++
 .../alert/engine/coordinator/Publishment.java   |  34 ++--
 .../impl/MonitorMetadataGenerator.java          |  18 +-
 .../impl/AlertBoltOutputCollectorWrapper.java   |  54 +++++-
 .../alert/engine/publisher/AlertPublisher.java  |  11 +-
 .../publisher/impl/AbstractPublishPlugin.java   |   1 -
 .../publisher/impl/AlertPublisherImpl.java      | 174 +++++++------------
 .../engine/router/AlertBoltSpecListener.java    |   4 +-
 .../eagle/alert/engine/runner/AlertBolt.java    |  29 +++-
 .../alert/engine/runner/AlertPublisherBolt.java |  19 +-
 .../alert/engine/runner/UnitTopologyRunner.java |   3 +-
 .../alert/engine/router/TestAlertBolt.java      |  13 +-
 .../engine/router/TestAlertPublisherBolt.java   |  19 +-
 .../engine/statecheck/TestStateCheckPolicy.java |  15 +-
 .../src/test/resources/publishments2.json       |   3 +-
 16 files changed, 387 insertions(+), 153 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/70600b26/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/coordination/model/AlertBoltSpec.java
----------------------------------------------------------------------
diff --git 
a/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/coordination/model/AlertBoltSpec.java
 
b/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/coordination/model/AlertBoltSpec.java
index b3adda5..19a803b 100644
--- 
a/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/coordination/model/AlertBoltSpec.java
+++ 
b/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/coordination/model/AlertBoltSpec.java
@@ -17,12 +17,17 @@
 package org.apache.eagle.alert.coordination.model;
 
 import org.apache.eagle.alert.engine.coordinator.PolicyDefinition;
+import org.apache.eagle.alert.engine.coordinator.PublishPartition;
 
 import com.fasterxml.jackson.annotation.JsonIgnore;
+import com.google.common.base.Joiner;
+
 import java.util.ArrayList;
 import java.util.HashMap;
+import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
+import java.util.Set;
 
 /**
  * The alert specification for topology bolts.
@@ -40,6 +45,8 @@ public class AlertBoltSpec {
     // mapping from boltId to list of PolicyDefinition's Ids
     private Map<String, List<String>> boltPolicyIdsMap = new HashMap<String, 
List<String>>();
 
+    private Set<PublishPartition> publishPartitions = new HashSet<>();
+
     public AlertBoltSpec() {
     }
 
@@ -87,6 +94,18 @@ public class AlertBoltSpec {
         }
     }
 
+    public Set<PublishPartition> getPublishPartitions() {
+        return publishPartitions;
+    }
+
+    public void setPublishPartitions(Set<PublishPartition> publishPartitions) {
+        this.publishPartitions = publishPartitions;
+    }
+
+    public void addPublishPartition(String streamId, String policyId, String 
publishId, Set<String> columns) {
+        this.publishPartitions.add(new PublishPartition(streamId, policyId, 
publishId, columns));
+    }
+
     @JsonIgnore
     public Map<String, List<PolicyDefinition>> getBoltPoliciesMap() {
         return boltPoliciesMap;
@@ -107,7 +126,8 @@ public class AlertBoltSpec {
 
     @Override
     public String toString() {
-        return String.format("version:%s-topo:%s, boltPolicyIdsMap %s", 
version, topologyName, boltPolicyIdsMap);
+        return String.format("version:%s-topo:%s, boltPolicyIdsMap %s, 
publishPartitions %s",
+            version, topologyName, boltPolicyIdsMap, 
Joiner.on(",").join(publishPartitions));
     }
 
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/70600b26/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/engine/coordinator/PublishPartition.java
----------------------------------------------------------------------
diff --git 
a/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/engine/coordinator/PublishPartition.java
 
b/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/engine/coordinator/PublishPartition.java
new file mode 100644
index 0000000..7e57f88
--- /dev/null
+++ 
b/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/engine/coordinator/PublishPartition.java
@@ -0,0 +1,121 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.eagle.alert.engine.coordinator;
+
+import java.io.Serializable;
+import java.util.HashSet;
+import java.util.Set;
+
+import org.apache.commons.collections.CollectionUtils;
+import org.apache.commons.lang3.builder.HashCodeBuilder;
+
+import com.fasterxml.jackson.annotation.JsonIgnore;
+import com.google.common.base.Objects;
+
+public class PublishPartition implements Serializable {
+
+    private static final long serialVersionUID = 2524776632955586234L;
+
+    private String policyId;
+    private String streamId;
+    private String publishId;
+    private Set<String> columns = new HashSet<>();
+
+    @JsonIgnore
+    private Set<Object> columnValues = new HashSet<>();
+
+    public PublishPartition() {
+    }
+
+    public PublishPartition(String streamId, String policyId, String 
publishId, Set<String> columns) {
+        this.streamId = streamId;
+        this.policyId = policyId;
+        this.publishId = publishId;
+        if (columns != null) {
+            this.columns = columns;
+        }
+    }
+
+    @Override
+    public int hashCode() {
+        return new 
HashCodeBuilder().append(streamId).append(policyId).append(publishId).append(columns).append(columnValues).build();
+    }
+
+    @Override
+    public boolean equals(Object obj) {
+        return obj instanceof PublishPartition
+            && Objects.equal(this.streamId, ((PublishPartition) 
obj).getStreamId())
+            && Objects.equal(this.policyId, ((PublishPartition) 
obj).getPolicyId())
+            && Objects.equal(this.publishId, ((PublishPartition) 
obj).getPublishId())
+            && CollectionUtils.isEqualCollection(this.columns, 
((PublishPartition) obj).getColumns())
+            && CollectionUtils.isEqualCollection(this.columnValues, 
((PublishPartition) obj).getColumnValues());
+    }
+
+    @Override
+    public String toString() {
+        return 
String.format("PublishPartition[policyId=%s,streamId=%s,publishId=%s,columns=%s,columnValues=%s]",
+            policyId, streamId, publishId, columns, columnValues);
+    }
+
+    @Override
+    public PublishPartition clone() {
+        return new PublishPartition(this.streamId, this.policyId, 
this.publishId, new HashSet<>(this.columns));
+    }
+
+    public String getPolicyId() {
+        return policyId;
+    }
+
+    public void setPolicyId(String policyId) {
+        this.policyId = policyId;
+    }
+
+    public String getStreamId() {
+        return streamId;
+    }
+
+    public void setStreamId(String streamId) {
+        this.streamId = streamId;
+    }
+
+    public String getPublishId() {
+        return publishId;
+    }
+
+    public void setPublishId(String publishId) {
+        this.publishId = publishId;
+    }
+
+    public Set<String> getColumns() {
+        return columns;
+    }
+
+    public void setColumns(Set<String> columns) {
+        this.columns = columns;
+    }
+
+    @JsonIgnore
+    public Set<Object> getColumnValues() {
+        return columnValues;
+    }
+
+    @JsonIgnore
+    public void setColumnValues(Set<Object> columnValues) {
+        this.columnValues = columnValues;
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/70600b26/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/engine/coordinator/Publishment.java
----------------------------------------------------------------------
diff --git 
a/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/engine/coordinator/Publishment.java
 
b/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/engine/coordinator/Publishment.java
index d1cc33a..74a3d69 100644
--- 
a/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/engine/coordinator/Publishment.java
+++ 
b/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/engine/coordinator/Publishment.java
@@ -19,9 +19,11 @@ package org.apache.eagle.alert.engine.coordinator;
 import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
 import org.apache.commons.lang3.builder.HashCodeBuilder;
 
+import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
 import java.util.Objects;
+import java.util.Set;
 
 /**
  * @since Apr 11, 2016.
@@ -29,6 +31,8 @@ import java.util.Objects;
 @JsonIgnoreProperties(ignoreUnknown = true)
 public class Publishment {
 
+    public static final String STREAM_NAME_DEFAULT = "_default";
+
     private String name;
     private String type;
     private List<String> policyIds;
@@ -42,6 +46,16 @@ public class Publishment {
     // the class name to extend the IEventSerializer interface
     private String serializer;
 
+    private Set<String> partitionColumns = new HashSet<>();
+
+    public Set<String> getPartitionColumns() {
+        return partitionColumns;
+    }
+
+    public void setPartitionColumns(Set<String> partitionColumns) {
+        this.partitionColumns = partitionColumns;
+    }
+
     public String getName() {
         return name;
     }
@@ -135,13 +149,13 @@ public class Publishment {
         if (obj instanceof Publishment) {
             Publishment p = (Publishment) obj;
             return (Objects.equals(name, p.getName()) && Objects.equals(type, 
p.getType())
-                    && Objects.equals(dedupIntervalMin, 
p.getDedupIntervalMin())
-                    && Objects.equals(dedupFields, p.getDedupFields())
-                    && Objects.equals(dedupStateField, p.getDedupStateField())
-                    && Objects.equals(overrideDeduplicator, 
p.getOverrideDeduplicator())
-                    && Objects.equals(policyIds, p.getPolicyIds())
-                    && Objects.equals(streamIds, p.getStreamIds())
-                    && properties.equals(p.getProperties()));
+                && Objects.equals(dedupIntervalMin, p.getDedupIntervalMin())
+                && Objects.equals(dedupFields, p.getDedupFields())
+                && Objects.equals(dedupStateField, p.getDedupStateField())
+                && Objects.equals(overrideDeduplicator, 
p.getOverrideDeduplicator())
+                && Objects.equals(policyIds, p.getPolicyIds())
+                && Objects.equals(streamIds, p.getStreamIds())
+                && properties.equals(p.getProperties()));
         }
         return false;
     }
@@ -149,15 +163,15 @@ public class Publishment {
     @Override
     public int hashCode() {
         return new 
HashCodeBuilder().append(name).append(type).append(dedupIntervalMin).append(dedupFields)
-                
.append(dedupStateField).append(overrideDeduplicator).append(policyIds).append(streamIds)
-                .append(properties).build();
+            
.append(dedupStateField).append(overrideDeduplicator).append(policyIds).append(streamIds)
+            .append(properties).build();
     }
 
     @Override
     public String toString() {
         StringBuilder sb = new StringBuilder();
         
sb.append("Publishment[name:").append(name).append(",type:").append(type).append(",policyId:")
-                .append(policyIds).append(",properties:").append(properties);
+            .append(policyIds).append(",properties:").append(properties);
         return sb.toString();
     }
 

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/70600b26/eagle-core/eagle-alert-parent/eagle-alert/alert-coordinator/src/main/java/org/apache/eagle/alert/coordinator/impl/MonitorMetadataGenerator.java
----------------------------------------------------------------------
diff --git 
a/eagle-core/eagle-alert-parent/eagle-alert/alert-coordinator/src/main/java/org/apache/eagle/alert/coordinator/impl/MonitorMetadataGenerator.java
 
b/eagle-core/eagle-alert-parent/eagle-alert/alert-coordinator/src/main/java/org/apache/eagle/alert/coordinator/impl/MonitorMetadataGenerator.java
index 3f64f86..fb20e66 100644
--- 
a/eagle-core/eagle-alert-parent/eagle-alert/alert-coordinator/src/main/java/org/apache/eagle/alert/coordinator/impl/MonitorMetadataGenerator.java
+++ 
b/eagle-core/eagle-alert-parent/eagle-alert/alert-coordinator/src/main/java/org/apache/eagle/alert/coordinator/impl/MonitorMetadataGenerator.java
@@ -38,7 +38,6 @@ import java.util.Map;
 
 /**
  * Given current policy placement, figure out monitor metadata
- *
  * <p>TODO: refactor to eliminate the duplicate of stupid 
if-notInMap-then-create....
  * FIXME: too many duplicated code logic : check null; add list to map; add to 
list..</p>
  *
@@ -136,6 +135,23 @@ public class MonitorMetadataGenerator {
                 for (String policyName : boltUsage.getPolicies()) {
                     PolicyDefinition definition = 
context.getPolicies().get(policyName);
                     alertSpec.addBoltPolicy(boltUsage.getBoltId(), 
definition.getName());
+
+                    for (Publishment publish : 
context.getPublishments().values()) {
+                        if 
(!publish.getPolicyIds().contains(definition.getName())) {
+                            continue;
+                        }
+
+                        List<String> streamIds = new ArrayList<>();
+                        // add the publish to the bolt
+                        if (publish.getStreamIds() == null || 
publish.getStreamIds().size() <= 0) {
+                            streamIds.add(Publishment.STREAM_NAME_DEFAULT);
+                        } else {
+                            streamIds.addAll(publish.getStreamIds());
+                        }
+                        for (String streamId : streamIds) {
+                            alertSpec.addPublishPartition(streamId, 
policyName, publish.getName(), publish.getPartitionColumns());
+                        }
+                    }
                 }
             }
         }

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/70600b26/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/evaluator/impl/AlertBoltOutputCollectorWrapper.java
----------------------------------------------------------------------
diff --git 
a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/evaluator/impl/AlertBoltOutputCollectorWrapper.java
 
b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/evaluator/impl/AlertBoltOutputCollectorWrapper.java
index 042fca7..b1be2da 100755
--- 
a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/evaluator/impl/AlertBoltOutputCollectorWrapper.java
+++ 
b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/evaluator/impl/AlertBoltOutputCollectorWrapper.java
@@ -5,9 +5,9 @@
  * The ASF licenses this file to You under the Apache License, Version 2.0
  * (the "License"); you may not use this file except in compliance with
  * the License.  You may obtain a copy of the License at
- * <p/>
+ * <p>
  * http://www.apache.org/licenses/LICENSE-2.0
- * <p/>
+ * <p>
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,
  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
@@ -16,29 +16,57 @@
  */
 package org.apache.eagle.alert.engine.evaluator.impl;
 
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.HashSet;
+import java.util.Set;
+
 import org.apache.eagle.alert.engine.AlertStreamCollector;
 import org.apache.eagle.alert.engine.StreamContext;
+import org.apache.eagle.alert.engine.coordinator.PublishPartition;
 import org.apache.eagle.alert.engine.model.AlertStreamEvent;
-import backtype.storm.task.OutputCollector;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
-import java.util.Arrays;
+import backtype.storm.task.OutputCollector;
 
 public class AlertBoltOutputCollectorWrapper implements AlertStreamCollector {
+
+    private static final Logger LOG = 
LoggerFactory.getLogger(AlertBoltOutputCollectorWrapper.class);
+
     private final OutputCollector delegate;
     private final Object outputLock;
     private final StreamContext streamContext;
 
-    public AlertBoltOutputCollectorWrapper(OutputCollector outputCollector, 
Object outputLock, StreamContext streamContext) {
+    private volatile Set<PublishPartition> publishPartitions;
+
+    public AlertBoltOutputCollectorWrapper(OutputCollector outputCollector, 
Object outputLock,
+                                           StreamContext streamContext) {
         this.delegate = outputCollector;
         this.outputLock = outputLock;
         this.streamContext = streamContext;
+
+        this.publishPartitions = new HashSet<>();
     }
 
     @Override
     public void emit(AlertStreamEvent event) {
-        synchronized (outputLock) {
-            streamContext.counter().scope("alert_count").incr();
-            delegate.emit(Arrays.asList(event.getStreamId(), event));
+        Set<PublishPartition> clonedPublishPartitions = new 
HashSet<>(publishPartitions);
+        for (PublishPartition publishPartition : clonedPublishPartitions) {
+            PublishPartition cloned = publishPartition.clone();
+            for (String column : cloned.getColumns()) {
+                int columnIndex = event.getSchema().getColumnIndex(column);
+                if (columnIndex < 0) {
+                    LOG.warn("Column {} is not found in stream {}", column, 
cloned.getStreamId());
+                    continue;
+                }
+                cloned.getColumnValues().add(event.getData()[columnIndex]);
+            }
+
+            synchronized (outputLock) {
+                streamContext.counter().scope("alert_count").incr();
+                delegate.emit(Arrays.asList(cloned, event));
+            }
         }
     }
 
@@ -49,6 +77,16 @@ public class AlertBoltOutputCollectorWrapper implements 
AlertStreamCollector {
 
     @Override
     public void close() {
+    }
 
+    public synchronized void 
onAlertBoltSpecChange(Collection<PublishPartition> addedPublishPartitions,
+                                                   
Collection<PublishPartition> removedPublishPartitions,
+                                                   
Collection<PublishPartition> modifiedPublishPartitions) {
+        Set<PublishPartition> clonedPublishPartitions = new 
HashSet<>(publishPartitions);
+        clonedPublishPartitions.addAll(addedPublishPartitions);
+        clonedPublishPartitions.removeAll(removedPublishPartitions);
+        clonedPublishPartitions.addAll(modifiedPublishPartitions);
+        publishPartitions = clonedPublishPartitions;
     }
+
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/70600b26/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/AlertPublisher.java
----------------------------------------------------------------------
diff --git 
a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/AlertPublisher.java
 
b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/AlertPublisher.java
index cdd52db..9717e2b 100644
--- 
a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/AlertPublisher.java
+++ 
b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/AlertPublisher.java
@@ -16,19 +16,22 @@
  */
 package org.apache.eagle.alert.engine.publisher;
 
-import org.apache.eagle.alert.engine.model.AlertStreamEvent;
-import com.typesafe.config.Config;
-
 import java.io.Serializable;
 import java.util.Map;
 
+import org.apache.eagle.alert.engine.coordinator.PublishPartition;
+import org.apache.eagle.alert.engine.model.AlertStreamEvent;
+
+import com.typesafe.config.Config;
+
 public interface AlertPublisher extends AlertPublishListener, Serializable {
     @SuppressWarnings("rawtypes")
     void init(Config config, Map stormConfig);
 
     String getName();
 
-    void nextEvent(AlertStreamEvent event);
+    void nextEvent(PublishPartition partition, AlertStreamEvent event);
 
     void close();
+
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/70600b26/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/impl/AbstractPublishPlugin.java
----------------------------------------------------------------------
diff --git 
a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/impl/AbstractPublishPlugin.java
 
b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/impl/AbstractPublishPlugin.java
index f68ae52..b155bb8 100644
--- 
a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/impl/AbstractPublishPlugin.java
+++ 
b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/impl/AbstractPublishPlugin.java
@@ -18,7 +18,6 @@ package org.apache.eagle.alert.engine.publisher.impl;
 
 import com.google.common.base.Joiner;
 import com.typesafe.config.Config;
-import javafx.scene.control.Alert;
 import org.apache.commons.lang3.StringUtils;
 import org.apache.eagle.alert.engine.codec.IEventSerializer;
 import org.apache.eagle.alert.engine.coordinator.OverrideDeduplicatorSpec;

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/70600b26/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/impl/AlertPublisherImpl.java
----------------------------------------------------------------------
diff --git 
a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/impl/AlertPublisherImpl.java
 
b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/impl/AlertPublisherImpl.java
index 4709180..bbb062b 100644
--- 
a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/impl/AlertPublisherImpl.java
+++ 
b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/impl/AlertPublisherImpl.java
@@ -17,17 +17,15 @@
 
 package org.apache.eagle.alert.engine.publisher.impl;
 
-import java.text.MessageFormat;
 import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
-import java.util.Map.Entry;
 import java.util.Set;
 import java.util.concurrent.ConcurrentHashMap;
 
-import org.apache.commons.lang3.StringUtils;
+import org.apache.eagle.alert.engine.coordinator.PublishPartition;
 import org.apache.eagle.alert.engine.coordinator.Publishment;
 import org.apache.eagle.alert.engine.model.AlertStreamEvent;
 import org.apache.eagle.alert.engine.publisher.AlertPublishPlugin;
@@ -35,7 +33,6 @@ import org.apache.eagle.alert.engine.publisher.AlertPublisher;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import com.google.common.base.Objects;
 import com.typesafe.config.Config;
 
 @SuppressWarnings("rawtypes")
@@ -44,12 +41,9 @@ public class AlertPublisherImpl implements AlertPublisher {
     private static final long serialVersionUID = 4809983246198138865L;
     private static final Logger LOG = 
LoggerFactory.getLogger(AlertPublisherImpl.class);
 
-    private static final String STREAM_NAME_DEFAULT = "_default";
-
     private final String name;
 
-    private volatile Map<String, Set<String>> psPublishPluginMapping = new 
ConcurrentHashMap<>(1);
-    private volatile Map<String, AlertPublishPlugin> publishPluginMapping = 
new ConcurrentHashMap<>(1);
+    private volatile Map<PublishPartition, AlertPublishPlugin> 
publishPluginMapping = new ConcurrentHashMap<>(1);
     private Config config;
     private Map conf;
 
@@ -69,42 +63,30 @@ public class AlertPublisherImpl implements AlertPublisher {
     }
 
     @Override
-    public void nextEvent(AlertStreamEvent event) {
+    public void nextEvent(PublishPartition partition, AlertStreamEvent event) {
         if (LOG.isDebugEnabled()) {
             LOG.debug(event.toString());
         }
-        notifyAlert(event);
+        notifyAlert(partition, event);
     }
 
-    private void notifyAlert(AlertStreamEvent event) {
-        String policyId = event.getPolicyId();
-        if (StringUtils.isEmpty(policyId)) {
-            LOG.warn("policyId cannot be null for event to be published");
+    private void notifyAlert(PublishPartition partition, AlertStreamEvent 
event) {
+        // remove the column values for publish plugin match
+        partition.getColumnValues().clear();
+        if (!publishPluginMapping.containsKey(partition)) {
+            LOG.warn("PublishPartition {} is not found in publish plugin map", 
partition);
             return;
         }
-        // use default stream name if specified stream publisher is not found
-        Set<String> pubIds = 
psPublishPluginMapping.get(getPolicyStreamUniqueId(policyId, 
event.getStreamId()));
-        if (pubIds == null) {
-            pubIds = 
psPublishPluginMapping.get(getPolicyStreamUniqueId(policyId));
-        }
-        if (pubIds == null) {
-            LOG.warn("Policy {} Stream {} does *NOT* subscribe any 
publishment!", policyId, event.getStreamId());
+        AlertPublishPlugin plugin = publishPluginMapping.get(partition);
+        if (plugin == null) {
+            LOG.warn("PublishPartition {} has problems while initializing 
publish plugin", partition);
             return;
         }
-        event.ensureAlertId();
-        for (String pubId : pubIds) {
-            @SuppressWarnings("resource")
-            AlertPublishPlugin plugin = pubId != null ? 
publishPluginMapping.get(pubId) : null;
-            if (plugin == null) {
-                LOG.warn("Policy {} does *NOT* subscribe any publishment!", 
policyId);
-                continue;
-            }
-            try {
-                LOG.debug("Execute alert publisher {}", 
plugin.getClass().getCanonicalName());
-                plugin.onAlert(event);
-            } catch (Exception ex) {
-                LOG.error("Fail invoking publisher's onAlert, continue ", ex);
-            }
+        try {
+            LOG.debug("Execute alert publisher {}", 
plugin.getClass().getCanonicalName());
+            plugin.onAlert(event);
+        } catch (Exception ex) {
+            LOG.error("Fail invoking publisher's onAlert, continue ", ex);
         }
     }
 
@@ -137,8 +119,7 @@ public class AlertPublisherImpl implements AlertPublisher {
         }
 
         // copy and swap to avoid concurrency issue
-        Map<String, Set<String>> newPSPublishPluginMapping = new 
HashMap<>(psPublishPluginMapping);
-        Map<String, AlertPublishPlugin> newPublishMap = new 
HashMap<>(publishPluginMapping);
+        Map<PublishPartition, AlertPublishPlugin> newPublishMap = new 
HashMap<>(publishPluginMapping);
 
         // added
         for (Publishment publishment : added) {
@@ -146,8 +127,9 @@ public class AlertPublisherImpl implements AlertPublisher {
 
             AlertPublishPlugin plugin = 
AlertPublishPluginsFactory.createNotificationPlugin(publishment, config, conf);
             if (plugin != null) {
-                newPublishMap.put(publishment.getName(), plugin);
-                addPublishmentPoliciesStreams(newPSPublishPluginMapping, 
publishment.getPolicyIds(), publishment.getStreamIds(), publishment.getName());
+                for (PublishPartition p : getPublishPartitions(publishment)) {
+                    newPublishMap.put(p, plugin);
+                }
             } else {
                 LOG.error("OnPublishChange alertPublisher {} failed due to 
invalid format", publishment);
             }
@@ -155,94 +137,70 @@ public class AlertPublisherImpl implements AlertPublisher 
{
         //removed
         List<AlertPublishPlugin> toBeClosed = new ArrayList<>();
         for (Publishment publishment : removed) {
-            String pubName = publishment.getName();
-            removePublihsPoliciesStreams(newPSPublishPluginMapping, 
publishment.getPolicyIds(), pubName);
-            toBeClosed.add(newPublishMap.get(pubName));
-            newPublishMap.remove(publishment.getName());
+            AlertPublishPlugin plugin = null;
+            for (PublishPartition p : getPublishPartitions(publishment)) {
+                if (plugin == null) {
+                    plugin = newPublishMap.remove(p);
+                } else {
+                    newPublishMap.remove(p);
+                }
+            }
+            if (plugin != null) {
+                toBeClosed.add(plugin);
+            }
         }
         // updated
-        for (int i = 0; i < afterModified.size(); i++) {
-            String pubName = afterModified.get(i).getName();
-            List<String> newPolicies = afterModified.get(i).getPolicyIds();
-            List<String> newStreams = afterModified.get(i).getStreamIds();
-            List<String> oldPolicies = beforeModified.get(i).getPolicyIds();
-            List<String> oldStreams = beforeModified.get(i).getStreamIds();
-
-            if (!newPolicies.equals(oldPolicies) || !Objects.equal(newStreams, 
oldStreams)) {
-                // since both policy & stream may change, skip the compare and 
difference update
-                removePublihsPoliciesStreams(newPSPublishPluginMapping, 
oldPolicies, pubName);
-                addPublishmentPoliciesStreams(newPSPublishPluginMapping, 
newPolicies, newStreams, pubName);
-            }
-            Publishment newPub = afterModified.get(i);
-
+        for (Publishment publishment : afterModified) {
             // for updated publishment, need to init them too
-            AlertPublishPlugin newPlugin = 
AlertPublishPluginsFactory.createNotificationPlugin(newPub, config, conf);
-            newPublishMap.replace(pubName, newPlugin);
+            AlertPublishPlugin newPlugin = 
AlertPublishPluginsFactory.createNotificationPlugin(publishment, config, conf);
+            if (newPlugin != null) {
+                AlertPublishPlugin plugin = null;
+                for (PublishPartition p : getPublishPartitions(publishment)) {
+                    if (plugin == null) {
+                        plugin = newPublishMap.get(p);
+                    }
+                    newPublishMap.put(p, newPlugin);
+                }
+                if (plugin != null) {
+                    toBeClosed.add(plugin);
+                }
+            } else {
+                LOG.error("OnPublishChange alertPublisher {} failed due to 
invalid format", publishment);
+            }
         }
 
         // now do the swap
         publishPluginMapping = newPublishMap;
-        psPublishPluginMapping = newPSPublishPluginMapping;
 
         // safely close : it depend on plugin to check if want to wait all 
data to be flushed.
         closePlugins(toBeClosed);
     }
 
+    private Set<PublishPartition> getPublishPartitions(Publishment publish) {
+        List<String> streamIds = new ArrayList<>();
+        // add the publish to the bolt
+        if (publish.getStreamIds() == null || publish.getStreamIds().size() <= 
0) {
+            streamIds.add(Publishment.STREAM_NAME_DEFAULT);
+        } else {
+            streamIds.addAll(publish.getStreamIds());
+        }
+        Set<PublishPartition> publishPartitions = new HashSet<>();
+        for (String streamId : streamIds) {
+            for (String policyId : publish.getPolicyIds()) {
+                publishPartitions.add(new PublishPartition(streamId, policyId, 
publish.getName(), publish.getPartitionColumns()));
+            }
+        }
+        return publishPartitions;
+    }
+
     private void closePlugins(List<AlertPublishPlugin> toBeClosed) {
         for (AlertPublishPlugin p : toBeClosed) {
             try {
                 p.close();
             } catch (Exception e) {
-                LOG.error(MessageFormat.format("Error when close publish 
plugin {}, {}!", p.getClass().getCanonicalName()), e);
+                LOG.error(String.format("Error when close publish plugin {}!", 
p.getClass().getCanonicalName()), e);
             }
         }
     }
 
-    private void addPublishmentPoliciesStreams(Map<String, Set<String>> 
newPSPublishPluginMapping,
-                                               List<String> addedPolicyIds, 
List<String> addedStreamIds, String pubName) {
-        if (addedPolicyIds == null || pubName == null) {
-            return;
-        }
-
-        if (addedStreamIds == null || addedStreamIds.size() <= 0) {
-            addedStreamIds = new ArrayList<String>();
-            addedStreamIds.add(STREAM_NAME_DEFAULT);
-        }
-
-        for (String policyId : addedPolicyIds) {
-            for (String streamId : addedStreamIds) {
-                String psUniqueId = getPolicyStreamUniqueId(policyId, 
streamId);
-                newPSPublishPluginMapping.putIfAbsent(psUniqueId, new 
HashSet<>());
-                newPSPublishPluginMapping.get(psUniqueId).add(pubName);
-            }
-        }
-    }
-
-    private synchronized void removePublihsPoliciesStreams(Map<String, 
Set<String>> newPSPublishPluginMapping,
-                                                           List<String> 
deletedPolicyIds, String pubName) {
-        if (deletedPolicyIds == null || pubName == null) {
-            return;
-        }
-
-        for (String policyId : deletedPolicyIds) {
-            for (Entry<String, Set<String>> entry : 
newPSPublishPluginMapping.entrySet()) {
-                if (entry.getKey().startsWith("policyId:" + policyId)) {
-                    entry.getValue().remove(pubName);
-                    break;
-                }
-            }
-        }
-    }
-
-    private String getPolicyStreamUniqueId(String policyId) {
-        return getPolicyStreamUniqueId(policyId, STREAM_NAME_DEFAULT);
-    }
-
-    private String getPolicyStreamUniqueId(String policyId, String streamId) {
-        if (StringUtils.isBlank(streamId)) {
-            streamId = STREAM_NAME_DEFAULT;
-        }
-        return String.format("policyId:%s,streamId:%s", policyId, streamId);
-    }
-
 }

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/70600b26/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/router/AlertBoltSpecListener.java
----------------------------------------------------------------------
diff --git 
a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/router/AlertBoltSpecListener.java
 
b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/router/AlertBoltSpecListener.java
index 84345dd..e1f3e9c 100644
--- 
a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/router/AlertBoltSpecListener.java
+++ 
b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/router/AlertBoltSpecListener.java
@@ -19,11 +19,11 @@
 
 package org.apache.eagle.alert.engine.router;
 
+import java.util.Map;
+
 import org.apache.eagle.alert.coordination.model.AlertBoltSpec;
 import org.apache.eagle.alert.engine.coordinator.StreamDefinition;
 
-import java.util.Map;
-
 /**
  * Since 5/2/16.
  */

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/70600b26/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/runner/AlertBolt.java
----------------------------------------------------------------------
diff --git 
a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/runner/AlertBolt.java
 
b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/runner/AlertBolt.java
index 639d338..627a218 100755
--- 
a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/runner/AlertBolt.java
+++ 
b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/runner/AlertBolt.java
@@ -16,18 +16,22 @@
  */
 package org.apache.eagle.alert.engine.runner;
 
+import java.util.Collection;
 import java.util.HashMap;
+import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
+import java.util.Set;
 import java.util.concurrent.ExecutorService;
 
+import org.apache.commons.collections.CollectionUtils;
 import org.apache.eagle.alert.coordination.model.AlertBoltSpec;
 import org.apache.eagle.alert.coordination.model.WorkSlot;
-import org.apache.eagle.alert.engine.AlertStreamCollector;
 import org.apache.eagle.alert.engine.StreamContextImpl;
 import org.apache.eagle.alert.engine.coordinator.IMetadataChangeNotifyService;
 import org.apache.eagle.alert.engine.coordinator.MetadataType;
 import org.apache.eagle.alert.engine.coordinator.PolicyDefinition;
+import org.apache.eagle.alert.engine.coordinator.PublishPartition;
 import org.apache.eagle.alert.engine.coordinator.StreamDefinition;
 import org.apache.eagle.alert.engine.evaluator.PolicyGroupEvaluator;
 import 
org.apache.eagle.alert.engine.evaluator.impl.AlertBoltOutputCollectorWrapper;
@@ -61,13 +65,15 @@ public class AlertBolt extends AbstractStreamBolt 
implements AlertBoltSpecListen
     private static final Logger LOG = LoggerFactory.getLogger(AlertBolt.class);
     private static final long serialVersionUID = -4132297691448945672L;
     private PolicyGroupEvaluator policyGroupEvaluator;
-    private AlertStreamCollector alertOutputCollector;
+    private AlertBoltOutputCollectorWrapper alertOutputCollector;
     private String boltId;
     private boolean logEventEnabled;
     private volatile Object outputLock;
     // mapping from policy name to PolicyDefinition
     private volatile Map<String, PolicyDefinition> cachedPolicies = new 
HashMap<>(); // for one streamGroup, there are multiple policies
 
+    private volatile Set<PublishPartition> cachedPublishPartitions = new 
HashSet<>();
+
     private AlertBoltSpec spec;
 
     public AlertBolt(String boltId, Config config, 
IMetadataChangeNotifyService changeNotifyService) {
@@ -174,6 +180,7 @@ public class AlertBolt extends AbstractStreamBolt 
implements AlertBoltSpecListen
         super.cleanup();
     }
 
+    @SuppressWarnings("unchecked")
     @Override
     public synchronized void onAlertBoltSpecChange(AlertBoltSpec spec, 
Map<String, StreamDefinition> sds) {
         List<PolicyDefinition> newPolicies = 
spec.getBoltPoliciesMap().get(boltId);
@@ -189,6 +196,24 @@ public class AlertBolt extends AbstractStreamBolt 
implements AlertBoltSpecListen
 
         policyGroupEvaluator.onPolicyChange(comparator.getAdded(), 
comparator.getRemoved(), comparator.getModified(), sds);
 
+        // update alert output collector
+        Set<PublishPartition> tempPublishPartitions = new HashSet<>();
+        spec.getPublishPartitions().forEach(p -> {
+            if (newPolicies.stream().filter(o -> 
o.getName().equals(p.getPolicyId())).count() > 0) {
+                tempPublishPartitions.add(p);
+            }
+        });
+
+        Collection<PublishPartition> addedPublishPartitions = 
CollectionUtils.subtract(tempPublishPartitions, cachedPublishPartitions);
+        Collection<PublishPartition> removedPublishPartitions = 
CollectionUtils.subtract(cachedPublishPartitions, tempPublishPartitions);
+        Collection<PublishPartition> modifiedPublishPartitions = 
CollectionUtils.intersection(tempPublishPartitions, cachedPublishPartitions);
+
+        LOG.debug("added PublishPartition " + addedPublishPartitions);
+        LOG.debug("removed PublishPartition " + removedPublishPartitions);
+        LOG.debug("modified PublishPartition " + modifiedPublishPartitions);
+
+        alertOutputCollector.onAlertBoltSpecChange(addedPublishPartitions, 
removedPublishPartitions, modifiedPublishPartitions);
+
         // switch
         cachedPolicies = newPoliciesMap;
         sdf = sds;

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/70600b26/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/runner/AlertPublisherBolt.java
----------------------------------------------------------------------
diff --git 
a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/runner/AlertPublisherBolt.java
 
b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/runner/AlertPublisherBolt.java
index 323f682..72eafe4 100644
--- 
a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/runner/AlertPublisherBolt.java
+++ 
b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/runner/AlertPublisherBolt.java
@@ -5,9 +5,9 @@
  * The ASF licenses this file to You under the Apache License, Version 2.0
  * (the "License"); you may not use this file except in compliance with
  * the License.  You may obtain a copy of the License at
- * <p/>
+ * <p>
  * http://www.apache.org/licenses/LICENSE-2.0
- * <p/>
+ * <p>
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,
  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
@@ -50,9 +50,16 @@ public class AlertPublisherBolt extends AbstractStreamBolt 
implements AlertPubli
     private volatile Map<String, PolicyDefinition> policyDefinitionMap;
     private volatile Map<String, StreamDefinition> streamDefinitionMap;
 
+    private boolean logEventEnabled;
+    private TopologyContext context;
+    
     public AlertPublisherBolt(String alertPublisherName, Config config, 
IMetadataChangeNotifyService coordinatorService) {
         super(alertPublisherName, coordinatorService, config);
         this.alertPublisher = new AlertPublisherImpl(alertPublisherName);
+        
+        if (config != null && config.hasPath("topology.logEventEnabled")) {
+            logEventEnabled = config.getBoolean("topology.logEventEnabled");
+        }
     }
 
     @Override
@@ -61,15 +68,20 @@ public class AlertPublisherBolt extends AbstractStreamBolt 
implements AlertPubli
         coordinatorService.init(config, MetadataType.ALERT_PUBLISH_BOLT);
         this.alertPublisher.init(config, stormConf);
         streamContext = new StreamContextImpl(config, 
context.registerMetric("eagle.publisher", new MultiCountMetric(), 60), context);
+        this.context = context;
     }
 
     @Override
     public void execute(Tuple input) {
         try {
             streamContext.counter().scope("receive_count");
+            PublishPartition partition = (PublishPartition) 
input.getValueByField(AlertConstants.FIELD_0);
             AlertStreamEvent event = (AlertStreamEvent) 
input.getValueByField(AlertConstants.FIELD_1);
+            if (logEventEnabled) {
+                LOG.info("Alert publish bolt {}/{} with partition {} received 
event: {}", this.getBoltId(), this.context.getThisTaskId(), partition, event);
+            }
             wrapAlertPublishEvent(event);
-            alertPublisher.nextEvent(event);
+            alertPublisher.nextEvent(partition, event);
             this.collector.ack(input);
             streamContext.counter().scope("ack_count");
         } catch (Exception ex) {
@@ -123,6 +135,7 @@ public class AlertPublisherBolt extends AbstractStreamBolt 
implements AlertPubli
         this.streamDefinitionMap = sds;
     }
 
+    @SuppressWarnings("unchecked")
     private void wrapAlertPublishEvent(AlertStreamEvent event) {
         Map<String, Object> extraData = new HashedMap();
         List<String> appIds = new ArrayList<>();

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/70600b26/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/runner/UnitTopologyRunner.java
----------------------------------------------------------------------
diff --git 
a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/runner/UnitTopologyRunner.java
 
b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/runner/UnitTopologyRunner.java
index 3b40afd..287d5db 100755
--- 
a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/runner/UnitTopologyRunner.java
+++ 
b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/runner/UnitTopologyRunner.java
@@ -205,8 +205,7 @@ public class UnitTopologyRunner {
         // connect alert bolt and alert publish bolt, this is the last bolt in 
the pipeline
         BoltDeclarer boltDeclarer = builder.setBolt(alertPublishBoltName, 
publisherBolt, numOfPublishExecutors).setNumTasks(numOfPublishTasks);
         for (int i = 0; i < numOfAlertBolts; i++) {
-            //boltDeclarer.fieldsGrouping(alertBoltNamePrefix + i, new 
Fields(AlertConstants.FIELD_0));
-            boltDeclarer.shuffleGrouping(alertBoltNamePrefix + i);
+            boltDeclarer.fieldsGrouping(alertBoltNamePrefix + i, new 
Fields(AlertConstants.FIELD_0));
         }
 
         return builder.createTopology();

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/70600b26/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/router/TestAlertBolt.java
----------------------------------------------------------------------
diff --git 
a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/router/TestAlertBolt.java
 
b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/router/TestAlertBolt.java
index 6529f64..e6acc3e 100755
--- 
a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/router/TestAlertBolt.java
+++ 
b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/router/TestAlertBolt.java
@@ -28,6 +28,7 @@ import com.typesafe.config.Config;
 import com.typesafe.config.ConfigFactory;
 import org.apache.eagle.alert.coordination.model.AlertBoltSpec;
 import org.apache.eagle.alert.engine.coordinator.PolicyDefinition;
+import org.apache.eagle.alert.engine.coordinator.PublishPartition;
 import org.apache.eagle.alert.engine.coordinator.StreamColumn;
 import org.apache.eagle.alert.engine.coordinator.StreamDefinition;
 import org.apache.eagle.alert.engine.coordinator.StreamPartition;
@@ -86,9 +87,10 @@ public class TestAlertBolt {
             public List<Integer> emit(String streamId, Collection<Tuple> 
anchors, List<Object> tuple) {
                 alertCount.incrementAndGet();
                 mutex.release();
-                Assert.assertEquals("testAlertStream", tuple.get(0));
+                Assert.assertEquals("testAlertStream", ((PublishPartition) 
tuple.get(0)).getStreamId());
+                Assert.assertEquals("testAlertPublish", ((PublishPartition) 
tuple.get(0)).getPublishId());
                 AlertStreamEvent event = (AlertStreamEvent) tuple.get(1);
-                System.out.println(String.format("collector received: 
[streamId=[%s], tuple=[%s] ", streamId, tuple));
+                System.out.println(String.format("collector received: 
[streamId=[%s], tuple=[%s] ", ((PublishPartition) tuple.get(0)).getStreamId(), 
tuple));
                 return null;
             }
 
@@ -107,6 +109,7 @@ public class TestAlertBolt {
             @Override
             public void reportError(Throwable error) {
             }
+            
         });
         AlertBolt bolt = createAlertBolt(collector);
 
@@ -141,6 +144,7 @@ public class TestAlertBolt {
         pd.getDefinition().value = "from cpuUsageStream[col1=='value1' OR 
col1=='value2'] select col1 insert into testAlertStream;";
         spec.addBoltPolicy("alertBolt1", pd.getName());
         spec.getBoltPoliciesMap().put("alertBolt1", new 
ArrayList<PolicyDefinition>(Arrays.asList(pd)));
+        spec.addPublishPartition("testAlertStream", "policy1", 
"testAlertPublish", null);
         bolt.onAlertBoltSpecChange(spec, sds);
 
         // contruct GeneralTopologyContext
@@ -166,7 +170,8 @@ public class TestAlertBolt {
         event2.setData(data);
         event2.setStreamId(streamId);
         PartitionedEvent partitionedEvent2 = new PartitionedEvent(event2, sp, 
1001);
-
+        
+        Thread.sleep(3000);
         Tuple input = new TupleImpl(context, 
Collections.singletonList(partitionedEvent1), taskId, "default");
         Tuple input2 = new TupleImpl(context, 
Collections.singletonList(partitionedEvent2), taskId, "default");
         bolt.execute(input);
@@ -426,6 +431,8 @@ public class TestAlertBolt {
         StreamDefinition sdTest = new StreamDefinition();
         sdTest.setStreamId(TEST_STREAM);
         sds.put(sdTest.getStreamId(), sdTest);
+        
+        boltSpecs.addPublishPartition(TEST_STREAM, "policy-definition", 
"testAlertPublish", null);
 
         bolt.onAlertBoltSpecChange(boltSpecs, sds);
 

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/70600b26/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/router/TestAlertPublisherBolt.java
----------------------------------------------------------------------
diff --git 
a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/router/TestAlertPublisherBolt.java
 
b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/router/TestAlertPublisherBolt.java
index c95cab1..46517fe 100644
--- 
a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/router/TestAlertPublisherBolt.java
+++ 
b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/router/TestAlertPublisherBolt.java
@@ -25,6 +25,7 @@ import java.util.Map;
 
 import org.apache.eagle.alert.coordination.model.PublishSpec;
 import org.apache.eagle.alert.engine.coordinator.PolicyDefinition;
+import org.apache.eagle.alert.engine.coordinator.PublishPartition;
 import org.apache.eagle.alert.engine.coordinator.Publishment;
 import org.apache.eagle.alert.engine.coordinator.StreamColumn;
 import org.apache.eagle.alert.engine.coordinator.StreamDefinition;
@@ -62,9 +63,11 @@ public class TestAlertPublisherBolt {
         PublishSpec spec = 
MetadataSerDeser.deserialize(getClass().getResourceAsStream("/testPublishSpec.json"),
 PublishSpec.class);
         publisher.onPublishChange(spec.getPublishments(), null, null, null);
         AlertStreamEvent event = create("testAlertStream");
-        publisher.nextEvent(event);
+        publisher.nextEvent(new PublishPartition(event.getStreamId(), 
event.getPolicyId(),
+            spec.getPublishments().get(0).getName(), 
spec.getPublishments().get(0).getPartitionColumns()), event);
         AlertStreamEvent event1 = create("testAlertStream");
-        publisher.nextEvent(event1);
+        publisher.nextEvent(new PublishPartition(event1.getStreamId(), 
event1.getPolicyId(),
+            spec.getPublishments().get(0).getName(), 
spec.getPublishments().get(0).getPartitionColumns()), event1);
     }
 
     private AlertStreamEvent create(String streamId) {
@@ -152,9 +155,12 @@ public class TestAlertPublisherBolt {
         publisherBolt.onAlertPublishSpecChange(spec3, null);
     }
 
+    @SuppressWarnings("rawtypes")
     @Test
     public void testAlertPublisher() throws Exception {
         AlertPublisher alertPublisher = new 
AlertPublisherImpl("alert-publisher-test");
+        Config config = ConfigFactory.load("application-test.conf");
+        alertPublisher.init(config, new HashMap());
         List<Publishment> oldPubs = loadEntities("/publishments1.json", 
Publishment.class);
         List<Publishment> newPubs = loadEntities("/publishments2.json", 
Publishment.class);
         alertPublisher.onPublishChange(oldPubs, null, null, null);
@@ -286,9 +292,12 @@ public class TestAlertPublisherBolt {
         AlertStreamEvent event2 = createSeverityWithStreamDef("switch2", 
"testapp2", "Memory 2 inconsistency detected", "CRITICAL", "docId2", "ed02", 
"distribution switch", "us");
         AlertStreamEvent event3 = createSeverityWithStreamDef("switch2", 
"testapp2", "Memory 3 inconsistency detected", "WARNING", "docId3", "ed02", 
"distribution switch", "us");
 
-        publisher.nextEvent(event1);
-        publisher.nextEvent(event2);
-        publisher.nextEvent(event3);
+        publisher.nextEvent(new PublishPartition(event1.getStreamId(), 
event1.getPolicyId(),
+            pubs.get(0).getName(), pubs.get(0).getPartitionColumns()), event1);
+        publisher.nextEvent(new PublishPartition(event2.getStreamId(), 
event2.getPolicyId(),
+            pubs.get(0).getName(), pubs.get(0).getPartitionColumns()), event2);
+        publisher.nextEvent(new PublishPartition(event3.getStreamId(), 
event3.getPolicyId(),
+            pubs.get(0).getName(), pubs.get(0).getPartitionColumns()), event3);
 
     }
 }

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/70600b26/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/statecheck/TestStateCheckPolicy.java
----------------------------------------------------------------------
diff --git 
a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/statecheck/TestStateCheckPolicy.java
 
b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/statecheck/TestStateCheckPolicy.java
index 1fc54a9..4c56630 100644
--- 
a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/statecheck/TestStateCheckPolicy.java
+++ 
b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/statecheck/TestStateCheckPolicy.java
@@ -24,6 +24,7 @@ import backtype.storm.tuple.Tuple;
 import backtype.storm.tuple.TupleImpl;
 import org.apache.eagle.alert.coordination.model.AlertBoltSpec;
 import org.apache.eagle.alert.engine.coordinator.PolicyDefinition;
+import org.apache.eagle.alert.engine.coordinator.PublishPartition;
 import org.apache.eagle.alert.engine.coordinator.StreamDefinition;
 import org.apache.eagle.alert.engine.evaluator.impl.PolicyGroupEvaluatorImpl;
 import org.apache.eagle.alert.engine.model.AlertStreamEvent;
@@ -57,9 +58,9 @@ public class TestStateCheckPolicy {
             @Override
             public List<Integer> emit(String streamId, Collection<Tuple> 
anchors, List<Object> tuple) {
                 verified.set(true);
-                Assert.assertEquals("perfmon_latency_check_output2", 
tuple.get(0));
+                Assert.assertEquals("perfmon_latency_stream", 
((PublishPartition) tuple.get(0)).getStreamId());
                 AlertStreamEvent event = (AlertStreamEvent) tuple.get(1);
-                System.out.println(String.format("collector received: 
[streamId=[%s], tuple=[%s] ", streamId, tuple));
+                System.out.println(String.format("collector received: 
[streamId=[%s], tuple=[%s] ", ((PublishPartition) tuple.get(0)).getStreamId(), 
tuple));
                 return null;
             }
 
@@ -83,6 +84,16 @@ public class TestStateCheckPolicy {
         AlertBolt alertBolt = TestAlertBolt.createAlertBolt(collector);
         AlertBoltSpec spec = createAlertSpec();
         Map<String, StreamDefinition> definitionMap = createStreamMap();
+        
+        
+        List<PolicyDefinition> policies = 
mapper.readValue(TestStateCheckPolicy.class.getResourceAsStream("/statecheck/policies.json"),
+                new TypeReference<List<PolicyDefinition>>() {
+                });
+        List<StreamDefinition> streams = 
mapper.readValue(TestStateCheckPolicy.class.getResourceAsStream("/statecheck/streamdefinitions.json"),
+                new TypeReference<List<StreamDefinition>>() {
+                });
+        spec.addPublishPartition(streams.get(0).getStreamId(), 
policies.get(0).getName(), "testPublishBolt", null);
+        
         alertBolt.onAlertBoltSpecChange(spec, definitionMap);
 
         // send data now

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/70600b26/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/resources/publishments2.json
----------------------------------------------------------------------
diff --git 
a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/resources/publishments2.json
 
b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/resources/publishments2.json
index 6a4918a..96cc015 100644
--- 
a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/resources/publishments2.json
+++ 
b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/resources/publishments2.json
@@ -19,6 +19,7 @@
       "mail.connection": "tls",
       "mail.smtp.port": "587"
     },
-    "dedupIntervalMin": "PT0M"
+    "dedupIntervalMin": "PT0M",
+    "serializer": 
"org.apache.eagle.alert.engine.publisher.impl.StringEventSerializer"
   }
 ]
\ No newline at end of file

Reply via email to