Repository: ambari Updated Branches: refs/heads/branch-2.1 5b05ece16 -> 4de51625d
AMBARI-13570 - Reduce Load On Database By Caching Alerts (jonathanhurley) Project: http://git-wip-us.apache.org/repos/asf/ambari/repo Commit: http://git-wip-us.apache.org/repos/asf/ambari/commit/4de51625 Tree: http://git-wip-us.apache.org/repos/asf/ambari/tree/4de51625 Diff: http://git-wip-us.apache.org/repos/asf/ambari/diff/4de51625 Branch: refs/heads/branch-2.1 Commit: 4de51625dbe3c208be8422d8b17b4685f39b702a Parents: 5b05ece Author: Jonathan Hurley <jhur...@hortonworks.com> Authored: Mon Oct 26 21:31:40 2015 -0400 Committer: Jonathan Hurley <jhur...@hortonworks.com> Committed: Wed Oct 28 10:12:02 2015 -0400 ---------------------------------------------------------------------- .../apache/ambari/annotations/Experimental.java | 7 + .../ambari/annotations/ExperimentalFeature.java | 39 ++ .../actionmanager/ActionDBAccessorImpl.java | 5 +- .../api/services/PersistKeyValueService.java | 18 +- .../server/configuration/Configuration.java | 82 ++- .../listeners/alerts/AlertReceivedListener.java | 26 +- .../apache/ambari/server/orm/dao/AlertsDAO.java | 518 ++++++++++++++++++- .../server/orm/entities/AlertCurrentEntity.java | 21 + .../state/services/CachedAlertFlushService.java | 97 ++++ .../server/configuration/ConfigurationTest.java | 20 +- 10 files changed, 785 insertions(+), 48 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ambari/blob/4de51625/ambari-server/src/main/java/org/apache/ambari/annotations/Experimental.java ---------------------------------------------------------------------- diff --git a/ambari-server/src/main/java/org/apache/ambari/annotations/Experimental.java b/ambari-server/src/main/java/org/apache/ambari/annotations/Experimental.java index 5a4915a..f51991c 100644 --- a/ambari-server/src/main/java/org/apache/ambari/annotations/Experimental.java +++ b/ambari-server/src/main/java/org/apache/ambari/annotations/Experimental.java @@ -32,4 +32,11 @@ import java.lang.annotation.Target; ElementType.ANNOTATION_TYPE, ElementType.PACKAGE, ElementType.FIELD, ElementType.LOCAL_VARIABLE }) public @interface Experimental { + + /** + * The logical feature set that an experimental area of code belongs to. + * + * @return + */ + ExperimentalFeature feature(); } http://git-wip-us.apache.org/repos/asf/ambari/blob/4de51625/ambari-server/src/main/java/org/apache/ambari/annotations/ExperimentalFeature.java ---------------------------------------------------------------------- diff --git a/ambari-server/src/main/java/org/apache/ambari/annotations/ExperimentalFeature.java b/ambari-server/src/main/java/org/apache/ambari/annotations/ExperimentalFeature.java new file mode 100644 index 0000000..f29ed40 --- /dev/null +++ b/ambari-server/src/main/java/org/apache/ambari/annotations/ExperimentalFeature.java @@ -0,0 +1,39 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.ambari.annotations; + +import java.util.concurrent.Executor; + +/** + * The {@link ExperimentalFeature} enumeration is meant to be used with the + * {@link Experimental} annotation to indicate which feature set experimental + * code belongs to. + */ +public enum ExperimentalFeature { + /** + * The processing of arbitrary, atomic list elements by an {@link Executor} in + * order to arrive at a full processed list faster. + */ + PARALLEL_PROCESSING, + + /** + * The caching of current alert information in order to reduce overall load on + * the database by preventing frequent updates and JPA entity invalidation. + */ + ALERT_CACHING +} http://git-wip-us.apache.org/repos/asf/ambari/blob/4de51625/ambari-server/src/main/java/org/apache/ambari/server/actionmanager/ActionDBAccessorImpl.java ---------------------------------------------------------------------- diff --git a/ambari-server/src/main/java/org/apache/ambari/server/actionmanager/ActionDBAccessorImpl.java b/ambari-server/src/main/java/org/apache/ambari/server/actionmanager/ActionDBAccessorImpl.java index 21d9f2b..99c327f 100644 --- a/ambari-server/src/main/java/org/apache/ambari/server/actionmanager/ActionDBAccessorImpl.java +++ b/ambari-server/src/main/java/org/apache/ambari/server/actionmanager/ActionDBAccessorImpl.java @@ -28,6 +28,7 @@ import java.util.Map; import java.util.concurrent.TimeUnit; import org.apache.ambari.annotations.Experimental; +import org.apache.ambari.annotations.ExperimentalFeature; import org.apache.ambari.server.AmbariException; import org.apache.ambari.server.agent.CommandReport; import org.apache.ambari.server.agent.ExecutionCommand; @@ -216,13 +217,13 @@ public class ActionDBAccessorImpl implements ActionDBAccessor { * {@inheritDoc} */ @Override - @Experimental + @Experimental(feature = ExperimentalFeature.PARALLEL_PROCESSING) public List<Stage> getStagesInProgress() { List<StageEntity> stageEntities = stageDAO.findByCommandStatuses( HostRoleStatus.IN_PROGRESS_STATUSES); // experimentally enable parallel stage processing - @Experimental + @Experimental(feature = ExperimentalFeature.PARALLEL_PROCESSING) boolean useConcurrentStageProcessing = configuration.isExperimentalConcurrentStageProcessingEnabled(); if (useConcurrentStageProcessing) { ParallelLoopResult<Stage> loopResult = Parallel.forLoop(stageEntities, http://git-wip-us.apache.org/repos/asf/ambari/blob/4de51625/ambari-server/src/main/java/org/apache/ambari/server/api/services/PersistKeyValueService.java ---------------------------------------------------------------------- diff --git a/ambari-server/src/main/java/org/apache/ambari/server/api/services/PersistKeyValueService.java b/ambari-server/src/main/java/org/apache/ambari/server/api/services/PersistKeyValueService.java index 9b942e5..83217c5 100644 --- a/ambari-server/src/main/java/org/apache/ambari/server/api/services/PersistKeyValueService.java +++ b/ambari-server/src/main/java/org/apache/ambari/server/api/services/PersistKeyValueService.java @@ -23,7 +23,13 @@ import java.util.ArrayList; import java.util.Collection; import java.util.Map; -import javax.ws.rs.*; +import javax.ws.rs.GET; +import javax.ws.rs.POST; +import javax.ws.rs.PUT; +import javax.ws.rs.Path; +import javax.ws.rs.PathParam; +import javax.ws.rs.Produces; +import javax.ws.rs.WebApplicationException; import javax.ws.rs.core.Response; import javax.xml.bind.JAXBException; @@ -64,25 +70,25 @@ public class PersistKeyValueService { @PUT @Produces("text/plain") public String store(String values) throws IOException, JAXBException { - LOG.info("Received message from UI " + values); + LOG.debug("Received message from UI " + values); Collection<String> valueCollection = StageUtils.fromJson(values, Collection.class); Collection<String> keys = new ArrayList<String>(valueCollection.size()); for (String s : valueCollection) { keys.add(persistKeyVal.put(s)); } String stringRet = StageUtils.jaxbToString(keys); - LOG.info("Returning " + stringRet); + LOG.debug("Returning " + stringRet); return stringRet; } - + @GET @Produces("text/plain") @Path("{keyName}") public String getKey( @PathParam("keyName") String keyName) { - LOG.info("Looking for keyName " + keyName); + LOG.debug("Looking for keyName " + keyName); return persistKeyVal.getValue(keyName); } - + @GET @Produces("text/plain") public String getAllKeyValues() throws JAXBException, IOException { http://git-wip-us.apache.org/repos/asf/ambari/blob/4de51625/ambari-server/src/main/java/org/apache/ambari/server/configuration/Configuration.java ---------------------------------------------------------------------- diff --git a/ambari-server/src/main/java/org/apache/ambari/server/configuration/Configuration.java b/ambari-server/src/main/java/org/apache/ambari/server/configuration/Configuration.java index e97a54e..26e20e8 100644 --- a/ambari-server/src/main/java/org/apache/ambari/server/configuration/Configuration.java +++ b/ambari-server/src/main/java/org/apache/ambari/server/configuration/Configuration.java @@ -23,8 +23,10 @@ import com.google.gson.JsonParser; import com.google.inject.Inject; import com.google.inject.Singleton; import org.apache.ambari.annotations.Experimental; +import org.apache.ambari.annotations.ExperimentalFeature; import org.apache.ambari.server.AmbariException; import org.apache.ambari.server.actionmanager.Stage; +import org.apache.ambari.server.events.listeners.alerts.AlertReceivedListener; import org.apache.ambari.server.orm.JPATableGenerationStrategy; import org.apache.ambari.server.orm.PersistenceType; import org.apache.ambari.server.orm.entities.StageEntity; @@ -450,8 +452,6 @@ public class Configuration { private static final String TIMELINE_METRICS_CACHE_HEAP_PERCENT = "server.timeline.metrics.cache.heap.percent"; private static final String DEFAULT_TIMELINE_METRICS_CACHE_HEAP_PERCENT = "15%"; - // experimental options - /** * Governs the use of {@link Parallel} to process {@link StageEntity} * instances into {@link Stage}. @@ -463,11 +463,45 @@ public class Configuration { */ private static final String ALERT_TEMPLATE_FILE = "alerts.template.file"; + /** + * The maximum number of threads which will handle published alert events. + */ public static final String ALERTS_EXECUTION_SCHEDULER_THREADS_KEY = "alerts.execution.scheduler.maxThreads"; + + /** + * The default core threads for handling published alert events + */ public static final String ALERTS_EXECUTION_SCHEDULER_THREADS_DEFAULT = "2"; /** - * For HTTP Response header configuration for Ambari Server UI + * If {@code true} then alert information is cached and not immediately + * persisted in the database. + */ + public static final String ALERTS_CACHE_ENABLED = "alerts.cache.enabled"; + + /** + * The time after which cached alert information is flushed to the database. + */ + public static final String ALERTS_CACHE_FLUSH_INTERVAL = "alerts.cache.flush.interval"; + + /** + * The default time, in minutes, that cached alert information is flushed to + * the database. + */ + public static final String ALERTS_CACHE_FLUSH_INTERVAL_DEFAULT = "10"; + + /** + * The size of the alert cache. + */ + public static final String ALERTS_CACHE_SIZE = "alerts.cache.size"; + + /** + * The default size of the alerts cache. + */ + public static final String ALERTS_CACHE_SIZE_DEFAULT = "50000"; + + /** + * For HTTP Response header configuration for Ambari Server UI */ public static final String HTTP_STRICT_TRANSPORT_HEADER_VALUE_KEY = "http.strict-transport-security"; public static final String HTTP_STRICT_TRANSPORT_HEADER_VALUE_DEFAULT = "max-age=31536000"; @@ -2236,9 +2270,49 @@ public class Configuration { * @return {code true} if the experimental feature is enabled, {@code false} * otherwise. */ - @Experimental + @Experimental(feature = ExperimentalFeature.PARALLEL_PROCESSING) public boolean isExperimentalConcurrentStageProcessingEnabled() { return Boolean.parseBoolean(properties.getProperty( EXPERIMENTAL_CONCURRENCY_STAGE_PROCESSING_ENABLED, Boolean.FALSE.toString())); } + + /** + * If {@code true}, then alerts processed by the {@link AlertReceivedListener} + * will not write alert data to the database on every event. Instead, data + * like timestamps and text will be kept in a cache and flushed out + * periodically to the database. + * <p/> + * The default value is {@code false}. + * + * @return {@code true} if the cache is enabled, {@code false} otherwise. + */ + @Experimental(feature = ExperimentalFeature.ALERT_CACHING) + public boolean isAlertCacheEnabled() { + return Boolean.parseBoolean( + properties.getProperty(ALERTS_CACHE_ENABLED, Boolean.FALSE.toString())); + } + + /** + * Gets the interval at which cached alert data is written out to the + * database, if enabled. + * + * @return the cache flush interval, or + * {@value #ALERTS_CACHE_FLUSH_INTERVAL_DEFAULT} if not set. + */ + @Experimental(feature = ExperimentalFeature.ALERT_CACHING) + public int getAlertCacheFlushInterval() { + return Integer.parseInt( + properties.getProperty(ALERTS_CACHE_FLUSH_INTERVAL, ALERTS_CACHE_FLUSH_INTERVAL_DEFAULT)); + } + + /** + * Gets the size of the alerts cache, if enabled. + * + * @return the cache flush interval, or {@value #ALERTS_CACHE_SIZE_DEFAULT} if + * not set. + */ + @Experimental(feature = ExperimentalFeature.ALERT_CACHING) + public int getAlertCacheSize() { + return Integer.parseInt(properties.getProperty(ALERTS_CACHE_SIZE, ALERTS_CACHE_SIZE_DEFAULT)); + } } http://git-wip-us.apache.org/repos/asf/ambari/blob/4de51625/ambari-server/src/main/java/org/apache/ambari/server/events/listeners/alerts/AlertReceivedListener.java ---------------------------------------------------------------------- diff --git a/ambari-server/src/main/java/org/apache/ambari/server/events/listeners/alerts/AlertReceivedListener.java b/ambari-server/src/main/java/org/apache/ambari/server/events/listeners/alerts/AlertReceivedListener.java index e59f63e..402af5c 100644 --- a/ambari-server/src/main/java/org/apache/ambari/server/events/listeners/alerts/AlertReceivedListener.java +++ b/ambari-server/src/main/java/org/apache/ambari/server/events/listeners/alerts/AlertReceivedListener.java @@ -21,9 +21,9 @@ import java.util.HashMap; import java.util.List; import java.util.Map; -import com.google.inject.persist.Transactional; import org.apache.ambari.server.AmbariException; import org.apache.ambari.server.EagerSingleton; +import org.apache.ambari.server.configuration.Configuration; import org.apache.ambari.server.controller.RootServiceResponseFactory.Services; import org.apache.ambari.server.events.AlertEvent; import org.apache.ambari.server.events.AlertReceivedEvent; @@ -50,6 +50,7 @@ import com.google.common.eventbus.Subscribe; import com.google.inject.Inject; import com.google.inject.Provider; import com.google.inject.Singleton; +import com.google.inject.persist.Transactional; /** * The {@link AlertReceivedListener} class handles {@link AlertReceivedEvent} @@ -65,6 +66,9 @@ public class AlertReceivedListener { private static final Logger LOG = LoggerFactory.getLogger(AlertReceivedListener.class); @Inject + Configuration m_configuration; + + @Inject AlertsDAO m_alertsDao; @Inject @@ -197,10 +201,11 @@ public class AlertReceivedListener { toCreateHistoryAndMerge.put(alert, current); oldStates.put(alert, oldState); - } } + // invokes the EntityManager create/merge on various entities in a single + // transaction saveEntities(toCreate, toMerge, toCreateHistoryAndMerge); //broadcast events @@ -241,11 +246,17 @@ public class AlertReceivedListener { } - Long getClusterIdByName(String clusterName) { + /** + * Gets the cluster ID given a name. + * + * @param clusterName + * @return + */ + private Long getClusterIdByName(String clusterName) { try { return m_clusters.get().getCluster(clusterName).getClusterId(); } catch (AmbariException e) { - LOG.warn("Cluster lookup failed for clusterName={}", clusterName); + LOG.warn("Cluster lookup failed for cluster named {}", clusterName); return null; } } @@ -257,15 +268,16 @@ public class AlertReceivedListener { * @param toCreateHistoryAndMerge - create new history, merge alert */ @Transactional - void saveEntities(Map<Alert, AlertCurrentEntity> toCreate, Map<Alert, AlertCurrentEntity> toMerge, - Map<Alert, AlertCurrentEntity> toCreateHistoryAndMerge) { + private void saveEntities(Map<Alert, AlertCurrentEntity> toCreate, + Map<Alert, AlertCurrentEntity> toMerge, + Map<Alert, AlertCurrentEntity> toCreateHistoryAndMerge) { for (Map.Entry<Alert, AlertCurrentEntity> entry : toCreate.entrySet()) { AlertCurrentEntity entity = entry.getValue(); m_alertsDao.create(entity); } for (AlertCurrentEntity entity : toMerge.values()) { - m_alertsDao.merge(entity); + m_alertsDao.merge(entity, m_configuration.isAlertCacheEnabled()); } for (Map.Entry<Alert, AlertCurrentEntity> entry : toCreateHistoryAndMerge.entrySet()) { http://git-wip-us.apache.org/repos/asf/ambari/blob/4de51625/ambari-server/src/main/java/org/apache/ambari/server/orm/dao/AlertsDAO.java ---------------------------------------------------------------------- diff --git a/ambari-server/src/main/java/org/apache/ambari/server/orm/dao/AlertsDAO.java b/ambari-server/src/main/java/org/apache/ambari/server/orm/dao/AlertsDAO.java index 0429534..145f841 100644 --- a/ambari-server/src/main/java/org/apache/ambari/server/orm/dao/AlertsDAO.java +++ b/ambari-server/src/main/java/org/apache/ambari/server/orm/dao/AlertsDAO.java @@ -17,12 +17,17 @@ */ package org.apache.ambari.server.orm.dao; +import java.util.ArrayList; import java.util.Collections; import java.util.Date; import java.util.EnumSet; import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.Map.Entry; +import java.util.Set; +import java.util.concurrent.ConcurrentMap; +import java.util.concurrent.ExecutionException; import javax.persistence.EntityManager; import javax.persistence.TypedQuery; @@ -30,8 +35,11 @@ import javax.persistence.criteria.CriteriaQuery; import javax.persistence.criteria.Order; import javax.persistence.metamodel.SingularAttribute; +import org.apache.ambari.annotations.Experimental; +import org.apache.ambari.annotations.ExperimentalFeature; import org.apache.ambari.server.api.query.JpaPredicateVisitor; import org.apache.ambari.server.api.query.JpaSortBuilder; +import org.apache.ambari.server.configuration.Configuration; import org.apache.ambari.server.controller.AlertCurrentRequest; import org.apache.ambari.server.controller.AlertHistoryRequest; import org.apache.ambari.server.controller.spi.Predicate; @@ -48,9 +56,13 @@ import org.apache.ambari.server.state.Cluster; import org.apache.ambari.server.state.Clusters; import org.apache.ambari.server.state.MaintenanceState; import org.apache.ambari.server.state.alert.Scope; +import org.apache.commons.lang.StringUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import com.google.common.cache.CacheBuilder; +import com.google.common.cache.CacheLoader; +import com.google.common.cache.LoadingCache; import com.google.inject.Inject; import com.google.inject.Provider; import com.google.inject.Singleton; @@ -60,8 +72,15 @@ import com.google.inject.persist.Transactional; * The {@link AlertsDAO} class manages the {@link AlertHistoryEntity} and * {@link AlertCurrentEntity} instances. Each {@link AlertHistoryEntity} is * known as an "alert" that has been triggered and received. + * <p/> + * If alert caching is enabled, then updates to {@link AlertCurrentEntity} are + * not immediately persisted to JPA. Instead, they are kept in a cache and + * periodically flushed. This means that many queries will need to swap in the + * cached {@link AlertCurrentEntity} with that returned from the EclipseLink JPA + * entity manager. */ @Singleton +@Experimental(feature = ExperimentalFeature.ALERT_CACHING) public class AlertsDAO { /** * Logger. @@ -113,6 +132,69 @@ public class AlertsDAO { private Provider<Clusters> m_clusters; /** + * Configuration. + */ + private final Configuration m_configuration; + + /** + * A cache of current alert information. The {@link AlertCurrentEntity} + * instances cached are currently managed. This allows the cached instances to + * be easiler flushed from the cache to JPA. + * <p/> + * This also means that the cache is holding onto a rather large map of JPA + * entities. This could lead to OOM errors over time if the indirectly + * referenced entity map contains more than just {@link AlertCurrentEntity}. + */ + private LoadingCache<AlertCacheKey, AlertCurrentEntity> m_currentAlertCache = null; + + /** + * Constructor. + * + */ + @Inject + public AlertsDAO(Configuration configuration) { + m_configuration = configuration; + + if( m_configuration.isAlertCacheEnabled() ){ + int maximumSize = m_configuration.getAlertCacheSize(); + + LOG.info("Alert caching is enabled (size={}, flushInterval={}m)", maximumSize, + m_configuration.getAlertCacheFlushInterval()); + + // construct a cache for current alerts which will prevent database hits + // on every heartbeat + m_currentAlertCache = CacheBuilder.newBuilder().maximumSize( + maximumSize).build(new CacheLoader<AlertCacheKey, AlertCurrentEntity>() { + @Override + public AlertCurrentEntity load(AlertCacheKey key) throws Exception { + LOG.debug("Cache miss for alert key {}, fetching from JPA", key); + + final AlertCurrentEntity alertCurrentEntity; + + long clusterId = key.getClusterId(); + String alertDefinitionName = key.getAlertDefinitionName(); + String hostName = key.getHostName(); + + if (StringUtils.isEmpty(hostName)) { + alertCurrentEntity = findCurrentByNameNoHostInternalInJPA(clusterId, + alertDefinitionName); + } else { + alertCurrentEntity = findCurrentByHostAndNameInJPA(clusterId, hostName, + alertDefinitionName); + } + + if (null == alertCurrentEntity) { + LOG.trace("Cache lookup failed for {} because the alert does not yet exist", key); + throw new AlertNotYetCreatedException(); + } + + return alertCurrentEntity; + } + }); + } + } + + /** * Gets an alert with the specified ID. * * @param alertId @@ -323,7 +405,14 @@ public class AlertsDAO { typedQuery.setMaxResults(request.Pagination.getPageSize()); } - return m_daoUtils.selectList(typedQuery); + List<AlertCurrentEntity> alerts = m_daoUtils.selectList(typedQuery); + + // if caching is enabled, replace results with cached values when present + if (m_configuration.isAlertCacheEnabled()) { + alerts = supplementWithCachedAlerts(alerts); + } + + return alerts; } /** @@ -349,7 +438,14 @@ public class AlertsDAO { TypedQuery<AlertCurrentEntity> query = m_entityManagerProvider.get().createNamedQuery( "AlertCurrentEntity.findAll", AlertCurrentEntity.class); - return m_daoUtils.selectList(query); + List<AlertCurrentEntity> alerts = m_daoUtils.selectList(query); + + // if caching is enabled, replace results with cached values when present + if (m_configuration.isAlertCacheEnabled()) { + alerts = supplementWithCachedAlerts(alerts); + } + + return alerts; } /** @@ -379,7 +475,14 @@ public class AlertsDAO { query.setParameter("definitionId", Long.valueOf(definitionId)); - return m_daoUtils.selectList(query); + List<AlertCurrentEntity> alerts = m_daoUtils.selectList(query); + + // if caching is enabled, replace results with cached values when present + if (m_configuration.isAlertCacheEnabled()) { + alerts = supplementWithCachedAlerts(alerts); + } + + return alerts; } /** @@ -395,7 +498,14 @@ public class AlertsDAO { query.setParameter("clusterId", Long.valueOf(clusterId)); - return m_daoUtils.selectList(query); + List<AlertCurrentEntity> alerts = m_daoUtils.selectList(query); + + // if caching is enabled, replace results with cached values when present + if (m_configuration.isAlertCacheEnabled()) { + alerts = supplementWithCachedAlerts(alerts); + } + + return alerts; } /** @@ -447,7 +557,7 @@ public class AlertsDAO { } /** - * Retrieves the summary information for all the hosts in the provided cluster. + * Retrieves the summary information for all the hosts in the provided cluster. * The result is mapping from hostname to summary DTO. * * @param clusterId @@ -548,13 +658,62 @@ public class AlertsDAO { query.setParameter("serviceName", serviceName); query.setParameter("inlist", EnumSet.of(Scope.ANY, Scope.SERVICE)); - return m_daoUtils.selectList(query); + List<AlertCurrentEntity> alerts = m_daoUtils.selectList(query); + + // if caching is enabled, replace results with cached values when present + if (m_configuration.isAlertCacheEnabled()) { + alerts = supplementWithCachedAlerts(alerts); + } + + return alerts; } - @RequiresSession + /** + * Locate the current alert for the provided service and alert name. This + * method will first consult the cache if configured with + * {@link Configuration#isAlertCacheEnabled()}. + * + * @param clusterId + * the cluster id + * @param hostName + * the name of the host (not {@code null}). + * @param alertName + * the name of the alert (not {@code null}). + * @return the current record, or {@code null} if not found + */ public AlertCurrentEntity findCurrentByHostAndName(long clusterId, String hostName, String alertName) { + if( m_configuration.isAlertCacheEnabled() ){ + AlertCacheKey key = new AlertCacheKey(clusterId, alertName, hostName); + + try { + return m_currentAlertCache.get(key); + } catch (ExecutionException executionException) { + Throwable cause = executionException.getCause(); + if (!(cause instanceof AlertNotYetCreatedException)) { + LOG.warn("Unable to retrieve alert for key {} from the cache", key); + } + } + } + + return findCurrentByHostAndNameInJPA(clusterId, hostName, alertName); + } + + /** + * Locate the current alert for the provided service and alert name. + * + * @param clusterId + * the cluster id + * @param hostName + * the name of the host (not {@code null}). + * @param alertName + * the name of the alert (not {@code null}). + * @return the current record, or {@code null} if not found + */ + @RequiresSession + private AlertCurrentEntity findCurrentByHostAndNameInJPA(long clusterId, String hostName, + String alertName) { TypedQuery<AlertCurrentEntity> query = m_entityManagerProvider.get().createNamedQuery( "AlertCurrentEntity.findByHostAndName", AlertCurrentEntity.class); @@ -589,6 +748,12 @@ public class AlertsDAO { historyQuery.executeUpdate(); entityManager.clear(); + + // if caching is enabled, invalidate the cache to force the latest values + // back from the DB + if (m_configuration.isAlertCacheEnabled()) { + m_currentAlertCache.invalidateAll(); + } } /** @@ -603,7 +768,15 @@ public class AlertsDAO { "AlertCurrentEntity.removeByHistoryId", AlertCurrentEntity.class); query.setParameter("historyId", historyId); - return query.executeUpdate(); + int rowsRemoved = query.executeUpdate(); + + // if caching is enabled, invalidate the cache to force the latest values + // back from the DB + if (m_configuration.isAlertCacheEnabled()) { + m_currentAlertCache.invalidateAll(); + } + + return rowsRemoved; } /** @@ -616,7 +789,15 @@ public class AlertsDAO { TypedQuery<AlertCurrentEntity> query = m_entityManagerProvider.get().createNamedQuery( "AlertCurrentEntity.removeDisabled", AlertCurrentEntity.class); - return query.executeUpdate(); + int rowsRemoved = query.executeUpdate(); + + // if caching is enabled, invalidate the cache to force the latest values + // back from the DB + if (m_configuration.isAlertCacheEnabled()) { + m_currentAlertCache.invalidateAll(); + } + + return rowsRemoved; } /** @@ -642,6 +823,12 @@ public class AlertsDAO { int removedItems = query.executeUpdate(); + // if caching is enabled, invalidate the cache to force the latest values + // back from the DB + if (m_configuration.isAlertCacheEnabled()) { + m_currentAlertCache.invalidateAll(); + } + // publish the event to recalculate aggregates m_alertEventPublisher.publish(new AggregateAlertRecalculateEvent(clusterId)); return removedItems; @@ -667,6 +854,12 @@ public class AlertsDAO { query.setParameter("hostName", hostName); int removedItems = query.executeUpdate(); + // if caching is enabled, invalidate the cache to force the latest values + // back from the DB + if (m_configuration.isAlertCacheEnabled()) { + m_currentAlertCache.invalidateAll(); + } + // publish the event to recalculate aggregates for every cluster since a host could potentially have several clusters try { Map<String, Cluster> clusters = m_clusters.get().getClusters(); @@ -715,6 +908,12 @@ public class AlertsDAO { int removedItems = query.executeUpdate(); + // if caching is enabled, invalidate the cache to force the latest values + // back from the DB + if (m_configuration.isAlertCacheEnabled()) { + m_currentAlertCache.invalidateAll(); + } + // publish the event to recalculate aggregates m_alertEventPublisher.publish(new AggregateAlertRecalculateEvent(clusterId)); @@ -800,27 +999,49 @@ public class AlertsDAO { */ @Transactional public AlertCurrentEntity merge(AlertCurrentEntity alert) { - return m_entityManagerProvider.get().merge(alert); + // perform the JPA merge + alert = m_entityManagerProvider.get().merge(alert); + + // if caching is enabled, update the cache + if( m_configuration.isAlertCacheEnabled() ){ + AlertCacheKey key = AlertCacheKey.build(alert); + m_currentAlertCache.put(key, alert); + } + + return alert; } /** - * Merge the specified current alert with the history and - * the existing alert in the database in a single transaction. + * Updates the internal cache of alerts with the specified alert. Unlike + * {@link #merge(AlertCurrentEntity)}, this is not transactional and only + * updates the cache. + * <p/> + * The alert should already exist in JPA - this is mainly to update the text + * and timestamp. * * @param alert - * the current alert to merge (not {@code null}). - * @param history - * the history to set to alert (not {@code null}). - * @return the updated current alert with merged content (never @code null}). + * the alert to update in the cache (not {@code null}). + * @param updateCacheOnly + * if {@code true}, then only the cache is updated and not JPA. + * @see Configuration#isAlertCacheEnabled() */ - @Transactional - public AlertCurrentEntity mergeAlertCurrentWithAlertHistory( - AlertCurrentEntity alert, AlertHistoryEntity history) { + public AlertCurrentEntity merge(AlertCurrentEntity alert, boolean updateCacheOnly) { + // cache only updates + if (updateCacheOnly) { + AlertCacheKey key = AlertCacheKey.build(alert); + + // cache not configured, log error + if (!m_configuration.isAlertCacheEnabled()) { + LOG.error( + "Unable to update a cached alert instance for {} because cached alerts are not enabled", + key); + } else { + // update cache and return alert; no database work + m_currentAlertCache.put(key, alert); + return alert; + } + } - // manually create the new history entity since we are merging into - // an existing current entity - create(history); - alert.setAlertHistory(history); return merge(alert); } @@ -865,13 +1086,45 @@ public class AlertsDAO { /** * Locate the current alert for the provided service and alert name, but when - * host is not set ({@code IS NULL}). - * @param clusterId the cluster id - * @param alertName the name of the alert + * host is not set ({@code IS NULL}). This method will first consult the cache + * if configured with {@link Configuration#isAlertCacheEnabled()}. + * + * @param clusterId + * the cluster id + * @param alertName + * the name of the alert * @return the current record, or {@code null} if not found */ - @RequiresSession public AlertCurrentEntity findCurrentByNameNoHost(long clusterId, String alertName) { + if( m_configuration.isAlertCacheEnabled() ){ + AlertCacheKey key = new AlertCacheKey(clusterId, alertName); + + try { + return m_currentAlertCache.get(key); + } catch (ExecutionException executionException) { + Throwable cause = executionException.getCause(); + + if (!(cause instanceof AlertNotYetCreatedException)) { + LOG.warn("Unable to retrieve alert for key {} from, the cache", key); + } + } + } + + return findCurrentByNameNoHostInternalInJPA(clusterId, alertName); + } + + /** + * Locate the current alert for the provided service and alert name, but when + * host is not set ({@code IS NULL}). This method + * + * @param clusterId + * the cluster id + * @param alertName + * the name of the alert + * @return the current record, or {@code null} if not found + */ + @RequiresSession + private AlertCurrentEntity findCurrentByNameNoHostInternalInJPA(long clusterId, String alertName) { TypedQuery<AlertCurrentEntity> query = m_entityManagerProvider.get().createNamedQuery( "AlertCurrentEntity.findByNameAndNoHost", AlertCurrentEntity.class); @@ -882,6 +1135,57 @@ public class AlertsDAO { } /** + * Writes all cached {@link AlertCurrentEntity} instances to the database and + * clears the cache. + */ + @Transactional + public void flushCachedEntitiesToJPA() { + if (!m_configuration.isAlertCacheEnabled()) { + LOG.warn("Unable to flush cached alerts to JPA because caching is not enabled"); + return; + } + + // capture for logging purposes + long cachedEntityCount = m_currentAlertCache.size(); + + ConcurrentMap<AlertCacheKey, AlertCurrentEntity> map = m_currentAlertCache.asMap(); + Set<Entry<AlertCacheKey, AlertCurrentEntity>> entries = map.entrySet(); + for (Entry<AlertCacheKey, AlertCurrentEntity> entry : entries) { + merge(entry.getValue()); + } + + m_currentAlertCache.invalidateAll(); + + LOG.info("Flushed {} cached alerts to the database", cachedEntityCount); + } + + /** + * Gets a list that is comprised of the original values replaced by any cached + * values from {@link #m_currentAlertCache}. This method should only be + * invoked if {@link Configuration#isAlertCacheEnabled()} is {@code true} + * + * @param alerts + * the list of alerts to iterate over and replace with cached + * instances. + * @return the list of alerts from JPA combined with any cached alerts. + */ + private List<AlertCurrentEntity> supplementWithCachedAlerts(List<AlertCurrentEntity> alerts) { + List<AlertCurrentEntity> cachedAlerts = new ArrayList<>(alerts.size()); + + for (AlertCurrentEntity alert : alerts) { + AlertCacheKey key = AlertCacheKey.build(alert); + AlertCurrentEntity cachedEntity = m_currentAlertCache.getIfPresent(key); + if (null != cachedEntity) { + alert = cachedEntity; + } + + cachedAlerts.add(alert); + } + + return cachedAlerts; + } + + /** * The {@link HistoryPredicateVisitor} is used to convert an Ambari * {@link Predicate} into a JPA {@link javax.persistence.criteria.Predicate}. */ @@ -946,4 +1250,164 @@ public class AlertsDAO { return AlertCurrentEntity_.getPredicateMapping().get(propertyId); } } + + /** + * The {@link AlertCacheKey} class is used as a key in the cache of + * {@link AlertCurrentEntity}. + */ + private final static class AlertCacheKey { + private final long m_clusterId; + private final String m_hostName; + private final String m_alertDefinitionName; + + /** + * Constructor. + * + * @param clusterId + * @param alertDefinitionName + */ + private AlertCacheKey(long clusterId, String alertDefinitionName) { + this(clusterId, alertDefinitionName, null); + } + + /** + * Constructor. + * + * @param clusterId + * @param alertDefinitionName + * @param hostName + */ + private AlertCacheKey(long clusterId, String alertDefinitionName, String hostName) { + m_clusterId = clusterId; + m_alertDefinitionName = alertDefinitionName; + m_hostName = hostName; + } + + /** + * Builds a key from an entity. + * + * @param current + * the entity to create the key for. + * @return the key (never {@code null}). + */ + public static AlertCacheKey build(AlertCurrentEntity current) { + AlertHistoryEntity history = current.getAlertHistory(); + AlertCacheKey key = new AlertCacheKey(history.getClusterId(), + history.getAlertDefinition().getDefinitionName(), history.getHostName()); + + return key; + } + + /** + * Gets the ID of the cluster that the alert is for. + * + * @return the clusterId + */ + public long getClusterId() { + return m_clusterId; + } + + /** + * Gets the host name, or {@code null} if none. + * + * @return the hostName, or {@code null} if none. + */ + public String getHostName() { + return m_hostName; + } + + /** + * Gets the unique name of the alert definition. + * + * @return the alertDefinitionName + */ + public String getAlertDefinitionName() { + return m_alertDefinitionName; + } + + /** + * {@inheritDoc} + */ + @Override + public int hashCode() { + final int prime = 31; + int result = 1; + result = prime * result + + ((m_alertDefinitionName == null) ? 0 : m_alertDefinitionName.hashCode()); + result = prime * result + (int) (m_clusterId ^ (m_clusterId >>> 32)); + result = prime * result + ((m_hostName == null) ? 0 : m_hostName.hashCode()); + return result; + } + + /** + * {@inheritDoc} + */ + @Override + public boolean equals(Object obj) { + if (this == obj) { + return true; + } + + if (obj == null) { + return false; + } + + if (getClass() != obj.getClass()) { + return false; + } + + AlertCacheKey other = (AlertCacheKey) obj; + + if (m_clusterId != other.m_clusterId) { + return false; + } + + if (m_alertDefinitionName == null) { + if (other.m_alertDefinitionName != null) { + return false; + } + } else if (!m_alertDefinitionName.equals(other.m_alertDefinitionName)) { + return false; + } + + if (m_hostName == null) { + if (other.m_hostName != null) { + return false; + } + } else if (!m_hostName.equals(other.m_hostName)) { + return false; + } + + return true; + } + + + /** + * {@inheritDoc} + */ + @Override + public String toString() { + StringBuilder buffer = new StringBuilder("AlertCacheKey{"); + buffer.append("cluserId=").append(m_clusterId); + buffer.append(", alertName=").append(m_alertDefinitionName); + + if (null != m_hostName) { + buffer.append(", hostName=").append(m_hostName); + } + + buffer.append("}"); + return buffer.toString(); + } + } + + /** + * The {@link AlertNotYetCreatedException} is used as a way to signal to the + * {@link CacheLoader} that there is no value for the specified + * {@link AlertCacheKey}. Because this cache doesn't understand {@code null} + * values, we use the exception mechanism to indicate that it should be + * created and that the {@code null} value should not be cached. + */ + @SuppressWarnings("serial") + private static final class AlertNotYetCreatedException extends Exception { + } } http://git-wip-us.apache.org/repos/asf/ambari/blob/4de51625/ambari-server/src/main/java/org/apache/ambari/server/orm/entities/AlertCurrentEntity.java ---------------------------------------------------------------------- diff --git a/ambari-server/src/main/java/org/apache/ambari/server/orm/entities/AlertCurrentEntity.java b/ambari-server/src/main/java/org/apache/ambari/server/orm/entities/AlertCurrentEntity.java index 66b2a83..affe69e 100644 --- a/ambari-server/src/main/java/org/apache/ambari/server/orm/entities/AlertCurrentEntity.java +++ b/ambari-server/src/main/java/org/apache/ambari/server/orm/entities/AlertCurrentEntity.java @@ -252,4 +252,25 @@ public class AlertCurrentEntity { int result = null != alertId ? alertId.hashCode() : 0; return result; } + + /** + * {@inheritDoc} + */ + @Override + public String toString() { + StringBuilder buffer = new StringBuilder("AlertCurrentEntity{"); + buffer.append("alertId=").append(alertId); + if( null != alertDefinition) { + buffer.append(", name=").append(alertDefinition.getDefinitionName()); + } + + if (null != alertHistory) { + buffer.append(", state=").append(alertHistory.getAlertState()); + } + + buffer.append(", latestTimestamp=").append(latestTimestamp); + + buffer.append("}"); + return buffer.toString(); + } } http://git-wip-us.apache.org/repos/asf/ambari/blob/4de51625/ambari-server/src/main/java/org/apache/ambari/server/state/services/CachedAlertFlushService.java ---------------------------------------------------------------------- diff --git a/ambari-server/src/main/java/org/apache/ambari/server/state/services/CachedAlertFlushService.java b/ambari-server/src/main/java/org/apache/ambari/server/state/services/CachedAlertFlushService.java new file mode 100644 index 0000000..72bf68a --- /dev/null +++ b/ambari-server/src/main/java/org/apache/ambari/server/state/services/CachedAlertFlushService.java @@ -0,0 +1,97 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.ambari.server.state.services; + +import java.util.concurrent.TimeUnit; + +import org.apache.ambari.annotations.Experimental; +import org.apache.ambari.annotations.ExperimentalFeature; +import org.apache.ambari.server.AmbariService; +import org.apache.ambari.server.configuration.Configuration; +import org.apache.ambari.server.orm.dao.AlertsDAO; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.google.common.util.concurrent.AbstractScheduledService; +import com.google.inject.Inject; + +/** + * The {@link CachedAlertFlushService} is used to periodically flush cached + * alert data to the database. This service is controlled by + * {@link Configuration#isAlertCacheEnabled()} and + * {@link Configuration#getAlertCacheFlushInterval()}. + */ +@AmbariService +@Experimental(feature = ExperimentalFeature.ALERT_CACHING) +public class CachedAlertFlushService extends AbstractScheduledService { + + /** + * Logger. + */ + private final static Logger LOG = LoggerFactory.getLogger(CachedAlertFlushService.class); + + /** + * Configuration. + */ + @Inject + private Configuration m_configuration; + + /** + * Used for flushing cached entities to the database. + */ + @Inject + private AlertsDAO m_alertsDAO; + + /** + * {@inheritDoc} + */ + @Override + protected Scheduler scheduler() { + int flushIntervalInMinutes = m_configuration.getAlertCacheFlushInterval(); + return Scheduler.newFixedDelaySchedule(flushIntervalInMinutes, flushIntervalInMinutes, + TimeUnit.MINUTES); + } + + /** + * {@inheritDoc} + * <p/> + * Invokes {@link #stop()} if not enabled. + */ + @Override + protected void startUp() throws Exception { + boolean enabled = m_configuration.isAlertCacheEnabled(); + if (!enabled) { + stop(); + } + } + + /** + * {@inheritDoc} + * <p/> + * Flushes cached alerts to the database. + */ + @Override + protected void runOneIteration() throws Exception { + try { + LOG.info("Flushing cached alerts to the database"); + m_alertsDAO.flushCachedEntitiesToJPA(); + } catch (Exception exception) { + LOG.error("Unable to flush cached alerts to the database", exception); + } + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/ambari/blob/4de51625/ambari-server/src/test/java/org/apache/ambari/server/configuration/ConfigurationTest.java ---------------------------------------------------------------------- diff --git a/ambari-server/src/test/java/org/apache/ambari/server/configuration/ConfigurationTest.java b/ambari-server/src/test/java/org/apache/ambari/server/configuration/ConfigurationTest.java index 573c02f..0f349fa 100644 --- a/ambari-server/src/test/java/org/apache/ambari/server/configuration/ConfigurationTest.java +++ b/ambari-server/src/test/java/org/apache/ambari/server/configuration/ConfigurationTest.java @@ -32,8 +32,6 @@ import java.lang.reflect.Method; import java.util.Map; import java.util.Properties; -import junit.framework.Assert; - import org.apache.ambari.server.AmbariException; import org.apache.ambari.server.configuration.Configuration.ConnectionPoolType; import org.apache.ambari.server.configuration.Configuration.DatabaseType; @@ -53,6 +51,8 @@ import org.powermock.core.classloader.annotations.PowerMockIgnore; import org.powermock.core.classloader.annotations.PrepareForTest; import org.powermock.modules.junit4.PowerMockRunner; +import junit.framework.Assert; + @RunWith(PowerMockRunner.class) @PrepareForTest({ Configuration.class }) @PowerMockIgnore( {"javax.management.*", "javax.crypto.*"}) @@ -497,6 +497,22 @@ public class ConfigurationTest { Boolean.TRUE.toString()); Assert.assertTrue(configuration.isExperimentalConcurrentStageProcessingEnabled()); + } + + @Test + public void testAlertCaching() throws Exception { + final Properties ambariProperties = new Properties(); + final Configuration configuration = new Configuration(ambariProperties); + Assert.assertFalse(configuration.isAlertCacheEnabled()); + + ambariProperties.setProperty(Configuration.ALERTS_CACHE_ENABLED, Boolean.TRUE.toString()); + ambariProperties.setProperty(Configuration.ALERTS_CACHE_FLUSH_INTERVAL, "60"); + ambariProperties.setProperty(Configuration.ALERTS_CACHE_SIZE, "1000"); + + Assert.assertTrue(configuration.isAlertCacheEnabled()); + Assert.assertEquals(60, configuration.getAlertCacheFlushInterval()); + Assert.assertEquals(1000, configuration.getAlertCacheSize()); } + }