This is an automated email from the ASF dual-hosted git repository.

mpapirkovskyy pushed a commit to branch branch-2.7
in repository https://gitbox.apache.org/repos/asf/ambari.git


The following commit(s) were added to refs/heads/branch-2.7 by this push:
     new 0d34375  AMBARI-24762. Ambari server continues to send request updates 
after all commands were completed. (#2448)
0d34375 is described below

commit 0d343757add5e64cdff171ee05bc46044a0ecaf6
Author: Myroslav Papirkovskyi <mpapirkovs...@apache.org>
AuthorDate: Fri Oct 19 15:14:10 2018 +0300

    AMBARI-24762. Ambari server continues to send request updates after all 
commands were completed. (#2448)
    
    * AMBARI-24762. Ambari server continues to send request updates after all 
commands were completed. (mpapirkovskyy)
    
    * AMBARI-24762. Ambari server continues to send request updates after all 
commands were completed. (mpapirkovskyy)
---
 .../server/actionmanager/ActionScheduler.java      |  3 +
 .../ambari/server/events/ServiceUpdateEvent.java   | 16 ++++-
 .../listeners/services/ServiceUpdateListener.java  | 22 ++-----
 .../publishers/BufferedUpdateEventPublisher.java   | 51 ++++++++-------
 .../HostComponentUpdateEventPublisher.java         | 30 ++-------
 .../publishers/RequestUpdateEventPublisher.java    | 70 +++++++--------------
 .../publishers/ServiceUpdateEventPublisher.java    | 72 ++++++++++++----------
 7 files changed, 120 insertions(+), 144 deletions(-)

diff --git 
a/ambari-server/src/main/java/org/apache/ambari/server/actionmanager/ActionScheduler.java
 
b/ambari-server/src/main/java/org/apache/ambari/server/actionmanager/ActionScheduler.java
index 2dfedb2..735a774 100644
--- 
a/ambari-server/src/main/java/org/apache/ambari/server/actionmanager/ActionScheduler.java
+++ 
b/ambari-server/src/main/java/org/apache/ambari/server/actionmanager/ActionScheduler.java
@@ -895,6 +895,9 @@ class ActionScheduler implements Runnable {
         }
 
         updateRoleStats(status, roleStats.get(roleStr));
+        if (status == HostRoleStatus.FAILED) {
+          LOG.info("Role {} on host {} was failed", roleStr, host);
+        }
 
       }
     }
diff --git 
a/ambari-server/src/main/java/org/apache/ambari/server/events/ServiceUpdateEvent.java
 
b/ambari-server/src/main/java/org/apache/ambari/server/events/ServiceUpdateEvent.java
index 34bc106..2492929 100644
--- 
a/ambari-server/src/main/java/org/apache/ambari/server/events/ServiceUpdateEvent.java
+++ 
b/ambari-server/src/main/java/org/apache/ambari/server/events/ServiceUpdateEvent.java
@@ -21,6 +21,7 @@ package org.apache.ambari.server.events;
 import org.apache.ambari.server.state.MaintenanceState;
 import org.apache.ambari.server.state.State;
 
+import com.fasterxml.jackson.annotation.JsonIgnore;
 import com.fasterxml.jackson.annotation.JsonInclude;
 import com.fasterxml.jackson.annotation.JsonProperty;
 
