Repository: ambari
Updated Branches:
  refs/heads/branch-2.5 80d804849 -> 32ee5995b


AMBARI-20749. Ambari data purging. (stoader)


Project: http://git-wip-us.apache.org/repos/asf/ambari/repo
Commit: http://git-wip-us.apache.org/repos/asf/ambari/commit/32ee5995
Tree: http://git-wip-us.apache.org/repos/asf/ambari/tree/32ee5995
Diff: http://git-wip-us.apache.org/repos/asf/ambari/diff/32ee5995

Branch: refs/heads/branch-2.5
Commit: 32ee5995bdaaabeb52e5aa6ef4533eb70d9bf809
Parents: 80d8048
Author: Toader, Sebastian <stoa...@hortonworks.com>
Authored: Wed Jun 14 16:39:07 2017 +0200
Committer: Toader, Sebastian <stoa...@hortonworks.com>
Committed: Wed Jun 14 16:39:50 2017 +0200

----------------------------------------------------------------------
 ambari-server/sbin/ambari-server                |  6 +--
 .../checks/DatabaseConsistencyCheckHelper.java  |  4 +-
 .../ambari/server/cleanup/CleanupDriver.java    | 15 +++++---
 .../ambari/server/cleanup/CleanupService.java   | 19 +++++++++-
 .../server/cleanup/CleanupServiceImpl.java      | 33 ++++++++++++++--
 .../server/orm/dao/HostRoleCommandDAO.java      |  7 ++--
 .../ambari/server/orm/dao/RequestDAO.java       | 24 ++++++------
 .../server/orm/dao/TopologyHostTaskDAO.java     |  7 +++-
 .../orm/dao/TopologyLogicalRequestDAO.java      |  6 ++-
 .../server/orm/dao/TopologyLogicalTaskDAO.java  |  8 +++-
 .../orm/entities/TopologyHostTaskEntity.java    |  2 +-
 .../entities/TopologyLogicalRequestEntity.java  |  2 +-
 .../orm/entities/TopologyLogicalTaskEntity.java |  2 +-
 ambari-server/src/main/python/ambari-server.py  | 16 ++++----
 .../src/main/python/ambari_server/dbCleanup.py  | 34 ++++++++---------
 .../main/python/ambari_server/setupActions.py   |  2 +-
 .../server/cleanup/CleanupServiceImplTest.java  | 40 +++++++++++++++++++-
 17 files changed, 162 insertions(+), 65 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ambari/blob/32ee5995/ambari-server/sbin/ambari-server
----------------------------------------------------------------------
diff --git a/ambari-server/sbin/ambari-server b/ambari-server/sbin/ambari-server
index 7044096..cc1c923 100755
--- a/ambari-server/sbin/ambari-server
+++ b/ambari-server/sbin/ambari-server
@@ -174,8 +174,8 @@ case "${1:-}" in
         echo -e "Setting up SSO authentication properties..."
         $PYTHON "$AMBARI_PYTHON_EXECUTABLE" $@
         ;;
-  db-cleanup)
-        echo -e "Cleanup database..."
+  db-purge-history)
+        echo -e "Purge database history..."
         $PYTHON "$AMBARI_PYTHON_EXECUTABLE" $@
         ;;
   install-mpack)
@@ -196,7 +196,7 @@ case "${1:-}" in
         ;;
   *)
         echo "Usage: $AMBARI_EXECUTABLE
-        
{start|stop|reset|restart|upgrade|status|upgradestack|setup|setup-jce|setup-ldap|sync-ldap|set-current|setup-security|refresh-stack-hash|backup|restore|update-host-names|check-database|enable-stack|setup-sso|db-cleanup|install-mpack|uninstall-mpack|upgrade-mpack|setup-kerberos}
 [options]
+        
{start|stop|reset|restart|upgrade|status|upgradestack|setup|setup-jce|setup-ldap|sync-ldap|set-current|setup-security|refresh-stack-hash|backup|restore|update-host-names|check-database|enable-stack|setup-sso|db-purge-history|install-mpack|uninstall-mpack|upgrade-mpack|setup-kerberos}
 [options]
         Use $AMBARI_PYTHON_EXECUTABLE <action> --help to get details on 
options available.
         Or, simply invoke ambari-server.py --help to print the options."
         exit 1

http://git-wip-us.apache.org/repos/asf/ambari/blob/32ee5995/ambari-server/src/main/java/org/apache/ambari/server/checks/DatabaseConsistencyCheckHelper.java
----------------------------------------------------------------------
diff --git 
a/ambari-server/src/main/java/org/apache/ambari/server/checks/DatabaseConsistencyCheckHelper.java
 
