This is an automated email from the ASF dual-hosted git repository. mpapirkovskyy pushed a commit to branch trunk in repository https://gitbox.apache.org/repos/asf/ambari.git
The following commit(s) were added to refs/heads/trunk by this push: new d497004 AMBARI-24521. CLONE - Requests STOMP topic sent updates for host check request. (#2143) d497004 is described below commit d497004d39639102d4092509ef8d955840bf3ecc Author: Myroslav Papirkovskyi <mpapirkovs...@apache.org> AuthorDate: Wed Aug 22 14:23:37 2018 +0300 AMBARI-24521. CLONE - Requests STOMP topic sent updates for host check request. (#2143) * AMBARI-24521. CLONE - Requests STOMP topic sent updates for host check request. (mpapirkovskyy) * AMBARI-24521. CLONE - Requests STOMP topic sent updates for host check request. (mpapirkovskyy) --- .../server/actionmanager/ActionDBAccessorImpl.java | 14 ++++++- .../events/listeners/tasks/TaskStatusListener.java | 46 ++++++++++++++++------ 2 files changed, 47 insertions(+), 13 deletions(-) diff --git a/ambari-server/src/main/java/org/apache/ambari/server/actionmanager/ActionDBAccessorImpl.java b/ambari-server/src/main/java/org/apache/ambari/server/actionmanager/ActionDBAccessorImpl.java index 3543486..1a055b3 100644 --- a/ambari-server/src/main/java/org/apache/ambari/server/actionmanager/ActionDBAccessorImpl.java +++ b/ambari-server/src/main/java/org/apache/ambari/server/actionmanager/ActionDBAccessorImpl.java @@ -436,8 +436,18 @@ public class ActionDBAccessorImpl implements ActionDBAccessor { TaskCreateEvent taskCreateEvent = new TaskCreateEvent(hostRoleCommands); taskEventPublisher.publish(taskCreateEvent); List<HostRoleCommandEntity> hostRoleCommandEntities = hostRoleCommandDAO.findByRequest(requestEntity.getRequestId()); - STOMPUpdatePublisher.publish(new RequestUpdateEvent(requestEntity, - hostRoleCommandDAO, topologyManager, clusterName, hostRoleCommandEntities)); + + // "requests" STOMP topic is used for clusters related requests only. + // Requests without clusters (like host checks) should be posted to divided topic. + if (clusterName != null) { + STOMPUpdatePublisher.publish(new RequestUpdateEvent(requestEntity, + hostRoleCommandDAO, topologyManager, clusterName, hostRoleCommandEntities)); + } else { + LOG.debug("No STOMP request update event was fired for new request due no cluster related, " + + "request id: {}, command name: {}", + requestEntity.getRequestId(), + requestEntity.getCommandName()); + } } @Override diff --git a/ambari-server/src/main/java/org/apache/ambari/server/events/listeners/tasks/TaskStatusListener.java b/ambari-server/src/main/java/org/apache/ambari/server/events/listeners/tasks/TaskStatusListener.java index 0570fdf..b188729 100644 --- a/ambari-server/src/main/java/org/apache/ambari/server/events/listeners/tasks/TaskStatusListener.java +++ b/ambari-server/src/main/java/org/apache/ambari/server/events/listeners/tasks/TaskStatusListener.java @@ -28,7 +28,6 @@ import java.util.Map; import java.util.Set; import java.util.concurrent.ConcurrentHashMap; -import org.apache.ambari.server.ClusterNotFoundException; import org.apache.ambari.server.EagerSingleton; import org.apache.ambari.server.Role; import org.apache.ambari.server.actionmanager.HostRoleCommand; @@ -123,7 +122,7 @@ public class TaskStatusListener { * @param event Consumes {@link TaskUpdateEvent}. */ @Subscribe - public void onTaskUpdateEvent(TaskUpdateEvent event) throws ClusterNotFoundException { + public void onTaskUpdateEvent(TaskUpdateEvent event) { LOG.debug("Received task update event {}", event); List<HostRoleCommand> hostRoleCommandListAll = event.getHostRoleCommands(); List<HostRoleCommand> hostRoleCommandWithReceivedStatus = new ArrayList<>(); @@ -145,13 +144,27 @@ public class TaskStatusListener { requestIdsWithReceivedTaskStatus.add(hostRoleCommand.getRequestId()); if (!activeTasksMap.get(reportedTaskId).getStatus().equals(hostRoleCommand.getStatus())) { - Set<RequestUpdateEvent.HostRoleCommand> hostRoleCommands = new HashSet<>(); - hostRoleCommands.add(new RequestUpdateEvent.HostRoleCommand(hostRoleCommand.getTaskId(), - hostRoleCommand.getRequestId(), - hostRoleCommand.getStatus(), - hostRoleCommand.getHostName())); - requestsToPublish.add(new RequestUpdateEvent(hostRoleCommand.getRequestId(), - activeRequestMap.get(hostRoleCommand.getRequestId()).getStatus(), hostRoleCommands)); + // Ignore requests not related to any cluster. "requests" topic is used for cluster requests only. + Long clusterId = activeRequestMap.get(hostRoleCommand.getRequestId()).getClusterId(); + if (clusterId != null && clusterId != -1) { + Set<RequestUpdateEvent.HostRoleCommand> hostRoleCommands = new HashSet<>(); + hostRoleCommands.add(new RequestUpdateEvent.HostRoleCommand(hostRoleCommand.getTaskId(), + hostRoleCommand.getRequestId(), + hostRoleCommand.getStatus(), + hostRoleCommand.getHostName())); + requestsToPublish.add(new RequestUpdateEvent(hostRoleCommand.getRequestId(), + activeRequestMap.get(hostRoleCommand.getRequestId()).getStatus(), hostRoleCommands)); + } else { + LOG.debug("No STOMP request update event was fired for host component status change due no cluster related, " + + "request id: {}, role: {}, role command: {}, host: {}, task id: {}, old state: {}, new state: {}", + hostRoleCommand.getRequestId(), + hostRoleCommand.getRole(), + hostRoleCommand.getRoleCommand(), + hostRoleCommand.getHostName(), + hostRoleCommand.getTaskId(), + activeTasksMap.get(reportedTaskId).getStatus(), + hostRoleCommand.getStatus()); + } } } } @@ -264,7 +277,8 @@ public class TaskStatusListener { // Request entity of the hostrolecommand should be persisted before publishing task create event assert requestEntity != null; Set<StageEntityPK> stageEntityPKs = Sets.newHashSet(stageEntityPK); - ActiveRequest request = new ActiveRequest(requestEntity.getStatus(),requestEntity.getDisplayStatus(), stageEntityPKs); + ActiveRequest request = new ActiveRequest(requestEntity.getStatus(),requestEntity.getDisplayStatus(), + stageEntityPKs, requestEntity.getClusterId()); activeRequestMap.put(requestId, request); } } @@ -524,11 +538,14 @@ public class TaskStatusListener { private HostRoleStatus status; private HostRoleStatus displayStatus; private Set <StageEntityPK> stageEntityPks; + private Long clusterId; - public ActiveRequest(HostRoleStatus status, HostRoleStatus displayStatus, Set<StageEntityPK> stageEntityPks) { + public ActiveRequest(HostRoleStatus status, HostRoleStatus displayStatus, Set<StageEntityPK> stageEntityPks, + Long clusterId) { this.status = status; this.displayStatus = displayStatus; this.stageEntityPks = stageEntityPks; + this.clusterId = clusterId; } public HostRoleStatus getStatus() { @@ -559,6 +576,13 @@ public class TaskStatusListener { stageEntityPks.add(stageEntityPK); } + public Long getClusterId() { + return clusterId; + } + + public void setClusterId(Long clusterId) { + this.clusterId = clusterId; + } } /**