AMBARI-7443 - Alerts: Implement Email Dispatcher (jonathanhurley)
Project: http://git-wip-us.apache.org/repos/asf/ambari/repo Commit: http://git-wip-us.apache.org/repos/asf/ambari/commit/d838ca95 Tree: http://git-wip-us.apache.org/repos/asf/ambari/tree/d838ca95 Diff: http://git-wip-us.apache.org/repos/asf/ambari/diff/d838ca95 Branch: refs/heads/trunk Commit: d838ca95b644a3222c27b7415f2711373affcd83 Parents: e6cc058 Author: Jonathan Hurley <jhur...@hortonworks.com> Authored: Mon Sep 22 23:45:06 2014 -0700 Committer: Jonathan Hurley <jhur...@hortonworks.com> Committed: Tue Sep 23 09:02:11 2014 -0700 ---------------------------------------------------------------------- .../internal/AlertTargetResourceProvider.java | 15 +- .../server/events/AlertStateChangeEvent.java | 2 +- .../events/listeners/AlertReceivedListener.java | 46 +++-- .../listeners/AlertServiceStateListener.java | 1 - .../notifications/DispatchCredentials.java | 44 +++++ .../server/notifications/DispatchFactory.java | 16 ++ .../server/notifications/Notification.java | 18 ++ .../ambari/server/notifications/Recipient.java | 45 +++++ .../dispatchers/EmailDispatcher.java | 129 +++++++++++++- .../apache/ambari/server/orm/dao/AlertsDAO.java | 114 +++++++----- .../server/orm/entities/AlertCurrentEntity.java | 13 +- .../services/AlertNoticeDispatchService.java | 175 ++++++++++++++++++- .../stacks/HDP/2.0.6/services/HDFS/alerts.json | 2 +- .../AlertTargetResourceProviderTest.java | 58 ++++++ .../notifications/EmailDispatcherTest.java | 117 +++++++++++++ .../server/notifications/MockDispatcher.java | 46 +++++ .../ambari/server/orm/dao/AlertsDAOTest.java | 133 +++++++++----- .../alerts/AlertStateChangedEventTest.java | 139 +++++++++++++++ 18 files changed, 984 insertions(+), 129 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ambari/blob/d838ca95/ambari-server/src/main/java/org/apache/ambari/server/controller/internal/AlertTargetResourceProvider.java ---------------------------------------------------------------------- diff --git a/ambari-server/src/main/java/org/apache/ambari/server/controller/internal/AlertTargetResourceProvider.java b/ambari-server/src/main/java/org/apache/ambari/server/controller/internal/AlertTargetResourceProvider.java index f2b82d6..b0cc52b 100644 --- a/ambari-server/src/main/java/org/apache/ambari/server/controller/internal/AlertTargetResourceProvider.java +++ b/ambari-server/src/main/java/org/apache/ambari/server/controller/internal/AlertTargetResourceProvider.java @@ -20,6 +20,7 @@ package org.apache.ambari.server.controller.internal; import java.text.MessageFormat; import java.util.ArrayList; import java.util.Arrays; +import java.util.HashMap; import java.util.HashSet; import java.util.List; import java.util.Map; @@ -45,7 +46,6 @@ import org.apache.ambari.server.state.alert.AlertTarget; import org.apache.commons.lang.StringUtils; import com.google.gson.Gson; -import com.google.gson.JsonObject; import com.google.inject.Inject; import com.google.inject.Injector; @@ -351,22 +351,19 @@ public class AlertTargetResourceProvider extends * {@code null} if none. */ private String extractProperties( Map<String, Object> requestMap ){ - JsonObject jsonObject = new JsonObject(); + Map<String, Object> normalizedMap = new HashMap<String, Object>( + requestMap.size()); + for (Entry<String, Object> entry : requestMap.entrySet()) { String key = entry.getKey(); String propCat = PropertyHelper.getPropertyCategory(key); if (propCat.equals(ALERT_TARGET_PROPERTIES)) { String propKey = PropertyHelper.getPropertyName(key); - jsonObject.addProperty(propKey, entry.getValue().toString()); + normalizedMap.put(propKey, entry.getValue()); } } - String properties = null; - if (jsonObject.entrySet().size() > 0) { - properties = jsonObject.toString(); - } - - return properties; + return s_gson.toJson(normalizedMap); } } http://git-wip-us.apache.org/repos/asf/ambari/blob/d838ca95/ambari-server/src/main/java/org/apache/ambari/server/events/AlertStateChangeEvent.java ---------------------------------------------------------------------- diff --git a/ambari-server/src/main/java/org/apache/ambari/server/events/AlertStateChangeEvent.java b/ambari-server/src/main/java/org/apache/ambari/server/events/AlertStateChangeEvent.java index ab2c3dd..efb7119 100644 --- a/ambari-server/src/main/java/org/apache/ambari/server/events/AlertStateChangeEvent.java +++ b/ambari-server/src/main/java/org/apache/ambari/server/events/AlertStateChangeEvent.java @@ -25,7 +25,7 @@ import org.apache.ambari.server.state.AlertState; * The {@link AlertStateChangeEvent} is fired when an {@link Alert} instance has * its {@link AlertState} changed. */ -public final class AlertStateChangeEvent extends AlertEvent { +public class AlertStateChangeEvent extends AlertEvent { /** * The prior alert state. http://git-wip-us.apache.org/repos/asf/ambari/blob/d838ca95/ambari-server/src/main/java/org/apache/ambari/server/events/listeners/AlertReceivedListener.java ---------------------------------------------------------------------- diff --git a/ambari-server/src/main/java/org/apache/ambari/server/events/listeners/AlertReceivedListener.java b/ambari-server/src/main/java/org/apache/ambari/server/events/listeners/AlertReceivedListener.java index fb7a608..e87ba7d 100644 --- a/ambari-server/src/main/java/org/apache/ambari/server/events/listeners/AlertReceivedListener.java +++ b/ambari-server/src/main/java/org/apache/ambari/server/events/listeners/AlertReceivedListener.java @@ -28,8 +28,8 @@ import org.apache.ambari.server.orm.entities.AlertDefinitionEntity; import org.apache.ambari.server.orm.entities.AlertHistoryEntity; import org.apache.ambari.server.state.Alert; import org.apache.ambari.server.state.AlertState; -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import com.google.common.eventbus.AllowConcurrentEvents; import com.google.common.eventbus.Subscribe; @@ -46,7 +46,7 @@ public class AlertReceivedListener { /** * Logger. */ - private static Log LOG = LogFactory.getLog(AlertReceivedListener.class); + private static final Logger LOG = LoggerFactory.getLogger(AlertReceivedListener.class); @Inject private AlertsDAO m_alertsDao; @@ -70,30 +70,33 @@ public class AlertReceivedListener { m_alertEventPublisher.register(this); } - /** - * Adds an alert. Checks for a new state before creating a new history record. + * Adds an alert. Checks for a new state before creating a new history record. * - * @param clusterId the id for the cluster - * @param alert the alert to add + * @param clusterId + * the id for the cluster + * @param alert + * the alert to add */ @Subscribe @AllowConcurrentEvents public void onAlertEvent(AlertReceivedEvent event) { - LOG.debug(event); + if (LOG.isDebugEnabled()) { + LOG.debug(event.toString()); + } long clusterId = event.getClusterId(); Alert alert = event.getAlert(); AlertCurrentEntity current = null; - + if (null == alert.getHost()) { current = m_alertsDao.findCurrentByNameNoHost(clusterId, alert.getName()); } else { current = m_alertsDao.findCurrentByHostAndName(clusterId, alert.getHost(), alert.getName()); } - + if (null == current) { AlertDefinitionEntity definition = m_definitionDao.findByName(clusterId, alert.getName()); @@ -111,13 +114,20 @@ public class AlertReceivedListener { current.setLatestTimestamp(Long.valueOf(alert.getTimestamp())); current.setLatestText(alert.getText()); - m_alertsDao.merge(current); + current = m_alertsDao.merge(current); } else { - AlertState oldState = current.getAlertHistory().getAlertState(); + LOG.debug( + "Alert State Changed: CurrentId {}, CurrentTimestamp {}, HistoryId {}, HistoryState {}", + current.getAlertId(), current.getLatestTimestamp(), + current.getAlertHistory().getAlertId(), + current.getAlertHistory().getAlertState()); + + AlertHistoryEntity oldHistory = current.getAlertHistory(); + AlertState oldState = oldHistory.getAlertState(); // insert history, update current AlertHistoryEntity history = createHistory(clusterId, - current.getAlertHistory().getAlertDefinition(), alert); + oldHistory.getAlertDefinition(), alert); // manually create the new history entity since we are merging into // an existing current entity @@ -127,8 +137,14 @@ public class AlertReceivedListener { current.setLatestTimestamp(Long.valueOf(alert.getTimestamp())); current.setOriginalTimestamp(Long.valueOf(alert.getTimestamp())); - m_alertsDao.merge(current); - + current = m_alertsDao.merge(current); + + LOG.debug( + "Alert State Merged: CurrentId {}, CurrentTimestamp {}, HistoryId {}, HistoryState {}", + current.getAlertId(), current.getLatestTimestamp(), + current.getAlertHistory().getAlertId(), + current.getAlertHistory().getAlertState()); + // broadcast the alert changed event for other subscribers AlertStateChangeEvent alertChangedEvent = new AlertStateChangeEvent( event.getClusterId(), event.getAlert(), current.getAlertHistory(), http://git-wip-us.apache.org/repos/asf/ambari/blob/d838ca95/ambari-server/src/main/java/org/apache/ambari/server/events/listeners/AlertServiceStateListener.java ---------------------------------------------------------------------- diff --git a/ambari-server/src/main/java/org/apache/ambari/server/events/listeners/AlertServiceStateListener.java b/ambari-server/src/main/java/org/apache/ambari/server/events/listeners/AlertServiceStateListener.java index f1ce617..6215cc1 100644 --- a/ambari-server/src/main/java/org/apache/ambari/server/events/listeners/AlertServiceStateListener.java +++ b/ambari-server/src/main/java/org/apache/ambari/server/events/listeners/AlertServiceStateListener.java @@ -94,7 +94,6 @@ public class AlertServiceStateListener { publisher.register(this); } - /** * Handles service installed events by populating the database with all known * alert definitions for the newly installed service and creates the service's http://git-wip-us.apache.org/repos/asf/ambari/blob/d838ca95/ambari-server/src/main/java/org/apache/ambari/server/notifications/DispatchCredentials.java ---------------------------------------------------------------------- diff --git a/ambari-server/src/main/java/org/apache/ambari/server/notifications/DispatchCredentials.java b/ambari-server/src/main/java/org/apache/ambari/server/notifications/DispatchCredentials.java new file mode 100644 index 0000000..9514474 --- /dev/null +++ b/ambari-server/src/main/java/org/apache/ambari/server/notifications/DispatchCredentials.java @@ -0,0 +1,44 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.ambari.server.notifications; + +/** + * The {@link DispatchCredentials} represent a generic username/password model + * that can be passed to a {@link NotificationDispatcher} in order to authentice + * with a backend dispatcher. + */ +public class DispatchCredentials { + + /** + * The username. + */ + public String UserName; + + /** + * The password. + */ + public String Password; + + /** + * Constructor. + * + */ + public DispatchCredentials() { + } + +} http://git-wip-us.apache.org/repos/asf/ambari/blob/d838ca95/ambari-server/src/main/java/org/apache/ambari/server/notifications/DispatchFactory.java ---------------------------------------------------------------------- diff --git a/ambari-server/src/main/java/org/apache/ambari/server/notifications/DispatchFactory.java b/ambari-server/src/main/java/org/apache/ambari/server/notifications/DispatchFactory.java index 3414035..13f2da2 100644 --- a/ambari-server/src/main/java/org/apache/ambari/server/notifications/DispatchFactory.java +++ b/ambari-server/src/main/java/org/apache/ambari/server/notifications/DispatchFactory.java @@ -50,6 +50,22 @@ public final class DispatchFactory { } /** + * Registers a dispatcher instance with a type. + * + * @param type + * the type + * @param dispatcher + * the dispatcher to register with the type. + */ + public void register(String type, NotificationDispatcher dispatcher) { + if (null == dispatcher) { + m_dispatchers.remove(type); + } else { + m_dispatchers.put(type, dispatcher); + } + } + + /** * Gets a dispatcher based on the type. * * @param type http://git-wip-us.apache.org/repos/asf/ambari/blob/d838ca95/ambari-server/src/main/java/org/apache/ambari/server/notifications/Notification.java ---------------------------------------------------------------------- diff --git a/ambari-server/src/main/java/org/apache/ambari/server/notifications/Notification.java b/ambari-server/src/main/java/org/apache/ambari/server/notifications/Notification.java index 08c5242..12dffd7 100644 --- a/ambari-server/src/main/java/org/apache/ambari/server/notifications/Notification.java +++ b/ambari-server/src/main/java/org/apache/ambari/server/notifications/Notification.java @@ -18,6 +18,7 @@ package org.apache.ambari.server.notifications; import java.util.List; +import java.util.Map; /** * The {@link Notification} class is a generic way to relay content through an @@ -36,6 +37,23 @@ public class Notification { public String Body; /** + * The optional recipients of the notification. Some dispatchers may not + * require explicit recipients. + */ + public List<Recipient> Recipients; + + /** + * A map of all of the properties that a {@link NotificationDispatcher} needs + * in order to dispatch this notification. + */ + public Map<String, String> DispatchProperties; + + /** + * The optional credentials used to authenticate with the dispatcher. + */ + public DispatchCredentials Credentials; + + /** * An optional callback implementation that the dispatcher can use to report * success/failure on delivery. */ http://git-wip-us.apache.org/repos/asf/ambari/blob/d838ca95/ambari-server/src/main/java/org/apache/ambari/server/notifications/Recipient.java ---------------------------------------------------------------------- diff --git a/ambari-server/src/main/java/org/apache/ambari/server/notifications/Recipient.java b/ambari-server/src/main/java/org/apache/ambari/server/notifications/Recipient.java new file mode 100644 index 0000000..933038b --- /dev/null +++ b/ambari-server/src/main/java/org/apache/ambari/server/notifications/Recipient.java @@ -0,0 +1,45 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.ambari.server.notifications; + +/** + * The {@link Recipient} class represents a target of a {@link Notification}. + */ +public class Recipient { + + /** + * A string that the concrete {@link NotificationDispatcher} can use to build + * a backend recipient that will work with the dispatch mechanism. + */ + public String Identifier; + + /** + * Constructor. + * + */ + public Recipient() { + } + + /** + * {@inheritDoc} + */ + @Override + public String toString() { + return Identifier; + } +} http://git-wip-us.apache.org/repos/asf/ambari/blob/d838ca95/ambari-server/src/main/java/org/apache/ambari/server/notifications/dispatchers/EmailDispatcher.java ---------------------------------------------------------------------- diff --git a/ambari-server/src/main/java/org/apache/ambari/server/notifications/dispatchers/EmailDispatcher.java b/ambari-server/src/main/java/org/apache/ambari/server/notifications/dispatchers/EmailDispatcher.java index d0858d3..a5dad84 100644 --- a/ambari-server/src/main/java/org/apache/ambari/server/notifications/dispatchers/EmailDispatcher.java +++ b/ambari-server/src/main/java/org/apache/ambari/server/notifications/dispatchers/EmailDispatcher.java @@ -17,8 +17,24 @@ */ package org.apache.ambari.server.notifications.dispatchers; -import org.apache.ambari.server.notifications.NotificationDispatcher; +import java.util.Map.Entry; +import java.util.Properties; +import java.util.Timer; + +import javax.mail.Authenticator; +import javax.mail.Message; +import javax.mail.Message.RecipientType; +import javax.mail.MessagingException; +import javax.mail.PasswordAuthentication; +import javax.mail.Session; +import javax.mail.Transport; +import javax.mail.internet.InternetAddress; +import javax.mail.internet.MimeMessage; + +import org.apache.ambari.server.notifications.DispatchCredentials; import org.apache.ambari.server.notifications.Notification; +import org.apache.ambari.server.notifications.NotificationDispatcher; +import org.apache.ambari.server.notifications.Recipient; import org.apache.ambari.server.state.alert.TargetType; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -27,7 +43,14 @@ import com.google.inject.Singleton; /** * The {@link EmailDispatcher} class is used to dispatch {@link Notification} - * via JavaMail. + * via JavaMail. This class currently does not attempt to reuse an existing + * {@link Session} or {@link Transport} since each {@link Notification} could + * have a different target server with different properties. + * <p/> + * In the future, this class could keep various {@link Transport} instances open + * on a {@link Timer}, but those instances would need to be hashed so that the + * proper instance is retrieved from the properties of the incoming + * {@link Notification}. */ @Singleton public class EmailDispatcher implements NotificationDispatcher { @@ -52,9 +75,105 @@ public class EmailDispatcher implements NotificationDispatcher { public void dispatch(Notification notification) { LOG.info("Sending email: {}", notification); - // callback to inform the interested parties about the successful dispatch - if (null != notification.Callback) { - notification.Callback.onSuccess(notification.CallbackIds); + if (null == notification.DispatchProperties) { + LOG.error("Unable to dispatch an email notification that does not contain SMTP properties"); + + if (null != notification.Callback) { + notification.Callback.onFailure(notification.CallbackIds); + } + + return; + } + + // convert properties to JavaMail properties + Properties properties = new Properties(); + for (Entry<String, String> entry : notification.DispatchProperties.entrySet()) { + properties.put(entry.getKey(), entry.getValue()); + } + + // notifications must have recipients + if (null == notification.Recipients) { + LOG.error("Unable to dispatch an email notification that does not have recipients"); + + if (null != notification.Callback) { + notification.Callback.onFailure(notification.CallbackIds); + } + + return; + } + + // create a simple email authentication for username/password + final Session session; + EmailAuthenticator authenticator = null; + + if (null != notification.Credentials) { + authenticator = new EmailAuthenticator(notification.Credentials); + } + + session = Session.getInstance(properties, authenticator); + + try { + Message message = new MimeMessage(session); + + for (Recipient recipient : notification.Recipients) { + InternetAddress address = new InternetAddress(recipient.Identifier); + message.addRecipient(RecipientType.TO, address); + } + + message.setSubject(notification.Subject); + message.setText(notification.Body); + + Transport.send(message); + + if (LOG.isDebugEnabled()) { + LOG.debug("Successfully dispatched email to {}", + notification.Recipients); + } + + // callback to inform the interested parties about the successful dispatch + if (null != notification.Callback) { + notification.Callback.onSuccess(notification.CallbackIds); + } + } catch (Exception exception) { + LOG.error("Unable to dispatch notification via Email", exception); + + // callback failure + if (null != notification.Callback) { + notification.Callback.onFailure(notification.CallbackIds); + } + } finally { + try { + session.getTransport().close(); + } catch (MessagingException me) { + LOG.warn("Dispatcher unable to close SMTP transport", me); + } + } + } + + /** + * The {@link EmailAuthenticator} class is used to provide a username and + * password combination to an SMTP server. + */ + private static final class EmailAuthenticator extends Authenticator{ + + private final DispatchCredentials m_credentials; + + /** + * Constructor. + * + * @param credentials + */ + private EmailAuthenticator(DispatchCredentials credentials) { + m_credentials = credentials; + } + + /** + * {@inheritDoc} + */ + @Override + protected PasswordAuthentication getPasswordAuthentication() { + return new PasswordAuthentication(m_credentials.UserName, + m_credentials.Password); } } } http://git-wip-us.apache.org/repos/asf/ambari/blob/d838ca95/ambari-server/src/main/java/org/apache/ambari/server/orm/dao/AlertsDAO.java ---------------------------------------------------------------------- diff --git a/ambari-server/src/main/java/org/apache/ambari/server/orm/dao/AlertsDAO.java b/ambari-server/src/main/java/org/apache/ambari/server/orm/dao/AlertsDAO.java index aba41a5..a28b448 100644 --- a/ambari-server/src/main/java/org/apache/ambari/server/orm/dao/AlertsDAO.java +++ b/ambari-server/src/main/java/org/apache/ambari/server/orm/dao/AlertsDAO.java @@ -30,6 +30,8 @@ import org.apache.ambari.server.orm.entities.AlertCurrentEntity; import org.apache.ambari.server.orm.entities.AlertHistoryEntity; import org.apache.ambari.server.state.AlertState; import org.apache.ambari.server.state.alert.Scope; +import org.eclipse.persistence.config.HintValues; +import org.eclipse.persistence.config.QueryHints; import com.google.inject.Inject; import com.google.inject.Provider; @@ -57,7 +59,7 @@ public class AlertsDAO { /** * Gets an alert with the specified ID. - * + * * @param alertId * the ID of the alert to retrieve. * @return the alert or {@code null} if none exists. @@ -68,7 +70,7 @@ public class AlertsDAO { /** * Gets all alerts stored in the database across all clusters. - * + * * @return all alerts or an empty list if none exist (never {@code null}). */ public List<AlertHistoryEntity> findAll() { @@ -80,7 +82,7 @@ public class AlertsDAO { /** * Gets all alerts stored in the database for the given cluster. - * + * * @param clusterId * the ID of the cluster. * @return all alerts in the specified cluster or an empty list if none exist @@ -98,7 +100,7 @@ public class AlertsDAO { /** * Gets all alerts stored in the database for the given cluster that have one * of the specified alert states. - * + * * @param clusterId * the ID of the cluster. * @param alertStates @@ -126,7 +128,7 @@ public class AlertsDAO { * Gets all alerts stored in the database for the given cluster and that fall * withing the specified date range. Dates are expected to be in milliseconds * since the epoch, normalized to UTC time. - * + * * @param clusterId * the ID of the cluster. * @param startDate @@ -141,8 +143,9 @@ public class AlertsDAO { */ public List<AlertHistoryEntity> findAll(long clusterId, Date startDate, Date endDate) { - if (null == startDate && null == endDate) + if (null == startDate && null == endDate) { return Collections.emptyList(); + } TypedQuery<AlertHistoryEntity> query = null; @@ -174,15 +177,16 @@ public class AlertsDAO { query.setParameter("beforeDate", endDate.getTime()); } - if (null == query) + if (null == query) { return Collections.emptyList(); + } return daoUtils.selectList(query); } /** * Gets the current alerts. - * + * * @return the current alerts or an empty list if none exist (never * {@code null}). */ @@ -196,7 +200,7 @@ public class AlertsDAO { /** * Gets a current alert with the specified ID. - * + * * @param alertId * the ID of the alert to retrieve. * @return the alert or {@code null} if none exists. @@ -208,7 +212,7 @@ public class AlertsDAO { /** * Gets the current alerts for a given cluster. - * + * * @return the current alerts for the given clusteror an empty list if none * exist (never {@code null}). */ @@ -218,14 +222,15 @@ public class AlertsDAO { "AlertCurrentEntity.findByCluster", AlertCurrentEntity.class); query.setParameter("clusterId", Long.valueOf(clusterId)); + query = setQueryRefreshHint(query); return daoUtils.selectList(query); } - + /** * Retrieves the summary information for a particular scope. The result is a DTO * since the columns are aggregated and don't fit to an entity. - * + * * @param clusterId the cluster id * @param serviceName the service name. Use {@code null} to not filter on service. * @param hostName the host name. Use {@code null} to not filter on host. @@ -240,41 +245,41 @@ public class AlertsDAO { sb.append("SUM(CASE WHEN history.alertState = %s.%s THEN 1 ELSE 0 END), "); sb.append("SUM(CASE WHEN history.alertState = %s.%s THEN 1 ELSE 0 END)) "); sb.append("FROM AlertCurrentEntity alert JOIN alert.alertHistory history WHERE history.clusterId = :clusterId"); - + if (null != serviceName) { sb.append(" AND history.serviceName = :serviceName"); } - + if (null != hostName) { sb.append(" AND history.hostName = :hostName"); } - + String str = String.format(sb.toString(), AlertSummaryDTO.class.getName(), AlertState.class.getName(), AlertState.OK.name(), AlertState.class.getName(), AlertState.WARNING.name(), AlertState.class.getName(), AlertState.CRITICAL.name(), AlertState.class.getName(), AlertState.UNKNOWN.name()); - + TypedQuery<AlertSummaryDTO> query = entityManagerProvider.get().createQuery( str, AlertSummaryDTO.class); - + query.setParameter("clusterId", Long.valueOf(clusterId)); - + if (null != serviceName) { query.setParameter("serviceName", serviceName); } - + if (null != hostName) { query.setParameter("hostName", hostName); } - + return daoUtils.selectSingle(query); } - + /** * Gets the current alerts for a given service. - * + * * @return the current alerts for the given service or an empty list if none * exist (never {@code null}). */ @@ -288,12 +293,13 @@ public class AlertsDAO { query.setParameter("serviceName", serviceName); query.setParameter("inlist", EnumSet.of(Scope.ANY, Scope.SERVICE)); + query = setQueryRefreshHint(query); return daoUtils.selectList(query); } /** * Gets the current alerts for a given host. - * + * * @return the current alerts for the given host or an empty list if none * exist (never {@code null}). */ @@ -307,13 +313,14 @@ public class AlertsDAO { query.setParameter("hostName", hostName); query.setParameter("inlist", EnumSet.of(Scope.ANY, Scope.HOST)); + query = setQueryRefreshHint(query); return daoUtils.selectList(query); } - + @RequiresSession public AlertCurrentEntity findCurrentByHostAndName(long clusterId, String hostName, String alertName) { - + TypedQuery<AlertCurrentEntity> query = entityManagerProvider.get().createNamedQuery( "AlertCurrentEntity.findByHostAndName", AlertCurrentEntity.class); @@ -321,6 +328,7 @@ public class AlertsDAO { query.setParameter("hostName", hostName); query.setParameter("definitionName", alertName); + query = setQueryRefreshHint(query); return daoUtils.selectOne(query); } @@ -328,7 +336,7 @@ public class AlertsDAO { * Removes alert history and current alerts for the specified alert defintiion * ID. This will invoke {@link EntityManager#clear()} when completed since the * JPQL statement will remove entries without going through the EM. - * + * * @param definitionId * the ID of the definition to remove. */ @@ -352,7 +360,7 @@ public class AlertsDAO { /** * Remove a current alert whose history entry matches the specfied ID. - * + * * @param historyId the ID of the history entry. * @return the number of alerts removed. */ @@ -367,7 +375,7 @@ public class AlertsDAO { /** * Persists a new alert. - * + * * @param alert * the alert to persist (not {@code null}). */ @@ -378,7 +386,7 @@ public class AlertsDAO { /** * Refresh the state of the alert from the database. - * + * * @param alert * the alert to refresh (not {@code null}). */ @@ -389,7 +397,7 @@ public class AlertsDAO { /** * Merge the speicified alert with the existing alert in the database. - * + * * @param alert * the alert to merge (not {@code null}). * @return the updated alert with merged content (never {@code null}). @@ -401,7 +409,7 @@ public class AlertsDAO { /** * Removes the specified alert from the database. - * + * * @param alert * the alert to remove. */ @@ -415,7 +423,7 @@ public class AlertsDAO { /** * Persists a new current alert. - * + * * @param alert * the current alert to persist (not {@code null}). */ @@ -426,7 +434,7 @@ public class AlertsDAO { /** * Refresh the state of the current alert from the database. - * + * * @param alert * the current alert to refresh (not {@code null}). */ @@ -437,7 +445,7 @@ public class AlertsDAO { /** * Merge the speicified current alert with the existing alert in the database. - * + * * @param alert * the current alert to merge (not {@code null}). * @return the updated current alert with merged content (never {@code null}). @@ -449,7 +457,7 @@ public class AlertsDAO { /** * Removes the specified current alert from the database. - * + * * @param alert * the current alert to remove. */ @@ -457,9 +465,9 @@ public class AlertsDAO { public void remove(AlertCurrentEntity alert) { entityManagerProvider.get().remove(merge(alert)); } - + /** - * Finds the aggregate counts for an alert name, across all hosts. + * Finds the aggregate counts for an alert name, across all hosts. * @param clusterId the cluster id * @param alertName the name of the alert to find the aggregate * @return the summary data @@ -474,22 +482,22 @@ public class AlertsDAO { sb.append("SUM(CASE WHEN history.alertState = %s.%s THEN 1 ELSE 0 END)) "); sb.append("FROM AlertCurrentEntity alert JOIN alert.alertHistory history WHERE history.clusterId = :clusterId"); sb.append(" AND history.alertDefinition.definitionName = :definitionName"); - + String str = String.format(sb.toString(), AlertSummaryDTO.class.getName(), AlertState.class.getName(), AlertState.WARNING.name(), AlertState.class.getName(), AlertState.CRITICAL.name(), AlertState.class.getName(), AlertState.UNKNOWN.name()); - + TypedQuery<AlertSummaryDTO> query = entityManagerProvider.get().createQuery( str, AlertSummaryDTO.class); - + query.setParameter("clusterId", Long.valueOf(clusterId)); query.setParameter("definitionName", alertName); - - return daoUtils.selectSingle(query); + + return daoUtils.selectSingle(query); } - + /** * Locate the current alert for the provided service and alert name, but when * host is not set ({@code IS NULL}). @@ -500,7 +508,7 @@ public class AlertsDAO { */ @RequiresSession public AlertCurrentEntity findCurrentByNameNoHost(long clusterId, String alertName) { - + TypedQuery<AlertCurrentEntity> query = entityManagerProvider.get().createNamedQuery( "AlertCurrentEntity.findByNameAndNoHost", AlertCurrentEntity.class); @@ -510,4 +518,22 @@ public class AlertsDAO { return daoUtils.selectOne(query); } + /** + * Sets {@link QueryHints#REFRESH} on the specified query so that child + * entities are not stale. + * <p/> + * See <a + * href="https://bugs.eclipse.org/bugs/show_bug.cgi?id=398067">https://bugs + * .eclipse.org/bugs/show_bug.cgi?id=398067</a> + * + * @param query + * @return + */ + private <T> TypedQuery<T> setQueryRefreshHint(TypedQuery<T> query) { + // !!! https://bugs.eclipse.org/bugs/show_bug.cgi?id=398067 + // ensure that an associated entity with a JOIN is not stale; this causes + // the associated AlertHistoryEntity to be stale + query.setHint(QueryHints.REFRESH, HintValues.TRUE); + return query; + } } http://git-wip-us.apache.org/repos/asf/ambari/blob/d838ca95/ambari-server/src/main/java/org/apache/ambari/server/orm/entities/AlertCurrentEntity.java ---------------------------------------------------------------------- diff --git a/ambari-server/src/main/java/org/apache/ambari/server/orm/entities/AlertCurrentEntity.java b/ambari-server/src/main/java/org/apache/ambari/server/orm/entities/AlertCurrentEntity.java index 5b54d57..8ca297b 100644 --- a/ambari-server/src/main/java/org/apache/ambari/server/orm/entities/AlertCurrentEntity.java +++ b/ambari-server/src/main/java/org/apache/ambari/server/orm/entities/AlertCurrentEntity.java @@ -47,11 +47,11 @@ import org.apache.ambari.server.state.MaintenanceState; @TableGenerator(name = "alert_current_id_generator", table = "ambari_sequences", pkColumnName = "sequence_name", valueColumnName = "sequence_value", pkColumnValue = "alert_current_id_seq", initialValue = 0, allocationSize = 1) @NamedQueries({ @NamedQuery(name = "AlertCurrentEntity.findAll", query = "SELECT alert FROM AlertCurrentEntity alert"), - @NamedQuery(name = "AlertCurrentEntity.findByCluster", query = "SELECT alert FROM AlertCurrentEntity alert JOIN alert.alertHistory history WHERE history.clusterId = :clusterId"), - @NamedQuery(name = "AlertCurrentEntity.findByService", query = "SELECT alert FROM AlertCurrentEntity alert JOIN alert.alertHistory history WHERE history.clusterId = :clusterId AND history.serviceName = :serviceName AND history.alertDefinition.scope IN :inlist"), - @NamedQuery(name = "AlertCurrentEntity.findByHost", query = "SELECT alert FROM AlertCurrentEntity alert JOIN alert.alertHistory history WHERE history.clusterId = :clusterId AND history.hostName = :hostName AND history.alertDefinition.scope IN :inlist"), - @NamedQuery(name = "AlertCurrentEntity.findByHostAndName", query = "SELECT alert FROM AlertCurrentEntity alert JOIN alert.alertHistory history WHERE history.clusterId = :clusterId AND history.alertDefinition.definitionName = :definitionName AND history.hostName = :hostName"), - @NamedQuery(name = "AlertCurrentEntity.findByNameAndNoHost", query = "SELECT alert FROM AlertCurrentEntity alert JOIN alert.alertHistory history WHERE history.clusterId = :clusterId AND history.alertDefinition.definitionName = :definitionName AND history.hostName IS NULL"), + @NamedQuery(name = "AlertCurrentEntity.findByCluster", query = "SELECT alert FROM AlertCurrentEntity alert WHERE alert.alertHistory.clusterId = :clusterId"), + @NamedQuery(name = "AlertCurrentEntity.findByService", query = "SELECT alert FROM AlertCurrentEntity alert WHERE alert.alertHistory.clusterId = :clusterId AND alert.alertHistory.serviceName = :serviceName AND alert.alertHistory.alertDefinition.scope IN :inlist"), + @NamedQuery(name = "AlertCurrentEntity.findByHost", query = "SELECT alert FROM AlertCurrentEntity alert WHERE alert.alertHistory.clusterId = :clusterId AND alert.alertHistory.hostName = :hostName AND alert.alertHistory.alertDefinition.scope IN :inlist"), + @NamedQuery(name = "AlertCurrentEntity.findByHostAndName", query = "SELECT alert FROM AlertCurrentEntity alert WHERE alert.alertHistory.clusterId = :clusterId AND alert.alertHistory.alertDefinition.definitionName = :definitionName AND alert.alertHistory.hostName = :hostName"), + @NamedQuery(name = "AlertCurrentEntity.findByNameAndNoHost", query = "SELECT alert FROM AlertCurrentEntity alert WHERE alert.alertHistory.clusterId = :clusterId AND alert.alertHistory.alertDefinition.definitionName = :definitionName AND alert.alertHistory.hostName IS NULL"), @NamedQuery(name = "AlertCurrentEntity.removeByHistoryId", query = "DELETE FROM AlertCurrentEntity alert WHERE alert.alertHistory.alertId = :historyId"), @NamedQuery(name = "AlertCurrentEntity.removeByDefinitionId", query = "DELETE FROM AlertCurrentEntity alert WHERE alert.alertDefinition.definitionId = :definitionId") }) public class AlertCurrentEntity { @@ -77,8 +77,7 @@ public class AlertCurrentEntity { /** * Unidirectional one-to-one association to {@link AlertHistoryEntity} */ - @OneToOne(cascade = { CascadeType.PERSIST, CascadeType.MERGE, - CascadeType.REFRESH }) + @OneToOne(cascade = { CascadeType.PERSIST, CascadeType.REFRESH }) @JoinColumn(name = "history_id", unique = true, nullable = false) private AlertHistoryEntity alertHistory; http://git-wip-us.apache.org/repos/asf/ambari/blob/d838ca95/ambari-server/src/main/java/org/apache/ambari/server/state/services/AlertNoticeDispatchService.java ---------------------------------------------------------------------- diff --git a/ambari-server/src/main/java/org/apache/ambari/server/state/services/AlertNoticeDispatchService.java b/ambari-server/src/main/java/org/apache/ambari/server/state/services/AlertNoticeDispatchService.java index 7025e14..72487b3 100644 --- a/ambari-server/src/main/java/org/apache/ambari/server/state/services/AlertNoticeDispatchService.java +++ b/ambari-server/src/main/java/org/apache/ambari/server/state/services/AlertNoticeDispatchService.java @@ -17,10 +17,13 @@ */ package org.apache.ambari.server.state.services; +import java.lang.reflect.Type; +import java.text.MessageFormat; import java.util.ArrayList; import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.Map.Entry; import java.util.Set; import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.ThreadFactory; @@ -29,19 +32,32 @@ import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; import org.apache.ambari.server.events.AlertEvent; +import org.apache.ambari.server.notifications.DispatchCallback; +import org.apache.ambari.server.notifications.DispatchCredentials; import org.apache.ambari.server.notifications.DispatchFactory; import org.apache.ambari.server.notifications.DispatchRunnable; -import org.apache.ambari.server.notifications.DispatchCallback; -import org.apache.ambari.server.notifications.NotificationDispatcher; import org.apache.ambari.server.notifications.Notification; +import org.apache.ambari.server.notifications.NotificationDispatcher; +import org.apache.ambari.server.notifications.Recipient; import org.apache.ambari.server.orm.dao.AlertDispatchDAO; +import org.apache.ambari.server.orm.entities.AlertHistoryEntity; import org.apache.ambari.server.orm.entities.AlertNoticeEntity; import org.apache.ambari.server.orm.entities.AlertTargetEntity; +import org.apache.ambari.server.state.AlertState; import org.apache.ambari.server.state.NotificationState; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import com.google.common.util.concurrent.AbstractScheduledService; +import com.google.gson.Gson; +import com.google.gson.GsonBuilder; +import com.google.gson.JsonArray; +import com.google.gson.JsonDeserializationContext; +import com.google.gson.JsonDeserializer; +import com.google.gson.JsonElement; +import com.google.gson.JsonObject; +import com.google.gson.JsonParseException; +import com.google.gson.reflect.TypeToken; import com.google.inject.Inject; import com.google.inject.Singleton; @@ -49,6 +65,10 @@ import com.google.inject.Singleton; * The {@link AlertNoticeDispatchService} is used to scan the database for * {@link AlertNoticeEntity} that are in the {@link NotificationState#PENDING}. * It will then process them through the dispatch system. + * <p/> + * The dispatch system will then make a callback to + * {@link AlertNoticeDispatchCallback} so that the {@link NotificationState} can + * be updated to its final value. */ @Singleton public class AlertNoticeDispatchService extends AbstractScheduledService { @@ -59,6 +79,26 @@ public class AlertNoticeDispatchService extends AbstractScheduledService { private static final Logger LOG = LoggerFactory.getLogger(AlertNoticeDispatchService.class); /** + * The property containing the dispatch authentication username. + */ + private static final String AMBARI_DISPATCH_CREDENTIAL_USERNAME = "ambari.dispatch.credential.username"; + + /** + * The property containing the dispatch authentication password. + */ + private static final String AMBARI_DISPATCH_CREDENTIAL_PASSWORD = "ambari.dispatch.credential.password"; + + /** + * The property containing the dispatch recipients + */ + private static final String AMBARI_DISPATCH_RECIPIENTS = "ambari.dispatch.recipients"; + + /** + * Gson used to convert JSON properties to a map. + */ + private final Gson m_gson; + + /** * Dispatch DAO to query pending {@link AlertNoticeEntity} instances from. */ @Inject @@ -84,6 +124,12 @@ public class AlertNoticeDispatchService extends AbstractScheduledService { TimeUnit.MINUTES, new LinkedBlockingQueue<Runnable>(), new AlertDispatchThreadFactory(), new ThreadPoolExecutor.CallerRunsPolicy()); + + GsonBuilder gsonBuilder = new GsonBuilder(); + gsonBuilder.registerTypeAdapter(AlertTargetProperties.class, + new AlertTargetPropertyDeserializer()); + + m_gson = gsonBuilder.create(); } /** @@ -122,16 +168,82 @@ public class AlertNoticeDispatchService extends AbstractScheduledService { continue; } + String propertiesJson = target.getProperties(); + AlertTargetProperties targetProperties = m_gson.fromJson(propertiesJson, + AlertTargetProperties.class); + + Map<String, String> properties = targetProperties.Properties; + Notification notification = new Notification(); - notification.Subject = target.getTargetName(); - notification.Body = target.getDescription(); notification.Callback = new AlertNoticeDispatchCallback(); notification.CallbackIds = new ArrayList<String>(notices.size()); + // !!! FIXME: temporary until velocity templates are implemented + String subject = "OK ({0}), Warning ({1}), Critical ({2})"; + StringBuilder buffer = new StringBuilder(512); + + int okCount = 0; + int warningCount = 0; + int criticalCount = 0; + for (AlertNoticeEntity notice : notices) { + AlertHistoryEntity history = notice.getAlertHistory(); notification.CallbackIds.add(notice.getUuid()); + + AlertState alertState = history.getAlertState(); + switch (alertState) { + case CRITICAL: + criticalCount++; + break; + case OK: + okCount++; + break; + case UNKNOWN: + // !!! hmmmmmm + break; + case WARNING: + warningCount++; + break; + default: + break; + } + + buffer.append(history.getAlertLabel()); + buffer.append(": "); + buffer.append(history.getAlertText()); + buffer.append("\n"); } + notification.Subject = MessageFormat.format(subject, okCount, + warningCount, criticalCount); + + notification.Body = buffer.toString(); + + // set dispatch credentials + if (properties.containsKey(AMBARI_DISPATCH_CREDENTIAL_USERNAME) + && properties.containsKey(AMBARI_DISPATCH_CREDENTIAL_PASSWORD)) { + DispatchCredentials credentials = new DispatchCredentials(); + credentials.UserName = properties.get(AMBARI_DISPATCH_CREDENTIAL_USERNAME); + credentials.Password = properties.get(AMBARI_DISPATCH_CREDENTIAL_PASSWORD); + notification.Credentials = credentials; + } + + if (null != targetProperties.Recipients) { + List<Recipient> recipients = new ArrayList<Recipient>( + targetProperties.Recipients.size()); + + for (String stringRecipient : targetProperties.Recipients) { + Recipient recipient = new Recipient(); + recipient.Identifier = stringRecipient; + recipients.add(recipient); + } + + notification.Recipients = recipients; + } + + // set all other dispatch properties + notification.DispatchProperties = properties; + NotificationDispatcher dispatcher = m_dispatchFactory.getDispatcher(target.getNotificationType()); DispatchRunnable runnable = new DispatchRunnable(dispatcher, notification); @@ -151,6 +263,61 @@ public class AlertNoticeDispatchService extends AbstractScheduledService { } /** + * The {@link AlertTargetProperties} separates out the dispatcher properties + * from the list of recipients which is a JSON array and not a String. + */ + private static final class AlertTargetProperties { + /** + * The properties to pass to the concrete dispatcher. + */ + public Map<String, String> Properties; + + /** + * The recipients of the notice. + */ + public List<String> Recipients; + } + + /** + * The {@link AlertTargetPropertyDeserializer} is used to dump the majority of + * JSON serialized properties into a {@link Map} of {@link String} while at + * the same time, converting + * {@link AlertNoticeDispatchService#AMBARI_DISPATCH_RECIPIENTS} into a list. + */ + private static final class AlertTargetPropertyDeserializer implements + JsonDeserializer<AlertTargetProperties> { + + /** + * {@inheritDoc} + */ + @Override + public AlertTargetProperties deserialize(JsonElement json, Type typeOfT, + JsonDeserializationContext context) throws JsonParseException { + + AlertTargetProperties properties = new AlertTargetProperties(); + properties.Properties = new HashMap<String, String>(); + + final JsonObject jsonObject = json.getAsJsonObject(); + Set<Entry<String, JsonElement>> entrySet = jsonObject.entrySet(); + + for (Entry<String, JsonElement> entry : entrySet) { + String entryKey = entry.getKey(); + JsonElement entryValue = entry.getValue(); + + if (entryKey.equals(AMBARI_DISPATCH_RECIPIENTS)) { + Type listType = new TypeToken<List<String>>() {}.getType(); + JsonArray jsonArray = entryValue.getAsJsonArray(); + properties.Recipients = context.deserialize(jsonArray, listType); + } else { + properties.Properties.put(entryKey, entryValue.getAsString()); + } + } + + return properties; + } + } + + /** * A custom {@link ThreadFactory} for the threads that will handle dispatching * {@link AlertNoticeEntity} instances. Threads created will have slightly * reduced priority since {@link AlertEvent} instances are not critical to the http://git-wip-us.apache.org/repos/asf/ambari/blob/d838ca95/ambari-server/src/main/resources/stacks/HDP/2.0.6/services/HDFS/alerts.json ---------------------------------------------------------------------- diff --git a/ambari-server/src/main/resources/stacks/HDP/2.0.6/services/HDFS/alerts.json b/ambari-server/src/main/resources/stacks/HDP/2.0.6/services/HDFS/alerts.json index 88503af..620c89f 100644 --- a/ambari-server/src/main/resources/stacks/HDP/2.0.6/services/HDFS/alerts.json +++ b/ambari-server/src/main/resources/stacks/HDP/2.0.6/services/HDFS/alerts.json @@ -2,7 +2,7 @@ "service": [ { "name": "percent_datanode", - "label": "Percent DataNodes live", + "label": "Percent DataNodes Live", "interval": 1, "scope": "SERVICE", "source": { http://git-wip-us.apache.org/repos/asf/ambari/blob/d838ca95/ambari-server/src/test/java/org/apache/ambari/server/controller/internal/AlertTargetResourceProviderTest.java ---------------------------------------------------------------------- diff --git a/ambari-server/src/test/java/org/apache/ambari/server/controller/internal/AlertTargetResourceProviderTest.java b/ambari-server/src/test/java/org/apache/ambari/server/controller/internal/AlertTargetResourceProviderTest.java index 1c964a3..b96a4f3 100644 --- a/ambari-server/src/test/java/org/apache/ambari/server/controller/internal/AlertTargetResourceProviderTest.java +++ b/ambari-server/src/test/java/org/apache/ambari/server/controller/internal/AlertTargetResourceProviderTest.java @@ -187,6 +187,39 @@ public class AlertTargetResourceProviderTest { * @throws Exception */ @Test + public void testCreateWithRecipientArray() throws Exception { + Capture<List<AlertTargetEntity>> listCapture = new Capture<List<AlertTargetEntity>>(); + + m_dao.createTargets(capture(listCapture)); + expectLastCall(); + + replay(m_amc, m_dao); + + AlertTargetResourceProvider provider = createProvider(m_amc); + Map<String, Object> requestProps = getRecipientCreationProperties(); + + Request request = PropertyHelper.getCreateRequest( + Collections.singleton(requestProps), null); + provider.createResources(request); + + Assert.assertTrue(listCapture.hasCaptured()); + AlertTargetEntity entity = listCapture.getValue().get(0); + Assert.assertNotNull(entity); + + assertEquals(ALERT_TARGET_NAME, entity.getTargetName()); + assertEquals(ALERT_TARGET_DESC, entity.getDescription()); + assertEquals(ALERT_TARGET_TYPE, entity.getNotificationType()); + assertEquals( + "{\"ambari.dispatch.recipients\":\"[\\\"amb...@ambari.apache.org\\\"]\"}", + entity.getProperties()); + + verify(m_amc, m_dao); + } + + /** + * @throws Exception + */ + @Test @SuppressWarnings("unchecked") public void testUpdateResources() throws Exception { Capture<AlertTargetEntity> entityCapture = new Capture<AlertTargetEntity>(); @@ -331,6 +364,31 @@ public class AlertTargetResourceProviderTest { } /** + * Gets the maps of properties that simulate a deserialzied JSON request with + * a nested JSON array. + * + * @return + * @throws Exception + */ + private Map<String, Object> getRecipientCreationProperties() throws Exception { + Map<String, Object> requestProps = new HashMap<String, Object>(); + requestProps.put(AlertTargetResourceProvider.ALERT_TARGET_NAME, + ALERT_TARGET_NAME); + + requestProps.put(AlertTargetResourceProvider.ALERT_TARGET_DESCRIPTION, + ALERT_TARGET_DESC); + + requestProps.put( + AlertTargetResourceProvider.ALERT_TARGET_NOTIFICATION_TYPE, + ALERT_TARGET_TYPE); + + requestProps.put(AlertTargetResourceProvider.ALERT_TARGET_PROPERTIES + + "/ambari.dispatch.recipients", "[\"amb...@ambari.apache.org\"]"); + + return requestProps; + } + + /** * */ private class MockModule implements Module { http://git-wip-us.apache.org/repos/asf/ambari/blob/d838ca95/ambari-server/src/test/java/org/apache/ambari/server/notifications/EmailDispatcherTest.java ---------------------------------------------------------------------- diff --git a/ambari-server/src/test/java/org/apache/ambari/server/notifications/EmailDispatcherTest.java b/ambari-server/src/test/java/org/apache/ambari/server/notifications/EmailDispatcherTest.java new file mode 100644 index 0000000..1e7689f --- /dev/null +++ b/ambari-server/src/test/java/org/apache/ambari/server/notifications/EmailDispatcherTest.java @@ -0,0 +1,117 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.ambari.server.notifications; + +import java.util.ArrayList; +import java.util.List; +import java.util.UUID; + +import org.apache.ambari.server.orm.InMemoryDefaultTestModule; +import org.apache.ambari.server.state.alert.TargetType; +import org.easymock.EasyMock; +import org.junit.Before; +import org.junit.Test; + +import com.google.inject.Binder; +import com.google.inject.Guice; +import com.google.inject.Injector; +import com.google.inject.Module; +import com.google.inject.util.Modules; + +/** + * + */ +public class EmailDispatcherTest { + + private Injector m_injector; + private DispatchFactory m_dispatchFactory; + + @Before + public void before() throws Exception { + m_injector = Guice.createInjector(Modules.override( + new InMemoryDefaultTestModule()).with(new MockModule())); + + m_dispatchFactory = m_injector.getInstance(DispatchFactory.class); + } + + /** + * Tests that an email without recipients causes a callback error. + */ + @Test + public void testNoRecipients() { + Notification notification = new Notification(); + DispatchCallback callback = EasyMock.createMock(DispatchCallback.class); + notification.Callback = callback; + + List<String> callbackIds = new ArrayList<String>(); + callbackIds.add(UUID.randomUUID().toString()); + notification.CallbackIds = callbackIds; + + callback.onFailure(callbackIds); + + EasyMock.expectLastCall(); + EasyMock.replay(callback); + + NotificationDispatcher dispatcher = m_dispatchFactory.getDispatcher(TargetType.EMAIL.name()); + dispatcher.dispatch(notification); + + EasyMock.verify(callback); + } + + /** + * Tests that an email without properties causes a callback error. + */ + @Test + public void testNoEmailPropeties() { + Notification notification = new Notification(); + DispatchCallback callback = EasyMock.createMock(DispatchCallback.class); + notification.Callback = callback; + notification.Recipients = new ArrayList<Recipient>(); + + Recipient recipient = new Recipient(); + recipient.Identifier = "foo"; + + notification.Recipients.add(recipient); + + List<String> callbackIds = new ArrayList<String>(); + callbackIds.add(UUID.randomUUID().toString()); + notification.CallbackIds = callbackIds; + + callback.onFailure(callbackIds); + + EasyMock.expectLastCall(); + EasyMock.replay(callback); + + NotificationDispatcher dispatcher = m_dispatchFactory.getDispatcher(TargetType.EMAIL.name()); + dispatcher.dispatch(notification); + + EasyMock.verify(callback); + } + + /** + * + */ + private class MockModule implements Module { + /** + * + */ + @Override + public void configure(Binder binder) { + } + } +} http://git-wip-us.apache.org/repos/asf/ambari/blob/d838ca95/ambari-server/src/test/java/org/apache/ambari/server/notifications/MockDispatcher.java ---------------------------------------------------------------------- diff --git a/ambari-server/src/test/java/org/apache/ambari/server/notifications/MockDispatcher.java b/ambari-server/src/test/java/org/apache/ambari/server/notifications/MockDispatcher.java new file mode 100644 index 0000000..616551f --- /dev/null +++ b/ambari-server/src/test/java/org/apache/ambari/server/notifications/MockDispatcher.java @@ -0,0 +1,46 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.ambari.server.notifications; + +/** + * + */ +public class MockDispatcher implements NotificationDispatcher { + + /** + * Constructor. + * + */ + public MockDispatcher() { + } + + /** + * {@inheritDoc} + */ + @Override + public String getType() { + return "MOCK"; + } + + /** + * {@inheritDoc} + */ + @Override + public void dispatch(Notification notification) { + } +} http://git-wip-us.apache.org/repos/asf/ambari/blob/d838ca95/ambari-server/src/test/java/org/apache/ambari/server/orm/dao/AlertsDAOTest.java ---------------------------------------------------------------------- diff --git a/ambari-server/src/test/java/org/apache/ambari/server/orm/dao/AlertsDAOTest.java b/ambari-server/src/test/java/org/apache/ambari/server/orm/dao/AlertsDAOTest.java index 9e638e8..6e4d4af 100644 --- a/ambari-server/src/test/java/org/apache/ambari/server/orm/dao/AlertsDAOTest.java +++ b/ambari-server/src/test/java/org/apache/ambari/server/orm/dao/AlertsDAOTest.java @@ -22,6 +22,7 @@ package org.apache.ambari.server.orm.dao; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertNull; +import static org.junit.Assert.assertTrue; import java.util.ArrayList; import java.util.Calendar; @@ -63,7 +64,7 @@ public class AlertsDAOTest { private AlertDefinitionDAO definitionDao; /** - * + * */ @Before public void setup() throws Exception { @@ -88,7 +89,7 @@ public class AlertsDAOTest { definition.setSourceType(SourceType.SCRIPT); definitionDao.create(definition); } - + List<AlertDefinitionEntity> definitions = definitionDao.findAll(); assertNotNull(definitions); assertEquals(5, definitions.size()); @@ -142,7 +143,7 @@ public class AlertsDAOTest { } /** - * + * */ @After public void teardown() { @@ -152,7 +153,7 @@ public class AlertsDAOTest { /** - * + * */ @Test public void testFindAll() { @@ -162,7 +163,7 @@ public class AlertsDAOTest { } /** - * + * */ @Test public void testFindAllCurrent() { @@ -172,16 +173,16 @@ public class AlertsDAOTest { } /** - * + * */ @Test public void testFindCurrentByService() { List<AlertCurrentEntity> currentAlerts = dao.findCurrent(); AlertCurrentEntity current = currentAlerts.get(0); AlertHistoryEntity history = current.getAlertHistory(); - + assertNotNull(history); - + currentAlerts = dao.findCurrentByService(clusterId, history.getServiceName()); @@ -193,7 +194,7 @@ public class AlertsDAOTest { assertNotNull(currentAlerts); assertEquals(0, currentAlerts.size()); } - + /** * Test looking up current by a host name. */ @@ -211,7 +212,7 @@ public class AlertsDAOTest { hostDef.setSource("HostService"); hostDef.setSourceType(SourceType.SCRIPT); definitionDao.create(hostDef); - + // history for the definition AlertHistoryEntity history = new AlertHistoryEntity(); history.setServiceName(hostDef.getServiceName()); @@ -222,14 +223,14 @@ public class AlertsDAOTest { history.setAlertTimestamp(Long.valueOf(1L)); history.setHostName("h2"); history.setAlertState(AlertState.OK); - + // current for the history AlertCurrentEntity current = new AlertCurrentEntity(); current.setOriginalTimestamp(1L); current.setLatestTimestamp(2L); current.setAlertHistory(history); dao.create(current); - + List<AlertCurrentEntity> currentAlerts = dao.findCurrentByHost(clusterId, history.getHostName()); assertNotNull(currentAlerts); @@ -239,10 +240,10 @@ public class AlertsDAOTest { assertNotNull(currentAlerts); assertEquals(0, currentAlerts.size()); - } + } /** - * + * */ @Test public void testFindByState() { @@ -250,7 +251,7 @@ public class AlertsDAOTest { allStates.add(AlertState.OK); allStates.add(AlertState.WARNING); allStates.add(AlertState.CRITICAL); - + List<AlertHistoryEntity> history = dao.findAll(clusterId, allStates); assertNotNull(history); assertEquals(50, history.size()); @@ -263,21 +264,21 @@ public class AlertsDAOTest { Collections.singletonList(AlertState.CRITICAL)); assertNotNull(history); assertEquals(10, history.size()); - + history = dao.findAll(clusterId, Collections.singletonList(AlertState.WARNING)); assertNotNull(history); - assertEquals(0, history.size()); + assertEquals(0, history.size()); } /** - * + * */ @Test public void testFindByDate() { calendar.clear(); calendar.set(2014, Calendar.JANUARY, 1); - + // on or after 1/1/2014 List<AlertHistoryEntity> history = dao.findAll(clusterId, calendar.getTime(), null); @@ -311,21 +312,21 @@ public class AlertsDAOTest { assertNotNull(history); assertEquals(0, history.size()); } - + @Test public void testFindCurrentByHostAndName() throws Exception { AlertCurrentEntity entity = dao.findCurrentByHostAndName(clusterId.longValue(), "h2", "Alert Definition 1"); assertNull(entity); - + entity = dao.findCurrentByHostAndName(clusterId.longValue(), "h1", "Alert Definition 1"); - + assertNotNull(entity); assertNotNull(entity.getAlertHistory()); assertNotNull(entity.getAlertHistory().getAlertDefinition()); } - + /** - * + * */ @Test public void testFindCurrentSummary() throws Exception { @@ -341,12 +342,12 @@ public class AlertsDAOTest { dao.merge(h2); h3.setAlertState(AlertState.UNKNOWN); dao.merge(h3); - + int ok = 0; int warn = 0; int crit = 0; int unk = 0; - + for (AlertCurrentEntity h : dao.findCurrentByCluster(clusterId.longValue())) { switch (h.getAlertHistory().getAlertState()) { case CRITICAL: @@ -362,22 +363,22 @@ public class AlertsDAOTest { warn++; break; } - + } - + summary = dao.findCurrentCounts(clusterId.longValue(), null, null); // !!! db-to-db compare assertEquals(ok, summary.getOkCount()); assertEquals(warn, summary.getWarningCount()); assertEquals(crit, summary.getCriticalCount()); assertEquals(unk, summary.getCriticalCount()); - + // !!! expected assertEquals(2, summary.getOkCount()); assertEquals(1, summary.getWarningCount()); assertEquals(1, summary.getCriticalCount()); assertEquals(1, summary.getCriticalCount()); - + summary = dao.findCurrentCounts(clusterId.longValue(), "Service 0", null); assertEquals(1, summary.getOkCount()); assertEquals(0, summary.getWarningCount()); @@ -389,15 +390,14 @@ public class AlertsDAOTest { assertEquals(1, summary.getWarningCount()); assertEquals(1, summary.getCriticalCount()); assertEquals(1, summary.getCriticalCount()); - + summary = dao.findCurrentCounts(clusterId.longValue(), "foo", null); assertEquals(0, summary.getOkCount()); assertEquals(0, summary.getWarningCount()); assertEquals(0, summary.getCriticalCount()); assertEquals(0, summary.getCriticalCount()); - } - + @Test public void testFindAggregates() throws Exception { // definition @@ -412,7 +412,7 @@ public class AlertsDAOTest { definition.setSource("SourceScript"); definition.setSourceType(SourceType.SCRIPT); definitionDao.create(definition); - + // history record #1 and current AlertHistoryEntity history = new AlertHistoryEntity(); history.setAlertDefinition(definition); @@ -425,13 +425,13 @@ public class AlertsDAOTest { history.setComponentName(""); history.setHostName("h1"); history.setServiceName("ServiceName"); - + AlertCurrentEntity current = new AlertCurrentEntity(); current.setAlertHistory(history); current.setLatestTimestamp(Long.valueOf(1L)); current.setOriginalTimestamp(Long.valueOf(1L)); dao.merge(current); - + // history record #2 and current history = new AlertHistoryEntity(); history.setAlertDefinition(definition); @@ -444,35 +444,84 @@ public class AlertsDAOTest { history.setComponentName(""); history.setHostName("h2"); history.setServiceName("ServiceName"); - + current = new AlertCurrentEntity(); current.setAlertHistory(history); current.setLatestTimestamp(Long.valueOf(1L)); current.setOriginalTimestamp(Long.valueOf(1L)); dao.merge(current); - + AlertSummaryDTO summary = dao.findAggregateCounts(clusterId.longValue(), "many_per_cluster"); assertEquals(2, summary.getOkCount()); assertEquals(0, summary.getWarningCount()); assertEquals(0, summary.getCriticalCount()); assertEquals(0, summary.getUnknownCount()); - + AlertCurrentEntity c = dao.findCurrentByHostAndName(clusterId.longValue(), "h2", "many_per_cluster"); AlertHistoryEntity h = c.getAlertHistory(); h.setAlertState(AlertState.CRITICAL); dao.merge(h); - + summary = dao.findAggregateCounts(clusterId.longValue(), "many_per_cluster"); assertEquals(2, summary.getOkCount()); assertEquals(0, summary.getWarningCount()); assertEquals(1, summary.getCriticalCount()); assertEquals(0, summary.getUnknownCount()); - + summary = dao.findAggregateCounts(clusterId.longValue(), "foo"); assertEquals(0, summary.getOkCount()); assertEquals(0, summary.getWarningCount()); assertEquals(0, summary.getCriticalCount()); assertEquals(0, summary.getUnknownCount()); - } + } + + /** + * Tests <a + * href="https://bugs.eclipse.org/bugs/show_bug.cgi?id=398067">https:/ + * /bugs.eclipse.org/bugs/show_bug.cgi?id=398067</a> which causes an inner + * entity to be stale. + */ + @Test + public void testJPAInnerEntityStaleness() { + List<AlertCurrentEntity> currents = dao.findCurrent(); + AlertCurrentEntity current = currents.get(0); + AlertHistoryEntity oldHistory = current.getAlertHistory(); + + AlertHistoryEntity newHistory = new AlertHistoryEntity(); + newHistory.setAlertDefinition(oldHistory.getAlertDefinition()); + newHistory.setAlertInstance(oldHistory.getAlertInstance()); + newHistory.setAlertLabel(oldHistory.getAlertLabel()); + + if (oldHistory.getAlertState() == AlertState.OK) { + newHistory.setAlertState(AlertState.CRITICAL); + } else { + newHistory.setAlertState(AlertState.OK); + } + + newHistory.setAlertText("New History"); + newHistory.setClusterId(oldHistory.getClusterId()); + newHistory.setAlertTimestamp(System.currentTimeMillis()); + newHistory.setComponentName(oldHistory.getComponentName()); + newHistory.setHostName(oldHistory.getHostName()); + newHistory.setServiceName(oldHistory.getServiceName()); + + dao.create(newHistory); + + assertTrue(newHistory.getAlertId().longValue() != oldHistory.getAlertId().longValue()); + + current.setAlertHistory(newHistory); + dao.merge(current); + + AlertCurrentEntity newCurrent = dao.findCurrentByHostAndName( + newHistory.getClusterId(), + newHistory.getHostName(), + newHistory.getAlertDefinition().getDefinitionName()); + + assertEquals(newHistory.getAlertId(), + newCurrent.getAlertHistory().getAlertId()); + + assertEquals(newHistory.getAlertState(), + newCurrent.getAlertHistory().getAlertState()); + } } http://git-wip-us.apache.org/repos/asf/ambari/blob/d838ca95/ambari-server/src/test/java/org/apache/ambari/server/state/alerts/AlertStateChangedEventTest.java ---------------------------------------------------------------------- diff --git a/ambari-server/src/test/java/org/apache/ambari/server/state/alerts/AlertStateChangedEventTest.java b/ambari-server/src/test/java/org/apache/ambari/server/state/alerts/AlertStateChangedEventTest.java new file mode 100644 index 0000000..312f297 --- /dev/null +++ b/ambari-server/src/test/java/org/apache/ambari/server/state/alerts/AlertStateChangedEventTest.java @@ -0,0 +1,139 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.ambari.server.state.alerts; + +import java.util.ArrayList; +import java.util.HashSet; +import java.util.List; +import java.util.Set; + +import org.apache.ambari.server.events.AlertStateChangeEvent; +import org.apache.ambari.server.events.listeners.AlertServiceStateListener; +import org.apache.ambari.server.events.listeners.AlertStateChangedListener; +import org.apache.ambari.server.events.publishers.AlertEventPublisher; +import org.apache.ambari.server.orm.GuiceJpaInitializer; +import org.apache.ambari.server.orm.InMemoryDefaultTestModule; +import org.apache.ambari.server.orm.dao.AlertDispatchDAO; +import org.apache.ambari.server.orm.entities.AlertDefinitionEntity; +import org.apache.ambari.server.orm.entities.AlertGroupEntity; +import org.apache.ambari.server.orm.entities.AlertHistoryEntity; +import org.apache.ambari.server.orm.entities.AlertNoticeEntity; +import org.apache.ambari.server.orm.entities.AlertTargetEntity; +import org.easymock.EasyMock; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; + +import com.google.inject.Binder; +import com.google.inject.Guice; +import com.google.inject.Injector; +import com.google.inject.Module; +import com.google.inject.persist.PersistService; +import com.google.inject.util.Modules; + +/** + * Tests that {@link AlertStateChangeEvent} instances cause + * {@link AlertNoticeEntity} instances to be created. + */ +public class AlertStateChangedEventTest { + + private AlertEventPublisher eventPublisher; + private AlertDispatchDAO dispatchDao; + private Injector injector; + + /** + * + */ + @Before + public void setup() throws Exception { + injector = Guice.createInjector(Modules.override( + new InMemoryDefaultTestModule()).with(new MockModule())); + + injector.getInstance(GuiceJpaInitializer.class); + + // force singleton init via Guice so the listener registers with the bus + injector.getInstance(AlertServiceStateListener.class); + injector.getInstance(AlertStateChangedListener.class); + + dispatchDao = injector.getInstance(AlertDispatchDAO.class); + eventPublisher = injector.getInstance(AlertEventPublisher.class); + } + + /** + * @throws Exception + */ + @After + public void teardown() throws Exception { + injector.getInstance(PersistService.class).stop(); + injector = null; + } + + /** + * Tests that an {@link AlertStateChangeEvent} causes + * {@link AlertNoticeEntity} instances to be written. + * + * @throws Exception + */ + @Test + public void testAlertNoticeCreationFromEvent() throws Exception { + AlertHistoryEntity history = EasyMock.createNiceMock(AlertHistoryEntity.class); + AlertStateChangeEvent event = EasyMock.createNiceMock(AlertStateChangeEvent.class); + EasyMock.expect(event.getNewHistoricalEntry()).andReturn(history).atLeastOnce(); + + EasyMock.replay(history, event); + + // async publishing + eventPublisher.publish(event); + Thread.sleep(2000); + + EasyMock.verify(dispatchDao, history, event); + } + + /** + * + */ + private class MockModule implements Module { + /** + * + */ + @Override + public void configure(Binder binder) { + AlertTargetEntity alertTarget = EasyMock.createMock(AlertTargetEntity.class); + AlertGroupEntity alertGroup = EasyMock.createMock(AlertGroupEntity.class); + List<AlertGroupEntity> groups = new ArrayList<AlertGroupEntity>(); + Set<AlertTargetEntity> targets = new HashSet<AlertTargetEntity>(); + + targets.add(alertTarget); + groups.add(alertGroup); + + EasyMock.expect(alertGroup.getAlertTargets()).andReturn(targets).once(); + + AlertDispatchDAO dispatchDao = EasyMock.createMock(AlertDispatchDAO.class); + EasyMock.expect( + dispatchDao.findGroupsByDefinition(EasyMock.anyObject(AlertDefinitionEntity.class))).andReturn( + groups).once(); + + dispatchDao.create(EasyMock.anyObject(AlertNoticeEntity.class)); + EasyMock.expectLastCall().once(); + + binder.bind(AlertDispatchDAO.class).toInstance(dispatchDao); + + EasyMock.replay(alertTarget, alertGroup, dispatchDao); + } + } +}