b/ambari-server/src/main/java/org/apache/ambari/server/checks/DatabaseConsistencyCheckHelper.java
index 70c3661..29c1222 100644
--- 
a/ambari-server/src/main/java/org/apache/ambari/server/checks/DatabaseConsistencyCheckHelper.java
+++ 
b/ambari-server/src/main/java/org/apache/ambari/server/checks/DatabaseConsistencyCheckHelper.java
@@ -328,7 +328,7 @@ public class DatabaseConsistencyCheckHelper {
 
         if (tableSizeInMB != null && tableSizeInMB > TABLE_SIZE_LIMIT_MB) {
           warning("The database table {} is currently {} MB (limit is {}) and 
may impact performance. It is recommended " +
-                  "that you reduce its size by executing \"ambari-server 
db-cleanup\".",
+                  "that you reduce its size by executing \"ambari-server 
db-purge-history\".",
                   tableName, tableSizeInMB, TABLE_SIZE_LIMIT_MB);
         } else if (tableSizeInMB != null && tableSizeInMB < 
TABLE_SIZE_LIMIT_MB) {
           LOG.info(String.format("The database table %s is currently %.3f MB 
and is within normal limits (%.3f)",
@@ -348,7 +348,7 @@ public class DatabaseConsistencyCheckHelper {
 
           if (tableRowCount > TABLE_ROW_COUNT_LIMIT) {
             warning("The database table {} currently has {} rows (limit is {}) 
and may impact performance. It is " +
-                    "recommended that you reduce its size by executing 
\"ambari-server db-cleanup\".",
+                    "recommended that you reduce its size by executing 
\"ambari-server db-purge-history\".",
                     tableName, tableRowCount, TABLE_ROW_COUNT_LIMIT);
           } else if (tableRowCount != -1 && tableRowCount < 
TABLE_ROW_COUNT_LIMIT) {
             LOG.info(String.format("The database table %s currently has %d 
rows and is within normal limits (%d)", tableName, tableRowCount, 
TABLE_ROW_COUNT_LIMIT));

http://git-wip-us.apache.org/repos/asf/ambari/blob/32ee5995/ambari-server/src/main/java/org/apache/ambari/server/cleanup/CleanupDriver.java
----------------------------------------------------------------------
diff --git 
a/ambari-server/src/main/java/org/apache/ambari/server/cleanup/CleanupDriver.java
 
b/ambari-server/src/main/java/org/apache/ambari/server/cleanup/CleanupDriver.java
index 788290b..b4c8369 100644
--- 
a/ambari-server/src/main/java/org/apache/ambari/server/cleanup/CleanupDriver.java
+++ 
b/ambari-server/src/main/java/org/apache/ambari/server/cleanup/CleanupDriver.java
@@ -49,7 +49,7 @@ public class CleanupDriver {
   private static Options getOptions() {
     Options options = new Options();
     options.addOption(Option.builder().longOpt(CLUSTER_NAME_ARG).desc("The 
cluster name").required().type(String.class).hasArg().valueSeparator(' 
').build());
-    options.addOption(Option.builder().longOpt(FROM_DATE_ARG).desc("The day 
from which the cleanup 
runs").required().type(String.class).hasArg().valueSeparator(' ').build());
+    options.addOption(Option.builder().longOpt(FROM_DATE_ARG).desc("Date up 
until data will be 
purged.").required().type(String.class).hasArg().valueSeparator(' ').build());
     return options;
   }
 
@@ -67,7 +67,7 @@ public class CleanupDriver {
     } catch (Exception exp) {
       System.err.println("Parsing failed.  Reason: " + exp.getMessage());
       LOGGER.error("Parsing failed.  Reason: ", exp);
-      formatter.printHelp("cleanup", getOptions());
+      formatter.printHelp("db-purge-history", getOptions());
       System.exit(1);
     }
     return ctx;
@@ -75,7 +75,7 @@ public class CleanupDriver {
 
 
   public static void main(String... args) throws Exception {
-    LOGGER.info("DB-CLEANUP - Starting the cleanup process ...");
+    LOGGER.info("DB-PURGE - Starting the database purge process ...");
 
     CleanupContext cleanupContext = processArguments(args);
 
@@ -86,12 +86,17 @@ public class CleanupDriver {
     injector.getInstance(AmbariJpaPersistService.class).start();
 
     CleanupServiceImpl cleanupService = 
injector.getInstance(CleanupServiceImpl.class);
-    long affected = cleanupService.cleanup(new 
TimeBasedCleanupPolicy(cleanupContext.getClusterName(), 
cleanupContext.getFromDayTimestamp()));
+    CleanupService.CleanupResult result = cleanupService.cleanup(new 
TimeBasedCleanupPolicy(cleanupContext.getClusterName(), 
cleanupContext.getFromDayTimestamp()));
 
     // explicitly stopping the persist service
     injector.getInstance(AmbariJpaPersistService.class).stop();
 
-    LOGGER.info("DB-CLEANUP - completed. Number of affected records [{}]", 
affected);
+    if (result.getErrorCount() > 0) {
+      LOGGER.warn("DB-PURGE - completed with error, check Ambari Server log 
for details ! Number of affected records [{}]", result.getAffectedRows());
+      System.exit(2);
+    }
+
+    LOGGER.info("DB-PURGE - completed. Number of affected records [{}]", 
result.getAffectedRows());
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/ambari/blob/32ee5995/ambari-server/src/main/java/org/apache/ambari/server/cleanup/CleanupService.java
----------------------------------------------------------------------
diff --git 
a/ambari-server/src/main/java/org/apache/ambari/server/cleanup/CleanupService.java
 
b/ambari-server/src/main/java/org/apache/ambari/server/cleanup/CleanupService.java
index 880207c..8ac0b92 100644
--- 
a/ambari-server/src/main/java/org/apache/ambari/server/cleanup/CleanupService.java
+++ 
b/ambari-server/src/main/java/org/apache/ambari/server/cleanup/CleanupService.java
@@ -23,11 +23,28 @@ package org.apache.ambari.server.cleanup;
  */
 public interface CleanupService<T> {
 
+  interface CleanupResult {
+    /**
+     * Returns the number of rows deleted by the cleanup
+     * @return The total number of rows deleted by the cleanup
+     */
+    long getAffectedRows();
+
+    /**
+     * The cleanup process executes the specific cleanup operations via
+     * {@link org.apache.ambari.server.orm.dao.Cleanable} implementations.
+     * Some of these may fail during the cleanup process. This method returns
+     * the number of failed clean ups.
+     * @return The number of failed cleanups.
+     */
+    int getErrorCount();
+  }
+
   /**
    * Triggers the cleanup for the given cleanup policy.
    *
    * @param cleanupPolicy the cleanup policy based on which the cleanup is 
executed.
    * @return the affected "rows"
    */
-  long cleanup(T cleanupPolicy);
+  CleanupResult cleanup(T cleanupPolicy);
 }

http://git-wip-us.apache.org/repos/asf/ambari/blob/32ee5995/ambari-server/src/main/java/org/apache/ambari/server/cleanup/CleanupServiceImpl.java
----------------------------------------------------------------------
diff --git 
a/ambari-server/src/main/java/org/apache/ambari/server/cleanup/CleanupServiceImpl.java
 
b/ambari-server/src/main/java/org/apache/ambari/server/cleanup/CleanupServiceImpl.java
index 29a9041..0436c92 100644
--- 
a/ambari-server/src/main/java/org/apache/ambari/server/cleanup/CleanupServiceImpl.java
+++ 
b/ambari-server/src/main/java/org/apache/ambari/server/cleanup/CleanupServiceImpl.java
@@ -34,6 +34,25 @@ import com.google.inject.Singleton;
 public class CleanupServiceImpl implements 
CleanupService<TimeBasedCleanupPolicy> {
   private static final Logger LOGGER = 
LoggerFactory.getLogger(CleanupServiceImpl.class);
 
+  class Result implements CleanupResult {
+    private final long affectedRows;
+    private final int errorCount;
+
+    public Result(long affectedRows, int errorCount) {
+      this.affectedRows = affectedRows;
+      this.errorCount = errorCount;
+    }
+
+    @Override
+    public long getAffectedRows() {
+      return affectedRows;
+    }
+
+    @Override
+    public int getErrorCount() {
+      return errorCount;
+    }
+  }
 
   // this Set is automatically populated by the guice framework (based on the 
cleanup interface)
   private Set<Cleanable> cleanables;
@@ -54,13 +73,21 @@ public class CleanupServiceImpl implements 
CleanupService<TimeBasedCleanupPolicy
    * @param cleanupPolicy the policy based on which the cleanup is done
    * @return the number of affected rows
    */
-  public long cleanup(TimeBasedCleanupPolicy cleanupPolicy) {
+  public CleanupResult cleanup(TimeBasedCleanupPolicy cleanupPolicy) {
     long affectedRows = 0;
+    int errorCount = 0;
     for (Cleanable cleanable : cleanables) {
       LOGGER.info("Running the purge process for DAO: [{}] with cleanup 
policy: [{}]", cleanable, cleanupPolicy);
-      affectedRows += cleanable.cleanup(cleanupPolicy);
+      try {
+        affectedRows += cleanable.cleanup(cleanupPolicy);
+      }
+      catch (Exception ex) {
+        LOGGER.error("Running the purge process for DAO: [{}] failed with: 
{}", cleanable, ex);
+        errorCount++;
+      }
     }
-    return affectedRows;
+
+    return new Result(affectedRows, errorCount);
   }
 
 }

http://git-wip-us.apache.org/repos/asf/ambari/blob/32ee5995/ambari-server/src/main/java/org/apache/ambari/server/orm/dao/HostRoleCommandDAO.java
----------------------------------------------------------------------
diff --git 
a/ambari-server/src/main/java/org/apache/ambari/server/orm/dao/HostRoleCommandDAO.java
 
b/ambari-server/src/main/java/org/apache/ambari/server/orm/dao/HostRoleCommandDAO.java
index b9e1fab..14c8443 100644
--- 
a/ambari-server/src/main/java/org/apache/ambari/server/orm/dao/HostRoleCommandDAO.java
+++ 
b/ambari-server/src/main/java/org/apache/ambari/server/orm/dao/HostRoleCommandDAO.java
@@ -63,6 +63,7 @@ import com.google.common.cache.Cache;
 import com.google.common.cache.CacheBuilder;
 import com.google.common.cache.LoadingCache;
 import com.google.common.collect.Lists;
+import com.google.common.collect.Sets;
 import com.google.inject.Inject;
 import com.google.inject.Provider;
 import com.google.inject.Singleton;
@@ -933,9 +934,9 @@ public class HostRoleCommandDAO {
     }
   }
 
-  public List<Long> 
findTaskIdsByRequestStageIds(List<RequestDAO.StageEntityPK> requestStageIds) {
+  public Set<Long> findTaskIdsByRequestStageIds(List<RequestDAO.StageEntityPK> 
requestStageIds) {
     EntityManager entityManager = entityManagerProvider.get();
-    List<Long> taskIds = new ArrayList<Long>();
+    List<Long> taskIds = new ArrayList<>();
     for (RequestDAO.StageEntityPK requestIds : requestStageIds) {
       TypedQuery<Long> hostRoleCommandQuery =
               
entityManager.createNamedQuery("HostRoleCommandEntity.findTaskIdsByRequestStageIds",
 Long.class);
@@ -946,6 +947,6 @@ public class HostRoleCommandDAO {
       taskIds.addAll(daoUtils.selectList(hostRoleCommandQuery));
     }
 
-    return taskIds;
+    return Sets.newHashSet(taskIds);
   }
 }

http://git-wip-us.apache.org/repos/asf/ambari/blob/32ee5995/ambari-server/src/main/java/org/apache/ambari/server/orm/dao/RequestDAO.java
----------------------------------------------------------------------
diff --git 
a/ambari-server/src/main/java/org/apache/ambari/server/orm/dao/RequestDAO.java 
b/ambari-server/src/main/java/org/apache/ambari/server/orm/dao/RequestDAO.java
index 38c0977..4007237 100644
--- 
a/ambari-server/src/main/java/org/apache/ambari/server/orm/dao/RequestDAO.java
+++ 
b/ambari-server/src/main/java/org/apache/ambari/server/orm/dao/RequestDAO.java
@@ -215,8 +215,8 @@ public class RequestDAO implements Cleanable {
    * Retrieves from the database for a cluster, or specifically for 
non-cluster requests.
    * This method should be considered temporary until Request/Stage/Task 
cleanup is achieved.
    *
-   * @param maxResults  the max number to return
-   * @param ascOrder    {@code true} to sort by requestId ascending, {@code 
false} for descending
+   * @param limit  the max number to return
+   * @param sortAscending    {@code true} to sort by requestId ascending, 
{@code false} for descending
    * @param clusterId   the cluster to find, or {@code null} to search for 
requests without cluster
    */
   @RequiresSession
@@ -268,12 +268,12 @@ public class RequestDAO implements Cleanable {
    * Search for all request ids in Upgrade table
    * @return the list of request ids
    */
-  private List<Long> findAllRequestIdsFromUpgrade() {
+  private Set<Long> findAllRequestIdsFromUpgrade() {
     EntityManager entityManager = entityManagerProvider.get();
     TypedQuery<Long> upgradeQuery =
             entityManager.createNamedQuery("UpgradeEntity.findAllRequestIds", 
Long.class);
 
-    return daoUtils.selectList(upgradeQuery);
+    return Sets.newHashSet(daoUtils.selectList(upgradeQuery));
   }
 
   /**
@@ -376,7 +376,7 @@ public class RequestDAO implements Cleanable {
 
       // find request ids from Upgrade table and exclude these ids from
       // request ids set that we already have. We don't want to make any 
changes for upgrade
-      Set<Long> requestIdsFromUpgrade = 
Sets.newHashSet(findAllRequestIdsFromUpgrade());
+      Set<Long> requestIdsFromUpgrade = findAllRequestIdsFromUpgrade();
       Iterator<StageEntityPK> requestStageIdsIterator =  
requestStageIds.iterator();
       while (requestStageIdsIterator.hasNext()) {
         StageEntityPK nextRequestStageIds = requestStageIdsIterator.next();
@@ -392,24 +392,24 @@ public class RequestDAO implements Cleanable {
       }
 
       // find task ids using request stage ids
-      Set<Long> taskIds = 
Sets.newHashSet(hostRoleCommandDAO.findTaskIdsByRequestStageIds(requestStageIds));
+      Set<Long> taskIds = 
hostRoleCommandDAO.findTaskIdsByRequestStageIds(requestStageIds);
       LinkedList<String> params = new LinkedList<>();
       params.add("stageId");
       params.add("requestId");
 
       // find host task ids, to find related host requests and also to remove 
needed host tasks
-      List<Long> hostTaskIds = new ArrayList<>();
+      Set<Long> hostTaskIds = new HashSet<>();
       if (taskIds != null && !taskIds.isEmpty()) {
-        hostTaskIds = 
topologyLogicalTaskDAO.findHostTaskIdsByPhysicalTaskIds(Lists.newArrayList(taskIds));
+        hostTaskIds = 
topologyLogicalTaskDAO.findHostTaskIdsByPhysicalTaskIds(taskIds);
       }
 
       // find host request ids by host task ids to remove later needed host 
requests
-      List<Long> hostRequestIds = new ArrayList<>();
+      Set<Long> hostRequestIds = new HashSet<>();
       if (!hostTaskIds.isEmpty()) {
         hostRequestIds = 
topologyHostTaskDAO.findHostRequestIdsByHostTaskIds(hostTaskIds);
       }
 
-      List<Long> topologyRequestIds = new ArrayList<>();
+      Set<Long> topologyRequestIds = new HashSet<>();
       if (!hostRequestIds.isEmpty()) {
         topologyRequestIds = 
topologyLogicalRequestDAO.findRequestIdsByIds(hostRequestIds);
       }
@@ -420,9 +420,9 @@ public class RequestDAO implements Cleanable {
               "ExecutionCommandEntity.removeByTaskIds", 
ExecutionCommandEntity.class);
       affectedRows += cleanTableByIds(taskIds, "taskIds", 
"TopologyLogicalTask", policy.getToDateInMillis(),
               "TopologyLogicalTaskEntity.removeByPhysicalTaskIds", 
TopologyLogicalTaskEntity.class);
-      affectedRows += cleanTableByIds(Sets.newHashSet(hostTaskIds), 
"hostTaskIds", "TopologyHostTask", policy.getToDateInMillis(),
+      affectedRows += cleanTableByIds(hostTaskIds, "hostTaskIds", 
"TopologyHostTask", policy.getToDateInMillis(),
               "TopologyHostTaskEntity.removeByTaskIds", 
TopologyHostTaskEntity.class);
-      affectedRows += cleanTableByIds(Sets.newHashSet(hostRequestIds), 
"hostRequestIds", "TopologyHostRequest", policy.getToDateInMillis(),
+      affectedRows += cleanTableByIds(hostRequestIds, "hostRequestIds", 
"TopologyHostRequest", policy.getToDateInMillis(),
               "TopologyHostRequestEntity.removeByIds", 
TopologyHostRequestEntity.class);
       for (Long topologyRequestId : topologyRequestIds) {
         topologyRequestDAO.removeByPK(topologyRequestId);

http://git-wip-us.apache.org/repos/asf/ambari/blob/32ee5995/ambari-server/src/main/java/org/apache/ambari/server/orm/dao/TopologyHostTaskDAO.java
----------------------------------------------------------------------
diff --git 
a/ambari-server/src/main/java/org/apache/ambari/server/orm/dao/TopologyHostTaskDAO.java
 
b/ambari-server/src/main/java/org/apache/ambari/server/orm/dao/TopologyHostTaskDAO.java
index eea8032..d7fc810 100644
--- 
a/ambari-server/src/main/java/org/apache/ambari/server/orm/dao/TopologyHostTaskDAO.java
+++ 
b/ambari-server/src/main/java/org/apache/ambari/server/orm/dao/TopologyHostTaskDAO.java
@@ -17,6 +17,7 @@
  */
 package org.apache.ambari.server.orm.dao;
 
+import com.google.common.collect.Sets;
 import com.google.inject.Inject;
 import com.google.inject.Provider;
 import com.google.inject.Singleton;
@@ -27,7 +28,9 @@ import 
org.apache.ambari.server.orm.entities.TopologyHostTaskEntity;
 import javax.persistence.EntityManager;
 import javax.persistence.TypedQuery;
 import java.util.Collection;
+import java.util.HashSet;
 import java.util.List;
+import java.util.Set;
 
 @Singleton
 public class TopologyHostTaskDAO {
@@ -52,14 +55,14 @@ public class TopologyHostTaskDAO {
   }
 
   @RequiresSession
-  public List<Long> findHostRequestIdsByHostTaskIds(List<Long> hostTaskIds) {
+  public Set<Long> findHostRequestIdsByHostTaskIds(Set<Long> hostTaskIds) {
     EntityManager entityManager = entityManagerProvider.get();
     TypedQuery<Long> topologyHostTaskQuery =
             
entityManager.createNamedQuery("TopologyLogicalTaskEntity.findHostRequestIdsByHostTaskIds",
 Long.class);
 
     topologyHostTaskQuery.setParameter("hostTaskIds", hostTaskIds);
 
-    return daoUtils.selectList(topologyHostTaskQuery);
+    return Sets.newHashSet(daoUtils.selectList(topologyHostTaskQuery));
   }
 
   @RequiresSession

http://git-wip-us.apache.org/repos/asf/ambari/blob/32ee5995/ambari-server/src/main/java/org/apache/ambari/server/orm/dao/TopologyLogicalRequestDAO.java
----------------------------------------------------------------------
diff --git 
a/ambari-server/src/main/java/org/apache/ambari/server/orm/dao/TopologyLogicalRequestDAO.java
 
b/ambari-server/src/main/java/org/apache/ambari/server/orm/dao/TopologyLogicalRequestDAO.java
index 32a38da..bc61e0b 100644
--- 
a/ambari-server/src/main/java/org/apache/ambari/server/orm/dao/TopologyLogicalRequestDAO.java
+++ 
b/ambari-server/src/main/java/org/apache/ambari/server/orm/dao/TopologyLogicalRequestDAO.java
@@ -17,6 +17,7 @@
  */
 package org.apache.ambari.server.orm.dao;
 
+import com.google.common.collect.Sets;
 import com.google.inject.Inject;
 import com.google.inject.Provider;
 import com.google.inject.Singleton;
@@ -26,6 +27,7 @@ import 
org.apache.ambari.server.orm.entities.TopologyLogicalRequestEntity;
 import javax.persistence.EntityManager;
 import javax.persistence.TypedQuery;
 import java.util.List;
+import java.util.Set;
 
 @Singleton
 public class TopologyLogicalRequestDAO {
@@ -61,13 +63,13 @@ public class TopologyLogicalRequestDAO {
   }
 
   @RequiresSession
-  public List<Long> findRequestIdsByIds(List<Long> ids) {
+  public Set<Long> findRequestIdsByIds(Set<Long> ids) {
     EntityManager entityManager = entityManagerProvider.get();
     TypedQuery<Long> topologyLogicalRequestQuery =
             
entityManager.createNamedQuery("TopologyLogicalRequestEntity.findRequestIds", 
Long.class);
 
     topologyLogicalRequestQuery.setParameter("ids", ids);
 
-    return daoUtils.selectList(topologyLogicalRequestQuery);
+    return Sets.newHashSet(daoUtils.selectList(topologyLogicalRequestQuery));
   }
 }

http://git-wip-us.apache.org/repos/asf/ambari/blob/32ee5995/ambari-server/src/main/java/org/apache/ambari/server/orm/dao/TopologyLogicalTaskDAO.java
----------------------------------------------------------------------
diff --git 
a/ambari-server/src/main/java/org/apache/ambari/server/orm/dao/TopologyLogicalTaskDAO.java
 
b/ambari-server/src/main/java/org/apache/ambari/server/orm/dao/TopologyLogicalTaskDAO.java
index 3a72aed..4660766 100644
--- 
a/ambari-server/src/main/java/org/apache/ambari/server/orm/dao/TopologyLogicalTaskDAO.java
+++ 
b/ambari-server/src/main/java/org/apache/ambari/server/orm/dao/TopologyLogicalTaskDAO.java
@@ -17,6 +17,7 @@
  */
 package org.apache.ambari.server.orm.dao;
 
+import com.google.common.collect.Sets;
 import com.google.inject.Inject;
 import com.google.inject.Provider;
 import com.google.inject.Singleton;
@@ -26,7 +27,10 @@ import 
org.apache.ambari.server.orm.entities.TopologyLogicalTaskEntity;
 
 import javax.persistence.EntityManager;
 import javax.persistence.TypedQuery;
+
+import java.util.HashSet;
 import java.util.List;
+import java.util.Set;
 
 @Singleton
 public class TopologyLogicalTaskDAO {
@@ -42,14 +46,14 @@ public class TopologyLogicalTaskDAO {
   }
 
   @RequiresSession
-  public List<Long> findHostTaskIdsByPhysicalTaskIds(List<Long> 
physicalTaskIds) {
+  public Set<Long> findHostTaskIdsByPhysicalTaskIds(Set<Long> physicalTaskIds) 
{
     EntityManager entityManager = entityManagerProvider.get();
     TypedQuery<Long> topologyHostTaskQuery =
             
entityManager.createNamedQuery("TopologyLogicalTaskEntity.findHostTaskIdsByPhysicalTaskIds",
 Long.class);
 
     topologyHostTaskQuery.setParameter("physicalTaskIds", physicalTaskIds);
 
-    return daoUtils.selectList(topologyHostTaskQuery);
+    return Sets.newHashSet(daoUtils.selectList(topologyHostTaskQuery));
   }
 
   @RequiresSession

http://git-wip-us.apache.org/repos/asf/ambari/blob/32ee5995/ambari-server/src/main/java/org/apache/ambari/server/orm/entities/TopologyHostTaskEntity.java
----------------------------------------------------------------------
diff --git 
a/ambari-server/src/main/java/org/apache/ambari/server/orm/entities/TopologyHostTaskEntity.java
 
b/ambari-server/src/main/java/org/apache/ambari/server/orm/entities/TopologyHostTaskEntity.java
index 37830b7..549ba7e 100644
--- 
a/ambari-server/src/main/java/org/apache/ambari/server/orm/entities/TopologyHostTaskEntity.java
+++ 
b/ambari-server/src/main/java/org/apache/ambari/server/orm/entities/TopologyHostTaskEntity.java
@@ -42,7 +42,7 @@ import java.util.Collection;
   @NamedQuery(name = "TopologyHostTaskEntity.findByHostRequest",
       query = "SELECT req FROM TopologyHostTaskEntity req WHERE 
req.topologyHostRequestEntity.id = :hostRequestId"),
   @NamedQuery(name = 
"TopologyLogicalTaskEntity.findHostRequestIdsByHostTaskIds",
-      query = "SELECT tht.hostRequestId from TopologyHostTaskEntity tht WHERE 
tht.id IN :hostTaskIds"),
+      query = "SELECT DISTINCT tht.hostRequestId from TopologyHostTaskEntity 
tht WHERE tht.id IN :hostTaskIds"),
   @NamedQuery(name = "TopologyHostTaskEntity.removeByTaskIds",
       query = "DELETE FROM TopologyHostTaskEntity tht WHERE tht.id IN 
:hostTaskIds")
 })

http://git-wip-us.apache.org/repos/asf/ambari/blob/32ee5995/ambari-server/src/main/java/org/apache/ambari/server/orm/entities/TopologyLogicalRequestEntity.java
----------------------------------------------------------------------
diff --git 
a/ambari-server/src/main/java/org/apache/ambari/server/orm/entities/TopologyLogicalRequestEntity.java
 
b/ambari-server/src/main/java/org/apache/ambari/server/orm/entities/TopologyLogicalRequestEntity.java
index 1536b80..ae13a49 100644
--- 
a/ambari-server/src/main/java/org/apache/ambari/server/orm/entities/TopologyLogicalRequestEntity.java
+++ 
b/ambari-server/src/main/java/org/apache/ambari/server/orm/entities/TopologyLogicalRequestEntity.java
@@ -36,7 +36,7 @@ import java.util.Collection;
 @Entity
 @Table(name = "topology_logical_request")
 @NamedQueries({
-  @NamedQuery(name = "TopologyLogicalRequestEntity.findRequestIds", query = 
"SELECT logicalrequest.topologyRequestId from TopologyLogicalRequestEntity 
logicalrequest WHERE logicalrequest.id IN :ids")
+  @NamedQuery(name = "TopologyLogicalRequestEntity.findRequestIds", query = 
"SELECT DISTINCT t.topologyLogicalRequestEntity.topologyRequestId from 
TopologyHostRequestEntity t WHERE t.id IN :ids")
 })
 public class TopologyLogicalRequestEntity {
   @Id

http://git-wip-us.apache.org/repos/asf/ambari/blob/32ee5995/ambari-server/src/main/java/org/apache/ambari/server/orm/entities/TopologyLogicalTaskEntity.java
----------------------------------------------------------------------
diff --git 
a/ambari-server/src/main/java/org/apache/ambari/server/orm/entities/TopologyLogicalTaskEntity.java
 
b/ambari-server/src/main/java/org/apache/ambari/server/orm/entities/TopologyLogicalTaskEntity.java
index 2954863..15238ff 100644
--- 
a/ambari-server/src/main/java/org/apache/ambari/server/orm/entities/TopologyLogicalTaskEntity.java
+++ 
b/ambari-server/src/main/java/org/apache/ambari/server/orm/entities/TopologyLogicalTaskEntity.java
@@ -36,7 +36,7 @@ import javax.persistence.TableGenerator;
   pkColumnName = "sequence_name", valueColumnName = "sequence_value",
   pkColumnValue = "topology_logical_task_id_seq", initialValue = 0)
 @NamedQueries({
-  @NamedQuery(name = 
"TopologyLogicalTaskEntity.findHostTaskIdsByPhysicalTaskIds", query = "SELECT 
logicaltask.hostTaskId from TopologyLogicalTaskEntity logicaltask WHERE 
logicaltask.physicalTaskId IN :physicalTaskIds"),
+  @NamedQuery(name = 
"TopologyLogicalTaskEntity.findHostTaskIdsByPhysicalTaskIds", query = "SELECT 
DISTINCT logicaltask.hostTaskId from TopologyLogicalTaskEntity logicaltask 
WHERE logicaltask.physicalTaskId IN :physicalTaskIds"),
   @NamedQuery(name = "TopologyLogicalTaskEntity.removeByPhysicalTaskIds", 
query = "DELETE FROM TopologyLogicalTaskEntity logicaltask WHERE 
logicaltask.physicalTaskId IN :taskIds")
 })
 public class TopologyLogicalTaskEntity {

http://git-wip-us.apache.org/repos/asf/ambari/blob/32ee5995/ambari-server/src/main/python/ambari-server.py
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/python/ambari-server.py 
b/ambari-server/src/main/python/ambari-server.py
index 235ef95..71459fb 100755
--- a/ambari-server/src/main/python/ambari-server.py
+++ b/ambari-server/src/main/python/ambari-server.py
@@ -42,7 +42,7 @@ from ambari_server.setupHttps import setup_https, 
setup_truststore
 from ambari_server.setupMpacks import install_mpack, uninstall_mpack, 
upgrade_mpack, STACK_DEFINITIONS_RESOURCE_NAME, \
   SERVICE_DEFINITIONS_RESOURCE_NAME, MPACKS_RESOURCE_NAME
 from ambari_server.setupSso import setup_sso
-from ambari_server.dbCleanup import db_cleanup
+from ambari_server.dbCleanup import db_purge
 from ambari_server.hostUpdate import update_host_names
 from ambari_server.checkDatabase import check_database
 from ambari_server.enableStack import enable_stack_version
@@ -52,7 +52,7 @@ from ambari_server.setupActions import BACKUP_ACTION, 
LDAP_SETUP_ACTION, LDAP_SY
   SETUP_ACTION, SETUP_SECURITY_ACTION,START_ACTION, STATUS_ACTION, 
STOP_ACTION, RESTART_ACTION, UPGRADE_ACTION, \
   UPGRADE_STACK_ACTION, SETUP_JCE_ACTION, SET_CURRENT_ACTION, START_ACTION, 
STATUS_ACTION, STOP_ACTION, UPGRADE_ACTION, \
   UPGRADE_STACK_ACTION, SETUP_JCE_ACTION, SET_CURRENT_ACTION, 
ENABLE_STACK_ACTION, SETUP_SSO_ACTION, \
-  DB_CLEANUP_ACTION, INSTALL_MPACK_ACTION, UNINSTALL_MPACK_ACTION, 
UPGRADE_MPACK_ACTION, PAM_SETUP_ACTION, KERBEROS_SETUP_ACTION
+  DB_PURGE_ACTION, INSTALL_MPACK_ACTION, UNINSTALL_MPACK_ACTION, 
UPGRADE_MPACK_ACTION, PAM_SETUP_ACTION, KERBEROS_SETUP_ACTION
 from ambari_server.setupSecurity import setup_ldap, sync_ldap, 
setup_master_key, setup_ambari_krb5_jaas, setup_pam
 from ambari_server.userInput import get_validated_string_input
 from ambari_server.kerberos_setup import setup_kerberos
@@ -200,11 +200,11 @@ def restart(args):
 
 
 @OsFamilyFuncImpl(OsFamilyImpl.DEFAULT)
-def database_cleanup(args):
-  logger.info("Database cleanup.")
+def database_purge(args):
+  logger.info("Purging historical data from database.")
   if args.silent:
     stop(args)
-  db_cleanup(args)
+  db_purge(args)
 
 #
 # The Ambari Server status.
@@ -472,7 +472,7 @@ def init_parser_options(parser):
                     help="Print verbose status messages")
   parser.add_option("-s", "--silent",
                     action="store_true", dest="silent", default=False,
-                    help="Silently accepts default prompt values. For 
db-cleanup command, silent mode will stop ambari server.")
+                    help="Silently accepts default prompt values. For 
db-purge-history command, silent mode will stop ambari server.")
   parser.add_option('-g', '--debug', action="store_true", dest='debug', 
default=False,
                     help="Start ambari-server in debug mode")
   parser.add_option('-y', '--suspend-start', action="store_true", 
dest='suspend_start', default=False,
@@ -517,7 +517,7 @@ def init_parser_options(parser):
                     help="Specify stack version that needs to be enabled. All 
other stacks versions will be disabled")
   parser.add_option('--stack', dest="stack_name", default=None, type="string",
                     help="Specify stack name for the stack versions that needs 
to be enabled")
-  parser.add_option("-d", "--from-date", dest="cleanup_from_date", 
default=None, type="string", help="Specify date for the cleanup process in 
'yyyy-MM-dd' format")
+  parser.add_option("-d", "--from-date", dest="purge_from_date", default=None, 
type="string", help="Specify date for the database purge process in 
'yyyy-MM-dd' format")
   add_parser_options('--mpack',
       default=None,
       help="Specify the path for management pack to be installed/upgraded",
@@ -762,7 +762,7 @@ def create_user_action_map(args, options):
         CHECK_DATABASE_ACTION: UserAction(check_database, options),
         ENABLE_STACK_ACTION: UserAction(enable_stack, options, args),
         SETUP_SSO_ACTION: UserActionRestart(setup_sso, options),
-        DB_CLEANUP_ACTION: UserAction(database_cleanup, options),
+        DB_PURGE_ACTION: UserAction(database_purge, options),
         INSTALL_MPACK_ACTION: UserAction(install_mpack, options),
         UNINSTALL_MPACK_ACTION: UserAction(uninstall_mpack, options),
         UPGRADE_MPACK_ACTION: UserAction(upgrade_mpack, options),

http://git-wip-us.apache.org/repos/asf/ambari/blob/32ee5995/ambari-server/src/main/python/ambari_server/dbCleanup.py
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/python/ambari_server/dbCleanup.py 
b/ambari-server/src/main/python/ambari_server/dbCleanup.py
index 6e16bc5..2611141 100644
--- a/ambari-server/src/main/python/ambari_server/dbCleanup.py
+++ b/ambari-server/src/main/python/ambari_server/dbCleanup.py
@@ -37,7 +37,7 @@ DB_CLEANUP_CMD = "{0} -cp {1} 
org.apache.ambari.server.cleanup.CleanupDriver --c
 #
 # Run the db cleanup process
 #
-def run_db_cleanup(options):
+def run_db_purge(options):
 
     if validate_args(options):
         return 1
@@ -50,18 +50,18 @@ def run_db_cleanup(options):
       confirmBackup = get_YN_input("Ambari Server configured for {0}. Confirm 
you have made a backup of the Ambari Server database [y/n]".format(
               db_title), True)
       if not confirmBackup:
-          print_info_msg("Ambari Server Database cleanup aborted")
+          print_info_msg("Ambari Server Database purge aborted")
           return 0
 
       if status:
-          print_error_msg("The database cleanup cannot proceed while Ambari 
Server is running. Please shut down Ambari first.")
+          print_error_msg("The database purge historical data cannot proceed 
while Ambari Server is running. Please shut down Ambari first.")
           return 1
 
       confirm = get_YN_input(
-          "Ambari server is using db type {0}. Cleanable database entries 
older than {1} will be cleaned up. Proceed [y/n]".format(
-              db_title, options.cleanup_from_date), True)
+          "Ambari server is using db type {0}. Cleanable database entries 
older than {1} will be purged. Proceed [y/n]".format(
+              db_title, options.purge_from_date), True)
       if not confirm:
-          print_info_msg("Ambari Server Database cleanup aborted")
+          print_info_msg("Ambari Server Database purge aborted")
           return 0
 
 
@@ -81,31 +81,31 @@ def run_db_cleanup(options):
     current_user = ensure_can_start_under_current_user(ambari_user)
     environ = generate_env(options, ambari_user, current_user)
 
-    print "Cleaning up the database ..."
-    command = DB_CLEANUP_CMD.format(jdk_path, class_path, 
options.cluster_name, options.cleanup_from_date)
+    print "Purging historical data from the database ..."
+    command = DB_CLEANUP_CMD.format(jdk_path, class_path, 
options.cluster_name, options.purge_from_date)
     (retcode, stdout, stderr) = run_os_command(command, env=environ)
 
     print_info_msg("Return code from database cleanup command, retcode = " + 
str(retcode))
 
     if stdout:
-        print "Console output from database cleanup command:"
+        print "Console output from database purge-history command:"
         print stdout
         print
     if stderr:
-        print "Error output from database cleanup command:"
+        print "Error output from database purge-history command:"
         print stderr
         print
     if retcode > 0:
-        print_error_msg("Error wncountered while cleaning up the Ambari Server 
Database. Check the ambari-server.log for details.")
+        print_error_msg("Error encountered while purging the Ambari Server 
Database. Check the ambari-server.log for details.")
     else:
-        print "Cleanup completed. Check the ambari-server.log for details."
+        print "Purging historical data completed. Check the ambari-server.log 
for details."
     return retcode
 
 #
-# Database cleanup
+# Database purge
 #
-def db_cleanup(options):
-    return run_db_cleanup(options)
+def db_purge(options):
+    return run_db_purge(options)
 
 
 def validate_args(options):
@@ -113,12 +113,12 @@ def validate_args(options):
         print_error_msg("Please provide the --cluster-name argument.")
         return 1
 
-    if not options.cleanup_from_date:
+    if not options.purge_from_date:
         print_error_msg("Please provide the --from-date argument.")
         return 1
 
     try:
-        datetime.datetime.strptime(options.cleanup_from_date, "%Y-%m-%d")
+        datetime.datetime.strptime(options.purge_from_date, "%Y-%m-%d")
     except ValueError as e:
         print_error_msg("The --from-date argument has an invalid format. 
{0}".format(e.args[0]))
         return 1;

http://git-wip-us.apache.org/repos/asf/ambari/blob/32ee5995/ambari-server/src/main/python/ambari_server/setupActions.py
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/python/ambari_server/setupActions.py 
b/ambari-server/src/main/python/ambari_server/setupActions.py
index 358bfc9..142a4d7 100644
--- a/ambari-server/src/main/python/ambari_server/setupActions.py
+++ b/ambari-server/src/main/python/ambari_server/setupActions.py
@@ -43,7 +43,7 @@ BACKUP_ACTION = "backup"
 RESTORE_ACTION = "restore"
 SETUP_JCE_ACTION = "setup-jce"
 ENABLE_STACK_ACTION = "enable-stack"
-DB_CLEANUP_ACTION = "db-cleanup"
+DB_PURGE_ACTION = "db-purge-history"
 INSTALL_MPACK_ACTION = "install-mpack"
 UNINSTALL_MPACK_ACTION = "uninstall-mpack"
 UPGRADE_MPACK_ACTION = "upgrade-mpack"

http://git-wip-us.apache.org/repos/asf/ambari/blob/32ee5995/ambari-server/src/test/java/org/apache/ambari/server/cleanup/CleanupServiceImplTest.java
----------------------------------------------------------------------
diff --git 
a/ambari-server/src/test/java/org/apache/ambari/server/cleanup/CleanupServiceImplTest.java
 
b/ambari-server/src/test/java/org/apache/ambari/server/cleanup/CleanupServiceImplTest.java
index 7de5aae..33c2844 100644
--- 
a/ambari-server/src/test/java/org/apache/ambari/server/cleanup/CleanupServiceImplTest.java
+++ 
b/ambari-server/src/test/java/org/apache/ambari/server/cleanup/CleanupServiceImplTest.java
@@ -32,6 +32,7 @@ import junit.framework.Assert;
 
 import static org.easymock.EasyMock.capture;
 import static org.easymock.EasyMock.expect;
+import static org.easymock.EasyMock.expectLastCall;
 import static org.easymock.EasyMock.newCapture;
 import static org.easymock.EasyMock.replay;
 import static org.easymock.EasyMock.reset;
@@ -71,7 +72,7 @@ public class CleanupServiceImplTest {
     cleanupServiceImpl = new CleanupServiceImpl(cleanables);
 
     // WHEN
-    long rows = cleanupServiceImpl.cleanup(cleanupPolicy);
+    cleanupServiceImpl.cleanup(cleanupPolicy);
 
     // THEN
     Assert.assertNotNull("The argument is null", 
timeBasedCleanupPolicyCapture.getValue());
@@ -79,4 +80,41 @@ public class CleanupServiceImplTest {
     Assert.assertEquals("The to date is wrong!", 
timeBasedCleanupPolicyCapture.getValue().getToDateInMillis(), 
FROM_DATE_TIMESTAMP);
   }
 
+  @Test
+  public void testAffectedRowsNoError() throws Exception {
+    // GIVEN
+    cleanables = new HashSet<>();
+    cleanables.add(cleanableDao);
+    expect(cleanableDao.cleanup(cleanupPolicy)).andReturn(2L);
+
+    replay(cleanableDao);
+    cleanupServiceImpl = new CleanupServiceImpl(cleanables);
+
+    // WHEN
+    CleanupService.CleanupResult res = 
cleanupServiceImpl.cleanup(cleanupPolicy);
+
+    // THEN
+    Assert.assertEquals("The affected rows count is wrong", 2L, 
res.getAffectedRows());
+    Assert.assertEquals("The error count is wrong", 0L, res.getErrorCount());
+  }
+
+  @Test
+  public void testAffectedRowsWithErrors() throws Exception {
+    // GIVEN
+    cleanables = new HashSet<>();
+    cleanables.add(cleanableDao);
+    expect(cleanableDao.cleanup(cleanupPolicy)).andThrow(new 
RuntimeException());
+
+
+    replay(cleanableDao);
+    cleanupServiceImpl = new CleanupServiceImpl(cleanables);
+
+    // WHEN
+    CleanupService.CleanupResult res = 
cleanupServiceImpl.cleanup(cleanupPolicy);
+
+    // THEN
+    Assert.assertEquals("The affected rows count is wrong", 0L, 
res.getAffectedRows());
+    Assert.assertEquals("The error count is wrong", 1L, res.getErrorCount());
+  }
+
 }
\ No newline at end of file

Reply via email to