This is an automated email from the ASF dual-hosted git repository. hapylestat pushed a commit to branch branch-2.7 in repository https://gitbox.apache.org/repos/asf/ambari.git
The following commit(s) were added to refs/heads/branch-2.7 by this push: new 3e46d76 AMBARI-25576. Primary key duplication error during flushing alerts from alerts cache. (#3251) (dvitiuk via dgrinenko) 3e46d76 is described below commit 3e46d76beafd9a9ff5e05f774d39f2d296069a37 Author: dvitiiuk <dmitriiviti...@gmail.com> AuthorDate: Thu Nov 5 18:22:11 2020 +0200 AMBARI-25576. Primary key duplication error during flushing alerts from alerts cache. (#3251) (dvitiuk via dgrinenko) --- .../listeners/alerts/AlertReceivedListener.java | 29 +------- .../apache/ambari/server/orm/dao/AlertsDAO.java | 77 ++++++++++++++++++++-- .../ambari/server/utils/EventBusSynchronizer.java | 49 ++++++++++++++ .../ambari/server/orm/dao/AlertsDAOTest.java | 29 +++++++- .../state/alerts/AggregateAlertListenerTest.java | 2 + 5 files changed, 152 insertions(+), 34 deletions(-) diff --git a/ambari-server/src/main/java/org/apache/ambari/server/events/listeners/alerts/AlertReceivedListener.java b/ambari-server/src/main/java/org/apache/ambari/server/events/listeners/alerts/AlertReceivedListener.java index db80e8c..57972ec 100644 --- a/ambari-server/src/main/java/org/apache/ambari/server/events/listeners/alerts/AlertReceivedListener.java +++ b/ambari-server/src/main/java/org/apache/ambari/server/events/listeners/alerts/AlertReceivedListener.java @@ -66,7 +66,6 @@ import com.google.common.util.concurrent.Striped; import com.google.inject.Inject; import com.google.inject.Provider; import com.google.inject.Singleton; -import com.google.inject.persist.Transactional; /** * The {@link AlertReceivedListener} class handles {@link AlertReceivedEvent} @@ -385,7 +384,7 @@ public class AlertReceivedListener { // invokes the EntityManager create/merge on various entities in a single // transaction - saveEntities(toMerge, toCreateHistoryAndMerge); + m_alertsDao.saveEntities(toMerge, toCreateHistoryAndMerge); // broadcast events for (AlertEvent eventToFire : alertEvents) { @@ -446,32 +445,6 @@ public class AlertReceivedListener { } /** - * Saves alert and alert history entities in single transaction - * @param toMerge - merge alert only - * @param toCreateHistoryAndMerge - create new history, merge alert - */ - @Transactional - void saveEntities(List<AlertCurrentEntity> toMerge, - List<AlertCurrentEntity> toCreateHistoryAndMerge) { - for (AlertCurrentEntity entity : toMerge) { - m_alertsDao.merge(entity, m_configuration.isAlertCacheEnabled()); - } - - for (AlertCurrentEntity entity : toCreateHistoryAndMerge) { - m_alertsDao.create(entity.getAlertHistory()); - m_alertsDao.merge(entity); - - if (LOG.isDebugEnabled()) { - LOG.debug( - "Alert State Merged: CurrentId {}, CurrentTimestamp {}, HistoryId {}, HistoryState {}", - entity.getAlertId(), entity.getLatestTimestamp(), - entity.getAlertHistory().getAlertId(), - entity.getAlertHistory().getAlertState()); - } - } - } - - /** * Gets whether the specified alert is valid for its reported cluster, * service, component, and host. This method is necessary for the following * cases diff --git a/ambari-server/src/main/java/org/apache/ambari/server/orm/dao/AlertsDAO.java b/ambari-server/src/main/java/org/apache/ambari/server/orm/dao/AlertsDAO.java index dceafcb..374e127 100644 --- a/ambari-server/src/main/java/org/apache/ambari/server/orm/dao/AlertsDAO.java +++ b/ambari-server/src/main/java/org/apache/ambari/server/orm/dao/AlertsDAO.java @@ -985,8 +985,18 @@ public class AlertsDAO implements Cleanable { * the alert to merge (not {@code null}). * @return the updated alert with merged content (never {@code null}). */ - @Transactional public AlertHistoryEntity merge(AlertHistoryEntity alert) { + if (m_configuration.isAlertCacheEnabled()) { + synchronized (this) { + return mergeTransactional(alert); + } + } else { + return mergeTransactional(alert); + } + } + + @Transactional + protected AlertHistoryEntity mergeTransactional(AlertHistoryEntity alert) { return m_entityManagerProvider.get().merge(alert); } @@ -1033,8 +1043,18 @@ public class AlertsDAO implements Cleanable { * the current alert to merge (not {@code null}). * @return the updated current alert with merged content (never {@code null}). */ - @Transactional public AlertCurrentEntity merge(AlertCurrentEntity alert) { + if (m_configuration.isAlertCacheEnabled()) { + synchronized (this) { + return mergeTransactional(alert); + } + } else { + return mergeTransactional(alert); + } + } + + @Transactional + protected AlertCurrentEntity mergeTransactional(AlertCurrentEntity alert) { // perform the JPA merge alert = m_entityManagerProvider.get().merge(alert); @@ -1174,12 +1194,18 @@ public class AlertsDAO implements Cleanable { * Writes all cached {@link AlertCurrentEntity} instances to the database and * clears the cache. */ - @Transactional public void flushCachedEntitiesToJPA() { - if (!m_configuration.isAlertCacheEnabled()) { + if (m_configuration.isAlertCacheEnabled()) { + synchronized (this) { + flushCachedEntitiesToJPATransactional(); + } + } else { LOG.warn("Unable to flush cached alerts to JPA because caching is not enabled"); - return; } + } + + @Transactional + protected void flushCachedEntitiesToJPATransactional() { // capture for logging purposes long cachedEntityCount = m_currentAlertCache.size(); @@ -1212,6 +1238,10 @@ public class AlertsDAO implements Cleanable { AlertCacheKey key = AlertCacheKey.build(alert); AlertCurrentEntity cachedEntity = m_currentAlertCache.getIfPresent(key); if (null != cachedEntity) { + if (cachedEntity.getAlertHistory() == null) { + LOG.warn("There is current entity with null history in the cache, currentId: {}, persisted historyId: {}", + cachedEntity.getAlertId(), alert.getHistoryId()); + } alert = cachedEntity; } @@ -1584,4 +1614,41 @@ public class AlertsDAO implements Cleanable { return affectedRows; } + /** + * Saves alert and alert history entities in single transaction + * @param toMerge - merge alert only + * @param toCreateHistoryAndMerge - create new history, merge alert + */ + public void saveEntities(List<AlertCurrentEntity> toMerge, + List<AlertCurrentEntity> toCreateHistoryAndMerge) { + if (m_configuration.isAlertCacheEnabled()) { + synchronized (this) { + saveEntitiesTransactional(toMerge, toCreateHistoryAndMerge); + } + } else { + saveEntitiesTransactional(toMerge, toCreateHistoryAndMerge); + } + } + + @Transactional + protected void saveEntitiesTransactional(List<AlertCurrentEntity> toMerge, + List<AlertCurrentEntity> toCreateHistoryAndMerge) { + for (AlertCurrentEntity entity : toMerge) { + merge(entity, m_configuration.isAlertCacheEnabled()); + } + + for (AlertCurrentEntity entity : toCreateHistoryAndMerge) { + create(entity.getAlertHistory()); + merge(entity); + + if (LOG.isDebugEnabled()) { + LOG.debug( + "Alert State Merged: CurrentId {}, CurrentTimestamp {}, HistoryId {}, HistoryState {}", + entity.getAlertId(), entity.getLatestTimestamp(), + entity.getAlertHistory().getAlertId(), + entity.getAlertHistory().getAlertState()); + } + } + } + } diff --git a/ambari-server/src/main/java/org/apache/ambari/server/utils/EventBusSynchronizer.java b/ambari-server/src/main/java/org/apache/ambari/server/utils/EventBusSynchronizer.java index c72ffbe..ecde673 100644 --- a/ambari-server/src/main/java/org/apache/ambari/server/utils/EventBusSynchronizer.java +++ b/ambari-server/src/main/java/org/apache/ambari/server/utils/EventBusSynchronizer.java @@ -25,10 +25,13 @@ import org.apache.ambari.server.events.listeners.alerts.AlertMaintenanceModeList import org.apache.ambari.server.events.listeners.alerts.AlertReceivedListener; import org.apache.ambari.server.events.listeners.alerts.AlertServiceStateListener; import org.apache.ambari.server.events.listeners.alerts.AlertStateChangedListener; +import org.apache.ambari.server.events.listeners.services.ServiceUpdateListener; import org.apache.ambari.server.events.listeners.upgrade.DistributeRepositoriesActionListener; import org.apache.ambari.server.events.listeners.upgrade.HostVersionOutOfSyncListener; +import org.apache.ambari.server.events.listeners.upgrade.UpgradeUpdateListener; import org.apache.ambari.server.events.publishers.AlertEventPublisher; import org.apache.ambari.server.events.publishers.AmbariEventPublisher; +import org.apache.ambari.server.events.publishers.STOMPUpdatePublisher; import com.google.common.eventbus.AsyncEventBus; import com.google.common.eventbus.EventBus; @@ -98,6 +101,24 @@ public class EventBusSynchronizer { } /** + * Force both {@link EventBus} from {@link STOMPUpdatePublisher} to be serial + * and synchronous. Also register the known listeners. Registering known + * listeners is necessary since the event bus was replaced. + * + * @param injector + */ + public static void synchronizeSTOMPUpdatePublisher(Injector injector) { + EventBus agentEventBus = new EventBus(); + EventBus apiEventBus = new EventBus(); + STOMPUpdatePublisher publisher = injector.getInstance(STOMPUpdatePublisher.class); + + replaceSTOMPEventBuses(STOMPUpdatePublisher.class, publisher, agentEventBus, apiEventBus); + + // register common agent event listeners + registerSTOMPApiListeners(injector, apiEventBus); + } + + /** * Register the normal listeners with the replaced synchronous bus. * * @param injector @@ -125,6 +146,18 @@ public class EventBusSynchronizer { synchronizedBus.register(injector.getInstance(AlertStateChangedListener.class)); } + /** + * Register the normal listeners with the replaced synchronous bus. + * + * @param injector + * @param synchronizedBus + */ + private static void registerSTOMPApiListeners(Injector injector, + EventBus synchronizedBus) { + synchronizedBus.register(injector.getInstance(ServiceUpdateListener.class)); + synchronizedBus.register(injector.getInstance(UpgradeUpdateListener.class)); + } + private static void replaceEventBus(Class<?> eventPublisherClass, Object instance, EventBus eventBus) { @@ -136,4 +169,20 @@ public class EventBusSynchronizer { throw new RuntimeException(exception); } } + + private static void replaceSTOMPEventBuses(Class<?> eventPublisherClass, + Object instance, EventBus agentEventBus, EventBus apiEventBus) { + + try { + Field agentEventBusField = eventPublisherClass.getDeclaredField("agentEventBus"); + agentEventBusField.setAccessible(true); + agentEventBusField.set(instance, agentEventBus); + + Field apiEventBusField = eventPublisherClass.getDeclaredField("apiEventBus"); + apiEventBusField.setAccessible(true); + apiEventBusField.set(instance, apiEventBus); + } catch (Exception exception) { + throw new RuntimeException(exception); + } + } } diff --git a/ambari-server/src/test/java/org/apache/ambari/server/orm/dao/AlertsDAOTest.java b/ambari-server/src/test/java/org/apache/ambari/server/orm/dao/AlertsDAOTest.java index 933743f..cd41edb 100644 --- a/ambari-server/src/test/java/org/apache/ambari/server/orm/dao/AlertsDAOTest.java +++ b/ambari-server/src/test/java/org/apache/ambari/server/orm/dao/AlertsDAOTest.java @@ -48,6 +48,9 @@ import org.apache.ambari.server.controller.spi.SortRequest; import org.apache.ambari.server.controller.spi.SortRequest.Order; import org.apache.ambari.server.controller.spi.SortRequestProperty; import org.apache.ambari.server.controller.utilities.PredicateBuilder; +import org.apache.ambari.server.events.publishers.HostComponentUpdateEventPublisher; +import org.apache.ambari.server.events.publishers.RequestUpdateEventPublisher; +import org.apache.ambari.server.events.publishers.ServiceUpdateEventPublisher; import org.apache.ambari.server.orm.AlertDaoHelper; import org.apache.ambari.server.orm.GuiceJpaInitializer; import org.apache.ambari.server.orm.InMemoryDefaultTestModule; @@ -68,13 +71,17 @@ import org.apache.ambari.server.state.ServiceFactory; import org.apache.ambari.server.state.alert.Scope; import org.apache.ambari.server.state.alert.SourceType; import org.apache.ambari.server.utils.EventBusSynchronizer; +import org.easymock.EasyMock; import org.junit.After; import org.junit.Before; import org.junit.Test; +import com.google.inject.Binder; import com.google.inject.Guice; import com.google.inject.Injector; +import com.google.inject.Module; import com.google.inject.persist.UnitOfWork; +import com.google.inject.util.Modules; /** * Tests {@link AlertsDAO}. @@ -102,7 +109,8 @@ public class AlertsDAOTest { */ @Before public void setup() throws Exception { - m_injector = Guice.createInjector(new InMemoryDefaultTestModule()); + m_injector = Guice.createInjector(Modules.override( + new InMemoryDefaultTestModule()).with(new MockModule())); m_injector.getInstance(GuiceJpaInitializer.class); m_injector.getInstance(UnitOfWork.class).begin(); @@ -117,6 +125,8 @@ public class AlertsDAOTest { // !!! need a synchronous op for testing EventBusSynchronizer.synchronizeAmbariEventPublisher(m_injector); + EventBusSynchronizer.synchronizeAlertEventPublisher(m_injector); + EventBusSynchronizer.synchronizeSTOMPUpdatePublisher(m_injector); // install YARN so there is at least 1 service installed and no // unexpected alerts since the test YARN service doesn't have any alerts @@ -1476,4 +1486,21 @@ public class AlertsDAOTest { currentAlerts = m_dao.findCurrent(); assertEquals(4, currentAlerts.size()); } + + private class MockModule implements Module { + + @Override + public void configure(Binder binder) { + HostComponentUpdateEventPublisher hostComponentUpdateEventPublisher = + EasyMock.createNiceMock(HostComponentUpdateEventPublisher.class); + RequestUpdateEventPublisher requestUpdateEventPublisher = + EasyMock.createNiceMock(RequestUpdateEventPublisher.class); + ServiceUpdateEventPublisher serviceUpdateEventPublisher = + EasyMock.createNiceMock(ServiceUpdateEventPublisher.class); + + binder.bind(HostComponentUpdateEventPublisher.class).toInstance(hostComponentUpdateEventPublisher); + binder.bind(RequestUpdateEventPublisher.class).toInstance(requestUpdateEventPublisher); + binder.bind(ServiceUpdateEventPublisher.class).toInstance(serviceUpdateEventPublisher); + } + } } diff --git a/ambari-server/src/test/java/org/apache/ambari/server/state/alerts/AggregateAlertListenerTest.java b/ambari-server/src/test/java/org/apache/ambari/server/state/alerts/AggregateAlertListenerTest.java index 503c11f..2a63b42 100644 --- a/ambari-server/src/test/java/org/apache/ambari/server/state/alerts/AggregateAlertListenerTest.java +++ b/ambari-server/src/test/java/org/apache/ambari/server/state/alerts/AggregateAlertListenerTest.java @@ -112,6 +112,8 @@ public class AggregateAlertListenerTest { EasyMock.expect( m_alertsDao.findAggregateCounts(EasyMock.anyLong(), EasyMock.eq("mock-aggregate-alert"))).andReturn( summaryDTO).atLeastOnce(); + m_alertsDao.saveEntities(EasyMock.anyObject(), EasyMock.anyObject()); + EasyMock.expectLastCall().anyTimes(); EasyMock.replay(m_alertsDao, m_aggregateMapping, currentEntityMock);