Repository: incubator-eagle Updated Branches: refs/heads/master aeec7f389 -> 70600b260
EAGLE-794: support publish bolt parallelism Author: Li, Garrett Reviewer: ralphsu This closes #687 Project: http://git-wip-us.apache.org/repos/asf/incubator-eagle/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-eagle/commit/70600b26 Tree: http://git-wip-us.apache.org/repos/asf/incubator-eagle/tree/70600b26 Diff: http://git-wip-us.apache.org/repos/asf/incubator-eagle/diff/70600b26 Branch: refs/heads/master Commit: 70600b2609c20c3f1385e60813823abf48c63c03 Parents: aeec7f3 Author: Xiancheng Li <xiancheng...@ebay.com> Authored: Fri Nov 25 09:47:41 2016 +0800 Committer: Ralph, Su <suliang...@gmail.com> Committed: Mon Nov 28 20:37:22 2016 +0800 ---------------------------------------------------------------------- .../alert/coordination/model/AlertBoltSpec.java | 22 ++- .../engine/coordinator/PublishPartition.java | 121 +++++++++++++ .../alert/engine/coordinator/Publishment.java | 34 ++-- .../impl/MonitorMetadataGenerator.java | 18 +- .../impl/AlertBoltOutputCollectorWrapper.java | 54 +++++- .../alert/engine/publisher/AlertPublisher.java | 11 +- .../publisher/impl/AbstractPublishPlugin.java | 1 - .../publisher/impl/AlertPublisherImpl.java | 174 +++++++------------ .../engine/router/AlertBoltSpecListener.java | 4 +- .../eagle/alert/engine/runner/AlertBolt.java | 29 +++- .../alert/engine/runner/AlertPublisherBolt.java | 19 +- .../alert/engine/runner/UnitTopologyRunner.java | 3 +- .../alert/engine/router/TestAlertBolt.java | 13 +- .../engine/router/TestAlertPublisherBolt.java | 19 +- .../engine/statecheck/TestStateCheckPolicy.java | 15 +- .../src/test/resources/publishments2.json | 3 +- 16 files changed, 387 insertions(+), 153 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/70600b26/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/coordination/model/AlertBoltSpec.java ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/coordination/model/AlertBoltSpec.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/coordination/model/AlertBoltSpec.java index b3adda5..19a803b 100644 --- a/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/coordination/model/AlertBoltSpec.java +++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/coordination/model/AlertBoltSpec.java @@ -17,12 +17,17 @@ package org.apache.eagle.alert.coordination.model; import org.apache.eagle.alert.engine.coordinator.PolicyDefinition; +import org.apache.eagle.alert.engine.coordinator.PublishPartition; import com.fasterxml.jackson.annotation.JsonIgnore; +import com.google.common.base.Joiner; + import java.util.ArrayList; import java.util.HashMap; +import java.util.HashSet; import java.util.List; import java.util.Map; +import java.util.Set; /** * The alert specification for topology bolts. @@ -40,6 +45,8 @@ public class AlertBoltSpec { // mapping from boltId to list of PolicyDefinition's Ids private Map<String, List<String>> boltPolicyIdsMap = new HashMap<String, List<String>>(); + private Set<PublishPartition> publishPartitions = new HashSet<>(); + public AlertBoltSpec() { } @@ -87,6 +94,18 @@ public class AlertBoltSpec { } } + public Set<PublishPartition> getPublishPartitions() { + return publishPartitions; + } + + public void setPublishPartitions(Set<PublishPartition> publishPartitions) { + this.publishPartitions = publishPartitions; + } + + public void addPublishPartition(String streamId, String policyId, String publishId, Set<String> columns) { + this.publishPartitions.add(new PublishPartition(streamId, policyId, publishId, columns)); + } + @JsonIgnore public Map<String, List<PolicyDefinition>> getBoltPoliciesMap() { return boltPoliciesMap; @@ -107,7 +126,8 @@ public class AlertBoltSpec { @Override public String toString() { - return String.format("version:%s-topo:%s, boltPolicyIdsMap %s", version, topologyName, boltPolicyIdsMap); + return String.format("version:%s-topo:%s, boltPolicyIdsMap %s, publishPartitions %s", + version, topologyName, boltPolicyIdsMap, Joiner.on(",").join(publishPartitions)); } } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/70600b26/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/engine/coordinator/PublishPartition.java ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/engine/coordinator/PublishPartition.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/engine/coordinator/PublishPartition.java new file mode 100644 index 0000000..7e57f88 --- /dev/null +++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/engine/coordinator/PublishPartition.java @@ -0,0 +1,121 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.eagle.alert.engine.coordinator; + +import java.io.Serializable; +import java.util.HashSet; +import java.util.Set; + +import org.apache.commons.collections.CollectionUtils; +import org.apache.commons.lang3.builder.HashCodeBuilder; + +import com.fasterxml.jackson.annotation.JsonIgnore; +import com.google.common.base.Objects; + +public class PublishPartition implements Serializable { + + private static final long serialVersionUID = 2524776632955586234L; + + private String policyId; + private String streamId; + private String publishId; + private Set<String> columns = new HashSet<>(); + + @JsonIgnore + private Set<Object> columnValues = new HashSet<>(); + + public PublishPartition() { + } + + public PublishPartition(String streamId, String policyId, String publishId, Set<String> columns) { + this.streamId = streamId; + this.policyId = policyId; + this.publishId = publishId; + if (columns != null) { + this.columns = columns; + } + } + + @Override + public int hashCode() { + return new HashCodeBuilder().append(streamId).append(policyId).append(publishId).append(columns).append(columnValues).build(); + } + + @Override + public boolean equals(Object obj) { + return obj instanceof PublishPartition + && Objects.equal(this.streamId, ((PublishPartition) obj).getStreamId()) + && Objects.equal(this.policyId, ((PublishPartition) obj).getPolicyId()) + && Objects.equal(this.publishId, ((PublishPartition) obj).getPublishId()) + && CollectionUtils.isEqualCollection(this.columns, ((PublishPartition) obj).getColumns()) + && CollectionUtils.isEqualCollection(this.columnValues, ((PublishPartition) obj).getColumnValues()); + } + + @Override + public String toString() { + return String.format("PublishPartition[policyId=%s,streamId=%s,publishId=%s,columns=%s,columnValues=%s]", + policyId, streamId, publishId, columns, columnValues); + } + + @Override + public PublishPartition clone() { + return new PublishPartition(this.streamId, this.policyId, this.publishId, new HashSet<>(this.columns)); + } + + public String getPolicyId() { + return policyId; + } + + public void setPolicyId(String policyId) { + this.policyId = policyId; + } + + public String getStreamId() { + return streamId; + } + + public void setStreamId(String streamId) { + this.streamId = streamId; + } + + public String getPublishId() { + return publishId; + } + + public void setPublishId(String publishId) { + this.publishId = publishId; + } + + public Set<String> getColumns() { + return columns; + } + + public void setColumns(Set<String> columns) { + this.columns = columns; + } + + @JsonIgnore + public Set<Object> getColumnValues() { + return columnValues; + } + + @JsonIgnore + public void setColumnValues(Set<Object> columnValues) { + this.columnValues = columnValues; + } + +} http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/70600b26/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/engine/coordinator/Publishment.java ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/engine/coordinator/Publishment.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/engine/coordinator/Publishment.java index d1cc33a..74a3d69 100644 --- a/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/engine/coordinator/Publishment.java +++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/engine/coordinator/Publishment.java @@ -19,9 +19,11 @@ package org.apache.eagle.alert.engine.coordinator; import com.fasterxml.jackson.annotation.JsonIgnoreProperties; import org.apache.commons.lang3.builder.HashCodeBuilder; +import java.util.HashSet; import java.util.List; import java.util.Map; import java.util.Objects; +import java.util.Set; /** * @since Apr 11, 2016. @@ -29,6 +31,8 @@ import java.util.Objects; @JsonIgnoreProperties(ignoreUnknown = true) public class Publishment { + public static final String STREAM_NAME_DEFAULT = "_default"; + private String name; private String type; private List<String> policyIds; @@ -42,6 +46,16 @@ public class Publishment { // the class name to extend the IEventSerializer interface private String serializer; + private Set<String> partitionColumns = new HashSet<>(); + + public Set<String> getPartitionColumns() { + return partitionColumns; + } + + public void setPartitionColumns(Set<String> partitionColumns) { + this.partitionColumns = partitionColumns; + } + public String getName() { return name; } @@ -135,13 +149,13 @@ public class Publishment { if (obj instanceof Publishment) { Publishment p = (Publishment) obj; return (Objects.equals(name, p.getName()) && Objects.equals(type, p.getType()) - && Objects.equals(dedupIntervalMin, p.getDedupIntervalMin()) - && Objects.equals(dedupFields, p.getDedupFields()) - && Objects.equals(dedupStateField, p.getDedupStateField()) - && Objects.equals(overrideDeduplicator, p.getOverrideDeduplicator()) - && Objects.equals(policyIds, p.getPolicyIds()) - && Objects.equals(streamIds, p.getStreamIds()) - && properties.equals(p.getProperties())); + && Objects.equals(dedupIntervalMin, p.getDedupIntervalMin()) + && Objects.equals(dedupFields, p.getDedupFields()) + && Objects.equals(dedupStateField, p.getDedupStateField()) + && Objects.equals(overrideDeduplicator, p.getOverrideDeduplicator()) + && Objects.equals(policyIds, p.getPolicyIds()) + && Objects.equals(streamIds, p.getStreamIds()) + && properties.equals(p.getProperties())); } return false; } @@ -149,15 +163,15 @@ public class Publishment { @Override public int hashCode() { return new HashCodeBuilder().append(name).append(type).append(dedupIntervalMin).append(dedupFields) - .append(dedupStateField).append(overrideDeduplicator).append(policyIds).append(streamIds) - .append(properties).build(); + .append(dedupStateField).append(overrideDeduplicator).append(policyIds).append(streamIds) + .append(properties).build(); } @Override public String toString() { StringBuilder sb = new StringBuilder(); sb.append("Publishment[name:").append(name).append(",type:").append(type).append(",policyId:") - .append(policyIds).append(",properties:").append(properties); + .append(policyIds).append(",properties:").append(properties); return sb.toString(); } http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/70600b26/eagle-core/eagle-alert-parent/eagle-alert/alert-coordinator/src/main/java/org/apache/eagle/alert/coordinator/impl/MonitorMetadataGenerator.java ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-coordinator/src/main/java/org/apache/eagle/alert/coordinator/impl/MonitorMetadataGenerator.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-coordinator/src/main/java/org/apache/eagle/alert/coordinator/impl/MonitorMetadataGenerator.java index 3f64f86..fb20e66 100644 --- a/eagle-core/eagle-alert-parent/eagle-alert/alert-coordinator/src/main/java/org/apache/eagle/alert/coordinator/impl/MonitorMetadataGenerator.java +++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-coordinator/src/main/java/org/apache/eagle/alert/coordinator/impl/MonitorMetadataGenerator.java @@ -38,7 +38,6 @@ import java.util.Map; /** * Given current policy placement, figure out monitor metadata - * * <p>TODO: refactor to eliminate the duplicate of stupid if-notInMap-then-create.... * FIXME: too many duplicated code logic : check null; add list to map; add to list..</p> * @@ -136,6 +135,23 @@ public class MonitorMetadataGenerator { for (String policyName : boltUsage.getPolicies()) { PolicyDefinition definition = context.getPolicies().get(policyName); alertSpec.addBoltPolicy(boltUsage.getBoltId(), definition.getName()); + + for (Publishment publish : context.getPublishments().values()) { + if (!publish.getPolicyIds().contains(definition.getName())) { + continue; + } + + List<String> streamIds = new ArrayList<>(); + // add the publish to the bolt + if (publish.getStreamIds() == null || publish.getStreamIds().size() <= 0) { + streamIds.add(Publishment.STREAM_NAME_DEFAULT); + } else { + streamIds.addAll(publish.getStreamIds()); + } + for (String streamId : streamIds) { + alertSpec.addPublishPartition(streamId, policyName, publish.getName(), publish.getPartitionColumns()); + } + } } } } http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/70600b26/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/evaluator/impl/AlertBoltOutputCollectorWrapper.java ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/evaluator/impl/AlertBoltOutputCollectorWrapper.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/evaluator/impl/AlertBoltOutputCollectorWrapper.java index 042fca7..b1be2da 100755 --- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/evaluator/impl/AlertBoltOutputCollectorWrapper.java +++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/evaluator/impl/AlertBoltOutputCollectorWrapper.java @@ -5,9 +5,9 @@ * The ASF licenses this file to You under the Apache License, Version 2.0 * (the "License"); you may not use this file except in compliance with * the License. You may obtain a copy of the License at - * <p/> + * <p> * http://www.apache.org/licenses/LICENSE-2.0 - * <p/> + * <p> * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. @@ -16,29 +16,57 @@ */ package org.apache.eagle.alert.engine.evaluator.impl; +import java.util.Arrays; +import java.util.Collection; +import java.util.HashSet; +import java.util.Set; + import org.apache.eagle.alert.engine.AlertStreamCollector; import org.apache.eagle.alert.engine.StreamContext; +import org.apache.eagle.alert.engine.coordinator.PublishPartition; import org.apache.eagle.alert.engine.model.AlertStreamEvent; -import backtype.storm.task.OutputCollector; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; -import java.util.Arrays; +import backtype.storm.task.OutputCollector; public class AlertBoltOutputCollectorWrapper implements AlertStreamCollector { + + private static final Logger LOG = LoggerFactory.getLogger(AlertBoltOutputCollectorWrapper.class); + private final OutputCollector delegate; private final Object outputLock; private final StreamContext streamContext; - public AlertBoltOutputCollectorWrapper(OutputCollector outputCollector, Object outputLock, StreamContext streamContext) { + private volatile Set<PublishPartition> publishPartitions; + + public AlertBoltOutputCollectorWrapper(OutputCollector outputCollector, Object outputLock, + StreamContext streamContext) { this.delegate = outputCollector; this.outputLock = outputLock; this.streamContext = streamContext; + + this.publishPartitions = new HashSet<>(); } @Override public void emit(AlertStreamEvent event) { - synchronized (outputLock) { - streamContext.counter().scope("alert_count").incr(); - delegate.emit(Arrays.asList(event.getStreamId(), event)); + Set<PublishPartition> clonedPublishPartitions = new HashSet<>(publishPartitions); + for (PublishPartition publishPartition : clonedPublishPartitions) { + PublishPartition cloned = publishPartition.clone(); + for (String column : cloned.getColumns()) { + int columnIndex = event.getSchema().getColumnIndex(column); + if (columnIndex < 0) { + LOG.warn("Column {} is not found in stream {}", column, cloned.getStreamId()); + continue; + } + cloned.getColumnValues().add(event.getData()[columnIndex]); + } + + synchronized (outputLock) { + streamContext.counter().scope("alert_count").incr(); + delegate.emit(Arrays.asList(cloned, event)); + } } } @@ -49,6 +77,16 @@ public class AlertBoltOutputCollectorWrapper implements AlertStreamCollector { @Override public void close() { + } + public synchronized void onAlertBoltSpecChange(Collection<PublishPartition> addedPublishPartitions, + Collection<PublishPartition> removedPublishPartitions, + Collection<PublishPartition> modifiedPublishPartitions) { + Set<PublishPartition> clonedPublishPartitions = new HashSet<>(publishPartitions); + clonedPublishPartitions.addAll(addedPublishPartitions); + clonedPublishPartitions.removeAll(removedPublishPartitions); + clonedPublishPartitions.addAll(modifiedPublishPartitions); + publishPartitions = clonedPublishPartitions; } + } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/70600b26/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/AlertPublisher.java ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/AlertPublisher.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/AlertPublisher.java index cdd52db..9717e2b 100644 --- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/AlertPublisher.java +++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/AlertPublisher.java @@ -16,19 +16,22 @@ */ package org.apache.eagle.alert.engine.publisher; -import org.apache.eagle.alert.engine.model.AlertStreamEvent; -import com.typesafe.config.Config; - import java.io.Serializable; import java.util.Map; +import org.apache.eagle.alert.engine.coordinator.PublishPartition; +import org.apache.eagle.alert.engine.model.AlertStreamEvent; + +import com.typesafe.config.Config; + public interface AlertPublisher extends AlertPublishListener, Serializable { @SuppressWarnings("rawtypes") void init(Config config, Map stormConfig); String getName(); - void nextEvent(AlertStreamEvent event); + void nextEvent(PublishPartition partition, AlertStreamEvent event); void close(); + } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/70600b26/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/impl/AbstractPublishPlugin.java ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/impl/AbstractPublishPlugin.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/impl/AbstractPublishPlugin.java index f68ae52..b155bb8 100644 --- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/impl/AbstractPublishPlugin.java +++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/impl/AbstractPublishPlugin.java @@ -18,7 +18,6 @@ package org.apache.eagle.alert.engine.publisher.impl; import com.google.common.base.Joiner; import com.typesafe.config.Config; -import javafx.scene.control.Alert; import org.apache.commons.lang3.StringUtils; import org.apache.eagle.alert.engine.codec.IEventSerializer; import org.apache.eagle.alert.engine.coordinator.OverrideDeduplicatorSpec; http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/70600b26/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/impl/AlertPublisherImpl.java ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/impl/AlertPublisherImpl.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/impl/AlertPublisherImpl.java index 4709180..bbb062b 100644 --- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/impl/AlertPublisherImpl.java +++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/impl/AlertPublisherImpl.java @@ -17,17 +17,15 @@ package org.apache.eagle.alert.engine.publisher.impl; -import java.text.MessageFormat; import java.util.ArrayList; import java.util.HashMap; import java.util.HashSet; import java.util.List; import java.util.Map; -import java.util.Map.Entry; import java.util.Set; import java.util.concurrent.ConcurrentHashMap; -import org.apache.commons.lang3.StringUtils; +import org.apache.eagle.alert.engine.coordinator.PublishPartition; import org.apache.eagle.alert.engine.coordinator.Publishment; import org.apache.eagle.alert.engine.model.AlertStreamEvent; import org.apache.eagle.alert.engine.publisher.AlertPublishPlugin; @@ -35,7 +33,6 @@ import org.apache.eagle.alert.engine.publisher.AlertPublisher; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import com.google.common.base.Objects; import com.typesafe.config.Config; @SuppressWarnings("rawtypes") @@ -44,12 +41,9 @@ public class AlertPublisherImpl implements AlertPublisher { private static final long serialVersionUID = 4809983246198138865L; private static final Logger LOG = LoggerFactory.getLogger(AlertPublisherImpl.class); - private static final String STREAM_NAME_DEFAULT = "_default"; - private final String name; - private volatile Map<String, Set<String>> psPublishPluginMapping = new ConcurrentHashMap<>(1); - private volatile Map<String, AlertPublishPlugin> publishPluginMapping = new ConcurrentHashMap<>(1); + private volatile Map<PublishPartition, AlertPublishPlugin> publishPluginMapping = new ConcurrentHashMap<>(1); private Config config; private Map conf; @@ -69,42 +63,30 @@ public class AlertPublisherImpl implements AlertPublisher { } @Override - public void nextEvent(AlertStreamEvent event) { + public void nextEvent(PublishPartition partition, AlertStreamEvent event) { if (LOG.isDebugEnabled()) { LOG.debug(event.toString()); } - notifyAlert(event); + notifyAlert(partition, event); } - private void notifyAlert(AlertStreamEvent event) { - String policyId = event.getPolicyId(); - if (StringUtils.isEmpty(policyId)) { - LOG.warn("policyId cannot be null for event to be published"); + private void notifyAlert(PublishPartition partition, AlertStreamEvent event) { + // remove the column values for publish plugin match + partition.getColumnValues().clear(); + if (!publishPluginMapping.containsKey(partition)) { + LOG.warn("PublishPartition {} is not found in publish plugin map", partition); return; } - // use default stream name if specified stream publisher is not found - Set<String> pubIds = psPublishPluginMapping.get(getPolicyStreamUniqueId(policyId, event.getStreamId())); - if (pubIds == null) { - pubIds = psPublishPluginMapping.get(getPolicyStreamUniqueId(policyId)); - } - if (pubIds == null) { - LOG.warn("Policy {} Stream {} does *NOT* subscribe any publishment!", policyId, event.getStreamId()); + AlertPublishPlugin plugin = publishPluginMapping.get(partition); + if (plugin == null) { + LOG.warn("PublishPartition {} has problems while initializing publish plugin", partition); return; } - event.ensureAlertId(); - for (String pubId : pubIds) { - @SuppressWarnings("resource") - AlertPublishPlugin plugin = pubId != null ? publishPluginMapping.get(pubId) : null; - if (plugin == null) { - LOG.warn("Policy {} does *NOT* subscribe any publishment!", policyId); - continue; - } - try { - LOG.debug("Execute alert publisher {}", plugin.getClass().getCanonicalName()); - plugin.onAlert(event); - } catch (Exception ex) { - LOG.error("Fail invoking publisher's onAlert, continue ", ex); - } + try { + LOG.debug("Execute alert publisher {}", plugin.getClass().getCanonicalName()); + plugin.onAlert(event); + } catch (Exception ex) { + LOG.error("Fail invoking publisher's onAlert, continue ", ex); } } @@ -137,8 +119,7 @@ public class AlertPublisherImpl implements AlertPublisher { } // copy and swap to avoid concurrency issue - Map<String, Set<String>> newPSPublishPluginMapping = new HashMap<>(psPublishPluginMapping); - Map<String, AlertPublishPlugin> newPublishMap = new HashMap<>(publishPluginMapping); + Map<PublishPartition, AlertPublishPlugin> newPublishMap = new HashMap<>(publishPluginMapping); // added for (Publishment publishment : added) { @@ -146,8 +127,9 @@ public class AlertPublisherImpl implements AlertPublisher { AlertPublishPlugin plugin = AlertPublishPluginsFactory.createNotificationPlugin(publishment, config, conf); if (plugin != null) { - newPublishMap.put(publishment.getName(), plugin); - addPublishmentPoliciesStreams(newPSPublishPluginMapping, publishment.getPolicyIds(), publishment.getStreamIds(), publishment.getName()); + for (PublishPartition p : getPublishPartitions(publishment)) { + newPublishMap.put(p, plugin); + } } else { LOG.error("OnPublishChange alertPublisher {} failed due to invalid format", publishment); } @@ -155,94 +137,70 @@ public class AlertPublisherImpl implements AlertPublisher { //removed List<AlertPublishPlugin> toBeClosed = new ArrayList<>(); for (Publishment publishment : removed) { - String pubName = publishment.getName(); - removePublihsPoliciesStreams(newPSPublishPluginMapping, publishment.getPolicyIds(), pubName); - toBeClosed.add(newPublishMap.get(pubName)); - newPublishMap.remove(publishment.getName()); + AlertPublishPlugin plugin = null; + for (PublishPartition p : getPublishPartitions(publishment)) { + if (plugin == null) { + plugin = newPublishMap.remove(p); + } else { + newPublishMap.remove(p); + } + } + if (plugin != null) { + toBeClosed.add(plugin); + } } // updated - for (int i = 0; i < afterModified.size(); i++) { - String pubName = afterModified.get(i).getName(); - List<String> newPolicies = afterModified.get(i).getPolicyIds(); - List<String> newStreams = afterModified.get(i).getStreamIds(); - List<String> oldPolicies = beforeModified.get(i).getPolicyIds(); - List<String> oldStreams = beforeModified.get(i).getStreamIds(); - - if (!newPolicies.equals(oldPolicies) || !Objects.equal(newStreams, oldStreams)) { - // since both policy & stream may change, skip the compare and difference update - removePublihsPoliciesStreams(newPSPublishPluginMapping, oldPolicies, pubName); - addPublishmentPoliciesStreams(newPSPublishPluginMapping, newPolicies, newStreams, pubName); - } - Publishment newPub = afterModified.get(i); - + for (Publishment publishment : afterModified) { // for updated publishment, need to init them too - AlertPublishPlugin newPlugin = AlertPublishPluginsFactory.createNotificationPlugin(newPub, config, conf); - newPublishMap.replace(pubName, newPlugin); + AlertPublishPlugin newPlugin = AlertPublishPluginsFactory.createNotificationPlugin(publishment, config, conf); + if (newPlugin != null) { + AlertPublishPlugin plugin = null; + for (PublishPartition p : getPublishPartitions(publishment)) { + if (plugin == null) { + plugin = newPublishMap.get(p); + } + newPublishMap.put(p, newPlugin); + } + if (plugin != null) { + toBeClosed.add(plugin); + } + } else { + LOG.error("OnPublishChange alertPublisher {} failed due to invalid format", publishment); + } } // now do the swap publishPluginMapping = newPublishMap; - psPublishPluginMapping = newPSPublishPluginMapping; // safely close : it depend on plugin to check if want to wait all data to be flushed. closePlugins(toBeClosed); } + private Set<PublishPartition> getPublishPartitions(Publishment publish) { + List<String> streamIds = new ArrayList<>(); + // add the publish to the bolt + if (publish.getStreamIds() == null || publish.getStreamIds().size() <= 0) { + streamIds.add(Publishment.STREAM_NAME_DEFAULT); + } else { + streamIds.addAll(publish.getStreamIds()); + } + Set<PublishPartition> publishPartitions = new HashSet<>(); + for (String streamId : streamIds) { + for (String policyId : publish.getPolicyIds()) { + publishPartitions.add(new PublishPartition(streamId, policyId, publish.getName(), publish.getPartitionColumns())); + } + } + return publishPartitions; + } + private void closePlugins(List<AlertPublishPlugin> toBeClosed) { for (AlertPublishPlugin p : toBeClosed) { try { p.close(); } catch (Exception e) { - LOG.error(MessageFormat.format("Error when close publish plugin {}, {}!", p.getClass().getCanonicalName()), e); + LOG.error(String.format("Error when close publish plugin {}!", p.getClass().getCanonicalName()), e); } } } - private void addPublishmentPoliciesStreams(Map<String, Set<String>> newPSPublishPluginMapping, - List<String> addedPolicyIds, List<String> addedStreamIds, String pubName) { - if (addedPolicyIds == null || pubName == null) { - return; - } - - if (addedStreamIds == null || addedStreamIds.size() <= 0) { - addedStreamIds = new ArrayList<String>(); - addedStreamIds.add(STREAM_NAME_DEFAULT); - } - - for (String policyId : addedPolicyIds) { - for (String streamId : addedStreamIds) { - String psUniqueId = getPolicyStreamUniqueId(policyId, streamId); - newPSPublishPluginMapping.putIfAbsent(psUniqueId, new HashSet<>()); - newPSPublishPluginMapping.get(psUniqueId).add(pubName); - } - } - } - - private synchronized void removePublihsPoliciesStreams(Map<String, Set<String>> newPSPublishPluginMapping, - List<String> deletedPolicyIds, String pubName) { - if (deletedPolicyIds == null || pubName == null) { - return; - } - - for (String policyId : deletedPolicyIds) { - for (Entry<String, Set<String>> entry : newPSPublishPluginMapping.entrySet()) { - if (entry.getKey().startsWith("policyId:" + policyId)) { - entry.getValue().remove(pubName); - break; - } - } - } - } - - private String getPolicyStreamUniqueId(String policyId) { - return getPolicyStreamUniqueId(policyId, STREAM_NAME_DEFAULT); - } - - private String getPolicyStreamUniqueId(String policyId, String streamId) { - if (StringUtils.isBlank(streamId)) { - streamId = STREAM_NAME_DEFAULT; - } - return String.format("policyId:%s,streamId:%s", policyId, streamId); - } - } http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/70600b26/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/router/AlertBoltSpecListener.java ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/router/AlertBoltSpecListener.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/router/AlertBoltSpecListener.java index 84345dd..e1f3e9c 100644 --- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/router/AlertBoltSpecListener.java +++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/router/AlertBoltSpecListener.java @@ -19,11 +19,11 @@ package org.apache.eagle.alert.engine.router; +import java.util.Map; + import org.apache.eagle.alert.coordination.model.AlertBoltSpec; import org.apache.eagle.alert.engine.coordinator.StreamDefinition; -import java.util.Map; - /** * Since 5/2/16. */ http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/70600b26/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/runner/AlertBolt.java ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/runner/AlertBolt.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/runner/AlertBolt.java index 639d338..627a218 100755 --- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/runner/AlertBolt.java +++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/runner/AlertBolt.java @@ -16,18 +16,22 @@ */ package org.apache.eagle.alert.engine.runner; +import java.util.Collection; import java.util.HashMap; +import java.util.HashSet; import java.util.List; import java.util.Map; +import java.util.Set; import java.util.concurrent.ExecutorService; +import org.apache.commons.collections.CollectionUtils; import org.apache.eagle.alert.coordination.model.AlertBoltSpec; import org.apache.eagle.alert.coordination.model.WorkSlot; -import org.apache.eagle.alert.engine.AlertStreamCollector; import org.apache.eagle.alert.engine.StreamContextImpl; import org.apache.eagle.alert.engine.coordinator.IMetadataChangeNotifyService; import org.apache.eagle.alert.engine.coordinator.MetadataType; import org.apache.eagle.alert.engine.coordinator.PolicyDefinition; +import org.apache.eagle.alert.engine.coordinator.PublishPartition; import org.apache.eagle.alert.engine.coordinator.StreamDefinition; import org.apache.eagle.alert.engine.evaluator.PolicyGroupEvaluator; import org.apache.eagle.alert.engine.evaluator.impl.AlertBoltOutputCollectorWrapper; @@ -61,13 +65,15 @@ public class AlertBolt extends AbstractStreamBolt implements AlertBoltSpecListen private static final Logger LOG = LoggerFactory.getLogger(AlertBolt.class); private static final long serialVersionUID = -4132297691448945672L; private PolicyGroupEvaluator policyGroupEvaluator; - private AlertStreamCollector alertOutputCollector; + private AlertBoltOutputCollectorWrapper alertOutputCollector; private String boltId; private boolean logEventEnabled; private volatile Object outputLock; // mapping from policy name to PolicyDefinition private volatile Map<String, PolicyDefinition> cachedPolicies = new HashMap<>(); // for one streamGroup, there are multiple policies + private volatile Set<PublishPartition> cachedPublishPartitions = new HashSet<>(); + private AlertBoltSpec spec; public AlertBolt(String boltId, Config config, IMetadataChangeNotifyService changeNotifyService) { @@ -174,6 +180,7 @@ public class AlertBolt extends AbstractStreamBolt implements AlertBoltSpecListen super.cleanup(); } + @SuppressWarnings("unchecked") @Override public synchronized void onAlertBoltSpecChange(AlertBoltSpec spec, Map<String, StreamDefinition> sds) { List<PolicyDefinition> newPolicies = spec.getBoltPoliciesMap().get(boltId); @@ -189,6 +196,24 @@ public class AlertBolt extends AbstractStreamBolt implements AlertBoltSpecListen policyGroupEvaluator.onPolicyChange(comparator.getAdded(), comparator.getRemoved(), comparator.getModified(), sds); + // update alert output collector + Set<PublishPartition> tempPublishPartitions = new HashSet<>(); + spec.getPublishPartitions().forEach(p -> { + if (newPolicies.stream().filter(o -> o.getName().equals(p.getPolicyId())).count() > 0) { + tempPublishPartitions.add(p); + } + }); + + Collection<PublishPartition> addedPublishPartitions = CollectionUtils.subtract(tempPublishPartitions, cachedPublishPartitions); + Collection<PublishPartition> removedPublishPartitions = CollectionUtils.subtract(cachedPublishPartitions, tempPublishPartitions); + Collection<PublishPartition> modifiedPublishPartitions = CollectionUtils.intersection(tempPublishPartitions, cachedPublishPartitions); + + LOG.debug("added PublishPartition " + addedPublishPartitions); + LOG.debug("removed PublishPartition " + removedPublishPartitions); + LOG.debug("modified PublishPartition " + modifiedPublishPartitions); + + alertOutputCollector.onAlertBoltSpecChange(addedPublishPartitions, removedPublishPartitions, modifiedPublishPartitions); + // switch cachedPolicies = newPoliciesMap; sdf = sds; http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/70600b26/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/runner/AlertPublisherBolt.java ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/runner/AlertPublisherBolt.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/runner/AlertPublisherBolt.java index 323f682..72eafe4 100644 --- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/runner/AlertPublisherBolt.java +++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/runner/AlertPublisherBolt.java @@ -5,9 +5,9 @@ * The ASF licenses this file to You under the Apache License, Version 2.0 * (the "License"); you may not use this file except in compliance with * the License. You may obtain a copy of the License at - * <p/> + * <p> * http://www.apache.org/licenses/LICENSE-2.0 - * <p/> + * <p> * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. @@ -50,9 +50,16 @@ public class AlertPublisherBolt extends AbstractStreamBolt implements AlertPubli private volatile Map<String, PolicyDefinition> policyDefinitionMap; private volatile Map<String, StreamDefinition> streamDefinitionMap; + private boolean logEventEnabled; + private TopologyContext context; + public AlertPublisherBolt(String alertPublisherName, Config config, IMetadataChangeNotifyService coordinatorService) { super(alertPublisherName, coordinatorService, config); this.alertPublisher = new AlertPublisherImpl(alertPublisherName); + + if (config != null && config.hasPath("topology.logEventEnabled")) { + logEventEnabled = config.getBoolean("topology.logEventEnabled"); + } } @Override @@ -61,15 +68,20 @@ public class AlertPublisherBolt extends AbstractStreamBolt implements AlertPubli coordinatorService.init(config, MetadataType.ALERT_PUBLISH_BOLT); this.alertPublisher.init(config, stormConf); streamContext = new StreamContextImpl(config, context.registerMetric("eagle.publisher", new MultiCountMetric(), 60), context); + this.context = context; } @Override public void execute(Tuple input) { try { streamContext.counter().scope("receive_count"); + PublishPartition partition = (PublishPartition) input.getValueByField(AlertConstants.FIELD_0); AlertStreamEvent event = (AlertStreamEvent) input.getValueByField(AlertConstants.FIELD_1); + if (logEventEnabled) { + LOG.info("Alert publish bolt {}/{} with partition {} received event: {}", this.getBoltId(), this.context.getThisTaskId(), partition, event); + } wrapAlertPublishEvent(event); - alertPublisher.nextEvent(event); + alertPublisher.nextEvent(partition, event); this.collector.ack(input); streamContext.counter().scope("ack_count"); } catch (Exception ex) { @@ -123,6 +135,7 @@ public class AlertPublisherBolt extends AbstractStreamBolt implements AlertPubli this.streamDefinitionMap = sds; } + @SuppressWarnings("unchecked") private void wrapAlertPublishEvent(AlertStreamEvent event) { Map<String, Object> extraData = new HashedMap(); List<String> appIds = new ArrayList<>(); http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/70600b26/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/runner/UnitTopologyRunner.java ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/runner/UnitTopologyRunner.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/runner/UnitTopologyRunner.java index 3b40afd..287d5db 100755 --- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/runner/UnitTopologyRunner.java +++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/runner/UnitTopologyRunner.java @@ -205,8 +205,7 @@ public class UnitTopologyRunner { // connect alert bolt and alert publish bolt, this is the last bolt in the pipeline BoltDeclarer boltDeclarer = builder.setBolt(alertPublishBoltName, publisherBolt, numOfPublishExecutors).setNumTasks(numOfPublishTasks); for (int i = 0; i < numOfAlertBolts; i++) { - //boltDeclarer.fieldsGrouping(alertBoltNamePrefix + i, new Fields(AlertConstants.FIELD_0)); - boltDeclarer.shuffleGrouping(alertBoltNamePrefix + i); + boltDeclarer.fieldsGrouping(alertBoltNamePrefix + i, new Fields(AlertConstants.FIELD_0)); } return builder.createTopology(); http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/70600b26/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/router/TestAlertBolt.java ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/router/TestAlertBolt.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/router/TestAlertBolt.java index 6529f64..e6acc3e 100755 --- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/router/TestAlertBolt.java +++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/router/TestAlertBolt.java @@ -28,6 +28,7 @@ import com.typesafe.config.Config; import com.typesafe.config.ConfigFactory; import org.apache.eagle.alert.coordination.model.AlertBoltSpec; import org.apache.eagle.alert.engine.coordinator.PolicyDefinition; +import org.apache.eagle.alert.engine.coordinator.PublishPartition; import org.apache.eagle.alert.engine.coordinator.StreamColumn; import org.apache.eagle.alert.engine.coordinator.StreamDefinition; import org.apache.eagle.alert.engine.coordinator.StreamPartition; @@ -86,9 +87,10 @@ public class TestAlertBolt { public List<Integer> emit(String streamId, Collection<Tuple> anchors, List<Object> tuple) { alertCount.incrementAndGet(); mutex.release(); - Assert.assertEquals("testAlertStream", tuple.get(0)); + Assert.assertEquals("testAlertStream", ((PublishPartition) tuple.get(0)).getStreamId()); + Assert.assertEquals("testAlertPublish", ((PublishPartition) tuple.get(0)).getPublishId()); AlertStreamEvent event = (AlertStreamEvent) tuple.get(1); - System.out.println(String.format("collector received: [streamId=[%s], tuple=[%s] ", streamId, tuple)); + System.out.println(String.format("collector received: [streamId=[%s], tuple=[%s] ", ((PublishPartition) tuple.get(0)).getStreamId(), tuple)); return null; } @@ -107,6 +109,7 @@ public class TestAlertBolt { @Override public void reportError(Throwable error) { } + }); AlertBolt bolt = createAlertBolt(collector); @@ -141,6 +144,7 @@ public class TestAlertBolt { pd.getDefinition().value = "from cpuUsageStream[col1=='value1' OR col1=='value2'] select col1 insert into testAlertStream;"; spec.addBoltPolicy("alertBolt1", pd.getName()); spec.getBoltPoliciesMap().put("alertBolt1", new ArrayList<PolicyDefinition>(Arrays.asList(pd))); + spec.addPublishPartition("testAlertStream", "policy1", "testAlertPublish", null); bolt.onAlertBoltSpecChange(spec, sds); // contruct GeneralTopologyContext @@ -166,7 +170,8 @@ public class TestAlertBolt { event2.setData(data); event2.setStreamId(streamId); PartitionedEvent partitionedEvent2 = new PartitionedEvent(event2, sp, 1001); - + + Thread.sleep(3000); Tuple input = new TupleImpl(context, Collections.singletonList(partitionedEvent1), taskId, "default"); Tuple input2 = new TupleImpl(context, Collections.singletonList(partitionedEvent2), taskId, "default"); bolt.execute(input); @@ -426,6 +431,8 @@ public class TestAlertBolt { StreamDefinition sdTest = new StreamDefinition(); sdTest.setStreamId(TEST_STREAM); sds.put(sdTest.getStreamId(), sdTest); + + boltSpecs.addPublishPartition(TEST_STREAM, "policy-definition", "testAlertPublish", null); bolt.onAlertBoltSpecChange(boltSpecs, sds); http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/70600b26/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/router/TestAlertPublisherBolt.java ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/router/TestAlertPublisherBolt.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/router/TestAlertPublisherBolt.java index c95cab1..46517fe 100644 --- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/router/TestAlertPublisherBolt.java +++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/router/TestAlertPublisherBolt.java @@ -25,6 +25,7 @@ import java.util.Map; import org.apache.eagle.alert.coordination.model.PublishSpec; import org.apache.eagle.alert.engine.coordinator.PolicyDefinition; +import org.apache.eagle.alert.engine.coordinator.PublishPartition; import org.apache.eagle.alert.engine.coordinator.Publishment; import org.apache.eagle.alert.engine.coordinator.StreamColumn; import org.apache.eagle.alert.engine.coordinator.StreamDefinition; @@ -62,9 +63,11 @@ public class TestAlertPublisherBolt { PublishSpec spec = MetadataSerDeser.deserialize(getClass().getResourceAsStream("/testPublishSpec.json"), PublishSpec.class); publisher.onPublishChange(spec.getPublishments(), null, null, null); AlertStreamEvent event = create("testAlertStream"); - publisher.nextEvent(event); + publisher.nextEvent(new PublishPartition(event.getStreamId(), event.getPolicyId(), + spec.getPublishments().get(0).getName(), spec.getPublishments().get(0).getPartitionColumns()), event); AlertStreamEvent event1 = create("testAlertStream"); - publisher.nextEvent(event1); + publisher.nextEvent(new PublishPartition(event1.getStreamId(), event1.getPolicyId(), + spec.getPublishments().get(0).getName(), spec.getPublishments().get(0).getPartitionColumns()), event1); } private AlertStreamEvent create(String streamId) { @@ -152,9 +155,12 @@ public class TestAlertPublisherBolt { publisherBolt.onAlertPublishSpecChange(spec3, null); } + @SuppressWarnings("rawtypes") @Test public void testAlertPublisher() throws Exception { AlertPublisher alertPublisher = new AlertPublisherImpl("alert-publisher-test"); + Config config = ConfigFactory.load("application-test.conf"); + alertPublisher.init(config, new HashMap()); List<Publishment> oldPubs = loadEntities("/publishments1.json", Publishment.class); List<Publishment> newPubs = loadEntities("/publishments2.json", Publishment.class); alertPublisher.onPublishChange(oldPubs, null, null, null); @@ -286,9 +292,12 @@ public class TestAlertPublisherBolt { AlertStreamEvent event2 = createSeverityWithStreamDef("switch2", "testapp2", "Memory 2 inconsistency detected", "CRITICAL", "docId2", "ed02", "distribution switch", "us"); AlertStreamEvent event3 = createSeverityWithStreamDef("switch2", "testapp2", "Memory 3 inconsistency detected", "WARNING", "docId3", "ed02", "distribution switch", "us"); - publisher.nextEvent(event1); - publisher.nextEvent(event2); - publisher.nextEvent(event3); + publisher.nextEvent(new PublishPartition(event1.getStreamId(), event1.getPolicyId(), + pubs.get(0).getName(), pubs.get(0).getPartitionColumns()), event1); + publisher.nextEvent(new PublishPartition(event2.getStreamId(), event2.getPolicyId(), + pubs.get(0).getName(), pubs.get(0).getPartitionColumns()), event2); + publisher.nextEvent(new PublishPartition(event3.getStreamId(), event3.getPolicyId(), + pubs.get(0).getName(), pubs.get(0).getPartitionColumns()), event3); } } http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/70600b26/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/statecheck/TestStateCheckPolicy.java ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/statecheck/TestStateCheckPolicy.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/statecheck/TestStateCheckPolicy.java index 1fc54a9..4c56630 100644 --- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/statecheck/TestStateCheckPolicy.java +++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/statecheck/TestStateCheckPolicy.java @@ -24,6 +24,7 @@ import backtype.storm.tuple.Tuple; import backtype.storm.tuple.TupleImpl; import org.apache.eagle.alert.coordination.model.AlertBoltSpec; import org.apache.eagle.alert.engine.coordinator.PolicyDefinition; +import org.apache.eagle.alert.engine.coordinator.PublishPartition; import org.apache.eagle.alert.engine.coordinator.StreamDefinition; import org.apache.eagle.alert.engine.evaluator.impl.PolicyGroupEvaluatorImpl; import org.apache.eagle.alert.engine.model.AlertStreamEvent; @@ -57,9 +58,9 @@ public class TestStateCheckPolicy { @Override public List<Integer> emit(String streamId, Collection<Tuple> anchors, List<Object> tuple) { verified.set(true); - Assert.assertEquals("perfmon_latency_check_output2", tuple.get(0)); + Assert.assertEquals("perfmon_latency_stream", ((PublishPartition) tuple.get(0)).getStreamId()); AlertStreamEvent event = (AlertStreamEvent) tuple.get(1); - System.out.println(String.format("collector received: [streamId=[%s], tuple=[%s] ", streamId, tuple)); + System.out.println(String.format("collector received: [streamId=[%s], tuple=[%s] ", ((PublishPartition) tuple.get(0)).getStreamId(), tuple)); return null; } @@ -83,6 +84,16 @@ public class TestStateCheckPolicy { AlertBolt alertBolt = TestAlertBolt.createAlertBolt(collector); AlertBoltSpec spec = createAlertSpec(); Map<String, StreamDefinition> definitionMap = createStreamMap(); + + + List<PolicyDefinition> policies = mapper.readValue(TestStateCheckPolicy.class.getResourceAsStream("/statecheck/policies.json"), + new TypeReference<List<PolicyDefinition>>() { + }); + List<StreamDefinition> streams = mapper.readValue(TestStateCheckPolicy.class.getResourceAsStream("/statecheck/streamdefinitions.json"), + new TypeReference<List<StreamDefinition>>() { + }); + spec.addPublishPartition(streams.get(0).getStreamId(), policies.get(0).getName(), "testPublishBolt", null); + alertBolt.onAlertBoltSpecChange(spec, definitionMap); // send data now http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/70600b26/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/resources/publishments2.json ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/resources/publishments2.json b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/resources/publishments2.json index 6a4918a..96cc015 100644 --- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/resources/publishments2.json +++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/resources/publishments2.json @@ -19,6 +19,7 @@ "mail.connection": "tls", "mail.smtp.port": "587" }, - "dedupIntervalMin": "PT0M" + "dedupIntervalMin": "PT0M", + "serializer": "org.apache.eagle.alert.engine.publisher.impl.StringEventSerializer" } ] \ No newline at end of file