Optimize and fix request status change
Project: http://git-wip-us.apache.org/repos/asf/ambari/repo Commit: http://git-wip-us.apache.org/repos/asf/ambari/commit/1678b34b Tree: http://git-wip-us.apache.org/repos/asf/ambari/tree/1678b34b Diff: http://git-wip-us.apache.org/repos/asf/ambari/diff/1678b34b Branch: refs/heads/audit_logging Commit: 1678b34b462de2d5278977d7641a0a8ca482581e Parents: dfc98fa Author: Daniel Gergely <dgerg...@hortonworks.com> Authored: Tue Mar 22 11:30:54 2016 +0100 Committer: Toader, Sebastian <stoa...@hortonworks.com> Committed: Thu Mar 24 13:06:50 2016 +0100 ---------------------------------------------------------------------- .../actionmanager/ActionDBAccessorImpl.java | 64 ++++++++++++++++++-- 1 file changed, 59 insertions(+), 5 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ambari/blob/1678b34b/ambari-server/src/main/java/org/apache/ambari/server/actionmanager/ActionDBAccessorImpl.java ---------------------------------------------------------------------- diff --git a/ambari-server/src/main/java/org/apache/ambari/server/actionmanager/ActionDBAccessorImpl.java b/ambari-server/src/main/java/org/apache/ambari/server/actionmanager/ActionDBAccessorImpl.java index 2d6aa52..9ebe543 100644 --- a/ambari-server/src/main/java/org/apache/ambari/server/actionmanager/ActionDBAccessorImpl.java +++ b/ambari-server/src/main/java/org/apache/ambari/server/actionmanager/ActionDBAccessorImpl.java @@ -23,8 +23,13 @@ import java.util.Collections; import java.util.Comparator; import java.util.EnumSet; import java.util.HashMap; +import java.util.HashSet; +import java.util.Iterator; +import java.util.LinkedList; import java.util.List; import java.util.Map; +import java.util.Set; +import java.util.TreeSet; import java.util.concurrent.TimeUnit; import org.apache.ambari.annotations.Experimental; @@ -35,6 +40,7 @@ import org.apache.ambari.annotations.TransactionalLock.LockType; import org.apache.ambari.server.AmbariException; import org.apache.ambari.server.agent.CommandReport; import org.apache.ambari.server.agent.ExecutionCommand; +import org.apache.ambari.server.api.services.ResultStatus; import org.apache.ambari.server.audit.event.AuditEvent; import org.apache.ambari.server.audit.AuditLogger; import org.apache.ambari.server.audit.event.OperationStatusAuditEvent; @@ -127,11 +133,17 @@ public class ActionDBAccessorImpl implements ActionDBAccessor { /** * Caches to store current request and task statuses. - * It is used for deduplicating status reports in order to avoid autitlog entry duplication + * It is used for avoiding audit log entry duplication */ private Map<Long, HostRoleStatus> temporaryStatusCache = new HashMap<Long, HostRoleStatus>(); private Map<Long, HostRoleStatus> temporaryTaskStatusCache = new HashMap<Long, HostRoleStatus>(); + /** + * Stores the host role command entities that are not completed for a request id + * It is used to calculate the summary state of the request for audit logging + */ + private Map<Long, List<HostRoleCommandEntity>> tasksForRequest = new HashMap<>(); + private Cache<Long, HostRoleCommand> hostRoleCommandCache; private long cacheLimit; //may be exceeded to store tasks from one request @@ -467,6 +479,8 @@ public class ActionDBAccessorImpl implements ActionDBAccessor { for (HostRoleCommandEntity commandEntity : commandEntities) { CommandReport report = taskReports.get(commandEntity.getTaskId()); + boolean statusChanged = false; + switch (commandEntity.getStatus()) { case ABORTED: // We don't want to overwrite statuses for ABORTED tasks with @@ -486,7 +500,7 @@ public class ActionDBAccessorImpl implements ActionDBAccessor { } commandEntity.setStatus(status); - auditLog(commandEntity, requestId); + statusChanged = true; break; } @@ -505,6 +519,9 @@ public class ActionDBAccessorImpl implements ActionDBAccessor { long stageId = requestStageIds[1]; if (requestDAO.getLastStageId(requestId).equals(stageId)) { requestsToCheck.add(requestId); + if(statusChanged) { + auditLog(commandEntity, requestId); // wrong requestId !!! + } } } } @@ -798,14 +815,17 @@ public class ActionDBAccessorImpl implements ActionDBAccessor { */ private void auditLog(HostRoleCommandEntity commandEntity, Long requestId) { if(requestId != null) { - List<Stage> stages = getAllStages(requestId); - CalculatedStatus cs = CalculatedStatus.statusFromStages(stages); + + CalculatedStatus cs = calculateStatus(commandEntity, requestId); + if (!temporaryStatusCache.containsKey(requestId) || temporaryStatusCache.get(requestId) != cs.getStatus()) { + RequestEntity request = requestDAO.findByPK(requestId); + String context = request != null ? request.getRequestContext() : null; AuditEvent auditEvent = OperationStatusAuditEvent.builder() .withRequestId(String.valueOf(requestId)) .withStatus(String.valueOf(cs.getStatus())) - .withRequestContext(stages.isEmpty() ? "" : stages.get(0).getRequestContext()) + .withRequestContext(context) .withTimestamp(System.currentTimeMillis()) .build(); auditLogger.log(auditEvent); @@ -817,6 +837,40 @@ public class ActionDBAccessorImpl implements ActionDBAccessor { } /** + * Calculates summary status for the given request id for the new command entity status + * @param commandEntity + * @param requestId + * @return + */ + private CalculatedStatus calculateStatus(HostRoleCommandEntity commandEntity, Long requestId) { + if(!tasksForRequest.containsKey(requestId)) { + tasksForRequest.put(requestId, new LinkedList<HostRoleCommandEntity>()); + } + for(Iterator<HostRoleCommandEntity> it = tasksForRequest.get(requestId).iterator(); it.hasNext();) { + HostRoleCommandEntity hrce = it.next(); + if(commandEntity.getTaskId().equals(hrce.getTaskId())) { + it.remove(); + } + } + tasksForRequest.get(requestId).add(commandEntity); + + CalculatedStatus cs = CalculatedStatus.statusFromTaskEntities(tasksForRequest.get(requestId), false); + + // if all task status is completed, we can remove it from the container + boolean hasInProgress = false; + for(HostRoleCommandEntity hrce : tasksForRequest.get(requestId)) { + if(!hrce.getStatus().isCompletedState()) { + hasInProgress = true; + break; + } + } + if(!hasInProgress) { + tasksForRequest.remove(requestId); + } + return cs; + } + + /** * Logs task status change * @param commandEntity * @param requestId