AMBARI-7480 - Alerts: data collection starts on install before config values are available (jonathanhurley)
Project: http://git-wip-us.apache.org/repos/asf/ambari/repo Commit: http://git-wip-us.apache.org/repos/asf/ambari/commit/ae9bf4c3 Tree: http://git-wip-us.apache.org/repos/asf/ambari/tree/ae9bf4c3 Diff: http://git-wip-us.apache.org/repos/asf/ambari/diff/ae9bf4c3 Branch: refs/heads/trunk Commit: ae9bf4c38705074821c853301c079f2f2c484ed7 Parents: 090ed51 Author: Jonathan Hurley <jhur...@hortonworks.com> Authored: Wed Sep 24 12:56:54 2014 -0700 Committer: Jonathan Hurley <jhur...@hortonworks.com> Committed: Wed Sep 24 13:49:59 2014 -0700 ---------------------------------------------------------------------- .../server/agent/AlertDefinitionCommand.java | 15 ++-- .../ambari/server/agent/HeartBeatHandler.java | 2 +- .../server/api/services/AmbariMetaInfo.java | 45 ++++++---- .../server/controller/ControllerModule.java | 4 + .../AlertDefinitionResourceProvider.java | 11 ++- .../AlertDefinitionRegistrationEvent.java | 57 ++++++++++++ .../ambari/server/events/AmbariEvent.java | 24 ++--- .../ambari/server/events/ClusterEvent.java | 50 +++++++++++ .../ambari/server/events/ServiceEvent.java | 2 +- .../listeners/AlertAggregateListener.java | 92 ++++++++------------ .../listeners/AlertLifecycleListener.java | 73 ++++++++++++++++ .../server/orm/dao/AlertDefinitionDAO.java | 40 +++++++++ .../state/alert/AggregateDefinitionMapping.java | 88 +++++++++++++++++++ .../server/state/alert/AlertDefinition.java | 20 +++++ .../state/alert/AlertDefinitionFactory.java | 3 +- .../server/state/alert/AlertDefinitionHash.java | 19 +++- .../AlertGroupResourceProviderTest.java | 2 +- .../apache/ambari/server/orm/OrmTestHelper.java | 2 +- .../server/orm/dao/AlertDefinitionDAOTest.java | 2 +- .../server/orm/dao/AlertDispatchDAOTest.java | 2 +- .../ambari/server/orm/dao/AlertsDAOTest.java | 6 +- .../state/cluster/AlertDataManagerTest.java | 59 ++++++++----- 22 files changed, 482 insertions(+), 136 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ambari/blob/ae9bf4c3/ambari-server/src/main/java/org/apache/ambari/server/agent/AlertDefinitionCommand.java ---------------------------------------------------------------------- diff --git a/ambari-server/src/main/java/org/apache/ambari/server/agent/AlertDefinitionCommand.java b/ambari-server/src/main/java/org/apache/ambari/server/agent/AlertDefinitionCommand.java index 5ae1741..fdd7f1d 100644 --- a/ambari-server/src/main/java/org/apache/ambari/server/agent/AlertDefinitionCommand.java +++ b/ambari-server/src/main/java/org/apache/ambari/server/agent/AlertDefinitionCommand.java @@ -34,6 +34,12 @@ import com.google.gson.annotations.SerializedName; * The {@link AlertDefinitionCommand} class is used to encapsulate the * {@link AlertDefinition}s that will be returned to an agent given a requested * hash. + * <p/> + * Commands must have {@link #addConfigs(ConfigHelper, Cluster)} invoked before + * being sent to the agent so that the definitions will have the required + * configuration data when they run. Failure to do this will cause the alerts to + * be scheduled and run, but the result will always be a failure since the + * parameterized properties they depend on will not be available. */ public class AlertDefinitionCommand extends AgentCommand { @SerializedName("clusterName") @@ -47,7 +53,7 @@ public class AlertDefinitionCommand extends AgentCommand { @SerializedName("alertDefinitions") private final List<AlertDefinition> m_definitions; - + @SerializedName("configurations") private Map<String, Map<String, String>> m_configurations; @@ -118,7 +124,7 @@ public class AlertDefinitionCommand extends AgentCommand { /** * Adds cluster configuration properties as required by commands sent to agent. - * + * * @param configHelper the helper * @param cluster the cluster, matching the cluster name specified by the command */ @@ -129,7 +135,7 @@ public class AlertDefinitionCommand extends AgentCommand { Map<String, Map<String, String>> allConfigTags = configHelper.getEffectiveDesiredTags(cluster, m_hostName); - + for(Config clusterConfig: cluster.getAllConfigs()) { if (null == clusterConfig) { // !!! hard to believe @@ -158,8 +164,5 @@ public class AlertDefinitionCommand extends AgentCommand { m_configurations.put(clusterConfig.getType(), props); } - } - - } http://git-wip-us.apache.org/repos/asf/ambari/blob/ae9bf4c3/ambari-server/src/main/java/org/apache/ambari/server/agent/HeartBeatHandler.java ---------------------------------------------------------------------- diff --git a/ambari-server/src/main/java/org/apache/ambari/server/agent/HeartBeatHandler.java b/ambari-server/src/main/java/org/apache/ambari/server/agent/HeartBeatHandler.java index a366301..c826153 100644 --- a/ambari-server/src/main/java/org/apache/ambari/server/agent/HeartBeatHandler.java +++ b/ambari-server/src/main/java/org/apache/ambari/server/agent/HeartBeatHandler.java @@ -831,8 +831,8 @@ public class HeartBeatHandler { String hash = alertDefinitionHash.getHash(clusterName, hostname); AlertDefinitionCommand command = new AlertDefinitionCommand(clusterName, hostname, hash, definitions); - command.addConfigs(configHelper, cluster); + command.addConfigs(configHelper, cluster); commands.add(command); } http://git-wip-us.apache.org/repos/asf/ambari/blob/ae9bf4c3/ambari-server/src/main/java/org/apache/ambari/server/api/services/AmbariMetaInfo.java ---------------------------------------------------------------------- diff --git a/ambari-server/src/main/java/org/apache/ambari/server/api/services/AmbariMetaInfo.java b/ambari-server/src/main/java/org/apache/ambari/server/api/services/AmbariMetaInfo.java index ec53afc..c15f73b 100644 --- a/ambari-server/src/main/java/org/apache/ambari/server/api/services/AmbariMetaInfo.java +++ b/ambari-server/src/main/java/org/apache/ambari/server/api/services/AmbariMetaInfo.java @@ -47,7 +47,8 @@ import org.apache.ambari.server.api.util.StackExtensionHelper; import org.apache.ambari.server.configuration.Configuration; import org.apache.ambari.server.customactions.ActionDefinition; import org.apache.ambari.server.customactions.ActionDefinitionManager; -import org.apache.ambari.server.events.listeners.AlertAggregateListener; +import org.apache.ambari.server.events.AlertDefinitionRegistrationEvent; +import org.apache.ambari.server.events.publishers.AmbariEventPublisher; import org.apache.ambari.server.orm.dao.AlertDefinitionDAO; import org.apache.ambari.server.orm.dao.MetainfoDAO; import org.apache.ambari.server.orm.entities.AlertDefinitionEntity; @@ -66,7 +67,6 @@ import org.apache.ambari.server.state.StackId; import org.apache.ambari.server.state.StackInfo; import org.apache.ambari.server.state.alert.AlertDefinition; import org.apache.ambari.server.state.alert.AlertDefinitionFactory; -import org.apache.ambari.server.state.alert.SourceType; import org.apache.ambari.server.state.stack.LatestRepoCallable; import org.apache.ambari.server.state.stack.MetricDefinition; import org.apache.ambari.server.state.stack.RepositoryXml; @@ -149,6 +149,16 @@ public class AmbariMetaInfo { */ private AlertDefinitionFactory alertDefinitionFactory; + /** + * Publishes the following events: + * <ul> + * <li>{@link AlertDefinitionRegistrationEvent} when new alerts are merged + * from the stack</li> + * </ul> + */ + @Inject + private AmbariEventPublisher eventPublisher; + // Required properties by stack version private final Map<StackId, Map<String, Map<String, PropertyInfo>>> requiredProperties = new HashMap<StackId, Map<String, Map<String, PropertyInfo>>>(); @@ -704,7 +714,7 @@ public class AmbariMetaInfo { return propertiesResult; } - + public Set<PropertyInfo> getStackProperties(String stackName, String version) throws AmbariException { StackInfo stackInfo = getStackInfo(stackName, version); @@ -742,27 +752,30 @@ public class AmbariMetaInfo { return propertyResult; } - + public Set<PropertyInfo> getStackPropertiesByName(String stackName, String version, String propertyName) throws AmbariException { Set<PropertyInfo> properties = getStackProperties(stackName, version); - if (properties.size() == 0) + if (properties.size() == 0) { throw new StackAccessException("stackName=" + stackName + ", stackVersion=" + version + ", propertyName=" + propertyName); + } Set<PropertyInfo> propertyResult = new HashSet<PropertyInfo>(); for (PropertyInfo property : properties) { - if (property.getName().equals(propertyName)) + if (property.getName().equals(propertyName)) { propertyResult.add(property); + } } - if (propertyResult.isEmpty()) + if (propertyResult.isEmpty()) { throw new StackAccessException("stackName=" + stackName + ", stackVersion=" + version + ", propertyName=" + propertyName); + } return propertyResult; } @@ -1280,18 +1293,16 @@ public class AmbariMetaInfo { } alertDefinitionDao.createOrUpdate(entity); } - - // all definitions have been resolved. pull and initialize the aggregates + + // all definition resolved; publish their registration for (AlertDefinitionEntity def : alertDefinitionDao.findAll(cluster.getClusterId())) { - if (def.getSourceType().equals(SourceType.AGGREGATE)) { - AlertDefinition realDef = alertDefinitionFactory.coerce(def); - - AlertAggregateListener listener = injector.getInstance(AlertAggregateListener.class); - - listener.addAggregateType(cluster.getClusterId(), realDef); - } + AlertDefinition realDef = alertDefinitionFactory.coerce(def); + + AlertDefinitionRegistrationEvent event = new AlertDefinitionRegistrationEvent( + cluster.getClusterId(), realDef); + + eventPublisher.publish(event); } - } } } http://git-wip-us.apache.org/repos/asf/ambari/blob/ae9bf4c3/ambari-server/src/main/java/org/apache/ambari/server/controller/ControllerModule.java ---------------------------------------------------------------------- diff --git a/ambari-server/src/main/java/org/apache/ambari/server/controller/ControllerModule.java b/ambari-server/src/main/java/org/apache/ambari/server/controller/ControllerModule.java index cfab1f8..da78639 100644 --- a/ambari-server/src/main/java/org/apache/ambari/server/controller/ControllerModule.java +++ b/ambari-server/src/main/java/org/apache/ambari/server/controller/ControllerModule.java @@ -53,6 +53,8 @@ import org.apache.ambari.server.controller.internal.HostResourceProvider; import org.apache.ambari.server.controller.internal.MemberResourceProvider; import org.apache.ambari.server.controller.internal.ServiceResourceProvider; import org.apache.ambari.server.controller.spi.ResourceProvider; +import org.apache.ambari.server.events.listeners.AlertAggregateListener; +import org.apache.ambari.server.events.listeners.AlertLifecycleListener; import org.apache.ambari.server.events.listeners.AlertReceivedListener; import org.apache.ambari.server.events.listeners.AlertServiceStateListener; import org.apache.ambari.server.events.listeners.AlertStateChangedListener; @@ -337,5 +339,7 @@ public class ControllerModule extends AbstractModule { bind(AlertReceivedListener.class).asEagerSingleton(); bind(AlertStateChangedListener.class).asEagerSingleton(); bind(AlertServiceStateListener.class).asEagerSingleton(); + bind(AlertLifecycleListener.class).asEagerSingleton(); + bind(AlertAggregateListener.class).asEagerSingleton(); } } http://git-wip-us.apache.org/repos/asf/ambari/blob/ae9bf4c3/ambari-server/src/main/java/org/apache/ambari/server/controller/internal/AlertDefinitionResourceProvider.java ---------------------------------------------------------------------- diff --git a/ambari-server/src/main/java/org/apache/ambari/server/controller/internal/AlertDefinitionResourceProvider.java b/ambari-server/src/main/java/org/apache/ambari/server/controller/internal/AlertDefinitionResourceProvider.java index 7cb9886..e2d2837 100644 --- a/ambari-server/src/main/java/org/apache/ambari/server/controller/internal/AlertDefinitionResourceProvider.java +++ b/ambari-server/src/main/java/org/apache/ambari/server/controller/internal/AlertDefinitionResourceProvider.java @@ -330,7 +330,7 @@ public class AlertDefinitionResourceProvider extends AbstractControllerResourceP if (null != desiredScope && desiredScope.length() > 0) { scope = Scope.valueOf(desiredScope); } - + SourceType sourceType = null; if (null != type && type.length() > 0) { sourceType = SourceType.valueOf(type); @@ -473,18 +473,17 @@ public class AlertDefinitionResourceProvider extends AbstractControllerResourceP private Resource toResource(boolean isCollection, String clusterName, AlertDefinitionEntity entity, Set<String> requestedIds) { Resource resource = new ResourceImpl(Resource.Type.AlertDefinition); + resource.setProperty(ALERT_DEF_ID, entity.getDefinitionId()); + resource.setProperty(ALERT_DEF_CLUSTER_NAME, clusterName); + resource.setProperty(ALERT_DEF_NAME, entity.getDefinitionName()); + resource.setProperty(ALERT_DEF_LABEL, entity.getLabel()); - setResourceProperty(resource, ALERT_DEF_CLUSTER_NAME, clusterName, requestedIds); - setResourceProperty(resource, ALERT_DEF_ID, entity.getDefinitionId(), requestedIds); - setResourceProperty(resource, ALERT_DEF_NAME, entity.getDefinitionName(), requestedIds); setResourceProperty(resource, ALERT_DEF_INTERVAL, entity.getScheduleInterval(), requestedIds); setResourceProperty(resource, ALERT_DEF_SERVICE_NAME, entity.getServiceName(), requestedIds); setResourceProperty(resource, ALERT_DEF_COMPONENT_NAME, entity.getComponentName(), requestedIds); setResourceProperty(resource, ALERT_DEF_ENABLED, Boolean.valueOf(entity.getEnabled()), requestedIds); setResourceProperty(resource, ALERT_DEF_SCOPE, entity.getScope(), requestedIds); setResourceProperty(resource, ALERT_DEF_SOURCE_TYPE, entity.getSourceType(), requestedIds); - setResourceProperty(resource, ALERT_DEF_LABEL, entity.getLabel(), - requestedIds); if (!isCollection && null != resource.getPropertyValue(ALERT_DEF_SOURCE_TYPE)) { http://git-wip-us.apache.org/repos/asf/ambari/blob/ae9bf4c3/ambari-server/src/main/java/org/apache/ambari/server/events/AlertDefinitionRegistrationEvent.java ---------------------------------------------------------------------- diff --git a/ambari-server/src/main/java/org/apache/ambari/server/events/AlertDefinitionRegistrationEvent.java b/ambari-server/src/main/java/org/apache/ambari/server/events/AlertDefinitionRegistrationEvent.java new file mode 100644 index 0000000..4b68ae9 --- /dev/null +++ b/ambari-server/src/main/java/org/apache/ambari/server/events/AlertDefinitionRegistrationEvent.java @@ -0,0 +1,57 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.ambari.server.events; + +import org.apache.ambari.server.state.alert.AlertDefinition; + +/** + * The {@link AlertDefinitionRegistrationEvent} is used to represent that an + * {@link AlertDefinition} is now a part of the system. This usually happens at + * startup when the alerts are being merged from the stack. It also occurs when + * a new alert is created via the REST APIs or when a new service is installed. + */ +public class AlertDefinitionRegistrationEvent extends ClusterEvent { + + /** + * The newly registered alert defintiion + */ + private final AlertDefinition m_definition; + + /** + * Constructor. + * + * @param clusterId + * the ID of the cluster that the definition is in. + * @param definition + * the alert definition being registered. + */ + public AlertDefinitionRegistrationEvent( + long clusterId, AlertDefinition definition) { + super(AmbariEventType.ALERT_DEFINITION_REGISTRATION, clusterId); + m_definition = definition; + } + + /** + * Get the registered alert definition. + * + * @return the alert definition (not {@code null}). + */ + public AlertDefinition getDefinition() { + return m_definition; + } +} http://git-wip-us.apache.org/repos/asf/ambari/blob/ae9bf4c3/ambari-server/src/main/java/org/apache/ambari/server/events/AmbariEvent.java ---------------------------------------------------------------------- diff --git a/ambari-server/src/main/java/org/apache/ambari/server/events/AmbariEvent.java b/ambari-server/src/main/java/org/apache/ambari/server/events/AmbariEvent.java index dda18c7..6cf752e 100644 --- a/ambari-server/src/main/java/org/apache/ambari/server/events/AmbariEvent.java +++ b/ambari-server/src/main/java/org/apache/ambari/server/events/AmbariEvent.java @@ -30,7 +30,12 @@ public abstract class AmbariEvent { /** * A service was successfully installed. */ - SERVICE_INSTALL_SUCCESS; + SERVICE_INSTALL_SUCCESS, + + /** + * An alert definition is registered with the system. + */ + ALERT_DEFINITION_REGISTRATION; } /** @@ -39,29 +44,14 @@ public abstract class AmbariEvent { protected final AmbariEventType m_eventType; /** - * The cluster ID. - */ - protected final long m_clusterId; - - /** * Constructor. * * @param eventType * the type of event (not {@code null}). * @param clusterId */ - public AmbariEvent(AmbariEventType eventType, long clusterId) { + public AmbariEvent(AmbariEventType eventType) { m_eventType = eventType; - m_clusterId = clusterId; - } - - /** - * Gets the cluster ID that the event belongs to. - * - * @return the ID of the cluster. - */ - public long getClusterId() { - return m_clusterId; } /** http://git-wip-us.apache.org/repos/asf/ambari/blob/ae9bf4c3/ambari-server/src/main/java/org/apache/ambari/server/events/ClusterEvent.java ---------------------------------------------------------------------- diff --git a/ambari-server/src/main/java/org/apache/ambari/server/events/ClusterEvent.java b/ambari-server/src/main/java/org/apache/ambari/server/events/ClusterEvent.java new file mode 100644 index 0000000..8f39ac3 --- /dev/null +++ b/ambari-server/src/main/java/org/apache/ambari/server/events/ClusterEvent.java @@ -0,0 +1,50 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.ambari.server.events; + +/** + * The {@link ClusterEvent} represents all events in Ambari that can be scoped + * within a cluster. + */ +public class ClusterEvent extends AmbariEvent { + /** + * The cluster ID. + */ + protected final long m_clusterId; + + /** + * Constructor. + * + * @param eventType + * @param clusterId + */ + public ClusterEvent(AmbariEventType eventType, long clusterId) { + super(eventType); + m_clusterId = clusterId; + } + + /** + * Gets the cluster ID that the event belongs to. + * + * @return the ID of the cluster. + */ + public long getClusterId() { + return m_clusterId; + } + +} http://git-wip-us.apache.org/repos/asf/ambari/blob/ae9bf4c3/ambari-server/src/main/java/org/apache/ambari/server/events/ServiceEvent.java ---------------------------------------------------------------------- diff --git a/ambari-server/src/main/java/org/apache/ambari/server/events/ServiceEvent.java b/ambari-server/src/main/java/org/apache/ambari/server/events/ServiceEvent.java index 5e01431..3bc5c17 100644 --- a/ambari-server/src/main/java/org/apache/ambari/server/events/ServiceEvent.java +++ b/ambari-server/src/main/java/org/apache/ambari/server/events/ServiceEvent.java @@ -21,7 +21,7 @@ package org.apache.ambari.server.events; /** * The {@link ServiceEvent} class is the base for all service events in Ambari. */ -public abstract class ServiceEvent extends AmbariEvent { +public abstract class ServiceEvent extends ClusterEvent { /** * The name of the service. http://git-wip-us.apache.org/repos/asf/ambari/blob/ae9bf4c3/ambari-server/src/main/java/org/apache/ambari/server/events/listeners/AlertAggregateListener.java ---------------------------------------------------------------------- diff --git a/ambari-server/src/main/java/org/apache/ambari/server/events/listeners/AlertAggregateListener.java b/ambari-server/src/main/java/org/apache/ambari/server/events/listeners/AlertAggregateListener.java index 8340c26..a0c8e46 100644 --- a/ambari-server/src/main/java/org/apache/ambari/server/events/listeners/AlertAggregateListener.java +++ b/ambari-server/src/main/java/org/apache/ambari/server/events/listeners/AlertAggregateListener.java @@ -18,10 +18,6 @@ package org.apache.ambari.server.events.listeners; import java.text.MessageFormat; -import java.util.HashMap; -import java.util.Map; -import java.util.regex.Matcher; -import java.util.regex.Pattern; import org.apache.ambari.server.events.AlertReceivedEvent; import org.apache.ambari.server.events.publishers.AlertEventPublisher; @@ -29,116 +25,100 @@ import org.apache.ambari.server.orm.dao.AlertSummaryDTO; import org.apache.ambari.server.orm.dao.AlertsDAO; import org.apache.ambari.server.state.Alert; import org.apache.ambari.server.state.AlertState; +import org.apache.ambari.server.state.alert.AggregateDefinitionMapping; import org.apache.ambari.server.state.alert.AggregateSource; import org.apache.ambari.server.state.alert.AlertDefinition; import org.apache.ambari.server.state.alert.Reporting; -import org.apache.ambari.server.state.alert.Reporting.ReportTemplate; +import org.apache.ambari.server.state.alert.SourceType; import com.google.common.eventbus.Subscribe; import com.google.inject.Inject; +import com.google.inject.Singleton; /** - * Used to aggregate alerts. + * The {@link AlertAggregateListener} is used to listen for all incoming + * {@link AlertReceivedEvent} instances and determine if there exists a + * {@link SourceType#AGGREGATE} alert that needs to run. */ +@Singleton public class AlertAggregateListener { @Inject private AlertsDAO m_alertsDao = null; - private AlertEventPublisher m_publisher = null; - private Map<Long, Map<String, AlertDefinition>> m_aggregateMap = - new HashMap<Long, Map<String, AlertDefinition>>(); - + /** + * The event publisher used to receive incoming events and publish new events + * when an aggregate alert is run. + */ + private final AlertEventPublisher m_publisher; + + /** + * Used for quick lookups of aggregate alerts. + */ + @Inject + private AggregateDefinitionMapping m_aggregateMapping; + @Inject public AlertAggregateListener(AlertEventPublisher publisher) { m_publisher = publisher; - - publisher.register(this); + m_publisher.register(this); } - + /** * Consume an alert that was received. */ @Subscribe public void onAlertEvent(AlertReceivedEvent event) { - AlertDefinition def = getAggregateDefinition(event.getClusterId(), event.getAlert().getName()); - + AlertDefinition def = m_aggregateMapping.getAggregateDefinition( + event.getClusterId(), event.getAlert().getName()); + if (null == def || null == m_alertsDao) { return; } - + AggregateSource as = (AggregateSource) def.getSource(); - + AlertSummaryDTO summary = m_alertsDao.findAggregateCounts( event.getClusterId(), as.getAlertName()); - + Alert alert = new Alert(def.getName(), null, def.getServiceName(), null, null, AlertState.UNKNOWN); + alert.setLabel(def.getLabel()); alert.setTimestamp(System.currentTimeMillis()); - + if (0 == summary.getOkCount()) { alert.setText("Cannot determine, there are no records"); } else if (summary.getUnknownCount() > 0) { alert.setText("There are alerts with status UNKNOWN."); } else { Reporting reporting = as.getReporting(); - + int numerator = summary.getCriticalCount() + summary.getWarningCount(); int denominator = summary.getOkCount(); double value = (double)(numerator) / denominator; - + if (value > reporting.getCritical().getValue().doubleValue()) { alert.setState(AlertState.CRITICAL); alert.setText(MessageFormat.format(reporting.getCritical().getText(), Integer.valueOf(denominator), Integer.valueOf(numerator))); - + } else if (value > reporting.getWarning().getValue().doubleValue()) { alert.setState(AlertState.WARNING); alert.setText(MessageFormat.format(reporting.getWarning().getText(), Integer.valueOf(denominator), Integer.valueOf(numerator))); - + } else { alert.setState(AlertState.OK); alert.setText(MessageFormat.format(reporting.getOk().getText(), Integer.valueOf(denominator), Integer.valueOf(numerator))); } - + } - + // make a new event and allow others to consume it AlertReceivedEvent aggEvent = new AlertReceivedEvent(event.getClusterId(), alert); - - m_publisher.publish(aggEvent); - } - - private AlertDefinition getAggregateDefinition(long clusterId, String name) { - Long id = Long.valueOf(clusterId); - if (!m_aggregateMap.containsKey(id)) - return null; - - if (!m_aggregateMap.get(id).containsKey(name)) - return null; - - return m_aggregateMap.get(id).get(name); - } - /** - * @param source the aggregate source - */ - public void addAggregateType(long clusterId, AlertDefinition definition) { - Long id = Long.valueOf(clusterId); - - if (!m_aggregateMap.containsKey(id)) { - m_aggregateMap.put(id, new HashMap<String, AlertDefinition>()); - } - - Map<String, AlertDefinition> map = m_aggregateMap.get(id); - - AggregateSource as = (AggregateSource) definition.getSource(); - - map.put(as.getAlertName(), definition); + m_publisher.publish(aggEvent); } - - } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/ambari/blob/ae9bf4c3/ambari-server/src/main/java/org/apache/ambari/server/events/listeners/AlertLifecycleListener.java ---------------------------------------------------------------------- diff --git a/ambari-server/src/main/java/org/apache/ambari/server/events/listeners/AlertLifecycleListener.java b/ambari-server/src/main/java/org/apache/ambari/server/events/listeners/AlertLifecycleListener.java new file mode 100644 index 0000000..43d4b35 --- /dev/null +++ b/ambari-server/src/main/java/org/apache/ambari/server/events/listeners/AlertLifecycleListener.java @@ -0,0 +1,73 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.ambari.server.events.listeners; + +import org.apache.ambari.server.events.AlertDefinitionRegistrationEvent; +import org.apache.ambari.server.events.publishers.AmbariEventPublisher; +import org.apache.ambari.server.state.alert.AggregateDefinitionMapping; +import org.apache.ambari.server.state.alert.AlertDefinition; +import org.apache.ambari.server.state.alert.SourceType; + +import com.google.common.eventbus.AllowConcurrentEvents; +import com.google.common.eventbus.Subscribe; +import com.google.inject.Inject; +import com.google.inject.Singleton; + +/** + * The {@link AlertLifecycleListener} handles events that are part of the alert + * infrastructure lifecycle such as definition registration events. + */ +@Singleton +public class AlertLifecycleListener { + + /** + * Used for quick lookups of aggregate alerts. + */ + @Inject + private AggregateDefinitionMapping m_aggregateMapping; + + /** + * Constructor. + * + * @param publisher + */ + @Inject + public AlertLifecycleListener(AmbariEventPublisher publisher) { + publisher.register(this); + } + + /** + * Handles {@link AlertDefinitionRegistrationEvent} by performing the + * following tasks: + * <ul> + * <li>Registration with {@link AggregateDefinitionMapping}</li> + * </ul> + * + * @param event + * the event being handled. + */ + @Subscribe + @AllowConcurrentEvents + public void onAmbariEvent(AlertDefinitionRegistrationEvent event) { + AlertDefinition definition = event.getDefinition(); + + if (definition.getSource().getType() == SourceType.AGGREGATE) { + m_aggregateMapping.addAggregateType(event.getClusterId(), definition); + } + } +} http://git-wip-us.apache.org/repos/asf/ambari/blob/ae9bf4c3/ambari-server/src/main/java/org/apache/ambari/server/orm/dao/AlertDefinitionDAO.java ---------------------------------------------------------------------- diff --git a/ambari-server/src/main/java/org/apache/ambari/server/orm/dao/AlertDefinitionDAO.java b/ambari-server/src/main/java/org/apache/ambari/server/orm/dao/AlertDefinitionDAO.java index 570f268..075ee04 100644 --- a/ambari-server/src/main/java/org/apache/ambari/server/orm/dao/AlertDefinitionDAO.java +++ b/ambari-server/src/main/java/org/apache/ambari/server/orm/dao/AlertDefinitionDAO.java @@ -25,9 +25,15 @@ import javax.persistence.EntityManager; import javax.persistence.TypedQuery; import org.apache.ambari.server.controller.RootServiceResponseFactory; +import org.apache.ambari.server.events.AlertDefinitionRegistrationEvent; +import org.apache.ambari.server.events.publishers.AmbariEventPublisher; import org.apache.ambari.server.orm.entities.AlertDefinitionEntity; import org.apache.ambari.server.orm.entities.AlertGroupEntity; +import org.apache.ambari.server.state.alert.AlertDefinition; +import org.apache.ambari.server.state.alert.AlertDefinitionFactory; import org.apache.ambari.server.state.alert.Scope; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import com.google.inject.Inject; import com.google.inject.Provider; @@ -41,6 +47,11 @@ import com.google.inject.persist.Transactional; @Singleton public class AlertDefinitionDAO { /** + * Logger. + */ + private static Logger LOG = LoggerFactory.getLogger(AlertDefinitionDAO.class); + + /** * JPA entity manager */ @Inject @@ -65,6 +76,23 @@ public class AlertDefinitionDAO { AlertDispatchDAO dispatchDao; /** + * Publishes the following events: + * <ul> + * <li>{@link AlertDefinitionRegistrationEvent} when new alerts are merged + * from the stack</li> + * </ul> + */ + @Inject + private AmbariEventPublisher eventPublisher; + + /** + * A factory that assists in the creation of {@link AlertDefinition} and + * {@link AlertDefinitionEntity}. + */ + @Inject + private AlertDefinitionFactory alertDefinitionFactory; + + /** * Gets an alert definition with the specified ID. * * @param definitionId @@ -264,6 +292,18 @@ public class AlertDefinitionDAO { group.addAlertDefinition(alertDefinition); dispatchDao.merge(group); } + + // publish the alert definition registration + AlertDefinition coerced = alertDefinitionFactory.coerce(alertDefinition); + if (null != coerced) { + AlertDefinitionRegistrationEvent event = new AlertDefinitionRegistrationEvent( + alertDefinition.getClusterId(), coerced); + + eventPublisher.publish(event); + } else { + LOG.warn("Unable to broadcast alert registration event for {}", + alertDefinition.getDefinitionName()); + } } /** http://git-wip-us.apache.org/repos/asf/ambari/blob/ae9bf4c3/ambari-server/src/main/java/org/apache/ambari/server/state/alert/AggregateDefinitionMapping.java ---------------------------------------------------------------------- diff --git a/ambari-server/src/main/java/org/apache/ambari/server/state/alert/AggregateDefinitionMapping.java b/ambari-server/src/main/java/org/apache/ambari/server/state/alert/AggregateDefinitionMapping.java new file mode 100644 index 0000000..04f20f9 --- /dev/null +++ b/ambari-server/src/main/java/org/apache/ambari/server/state/alert/AggregateDefinitionMapping.java @@ -0,0 +1,88 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.ambari.server.state.alert; + +import java.util.HashMap; +import java.util.Map; + +import com.google.inject.Singleton; + +/** + * The {@link AggregateDefinitionMapping} is used to keep an in-memory mapping + * of all of the {@link AlertDefinition}s that have aggregate definitions + * associated with them. + */ +@Singleton +public final class AggregateDefinitionMapping { + /** + * In-memory mapping of cluster ID to definition name / aggregate definition. + * This is used for fast lookups when receiving events. + */ + private Map<Long, Map<String, AlertDefinition>> m_aggregateMap = new HashMap<Long, Map<String, AlertDefinition>>(); + + /** + * Constructor. + * + */ + public AggregateDefinitionMapping() { + } + + /** + * Gets an aggregate definition based on a given alert definition name. + * + * @param clusterId + * the ID of the cluster that the definition is bound to. + * @param name + * the unique name of the definition. + * @return the aggregate definition, or {@code null} if none. + */ + public AlertDefinition getAggregateDefinition(long clusterId, String name) { + Long id = Long.valueOf(clusterId); + if (!m_aggregateMap.containsKey(id)) { + return null; + } + + if (!m_aggregateMap.get(id).containsKey(name)) { + return null; + } + + return m_aggregateMap.get(id).get(name); + } + + /** + * Adds a mapping for a new aggregate definition. + * + * @param clusterId + * the ID of the cluster that the definition is bound to. + * @param name + * the unique name of the definition. + */ + public void addAggregateType(long clusterId, AlertDefinition definition) { + Long id = Long.valueOf(clusterId); + + if (!m_aggregateMap.containsKey(id)) { + m_aggregateMap.put(id, new HashMap<String, AlertDefinition>()); + } + + Map<String, AlertDefinition> map = m_aggregateMap.get(id); + + AggregateSource as = (AggregateSource) definition.getSource(); + + map.put(as.getAlertName(), definition); + } +} http://git-wip-us.apache.org/repos/asf/ambari/blob/ae9bf4c3/ambari-server/src/main/java/org/apache/ambari/server/state/alert/AlertDefinition.java ---------------------------------------------------------------------- diff --git a/ambari-server/src/main/java/org/apache/ambari/server/state/alert/AlertDefinition.java b/ambari-server/src/main/java/org/apache/ambari/server/state/alert/AlertDefinition.java index f227c0a..5058e91 100644 --- a/ambari-server/src/main/java/org/apache/ambari/server/state/alert/AlertDefinition.java +++ b/ambari-server/src/main/java/org/apache/ambari/server/state/alert/AlertDefinition.java @@ -43,6 +43,7 @@ public class AlertDefinition { private boolean enabled = true; private Source source = null; private String label = null; + private String uuid = null; /** * @return the service name @@ -150,6 +151,25 @@ public class AlertDefinition { } /** + * Sets the UUID of the definition + * + * @param definitionUuid + */ + public void setUuid(String definitionUuid) { + uuid = definitionUuid; + } + + /** + * Gets the UUID of the definition. The UUID is a unique identifier that is + * generated every time the definition's state changes. + * + * @return the UUID + */ + public String getUuid() { + return uuid; + } + + /** * Compares {@link #equals(Object)} of every field. This is used mainly for * reconciling the stack versus the database. * http://git-wip-us.apache.org/repos/asf/ambari/blob/ae9bf4c3/ambari-server/src/main/java/org/apache/ambari/server/state/alert/AlertDefinitionFactory.java ---------------------------------------------------------------------- diff --git a/ambari-server/src/main/java/org/apache/ambari/server/state/alert/AlertDefinitionFactory.java b/ambari-server/src/main/java/org/apache/ambari/server/state/alert/AlertDefinitionFactory.java index f8e44bd..4f6a9a3 100644 --- a/ambari-server/src/main/java/org/apache/ambari/server/state/alert/AlertDefinitionFactory.java +++ b/ambari-server/src/main/java/org/apache/ambari/server/state/alert/AlertDefinitionFactory.java @@ -132,6 +132,7 @@ public class AlertDefinitionFactory { definition.setScope(entity.getScope()); definition.setServiceName(entity.getServiceName()); definition.setLabel(entity.getLabel()); + definition.setUuid(entity.getHash()); try{ String sourceJson = entity.getSource(); @@ -139,7 +140,7 @@ public class AlertDefinitionFactory { definition.setSource(source); } catch (Exception exception) { LOG.error( - "Unable to deserialized the alert definition source during coercion", + "Unable to deserialize the alert definition source during coercion", exception); return null; http://git-wip-us.apache.org/repos/asf/ambari/blob/ae9bf4c3/ambari-server/src/main/java/org/apache/ambari/server/state/alert/AlertDefinitionHash.java ---------------------------------------------------------------------- diff --git a/ambari-server/src/main/java/org/apache/ambari/server/state/alert/AlertDefinitionHash.java b/ambari-server/src/main/java/org/apache/ambari/server/state/alert/AlertDefinitionHash.java index 0a1c73a..9ea039c 100644 --- a/ambari-server/src/main/java/org/apache/ambari/server/state/alert/AlertDefinitionHash.java +++ b/ambari-server/src/main/java/org/apache/ambari/server/state/alert/AlertDefinitionHash.java @@ -42,6 +42,7 @@ import org.apache.ambari.server.orm.dao.AlertDefinitionDAO; import org.apache.ambari.server.orm.entities.AlertDefinitionEntity; import org.apache.ambari.server.state.Cluster; import org.apache.ambari.server.state.Clusters; +import org.apache.ambari.server.state.ConfigHelper; import org.apache.ambari.server.state.Host; import org.apache.ambari.server.state.Service; import org.apache.ambari.server.state.ServiceComponent; @@ -51,6 +52,7 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; import com.google.inject.Inject; +import com.google.inject.Provider; import com.google.inject.Singleton; /** @@ -91,6 +93,14 @@ public class AlertDefinitionHash { private ActionQueue m_actionQueue; /** + * Used to add configurations to the {@link AlertDefinitionCommand} instances + * so that alerts can be scheduled to run with access to the properties they + * need. + */ + @Inject + private Provider<ConfigHelper> m_configHelper; + + /** * !!! TODO: this class needs some thoughts on locking */ private ReadWriteLock m_lock = new ReentrantReadWriteLock(); @@ -489,7 +499,7 @@ public class AlertDefinitionHash { * This method is typically called after * {@link #invalidateHosts(AlertDefinitionEntity)} has caused a cache * invalidation of the alert definition hash. - * + * * @param clusterName * the name of the cluster (not {@code null}). * @param hosts @@ -514,6 +524,13 @@ public class AlertDefinitionHash { AlertDefinitionCommand command = new AlertDefinitionCommand(clusterName, hostName, hash, definitions); + try { + Cluster cluster = m_clusters.getCluster(clusterName); + command.addConfigs(m_configHelper.get(), cluster); + } catch (AmbariException ae) { + LOG.warn("Unable to add configurations to alert definition command", ae); + } + // unlike other commands, the alert definitions commands are really // designed to be 1:1 per change; if multiple invalidations happened // before the next heartbeat, there would be several commands that would http://git-wip-us.apache.org/repos/asf/ambari/blob/ae9bf4c3/ambari-server/src/test/java/org/apache/ambari/server/controller/internal/AlertGroupResourceProviderTest.java ---------------------------------------------------------------------- diff --git a/ambari-server/src/test/java/org/apache/ambari/server/controller/internal/AlertGroupResourceProviderTest.java b/ambari-server/src/test/java/org/apache/ambari/server/controller/internal/AlertGroupResourceProviderTest.java index d2ce6fb..5583617 100644 --- a/ambari-server/src/test/java/org/apache/ambari/server/controller/internal/AlertGroupResourceProviderTest.java +++ b/ambari-server/src/test/java/org/apache/ambari/server/controller/internal/AlertGroupResourceProviderTest.java @@ -479,7 +479,7 @@ public class AlertGroupResourceProviderTest { entity.setScheduleInterval(Integer.valueOf(2)); entity.setServiceName(null); entity.setSourceType(SourceType.METRIC); - entity.setSource(null); + entity.setSource("{\"type\" : \"METRIC\"}"); Set<AlertDefinitionEntity> definitions = new HashSet<AlertDefinitionEntity>(); definitions.add(entity); http://git-wip-us.apache.org/repos/asf/ambari/blob/ae9bf4c3/ambari-server/src/test/java/org/apache/ambari/server/orm/OrmTestHelper.java ---------------------------------------------------------------------- diff --git a/ambari-server/src/test/java/org/apache/ambari/server/orm/OrmTestHelper.java b/ambari-server/src/test/java/org/apache/ambari/server/orm/OrmTestHelper.java index 08cd7b8..dc71862 100644 --- a/ambari-server/src/test/java/org/apache/ambari/server/orm/OrmTestHelper.java +++ b/ambari-server/src/test/java/org/apache/ambari/server/orm/OrmTestHelper.java @@ -330,7 +330,7 @@ public class OrmTestHelper { definition.setHash(UUID.randomUUID().toString()); definition.setScheduleInterval(60); definition.setScope(Scope.SERVICE); - definition.setSource("Source " + System.currentTimeMillis()); + definition.setSource("{\"type\" : \"SCRIPT\"}"); definition.setSourceType(SourceType.SCRIPT); alertDefinitionDAO.create(definition); http://git-wip-us.apache.org/repos/asf/ambari/blob/ae9bf4c3/ambari-server/src/test/java/org/apache/ambari/server/orm/dao/AlertDefinitionDAOTest.java ---------------------------------------------------------------------- diff --git a/ambari-server/src/test/java/org/apache/ambari/server/orm/dao/AlertDefinitionDAOTest.java b/ambari-server/src/test/java/org/apache/ambari/server/orm/dao/AlertDefinitionDAOTest.java index 3e20cd6..4d182cc 100644 --- a/ambari-server/src/test/java/org/apache/ambari/server/orm/dao/AlertDefinitionDAOTest.java +++ b/ambari-server/src/test/java/org/apache/ambari/server/orm/dao/AlertDefinitionDAOTest.java @@ -97,7 +97,7 @@ public class AlertDefinitionDAOTest { definition.setHash(UUID.randomUUID().toString()); definition.setScheduleInterval(60); definition.setScope(Scope.SERVICE); - definition.setSource("Source " + i); + definition.setSource("{\"type\" : \"SCRIPT\"}"); definition.setSourceType(SourceType.SCRIPT); dao.create(definition); } http://git-wip-us.apache.org/repos/asf/ambari/blob/ae9bf4c3/ambari-server/src/test/java/org/apache/ambari/server/orm/dao/AlertDispatchDAOTest.java ---------------------------------------------------------------------- diff --git a/ambari-server/src/test/java/org/apache/ambari/server/orm/dao/AlertDispatchDAOTest.java b/ambari-server/src/test/java/org/apache/ambari/server/orm/dao/AlertDispatchDAOTest.java index 7f4deda..015acc0 100644 --- a/ambari-server/src/test/java/org/apache/ambari/server/orm/dao/AlertDispatchDAOTest.java +++ b/ambari-server/src/test/java/org/apache/ambari/server/orm/dao/AlertDispatchDAOTest.java @@ -431,7 +431,7 @@ public class AlertDispatchDAOTest { definition.setHash(UUID.randomUUID().toString()); definition.setScheduleInterval(60); definition.setScope(Scope.SERVICE); - definition.setSource("Source " + i); + definition.setSource("{\"type\" : \"SCRIPT\"}"); definition.setSourceType(SourceType.SCRIPT); definitionDao.create(definition); } http://git-wip-us.apache.org/repos/asf/ambari/blob/ae9bf4c3/ambari-server/src/test/java/org/apache/ambari/server/orm/dao/AlertsDAOTest.java ---------------------------------------------------------------------- diff --git a/ambari-server/src/test/java/org/apache/ambari/server/orm/dao/AlertsDAOTest.java b/ambari-server/src/test/java/org/apache/ambari/server/orm/dao/AlertsDAOTest.java index 6e4d4af..0cc2ae7 100644 --- a/ambari-server/src/test/java/org/apache/ambari/server/orm/dao/AlertsDAOTest.java +++ b/ambari-server/src/test/java/org/apache/ambari/server/orm/dao/AlertsDAOTest.java @@ -85,7 +85,7 @@ public class AlertsDAOTest { definition.setHash(UUID.randomUUID().toString()); definition.setScheduleInterval(Integer.valueOf(60)); definition.setScope(Scope.SERVICE); - definition.setSource("Source " + i); + definition.setSource("{\"type\" : \"SCRIPT\"}"); definition.setSourceType(SourceType.SCRIPT); definitionDao.create(definition); } @@ -209,7 +209,7 @@ public class AlertsDAOTest { hostDef.setHash(UUID.randomUUID().toString()); hostDef.setScheduleInterval(Integer.valueOf(60)); hostDef.setScope(Scope.HOST); - hostDef.setSource("HostService"); + hostDef.setSource("{\"type\" : \"SCRIPT\"}"); hostDef.setSourceType(SourceType.SCRIPT); definitionDao.create(hostDef); @@ -409,7 +409,7 @@ public class AlertsDAOTest { definition.setHash(UUID.randomUUID().toString()); definition.setScheduleInterval(Integer.valueOf(60)); definition.setScope(Scope.SERVICE); - definition.setSource("SourceScript"); + definition.setSource("{\"type\" : \"SCRIPT\"}"); definition.setSourceType(SourceType.SCRIPT); definitionDao.create(definition); http://git-wip-us.apache.org/repos/asf/ambari/blob/ae9bf4c3/ambari-server/src/test/java/org/apache/ambari/server/state/cluster/AlertDataManagerTest.java ---------------------------------------------------------------------- diff --git a/ambari-server/src/test/java/org/apache/ambari/server/state/cluster/AlertDataManagerTest.java b/ambari-server/src/test/java/org/apache/ambari/server/state/cluster/AlertDataManagerTest.java index 00ba942..85a6e9b 100644 --- a/ambari-server/src/test/java/org/apache/ambari/server/state/cluster/AlertDataManagerTest.java +++ b/ambari-server/src/test/java/org/apache/ambari/server/state/cluster/AlertDataManagerTest.java @@ -50,7 +50,9 @@ import org.apache.ambari.server.orm.entities.AlertNoticeEntity; import org.apache.ambari.server.orm.entities.AlertTargetEntity; import org.apache.ambari.server.state.Alert; import org.apache.ambari.server.state.AlertState; +import org.apache.ambari.server.state.alert.AggregateDefinitionMapping; import org.apache.ambari.server.state.alert.AggregateSource; +import org.apache.ambari.server.state.alert.AlertDefinition; import org.apache.ambari.server.state.alert.AlertDefinitionFactory; import org.apache.ambari.server.state.alert.Reporting; import org.apache.ambari.server.state.alert.Reporting.ReportTemplate; @@ -108,7 +110,7 @@ public class AlertDataManagerTest { definition.setHash(UUID.randomUUID().toString()); definition.setScheduleInterval(Integer.valueOf(60)); definition.setScope(Scope.SERVICE); - definition.setSource("Source " + i); + definition.setSource("{\"type\" : \"SCRIPT\"}"); definition.setSourceType(SourceType.SCRIPT); definitionDao.create(definition); } @@ -268,7 +270,7 @@ public class AlertDataManagerTest { notices = dispatchDao.findAllNotices(); assertEquals(1, notices.size()); } - + @Test public void testAggregateAlerts() throws Exception { // create definition @@ -281,10 +283,10 @@ public class AlertDataManagerTest { definition.setHash(UUID.randomUUID().toString()); definition.setScheduleInterval(Integer.valueOf(60)); definition.setScope(Scope.HOST); - definition.setSource("Source"); + definition.setSource("{\"type\" : \"SCRIPT\"}"); definition.setSourceType(SourceType.SCRIPT); definitionDao.create(definition); - + // create aggregate of definition AlertDefinitionEntity aggDef = new AlertDefinitionEntity(); aggDef.setDefinitionName("aggregate_test"); @@ -294,7 +296,7 @@ public class AlertDataManagerTest { aggDef.setHash(UUID.randomUUID().toString()); aggDef.setScheduleInterval(Integer.valueOf(60)); aggDef.setScope(Scope.SERVICE); - + AggregateSource source = new AggregateSource(); source.setAlertName("to_aggregate"); @@ -304,7 +306,7 @@ public class AlertDataManagerTest { field.set(source, SourceType.AGGREGATE); Reporting reporting = new Reporting(); - + ReportTemplate template = new ReportTemplate(); template.setText("You are good {1}/{0}"); reporting.setOk(template); @@ -313,20 +315,20 @@ public class AlertDataManagerTest { template.setText("Going bad {1}/{0}"); template.setValue(Double.valueOf(0.33d)); reporting.setWarning(template); - + template = new ReportTemplate(); template.setText("On fire! {1}/{0}"); template.setValue(Double.valueOf(0.66d)); reporting.setCritical(template); - + source.setReporting(reporting); Gson gson = new Gson(); - + aggDef.setSource(gson.toJson(source)); aggDef.setSourceType(SourceType.AGGREGATE); definitionDao.create(aggDef); - + // add current and history across four hosts for (int i = 0; i < 4; i++) { AlertHistoryEntity history = new AlertHistoryEntity(); @@ -341,7 +343,7 @@ public class AlertDataManagerTest { history.setHostName("h" + (i+1)); history.setServiceName(definition.getServiceName()); dao.create(history); - + AlertCurrentEntity current = new AlertCurrentEntity(); current.setAlertHistory(history); current.setLatestText(history.getAlertText()); @@ -349,7 +351,7 @@ public class AlertDataManagerTest { current.setOriginalTimestamp(Long.valueOf(1L)); dao.merge(current); } - + AlertEventPublisher publisher = injector.getInstance(AlertEventPublisher.class); // !!! need a synchronous op for testing @@ -357,17 +359,28 @@ public class AlertDataManagerTest { field.setAccessible(true); field.set(publisher, new EventBus()); - final AtomicReference<Alert> ref = new AtomicReference<Alert>(); + final AtomicReference<Alert> ref = new AtomicReference<Alert>(); publisher.register(new TestListener() { + @Override @Subscribe public void catchIt(AlertReceivedEvent event) { ref.set(event.getAlert()); } }); - + AlertAggregateListener listener = injector.getInstance(AlertAggregateListener.class); AlertDefinitionFactory factory = new AlertDefinitionFactory(); - listener.addAggregateType(clusterId.longValue(), factory.coerce(aggDef)); + AggregateDefinitionMapping aggregateMapping = injector.getInstance(AggregateDefinitionMapping.class); + + AlertDefinition aggregateDefinition = factory.coerce(aggDef); + aggregateMapping.addAggregateType(clusterId.longValue(), + aggregateDefinition ); + + AggregateSource as = (AggregateSource) aggregateDefinition.getSource(); + AlertDefinition aggregatedDefinition = aggregateMapping.getAggregateDefinition( + clusterId.longValue(), as.getAlertName()); + + assertNotNull(aggregatedDefinition); // any alert and event will do that is for the definition since an aggregate // checks them all regardless of state @@ -379,12 +392,12 @@ public class AlertDataManagerTest { "h1", AlertState.OK); AlertReceivedEvent event = new AlertReceivedEvent(clusterId.longValue(), alert); - + listener.onAlertEvent(event); assertNotNull(ref.get()); assertEquals(AlertState.OK, ref.get().getState()); assertTrue(ref.get().getText().indexOf("0/4") > -1); - + // check if one is critical, still ok AlertCurrentEntity current = dao.findCurrentByHostAndName( clusterId.longValue(), "h1", definition.getDefinitionName()); @@ -392,7 +405,7 @@ public class AlertDataManagerTest { dao.merge(current.getAlertHistory()); listener.onAlertEvent(event); - assertEquals("aggregate_test", ref.get().getName()); + assertEquals("aggregate_test", ref.get().getName()); assertEquals(AlertState.OK, ref.get().getState()); assertTrue(ref.get().getText().indexOf("1/4") > -1); @@ -401,7 +414,7 @@ public class AlertDataManagerTest { clusterId.longValue(), "h2", definition.getDefinitionName()); current.getAlertHistory().setAlertState(AlertState.WARNING); dao.merge(current.getAlertHistory()); - + listener.onAlertEvent(event); assertEquals("aggregate_test", ref.get().getName()); assertEquals(AlertState.WARNING, ref.get().getState()); @@ -412,15 +425,15 @@ public class AlertDataManagerTest { clusterId.longValue(), "h3", definition.getDefinitionName()); current.getAlertHistory().setAlertState(AlertState.CRITICAL); dao.merge(current.getAlertHistory()); - + listener.onAlertEvent(event); - assertEquals("aggregate_test", ref.get().getName()); + assertEquals("aggregate_test", ref.get().getName()); assertEquals(AlertState.CRITICAL, ref.get().getState()); assertTrue(ref.get().getText().indexOf("3/4") > -1); } - + /** - * Test interface collects aggregate alert invocations + * Test interface collects aggregate alert invocations */ private static interface TestListener { public void catchIt(AlertReceivedEvent event);