Optimize and fix request status change

Project: http://git-wip-us.apache.org/repos/asf/ambari/repo
Commit: http://git-wip-us.apache.org/repos/asf/ambari/commit/1678b34b
Tree: http://git-wip-us.apache.org/repos/asf/ambari/tree/1678b34b
Diff: http://git-wip-us.apache.org/repos/asf/ambari/diff/1678b34b

Branch: refs/heads/audit_logging
Commit: 1678b34b462de2d5278977d7641a0a8ca482581e
Parents: dfc98fa
Author: Daniel Gergely <dgerg...@hortonworks.com>
Authored: Tue Mar 22 11:30:54 2016 +0100
Committer: Toader, Sebastian <stoa...@hortonworks.com>
Committed: Thu Mar 24 13:06:50 2016 +0100

----------------------------------------------------------------------
 .../actionmanager/ActionDBAccessorImpl.java     | 64 ++++++++++++++++++--
 1 file changed, 59 insertions(+), 5 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ambari/blob/1678b34b/ambari-server/src/main/java/org/apache/ambari/server/actionmanager/ActionDBAccessorImpl.java
----------------------------------------------------------------------
diff --git 
a/ambari-server/src/main/java/org/apache/ambari/server/actionmanager/ActionDBAccessorImpl.java
 
b/ambari-server/src/main/java/org/apache/ambari/server/actionmanager/ActionDBAccessorImpl.java
index 2d6aa52..9ebe543 100644
--- 
a/ambari-server/src/main/java/org/apache/ambari/server/actionmanager/ActionDBAccessorImpl.java
+++ 
b/ambari-server/src/main/java/org/apache/ambari/server/actionmanager/ActionDBAccessorImpl.java
@@ -23,8 +23,13 @@ import java.util.Collections;
 import java.util.Comparator;
 import java.util.EnumSet;
 import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
+import java.util.Set;
+import java.util.TreeSet;
 import java.util.concurrent.TimeUnit;
 
 import org.apache.ambari.annotations.Experimental;
@@ -35,6 +40,7 @@ import 
org.apache.ambari.annotations.TransactionalLock.LockType;
 import org.apache.ambari.server.AmbariException;
 import org.apache.ambari.server.agent.CommandReport;
 import org.apache.ambari.server.agent.ExecutionCommand;
+import org.apache.ambari.server.api.services.ResultStatus;
 import org.apache.ambari.server.audit.event.AuditEvent;
 import org.apache.ambari.server.audit.AuditLogger;
 import org.apache.ambari.server.audit.event.OperationStatusAuditEvent;
@@ -127,11 +133,17 @@ public class ActionDBAccessorImpl implements 
ActionDBAccessor {
 
   /**
    * Caches to store current request and task statuses.
-   * It is used for deduplicating status reports in order to avoid autitlog 
entry duplication
+   * It is used for avoiding audit log entry duplication
    */
   private Map<Long, HostRoleStatus> temporaryStatusCache = new HashMap<Long, 
HostRoleStatus>();
   private Map<Long, HostRoleStatus> temporaryTaskStatusCache = new 
HashMap<Long, HostRoleStatus>();
 
+  /**
+   * Stores the host role command entities that are not completed for a 
request id
+   * It is used to calculate the summary state of the request for audit logging
+   */
+  private Map<Long, List<HostRoleCommandEntity>> tasksForRequest = new 
HashMap<>();
+
   private Cache<Long, HostRoleCommand> hostRoleCommandCache;
   private long cacheLimit; //may be exceeded to store tasks from one request
 
@@ -467,6 +479,8 @@ public class ActionDBAccessorImpl implements 
ActionDBAccessor {
     for (HostRoleCommandEntity commandEntity : commandEntities) {
       CommandReport report = taskReports.get(commandEntity.getTaskId());
 
+      boolean statusChanged = false;
+
       switch (commandEntity.getStatus()) {
         case ABORTED:
           // We don't want to overwrite statuses for ABORTED tasks with
@@ -486,7 +500,7 @@ public class ActionDBAccessorImpl implements 
ActionDBAccessor {
           }
 
           commandEntity.setStatus(status);
-          auditLog(commandEntity, requestId);
+          statusChanged = true;
           break;
       }
 
@@ -505,6 +519,9 @@ public class ActionDBAccessorImpl implements 
ActionDBAccessor {
         long stageId = requestStageIds[1];
         if (requestDAO.getLastStageId(requestId).equals(stageId)) {
           requestsToCheck.add(requestId);
+          if(statusChanged) {
+            auditLog(commandEntity, requestId); // wrong requestId !!!
+          }
         }
       }
     }
@@ -798,14 +815,17 @@ public class ActionDBAccessorImpl implements 
ActionDBAccessor {
    */
   private void auditLog(HostRoleCommandEntity commandEntity, Long requestId) {
     if(requestId != null) {
-      List<Stage> stages = getAllStages(requestId);
-      CalculatedStatus cs = CalculatedStatus.statusFromStages(stages);
+
+      CalculatedStatus cs = calculateStatus(commandEntity, requestId);
+
       if (!temporaryStatusCache.containsKey(requestId) || 
temporaryStatusCache.get(requestId) != cs.getStatus()) {
 
+        RequestEntity request = requestDAO.findByPK(requestId);
+        String context = request != null ? request.getRequestContext() : null;
         AuditEvent auditEvent = OperationStatusAuditEvent.builder()
           .withRequestId(String.valueOf(requestId))
           .withStatus(String.valueOf(cs.getStatus()))
-          .withRequestContext(stages.isEmpty() ? "" : 
stages.get(0).getRequestContext())
+          .withRequestContext(context)
           .withTimestamp(System.currentTimeMillis())
           .build();
         auditLogger.log(auditEvent);
@@ -817,6 +837,40 @@ public class ActionDBAccessorImpl implements 
ActionDBAccessor {
   }
 
   /**
+   * Calculates summary status for the given request id for the new command 
entity status
+   * @param commandEntity
+   * @param requestId
+   * @return
+   */
+  private CalculatedStatus calculateStatus(HostRoleCommandEntity 
commandEntity, Long requestId) {
+    if(!tasksForRequest.containsKey(requestId)) {
+      tasksForRequest.put(requestId, new LinkedList<HostRoleCommandEntity>());
+    }
+    for(Iterator<HostRoleCommandEntity> it = 
tasksForRequest.get(requestId).iterator(); it.hasNext();) {
+      HostRoleCommandEntity hrce = it.next();
+      if(commandEntity.getTaskId().equals(hrce.getTaskId())) {
+        it.remove();
+      }
+    }
+    tasksForRequest.get(requestId).add(commandEntity);
+
+    CalculatedStatus cs = 
CalculatedStatus.statusFromTaskEntities(tasksForRequest.get(requestId), false);
+
+    // if all task status is completed, we can remove it from the container
+    boolean hasInProgress = false;
+    for(HostRoleCommandEntity hrce : tasksForRequest.get(requestId)) {
+      if(!hrce.getStatus().isCompletedState()) {
+        hasInProgress = true;
+        break;
+      }
+    }
+    if(!hasInProgress) {
+      tasksForRequest.remove(requestId);
+    }
+    return cs;
+  }
+
+  /**
    * Logs task status change
    * @param commandEntity
    * @param requestId

Reply via email to