This is an automated email from the ASF dual-hosted git repository.

hapylestat pushed a commit to branch branch-2.7
in repository https://gitbox.apache.org/repos/asf/ambari.git


The following commit(s) were added to refs/heads/branch-2.7 by this push:
     new 3e46d76  AMBARI-25576. Primary key duplication error during flushing 
alerts from alerts cache. (#3251) (dvitiuk via dgrinenko)
3e46d76 is described below

commit 3e46d76beafd9a9ff5e05f774d39f2d296069a37
Author: dvitiiuk <dmitriiviti...@gmail.com>
AuthorDate: Thu Nov 5 18:22:11 2020 +0200

    AMBARI-25576. Primary key duplication error during flushing alerts from 
alerts cache. (#3251) (dvitiuk via dgrinenko)
---
 .../listeners/alerts/AlertReceivedListener.java    | 29 +-------
 .../apache/ambari/server/orm/dao/AlertsDAO.java    | 77 ++++++++++++++++++++--
 .../ambari/server/utils/EventBusSynchronizer.java  | 49 ++++++++++++++
 .../ambari/server/orm/dao/AlertsDAOTest.java       | 29 +++++++-
 .../state/alerts/AggregateAlertListenerTest.java   |  2 +
 5 files changed, 152 insertions(+), 34 deletions(-)

diff --git 
a/ambari-server/src/main/java/org/apache/ambari/server/events/listeners/alerts/AlertReceivedListener.java
 
b/ambari-server/src/main/java/org/apache/ambari/server/events/listeners/alerts/AlertReceivedListener.java
index db80e8c..57972ec 100644
--- 
a/ambari-server/src/main/java/org/apache/ambari/server/events/listeners/alerts/AlertReceivedListener.java
+++ 
b/ambari-server/src/main/java/org/apache/ambari/server/events/listeners/alerts/AlertReceivedListener.java
@@ -66,7 +66,6 @@ import com.google.common.util.concurrent.Striped;
 import com.google.inject.Inject;
 import com.google.inject.Provider;
 import com.google.inject.Singleton;
-import com.google.inject.persist.Transactional;
 
 /**
  * The {@link AlertReceivedListener} class handles {@link AlertReceivedEvent}
@@ -385,7 +384,7 @@ public class AlertReceivedListener {
 
     // invokes the EntityManager create/merge on various entities in a single
     // transaction
-    saveEntities(toMerge, toCreateHistoryAndMerge);
+    m_alertsDao.saveEntities(toMerge, toCreateHistoryAndMerge);
 
     // broadcast events
     for (AlertEvent eventToFire : alertEvents) {
@@ -446,32 +445,6 @@ public class AlertReceivedListener {
   }
 
   /**
-   * Saves alert and alert history entities in single transaction
-   * @param toMerge - merge alert only
-   * @param toCreateHistoryAndMerge - create new history, merge alert
-   */
-  @Transactional
-  void saveEntities(List<AlertCurrentEntity> toMerge,
-      List<AlertCurrentEntity> toCreateHistoryAndMerge) {
-    for (AlertCurrentEntity entity : toMerge) {
-      m_alertsDao.merge(entity, m_configuration.isAlertCacheEnabled());
-    }
-
-    for (AlertCurrentEntity entity : toCreateHistoryAndMerge) {
-      m_alertsDao.create(entity.getAlertHistory());
-      m_alertsDao.merge(entity);
-
-      if (LOG.isDebugEnabled()) {
-        LOG.debug(
-            "Alert State Merged: CurrentId {}, CurrentTimestamp {}, HistoryId 
{}, HistoryState {}",
-            entity.getAlertId(), entity.getLatestTimestamp(),
-            entity.getAlertHistory().getAlertId(),
-            entity.getAlertHistory().getAlertState());
-      }
-    }
-  }
-
-  /**
    * Gets whether the specified alert is valid for its reported cluster,
    * service, component, and host. This method is necessary for the following
    * cases
diff --git 
a/ambari-server/src/main/java/org/apache/ambari/server/orm/dao/AlertsDAO.java 
b/ambari-server/src/main/java/org/apache/ambari/server/orm/dao/AlertsDAO.java
index dceafcb..374e127 100644
--- 
a/ambari-server/src/main/java/org/apache/ambari/server/orm/dao/AlertsDAO.java
+++ 
b/ambari-server/src/main/java/org/apache/ambari/server/orm/dao/AlertsDAO.java
@@ -985,8 +985,18 @@ public class AlertsDAO implements Cleanable {
    *          the alert to merge (not {@code null}).
    * @return the updated alert with merged content (never {@code null}).
    */
-  @Transactional
   public AlertHistoryEntity merge(AlertHistoryEntity alert) {
+    if (m_configuration.isAlertCacheEnabled()) {
+      synchronized (this) {
+        return mergeTransactional(alert);
+      }
+    } else {
+      return mergeTransactional(alert);
+    }
+  }
+
+  @Transactional
+  protected AlertHistoryEntity mergeTransactional(AlertHistoryEntity alert) {
     return m_entityManagerProvider.get().merge(alert);
   }
 
@@ -1033,8 +1043,18 @@ public class AlertsDAO implements Cleanable {
    *          the current alert to merge (not {@code null}).
    * @return the updated current alert with merged content (never {@code 
null}).
    */
-  @Transactional
   public AlertCurrentEntity merge(AlertCurrentEntity alert) {
+    if (m_configuration.isAlertCacheEnabled()) {
+      synchronized (this) {
+        return mergeTransactional(alert);
+      }
+    } else {
+      return mergeTransactional(alert);
+    }
+  }
+
+  @Transactional
+  protected AlertCurrentEntity mergeTransactional(AlertCurrentEntity alert) {
     // perform the JPA merge
     alert = m_entityManagerProvider.get().merge(alert);
 
@@ -1174,12 +1194,18 @@ public class AlertsDAO implements Cleanable {
    * Writes all cached {@link AlertCurrentEntity} instances to the database and
    * clears the cache.
    */
-  @Transactional
   public void flushCachedEntitiesToJPA() {
-    if (!m_configuration.isAlertCacheEnabled()) {
+    if (m_configuration.isAlertCacheEnabled()) {
+      synchronized (this) {
+        flushCachedEntitiesToJPATransactional();
+      }
+    } else {
       LOG.warn("Unable to flush cached alerts to JPA because caching is not 
enabled");
-      return;
     }
+  }
+
+  @Transactional
+  protected void flushCachedEntitiesToJPATransactional() {
 
     // capture for logging purposes
     long cachedEntityCount = m_currentAlertCache.size();
@@ -1212,6 +1238,10 @@ public class AlertsDAO implements Cleanable {
       AlertCacheKey key = AlertCacheKey.build(alert);
       AlertCurrentEntity cachedEntity = m_currentAlertCache.getIfPresent(key);
       if (null != cachedEntity) {
+        if (cachedEntity.getAlertHistory() == null) {
+          LOG.warn("There is current entity with null history in the cache, 
currentId: {}, persisted historyId: {}",
+              cachedEntity.getAlertId(), alert.getHistoryId());
+        }
         alert = cachedEntity;
       }
 
@@ -1584,4 +1614,41 @@ public class AlertsDAO implements Cleanable {
     return affectedRows;
   }
 
+  /**
+   * Saves alert and alert history entities in single transaction
+   * @param toMerge - merge alert only
+   * @param toCreateHistoryAndMerge - create new history, merge alert
+   */
+  public void saveEntities(List<AlertCurrentEntity> toMerge,
+                                 List<AlertCurrentEntity> 
toCreateHistoryAndMerge) {
+    if (m_configuration.isAlertCacheEnabled()) {
+      synchronized (this) {
+        saveEntitiesTransactional(toMerge, toCreateHistoryAndMerge);
+      }
+    } else {
+      saveEntitiesTransactional(toMerge, toCreateHistoryAndMerge);
+    }
+  }
+
+  @Transactional
+  protected void saveEntitiesTransactional(List<AlertCurrentEntity> toMerge,
+                    List<AlertCurrentEntity> toCreateHistoryAndMerge) {
+    for (AlertCurrentEntity entity : toMerge) {
+      merge(entity, m_configuration.isAlertCacheEnabled());
+    }
+
+    for (AlertCurrentEntity entity : toCreateHistoryAndMerge) {
+      create(entity.getAlertHistory());
+      merge(entity);
+
+      if (LOG.isDebugEnabled()) {
+        LOG.debug(
+            "Alert State Merged: CurrentId {}, CurrentTimestamp {}, HistoryId 
{}, HistoryState {}",
+            entity.getAlertId(), entity.getLatestTimestamp(),
+            entity.getAlertHistory().getAlertId(),
+            entity.getAlertHistory().getAlertState());
+      }
+    }
+  }
+
 }
diff --git 
a/ambari-server/src/main/java/org/apache/ambari/server/utils/EventBusSynchronizer.java
 
b/ambari-server/src/main/java/org/apache/ambari/server/utils/EventBusSynchronizer.java
index c72ffbe..ecde673 100644
--- 
a/ambari-server/src/main/java/org/apache/ambari/server/utils/EventBusSynchronizer.java
+++ 
b/ambari-server/src/main/java/org/apache/ambari/server/utils/EventBusSynchronizer.java
@@ -25,10 +25,13 @@ import 
org.apache.ambari.server.events.listeners.alerts.AlertMaintenanceModeList
 import org.apache.ambari.server.events.listeners.alerts.AlertReceivedListener;
 import 
org.apache.ambari.server.events.listeners.alerts.AlertServiceStateListener;
 import 
org.apache.ambari.server.events.listeners.alerts.AlertStateChangedListener;
+import 
org.apache.ambari.server.events.listeners.services.ServiceUpdateListener;
 import 
org.apache.ambari.server.events.listeners.upgrade.DistributeRepositoriesActionListener;
 import 
org.apache.ambari.server.events.listeners.upgrade.HostVersionOutOfSyncListener;
+import org.apache.ambari.server.events.listeners.upgrade.UpgradeUpdateListener;
 import org.apache.ambari.server.events.publishers.AlertEventPublisher;
 import org.apache.ambari.server.events.publishers.AmbariEventPublisher;
+import org.apache.ambari.server.events.publishers.STOMPUpdatePublisher;
 
 import com.google.common.eventbus.AsyncEventBus;
 import com.google.common.eventbus.EventBus;
@@ -98,6 +101,24 @@ public class EventBusSynchronizer {
   }
 
   /**
+   * Force both {@link EventBus} from {@link STOMPUpdatePublisher} to be serial
+   * and synchronous. Also register the known listeners. Registering known
+   * listeners is necessary since the event bus was replaced.
+   *
+   * @param injector
+   */
+  public static void synchronizeSTOMPUpdatePublisher(Injector injector) {
+    EventBus agentEventBus = new EventBus();
+    EventBus apiEventBus = new EventBus();
+    STOMPUpdatePublisher publisher = 
injector.getInstance(STOMPUpdatePublisher.class);
+
+    replaceSTOMPEventBuses(STOMPUpdatePublisher.class, publisher, 
agentEventBus, apiEventBus);
+
+    // register common agent event listeners
+    registerSTOMPApiListeners(injector, apiEventBus);
+  }
+
+  /**
    * Register the normal listeners with the replaced synchronous bus.
    *
    * @param injector
@@ -125,6 +146,18 @@ public class EventBusSynchronizer {
     
synchronizedBus.register(injector.getInstance(AlertStateChangedListener.class));
   }
 
+  /**
+   * Register the normal listeners with the replaced synchronous bus.
+   *
+   * @param injector
+   * @param synchronizedBus
+   */
+  private static void registerSTOMPApiListeners(Injector injector,
+      EventBus synchronizedBus) {
+    
synchronizedBus.register(injector.getInstance(ServiceUpdateListener.class));
+    
synchronizedBus.register(injector.getInstance(UpgradeUpdateListener.class));
+  }
+
   private static void replaceEventBus(Class<?> eventPublisherClass,
       Object instance, EventBus eventBus) {
 
@@ -136,4 +169,20 @@ public class EventBusSynchronizer {
       throw new RuntimeException(exception);
     }
   }
+
+  private static void replaceSTOMPEventBuses(Class<?> eventPublisherClass,
+      Object instance, EventBus agentEventBus, EventBus apiEventBus) {
+
+    try {
+      Field agentEventBusField = 
eventPublisherClass.getDeclaredField("agentEventBus");
+      agentEventBusField.setAccessible(true);
+      agentEventBusField.set(instance, agentEventBus);
+
+      Field apiEventBusField = 
eventPublisherClass.getDeclaredField("apiEventBus");
+      apiEventBusField.setAccessible(true);
+      apiEventBusField.set(instance, apiEventBus);
+    } catch (Exception exception) {
+      throw new RuntimeException(exception);
+    }
+  }
 }
diff --git 
a/ambari-server/src/test/java/org/apache/ambari/server/orm/dao/AlertsDAOTest.java
 
b/ambari-server/src/test/java/org/apache/ambari/server/orm/dao/AlertsDAOTest.java
index 933743f..cd41edb 100644
--- 
a/ambari-server/src/test/java/org/apache/ambari/server/orm/dao/AlertsDAOTest.java
+++ 
b/ambari-server/src/test/java/org/apache/ambari/server/orm/dao/AlertsDAOTest.java
@@ -48,6 +48,9 @@ import org.apache.ambari.server.controller.spi.SortRequest;
 import org.apache.ambari.server.controller.spi.SortRequest.Order;
 import org.apache.ambari.server.controller.spi.SortRequestProperty;
 import org.apache.ambari.server.controller.utilities.PredicateBuilder;
+import 
org.apache.ambari.server.events.publishers.HostComponentUpdateEventPublisher;
+import org.apache.ambari.server.events.publishers.RequestUpdateEventPublisher;
+import org.apache.ambari.server.events.publishers.ServiceUpdateEventPublisher;
 import org.apache.ambari.server.orm.AlertDaoHelper;
 import org.apache.ambari.server.orm.GuiceJpaInitializer;
 import org.apache.ambari.server.orm.InMemoryDefaultTestModule;
@@ -68,13 +71,17 @@ import org.apache.ambari.server.state.ServiceFactory;
 import org.apache.ambari.server.state.alert.Scope;
 import org.apache.ambari.server.state.alert.SourceType;
 import org.apache.ambari.server.utils.EventBusSynchronizer;
+import org.easymock.EasyMock;
 import org.junit.After;
 import org.junit.Before;
 import org.junit.Test;
 
+import com.google.inject.Binder;
 import com.google.inject.Guice;
 import com.google.inject.Injector;
+import com.google.inject.Module;
 import com.google.inject.persist.UnitOfWork;
+import com.google.inject.util.Modules;
 
 /**
  * Tests {@link AlertsDAO}.
@@ -102,7 +109,8 @@ public class AlertsDAOTest {
    */
   @Before
   public void setup() throws Exception {
-    m_injector = Guice.createInjector(new InMemoryDefaultTestModule());
+    m_injector = Guice.createInjector(Modules.override(
+        new InMemoryDefaultTestModule()).with(new MockModule()));
     m_injector.getInstance(GuiceJpaInitializer.class);
     m_injector.getInstance(UnitOfWork.class).begin();
 
@@ -117,6 +125,8 @@ public class AlertsDAOTest {
 
     // !!! need a synchronous op for testing
     EventBusSynchronizer.synchronizeAmbariEventPublisher(m_injector);
+    EventBusSynchronizer.synchronizeAlertEventPublisher(m_injector);
+    EventBusSynchronizer.synchronizeSTOMPUpdatePublisher(m_injector);
 
     // install YARN so there is at least 1 service installed and no
     // unexpected alerts since the test YARN service doesn't have any alerts
@@ -1476,4 +1486,21 @@ public class AlertsDAOTest {
     currentAlerts = m_dao.findCurrent();
     assertEquals(4, currentAlerts.size());
   }
+
+  private class MockModule implements Module {
+
+    @Override
+    public void configure(Binder binder) {
+      HostComponentUpdateEventPublisher hostComponentUpdateEventPublisher =
+          EasyMock.createNiceMock(HostComponentUpdateEventPublisher.class);
+      RequestUpdateEventPublisher requestUpdateEventPublisher =
+          EasyMock.createNiceMock(RequestUpdateEventPublisher.class);
+      ServiceUpdateEventPublisher serviceUpdateEventPublisher =
+          EasyMock.createNiceMock(ServiceUpdateEventPublisher.class);
+
+      
binder.bind(HostComponentUpdateEventPublisher.class).toInstance(hostComponentUpdateEventPublisher);
+      
binder.bind(RequestUpdateEventPublisher.class).toInstance(requestUpdateEventPublisher);
+      
binder.bind(ServiceUpdateEventPublisher.class).toInstance(serviceUpdateEventPublisher);
+    }
+  }
 }
diff --git 
a/ambari-server/src/test/java/org/apache/ambari/server/state/alerts/AggregateAlertListenerTest.java
 
b/ambari-server/src/test/java/org/apache/ambari/server/state/alerts/AggregateAlertListenerTest.java
index 503c11f..2a63b42 100644
--- 
a/ambari-server/src/test/java/org/apache/ambari/server/state/alerts/AggregateAlertListenerTest.java
+++ 
b/ambari-server/src/test/java/org/apache/ambari/server/state/alerts/AggregateAlertListenerTest.java
@@ -112,6 +112,8 @@ public class AggregateAlertListenerTest {
     EasyMock.expect(
         m_alertsDao.findAggregateCounts(EasyMock.anyLong(), 
EasyMock.eq("mock-aggregate-alert"))).andReturn(
         summaryDTO).atLeastOnce();
+    m_alertsDao.saveEntities(EasyMock.anyObject(), EasyMock.anyObject());
+    EasyMock.expectLastCall().anyTimes();
 
     EasyMock.replay(m_alertsDao, m_aggregateMapping, currentEntityMock);
 

Reply via email to