@@ -42,12 +43,17 @@ public class ServiceUpdateEvent extends STOMPEvent {
   @JsonProperty("state")
   private State state;
 
-  public ServiceUpdateEvent(String clusterName, MaintenanceState 
maintenanceState, String serviceName, State state) {
+  @JsonIgnore
+  private boolean stateChanged = false;
+
+  public ServiceUpdateEvent(String clusterName, MaintenanceState 
maintenanceState, String serviceName, State state,
+                            boolean stateChanged) {
     super(Type.SERVICE);
     this.clusterName = clusterName;
     this.maintenanceState = maintenanceState;
     this.serviceName = serviceName;
     this.state = state;
+    this.stateChanged = stateChanged;
   }
 
   public String getClusterName() {
@@ -82,6 +88,14 @@ public class ServiceUpdateEvent extends STOMPEvent {
     this.state = state;
   }
 
+  public boolean isStateChanged() {
+    return stateChanged;
+  }
+
+  public void setStateChanged(boolean stateChanged) {
+    this.stateChanged = stateChanged;
+  }
+
   @Override
   public boolean equals(Object o) {
     if (this == o) return true;
diff --git 
a/ambari-server/src/main/java/org/apache/ambari/server/events/listeners/services/ServiceUpdateListener.java
 
b/ambari-server/src/main/java/org/apache/ambari/server/events/listeners/services/ServiceUpdateListener.java
index c35cc08..4cbac1f 100644
--- 
a/ambari-server/src/main/java/org/apache/ambari/server/events/listeners/services/ServiceUpdateListener.java
+++ 
b/ambari-server/src/main/java/org/apache/ambari/server/events/listeners/services/ServiceUpdateListener.java
@@ -24,18 +24,14 @@ import java.util.Set;
 
 import org.apache.ambari.server.AmbariException;
 import org.apache.ambari.server.EagerSingleton;
-import 
org.apache.ambari.server.controller.utilities.ServiceCalculatedStateFactory;
-import 
org.apache.ambari.server.controller.utilities.state.ServiceCalculatedState;
 import org.apache.ambari.server.events.HostComponentUpdate;
 import org.apache.ambari.server.events.HostComponentsUpdateEvent;
 import org.apache.ambari.server.events.MaintenanceModeEvent;
 import org.apache.ambari.server.events.ServiceUpdateEvent;
 import org.apache.ambari.server.events.publishers.AmbariEventPublisher;
 import org.apache.ambari.server.events.publishers.STOMPUpdatePublisher;
-import org.apache.ambari.server.orm.dao.ServiceDesiredStateDAO;
 import org.apache.ambari.server.state.Clusters;
 import org.apache.ambari.server.state.MaintenanceState;
-import org.apache.ambari.server.state.State;
 
 import com.google.common.eventbus.Subscribe;
 import com.google.inject.Inject;
@@ -45,14 +41,10 @@ import com.google.inject.Singleton;
 @Singleton
 @EagerSingleton
 public class ServiceUpdateListener {
-  private Map<Long, Map<String, State>> states = new HashMap<>();
 
   private STOMPUpdatePublisher STOMPUpdatePublisher;
 
   @Inject
-  private ServiceDesiredStateDAO serviceDesiredStateDAO;
-
-  @Inject
   private Provider<Clusters> m_clusters;
 
   @Inject
@@ -75,15 +67,8 @@ public class ServiceUpdateListener {
       Long clusterId = clusterServices.getKey();
       String clusterName = 
m_clusters.get().getClusterById(clusterId).getClusterName();
       for (String serviceName : clusterServices.getValue()) {
-        ServiceCalculatedState serviceCalculatedState = 
ServiceCalculatedStateFactory.getServiceStateProvider(serviceName);
-        State serviceState = serviceCalculatedState.getState(clusterName, 
serviceName);
-
-        // retrieve state from cache
-        if (states.containsKey(clusterId) && 
states.get(clusterId).containsKey(serviceName) && 
states.get(clusterId).get(serviceName).equals(serviceState)) {
-          continue;
-        }
-        states.computeIfAbsent(clusterId, c -> new 
HashMap<>()).put(serviceName, serviceState);
-        STOMPUpdatePublisher.publish(new ServiceUpdateEvent(clusterName, null, 
serviceName, serviceState));
+        STOMPUpdatePublisher.publish(new ServiceUpdateEvent(clusterName, null, 
serviceName, null,
+            true));
       }
     }
   }
@@ -99,6 +84,7 @@ public class ServiceUpdateListener {
 
     MaintenanceState maintenanceState = event.getMaintenanceState();
 
-    STOMPUpdatePublisher.publish(new ServiceUpdateEvent(clusterName, 
maintenanceState, serviceName, null));
+    STOMPUpdatePublisher.publish(new ServiceUpdateEvent(clusterName, 
maintenanceState, serviceName, null,
+        false));
   }
 }
diff --git 
a/ambari-server/src/main/java/org/apache/ambari/server/events/publishers/BufferedUpdateEventPublisher.java
 
b/ambari-server/src/main/java/org/apache/ambari/server/events/publishers/BufferedUpdateEventPublisher.java
index e02785f..25d396c 100644
--- 
a/ambari-server/src/main/java/org/apache/ambari/server/events/publishers/BufferedUpdateEventPublisher.java
+++ 
b/ambari-server/src/main/java/org/apache/ambari/server/events/publishers/BufferedUpdateEventPublisher.java
@@ -24,8 +24,6 @@ import java.util.concurrent.ConcurrentLinkedQueue;
 import java.util.concurrent.Executors;
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.concurrent.atomic.AtomicLong;
 
 import com.google.common.eventbus.EventBus;
 import com.google.inject.Singleton;
@@ -34,31 +32,25 @@ import com.google.inject.Singleton;
 public abstract class BufferedUpdateEventPublisher<T> {
 
   private static final long TIMEOUT = 1000L;
-  private final AtomicLong previousTime = new AtomicLong(0);
-  private final AtomicBoolean collecting = new AtomicBoolean(false);
   private final ConcurrentLinkedQueue<T> buffer = new 
ConcurrentLinkedQueue<>();
-  private final ScheduledExecutorService scheduledExecutorService = 
Executors.newScheduledThreadPool(1);
+
+  private ScheduledExecutorService scheduledExecutorService;
 
   public void publish(T event, EventBus m_eventBus) {
-    long eventTime = System.currentTimeMillis();
-    if ((eventTime - previousTime.get() <= TIMEOUT) && !collecting.get()) {
-      buffer.add(event);
-      collecting.set(true);
-      scheduledExecutorService.schedule(getScheduledPublisher(m_eventBus),
-          TIMEOUT, TimeUnit.MILLISECONDS);
-    } else if (collecting.get()) {
-      buffer.add(event);
-    } else {
-      //TODO add logging and metrics posting
-      previousTime.set(eventTime);
-      m_eventBus.post(event);
+    if (scheduledExecutorService == null) {
+      scheduledExecutorService =
+          Executors.newScheduledThreadPool(1);
+      scheduledExecutorService
+          .scheduleWithFixedDelay(getScheduledPublisher(m_eventBus), TIMEOUT, 
TIMEOUT, TimeUnit.MILLISECONDS);
     }
+    buffer.add(event);
   }
 
-  protected abstract Runnable getScheduledPublisher(EventBus m_eventBus);
+  protected MergingRunnable getScheduledPublisher(EventBus m_eventBus) {
+    return new MergingRunnable(m_eventBus);
+  }
 
   protected List<T> retrieveBuffer() {
-    resetCollecting();
     List<T> bufferContent = new ArrayList<>();
     while (!buffer.isEmpty()) {
       bufferContent.add(buffer.poll());
@@ -66,8 +58,23 @@ public abstract class BufferedUpdateEventPublisher<T> {
     return bufferContent;
   }
 
-  protected void resetCollecting() {
-    previousTime.set(System.currentTimeMillis());
-    collecting.set(false);
+  public abstract void mergeBufferAndPost(List<T> events, EventBus m_eventBus);
+
+  private class MergingRunnable implements Runnable {
+
+    private final EventBus m_eventBus;
+
+    public MergingRunnable(EventBus m_eventBus) {
+      this.m_eventBus = m_eventBus;
+    }
+
+    @Override
+    public final void run() {
+      List<T> events = retrieveBuffer();
+      if (events.isEmpty()) {
+        return;
+      }
+      mergeBufferAndPost(events, m_eventBus);
+    }
   }
 }
diff --git 
a/ambari-server/src/main/java/org/apache/ambari/server/events/publishers/HostComponentUpdateEventPublisher.java
 
b/ambari-server/src/main/java/org/apache/ambari/server/events/publishers/HostComponentUpdateEventPublisher.java
index f7fea1d..d9f51e8 100644
--- 
a/ambari-server/src/main/java/org/apache/ambari/server/events/publishers/HostComponentUpdateEventPublisher.java
+++ 
b/ambari-server/src/main/java/org/apache/ambari/server/events/publishers/HostComponentUpdateEventPublisher.java
@@ -31,30 +31,12 @@ import com.google.inject.Singleton;
 public class HostComponentUpdateEventPublisher extends 
BufferedUpdateEventPublisher<HostComponentsUpdateEvent> {
 
   @Override
-  protected Runnable getScheduledPublisher(EventBus m_eventBus) {
-    return new HostComponentsEventRunnable(m_eventBus);
-  }
-
-  private class HostComponentsEventRunnable implements Runnable {
-
-    private final EventBus eventBus;
-
-    public HostComponentsEventRunnable(EventBus eventBus) {
-      this.eventBus = eventBus;
-    }
-
-    @Override
-    public void run() {
-      List<HostComponentsUpdateEvent> hostComponentUpdateEvents = 
retrieveBuffer();
-      if (hostComponentUpdateEvents.isEmpty()) {
-        return;
-      }
-      List<HostComponentUpdate> hostComponentUpdates = 
hostComponentUpdateEvents.stream().flatMap(
-          u -> 
u.getHostComponentUpdates().stream()).collect(Collectors.toList());
+  public void mergeBufferAndPost(List<HostComponentsUpdateEvent> events, 
EventBus m_eventBus) {
+    List<HostComponentUpdate> hostComponentUpdates = events.stream().flatMap(
+        u -> 
u.getHostComponentUpdates().stream()).collect(Collectors.toList());
 
-      HostComponentsUpdateEvent resultEvents = new 
HostComponentsUpdateEvent(hostComponentUpdates);
-      //TODO add logging and metrics posting
-      eventBus.post(resultEvents);
-    }
+    HostComponentsUpdateEvent resultEvents = new 
HostComponentsUpdateEvent(hostComponentUpdates);
+    //TODO add logging and metrics posting
+    m_eventBus.post(resultEvents);
   }
 }
diff --git 
a/ambari-server/src/main/java/org/apache/ambari/server/events/publishers/RequestUpdateEventPublisher.java
 
b/ambari-server/src/main/java/org/apache/ambari/server/events/publishers/RequestUpdateEventPublisher.java
index e080bd9..42f22ba 100644
--- 
a/ambari-server/src/main/java/org/apache/ambari/server/events/publishers/RequestUpdateEventPublisher.java
+++ 
b/ambari-server/src/main/java/org/apache/ambari/server/events/publishers/RequestUpdateEventPublisher.java
@@ -18,9 +18,9 @@
 
 package org.apache.ambari.server.events.publishers;
 
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.Executors;
-import java.util.concurrent.TimeUnit;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
 
 import org.apache.ambari.server.controller.internal.CalculatedStatus;
 import org.apache.ambari.server.events.RequestUpdateEvent;
@@ -35,11 +35,7 @@ import com.google.inject.Inject;
 import com.google.inject.Singleton;
 
 @Singleton
-public class RequestUpdateEventPublisher {
-
-  private final Long TIMEOUT = 1000L;
-  private ConcurrentHashMap<Long, Long> previousTime = new 
ConcurrentHashMap<>();
-  private ConcurrentHashMap<Long, RequestUpdateEvent> buffer = new 
ConcurrentHashMap<>();
+public class RequestUpdateEventPublisher extends 
BufferedUpdateEventPublisher<RequestUpdateEvent> {
 
   @Inject
   private HostRoleCommandDAO hostRoleCommandDAO;
@@ -53,27 +49,25 @@ public class RequestUpdateEventPublisher {
   @Inject
   private ClusterDAO clusterDAO;
 
-  public void publish(RequestUpdateEvent event, EventBus m_eventBus) {
-    Long eventTime = System.currentTimeMillis();
-    Long requestId = event.getRequestId();
-    if (!previousTime.containsKey(requestId)) {
-      previousTime.put(requestId, 0L);
+  @Override
+  public void mergeBufferAndPost(List<RequestUpdateEvent> events, EventBus 
m_eventBus) {
+    Map<Long, RequestUpdateEvent> filteredRequests = new HashMap<>();
+    for (RequestUpdateEvent event : events) {
+      Long requestId = event.getRequestId();
+      if (filteredRequests.containsKey(requestId)) {
+        RequestUpdateEvent filteredRequest = filteredRequests.get(requestId);
+        filteredRequest.setEndTime(event.getEndTime());
+        filteredRequest.setRequestStatus(event.getRequestStatus());
+        filteredRequest.setRequestContext(event.getRequestContext());
+        
filteredRequest.getHostRoleCommands().removeAll(event.getHostRoleCommands());
+        
filteredRequest.getHostRoleCommands().addAll(event.getHostRoleCommands());
+      } else {
+        filteredRequests.put(requestId, event);
+      }
     }
-    if (eventTime - previousTime.get(requestId) <= TIMEOUT && 
!buffer.containsKey(requestId)) {
-      buffer.put(event.getRequestId(), event);
-      Executors.newScheduledThreadPool(1).schedule(new 
RequestEventRunnable(requestId, m_eventBus),
-          TIMEOUT, TimeUnit.MILLISECONDS);
-    } else if (buffer.containsKey(requestId)) {
-      //merge available buffer content with arrived
-      buffer.get(requestId).setEndTime(event.getEndTime());
-      buffer.get(requestId).setRequestStatus(event.getRequestStatus());
-      buffer.get(requestId).setRequestContext(event.getRequestContext());
-      
buffer.get(requestId).getHostRoleCommands().removeAll(event.getHostRoleCommands());
-      
buffer.get(requestId).getHostRoleCommands().addAll(event.getHostRoleCommands());
-    } else {
-      previousTime.put(requestId, eventTime);
-      //TODO add logging and metrics posting
-      m_eventBus.post(fillRequest(event));
+    for (RequestUpdateEvent requestUpdateEvent : filteredRequests.values()) {
+      RequestUpdateEvent filled = fillRequest(requestUpdateEvent);
+      m_eventBus.post(filled);
     }
   }
 
@@ -94,24 +88,4 @@ public class RequestUpdateEventPublisher {
     }
     return event;
   }
-
-  private class RequestEventRunnable implements Runnable {
-
-    private final long requestId;
-    private final EventBus eventBus;
-
-    public RequestEventRunnable(long requestId, EventBus eventBus) {
-      this.requestId = requestId;
-      this.eventBus = eventBus;
-    }
-
-    @Override
-    public void run() {
-      RequestUpdateEvent resultEvent = buffer.get(requestId);
-      //TODO add logging and metrics posting
-      eventBus.post(fillRequest(resultEvent));
-      buffer.remove(requestId);
-      previousTime.remove(requestId);
-    }
-  }
 }
diff --git 
a/ambari-server/src/main/java/org/apache/ambari/server/events/publishers/ServiceUpdateEventPublisher.java
 
b/ambari-server/src/main/java/org/apache/ambari/server/events/publishers/ServiceUpdateEventPublisher.java
index 8f45859..6dfef43 100644
--- 
a/ambari-server/src/main/java/org/apache/ambari/server/events/publishers/ServiceUpdateEventPublisher.java
+++ 
b/ambari-server/src/main/java/org/apache/ambari/server/events/publishers/ServiceUpdateEventPublisher.java
@@ -19,52 +19,62 @@
 package org.apache.ambari.server.events.publishers;
 
 import java.util.ArrayList;
+import java.util.HashMap;
 import java.util.List;
+import java.util.Map;
 
+import 
org.apache.ambari.server.controller.utilities.ServiceCalculatedStateFactory;
+import 
org.apache.ambari.server.controller.utilities.state.ServiceCalculatedState;
 import org.apache.ambari.server.events.ServiceUpdateEvent;
+import org.apache.ambari.server.state.State;
 
 import com.google.common.eventbus.EventBus;
 import com.google.inject.Singleton;
 
 @Singleton
 public class ServiceUpdateEventPublisher extends 
BufferedUpdateEventPublisher<ServiceUpdateEvent> {
+  private Map<String, Map<String, State>> states = new HashMap<>();
 
-  @Override
-  protected Runnable getScheduledPublisher(EventBus m_eventBus) {
-    return new ServiceEventRunnable(m_eventBus);
-  }
-
-  private class ServiceEventRunnable implements Runnable {
 
-    private final EventBus eventBus;
-
-    public ServiceEventRunnable(EventBus eventBus) {
-      this.eventBus = eventBus;
+  @Override
+  public void mergeBufferAndPost(List<ServiceUpdateEvent> events, EventBus 
eventBus) {
+    List<ServiceUpdateEvent> filtered = new ArrayList<>();
+    for (ServiceUpdateEvent event : events) {
+      int pos = filtered.indexOf(event);
+      if (pos != -1) {
+        if (event.isStateChanged()) {
+          filtered.get(pos).setStateChanged(true);
+        }
+        if (event.getMaintenanceState() != null) {
+          filtered.get(pos).setMaintenanceState(event.getMaintenanceState());
+        }
+      } else {
+        filtered.add(event);
+      }
     }
+    for (ServiceUpdateEvent serviceUpdateEvent : filtered) {
+      // calc state
+      if (serviceUpdateEvent.isStateChanged()) {
+        ServiceCalculatedState serviceCalculatedState =
+            
ServiceCalculatedStateFactory.getServiceStateProvider(serviceUpdateEvent.getServiceName());
+        State serviceState =
+            
serviceCalculatedState.getState(serviceUpdateEvent.getClusterName(), 
serviceUpdateEvent.getServiceName());
 
-    @Override
-    public void run() {
-      List<ServiceUpdateEvent> serviceUpdates = retrieveBuffer();
-      if (serviceUpdates.isEmpty()) {
-        return;
-      }
-      List<ServiceUpdateEvent> filtered = new ArrayList<>();
-      for (ServiceUpdateEvent event : serviceUpdates) {
-        int pos = filtered.indexOf(event);
-        if (pos != -1) {
-          if (event.getState() != null) {
-            filtered.get(pos).setState(event.getState());
-          }
-          if (event.getMaintenanceState() != null) {
-            filtered.get(pos).setMaintenanceState(event.getMaintenanceState());
-          }
-        } else {
-          filtered.add(event);
+        String serviceName = serviceUpdateEvent.getServiceName();
+        String clusterName = serviceUpdateEvent.getClusterName();
+
+        // retrieve state from cache
+        // don't send update when state was not changed and update doesn't 
have maintenance info
+        if (states.containsKey(clusterName) && 
states.get(clusterName).containsKey(serviceName)
+            && states.get(clusterName).get(serviceName).equals(serviceState)
+            && serviceUpdateEvent.getMaintenanceState() == null) {
+          continue;
         }
+        states.computeIfAbsent(clusterName, c -> new 
HashMap<>()).put(serviceName, serviceState);
+        serviceUpdateEvent.setState(serviceState);
       }
-      for (ServiceUpdateEvent serviceUpdateEvent : serviceUpdates) {
-        eventBus.post(serviceUpdateEvent);
-      }
+
+      eventBus.post(serviceUpdateEvent);
     }
   }
 }

Reply via email to