Repository: ambari
Updated Branches:
  refs/heads/branch-2.5 24dcb1c85 -> c23ef5095


AMBARI-20687. Perf: Refactor ambari db-cleanup to include all big 
tables.(vbrodetskyi)


Project: http://git-wip-us.apache.org/repos/asf/ambari/repo
Commit: http://git-wip-us.apache.org/repos/asf/ambari/commit/c23ef509
Tree: http://git-wip-us.apache.org/repos/asf/ambari/tree/c23ef509
Diff: http://git-wip-us.apache.org/repos/asf/ambari/diff/c23ef509

Branch: refs/heads/branch-2.5
Commit: c23ef5095bbfffe910115b1cab4017f21f377e0a
Parents: 24dcb1c
Author: Vitaly Brodetskyi <vbrodets...@hortonworks.com>
Authored: Wed Apr 12 16:30:34 2017 +0300
Committer: Toader, Sebastian <stoa...@hortonworks.com>
Committed: Thu Jun 8 11:51:51 2017 +0200

----------------------------------------------------------------------
 .../checks/DatabaseConsistencyCheckHelper.java  | 111 ++++++++
 .../apache/ambari/server/orm/DBAccessor.java    |   6 +
 .../ambari/server/orm/DBAccessorImpl.java       |   5 +
 .../server/orm/dao/HostRoleCommandDAO.java      |  16 ++
 .../ambari/server/orm/dao/RequestDAO.java       | 260 ++++++++++++++++++-
 .../server/orm/dao/TopologyHostTaskDAO.java     |  11 +
 .../orm/dao/TopologyLogicalRequestDAO.java      |  12 +
 .../server/orm/dao/TopologyLogicalTaskDAO.java  |  12 +
 .../orm/entities/ExecutionCommandEntity.java    |  15 +-
 .../orm/entities/HostRoleCommandEntity.java     |  10 +-
 .../server/orm/entities/RequestEntity.java      |   6 +
 .../entities/RequestOperationLevelEntity.java   |   4 +-
 .../entities/RequestResourceFilterEntity.java   |   5 +
 .../orm/entities/RoleSuccessCriteriaEntity.java |   5 +
 .../ambari/server/orm/entities/StageEntity.java |   6 +-
 .../orm/entities/TopologyHostRequestEntity.java |   5 +
 .../orm/entities/TopologyHostTaskEntity.java    |  15 +-
 .../entities/TopologyLogicalRequestEntity.java  |   5 +
 .../orm/entities/TopologyLogicalTaskEntity.java |  30 ++-
 .../server/orm/entities/UpgradeEntity.java      |   2 +
 .../server/orm/entities/UpgradeItemEntity.java  |   5 +
 ambari-server/src/main/python/ambari-server.py  |  10 +-
 .../src/main/python/ambari_server/dbCleanup.py  |  37 +--
 .../DatabaseConsistencyCheckHelperTest.java     |  66 +++++
 24 files changed, 623 insertions(+), 36 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ambari/blob/c23ef509/ambari-server/src/main/java/org/apache/ambari/server/checks/DatabaseConsistencyCheckHelper.java
----------------------------------------------------------------------
diff --git 
a/ambari-server/src/main/java/org/apache/ambari/server/checks/DatabaseConsistencyCheckHelper.java
 
b/ambari-server/src/main/java/org/apache/ambari/server/checks/DatabaseConsistencyCheckHelper.java
index 4513fac..70c3661 100644
--- 
a/ambari-server/src/main/java/org/apache/ambari/server/checks/DatabaseConsistencyCheckHelper.java
+++ 
b/ambari-server/src/main/java/org/apache/ambari/server/checks/DatabaseConsistencyCheckHelper.java
@@ -24,6 +24,7 @@ import java.sql.ResultSet;
 import java.sql.SQLException;
 import java.sql.Statement;
 import java.util.ArrayList;
+import java.util.Arrays;
 import java.util.Collection;
 import java.util.HashMap;
 import java.util.HashSet;
@@ -177,6 +178,7 @@ public class DatabaseConsistencyCheckHelper {
       checkHostComponentStatesCountEqualsHostComponentsDesiredStates();
       checkServiceConfigs();
       checkTopologyTables();
+      checkForLargeTables();
       LOG.info("******************************* Check database completed 
*******************************");
       return checkResult;
     }
@@ -269,6 +271,115 @@ public class DatabaseConsistencyCheckHelper {
   }
 
   /**
+   * This method checks if ambari database has tables with too big size 
(according to limit).
+   * First of all we are trying to get table size from schema information, but 
if it's not possible,
+   * we will get tables rows count and compare it with row count limit.
+   */
+  static void checkForLargeTables() {
+    LOG.info("Checking for tables with large physical size");
+
+    ensureConnection();
+
+    DBAccessor.DbType dbType = dbAccessor.getDbType();
+    String schemaName = dbAccessor.getDbSchema();
+
+    String GET_TABLE_SIZE_IN_BYTES_POSTGRESQL = "SELECT 
pg_total_relation_size('%s') \"Table Size\"";
+    String GET_TABLE_SIZE_IN_BYTES_MYSQL = "SELECT (data_length + 
index_length) \"Table Size\" FROM information_schema.TABLES WHERE table_schema 
= \"" + schemaName + "\" AND table_name =\"%s\"";
+    String GET_TABLE_SIZE_IN_BYTES_ORACLE = "SELECT bytes \"Table Size\" FROM 
user_segments WHERE segment_type='TABLE' AND segment_name='%s'";
+    String GET_ROW_COUNT_QUERY = "SELECT COUNT(*) FROM %s";
+
+    Map<DBAccessor.DbType, String> tableSizeQueryMap = new HashMap<>();
+    tableSizeQueryMap.put(DBAccessor.DbType.POSTGRES, 
GET_TABLE_SIZE_IN_BYTES_POSTGRESQL);
+    tableSizeQueryMap.put(DBAccessor.DbType.MYSQL, 
GET_TABLE_SIZE_IN_BYTES_MYSQL);
+    tableSizeQueryMap.put(DBAccessor.DbType.ORACLE, 
GET_TABLE_SIZE_IN_BYTES_ORACLE);
+
+    List<String> tablesToCheck = Arrays.asList("host_role_command", 
"execution_command", "stage", "request", "alert_history");
+
+    final double TABLE_SIZE_LIMIT_MB = 3000.0;
+    final int TABLE_ROW_COUNT_LIMIT = 3000000;
+
+    String findTableSizeQuery = tableSizeQueryMap.get(dbType);
+
+    if (dbType == DBAccessor.DbType.ORACLE) {
+      for (int i = 0;i < tablesToCheck.size(); i++) {
+        tablesToCheck.set(i, tablesToCheck.get(i).toUpperCase());
+      }
+    }
+
+    for (String tableName : tablesToCheck) {
+
+      ResultSet rs = null;
+      Statement statement = null;
+      Double tableSizeInMB = null;
+      Long tableSizeInBytes = null;
+      int tableRowCount = -1;
+
+      try {
+        statement = 
connection.createStatement(ResultSet.TYPE_SCROLL_SENSITIVE, 
ResultSet.CONCUR_UPDATABLE);
+        rs = statement.executeQuery(String.format(findTableSizeQuery, 
tableName));
+        if (rs != null) {
+          while (rs.next()) {
+            tableSizeInBytes = rs.getLong(1);
+            if (tableSizeInBytes != null) {
+              tableSizeInMB = tableSizeInBytes / 1024.0 / 1024.0;
+            }
+          }
+        }
+
+        if (tableSizeInMB != null && tableSizeInMB > TABLE_SIZE_LIMIT_MB) {
+          warning("The database table {} is currently {} MB (limit is {}) and 
may impact performance. It is recommended " +
+                  "that you reduce its size by executing \"ambari-server 
db-cleanup\".",
+                  tableName, tableSizeInMB, TABLE_SIZE_LIMIT_MB);
+        } else if (tableSizeInMB != null && tableSizeInMB < 
TABLE_SIZE_LIMIT_MB) {
+          LOG.info(String.format("The database table %s is currently %.3f MB 
and is within normal limits (%.3f)",
+                  tableName, tableSizeInMB, TABLE_SIZE_LIMIT_MB));
+        } else {
+          throw new Exception();
+        }
+      } catch (Exception e) {
+        LOG.error(String.format("Failed to get %s table size from database, 
will check row count: ", tableName), e);
+        try {
+          rs = statement.executeQuery(String.format(GET_ROW_COUNT_QUERY, 
tableName));
+          if (rs != null) {
+            while (rs.next()) {
+              tableRowCount = rs.getInt(1);
+            }
+          }
+
+          if (tableRowCount > TABLE_ROW_COUNT_LIMIT) {
+            warning("The database table {} currently has {} rows (limit is {}) 
and may impact performance. It is " +
+                    "recommended that you reduce its size by executing 
\"ambari-server db-cleanup\".",
+                    tableName, tableRowCount, TABLE_ROW_COUNT_LIMIT);
+          } else if (tableRowCount != -1 && tableRowCount < 
TABLE_ROW_COUNT_LIMIT) {
+            LOG.info(String.format("The database table %s currently has %d 
rows and is within normal limits (%d)", tableName, tableRowCount, 
TABLE_ROW_COUNT_LIMIT));
+          } else {
+            throw new SQLException();
+          }
+        } catch (SQLException ex) {
+          LOG.error(String.format("Failed to get %s row count: ", tableName), 
e);
+        }
+      } finally {
+        if (rs != null) {
+          try {
+            rs.close();
+          } catch (SQLException e) {
+            LOG.error("Exception occurred during result set closing procedure: 
", e);
+          }
+        }
+
+        if (statement != null) {
+          try {
+            statement.close();
+          } catch (SQLException e) {
+            LOG.error("Exception occurred during statement closing procedure: 
", e);
+          }
+        }
+      }
+    }
+
+  }
+
+  /**
   * This method checks if any config type in clusterconfigmapping table, has
   * more than one versions selected. If config version is selected(in selected 
column = 1),
   * it means that this version of config is actual. So, if any config type has 
more

http://git-wip-us.apache.org/repos/asf/ambari/blob/c23ef509/ambari-server/src/main/java/org/apache/ambari/server/orm/DBAccessor.java
----------------------------------------------------------------------
diff --git 
a/ambari-server/src/main/java/org/apache/ambari/server/orm/DBAccessor.java 
b/ambari-server/src/main/java/org/apache/ambari/server/orm/DBAccessor.java
index fac524c..c637c05 100644
--- a/ambari-server/src/main/java/org/apache/ambari/server/orm/DBAccessor.java
+++ b/ambari-server/src/main/java/org/apache/ambari/server/orm/DBAccessor.java
@@ -663,6 +663,12 @@ public interface DBAccessor {
   DbType getDbType();
 
   /**
+   * Get database schema name
+   * @return @dbSchema
+   */
+  String getDbSchema();
+
+  /**
    * Capture column type
    */
   class DBColumnInfo {

http://git-wip-us.apache.org/repos/asf/ambari/blob/c23ef509/ambari-server/src/main/java/org/apache/ambari/server/orm/DBAccessorImpl.java
----------------------------------------------------------------------
diff --git 
a/ambari-server/src/main/java/org/apache/ambari/server/orm/DBAccessorImpl.java 
b/ambari-server/src/main/java/org/apache/ambari/server/orm/DBAccessorImpl.java
index c5b116c..0e2237c 100644
--- 
a/ambari-server/src/main/java/org/apache/ambari/server/orm/DBAccessorImpl.java
+++ 
b/ambari-server/src/main/java/org/apache/ambari/server/orm/DBAccessorImpl.java
@@ -233,6 +233,11 @@ public class DBAccessorImpl implements DBAccessor {
   }
 
   @Override
+  public String getDbSchema() {
+    return dbSchema;
+  }
+
+  @Override
   public boolean tableHasData(String tableName) throws SQLException {
     String query = "SELECT count(*) from " + tableName;
     Statement statement = getConnection().createStatement();

http://git-wip-us.apache.org/repos/asf/ambari/blob/c23ef509/ambari-server/src/main/java/org/apache/ambari/server/orm/dao/HostRoleCommandDAO.java
----------------------------------------------------------------------
diff --git 
a/ambari-server/src/main/java/org/apache/ambari/server/orm/dao/HostRoleCommandDAO.java
 
b/ambari-server/src/main/java/org/apache/ambari/server/orm/dao/HostRoleCommandDAO.java
index 7582957..b9e1fab 100644
--- 
a/ambari-server/src/main/java/org/apache/ambari/server/orm/dao/HostRoleCommandDAO.java
+++ 
b/ambari-server/src/main/java/org/apache/ambari/server/orm/dao/HostRoleCommandDAO.java
@@ -932,4 +932,20 @@ public class HostRoleCommandDAO {
       return HostRoleCommandEntity_.getPredicateMapping().get(propertyId);
     }
   }
+
+  public List<Long> 
findTaskIdsByRequestStageIds(List<RequestDAO.StageEntityPK> requestStageIds) {
+    EntityManager entityManager = entityManagerProvider.get();
+    List<Long> taskIds = new ArrayList<Long>();
+    for (RequestDAO.StageEntityPK requestIds : requestStageIds) {
+      TypedQuery<Long> hostRoleCommandQuery =
+              
entityManager.createNamedQuery("HostRoleCommandEntity.findTaskIdsByRequestStageIds",
 Long.class);
+
+      hostRoleCommandQuery.setParameter("requestId", 
requestIds.getRequestId());
+      hostRoleCommandQuery.setParameter("stageId", requestIds.getStageId());
+
+      taskIds.addAll(daoUtils.selectList(hostRoleCommandQuery));
+    }
+
+    return taskIds;
+  }
 }

http://git-wip-us.apache.org/repos/asf/ambari/blob/c23ef509/ambari-server/src/main/java/org/apache/ambari/server/orm/dao/RequestDAO.java
----------------------------------------------------------------------
diff --git 
a/ambari-server/src/main/java/org/apache/ambari/server/orm/dao/RequestDAO.java 
b/ambari-server/src/main/java/org/apache/ambari/server/orm/dao/RequestDAO.java
index 1c4d0a3..38c0977 100644
--- 
a/ambari-server/src/main/java/org/apache/ambari/server/orm/dao/RequestDAO.java
+++ 
b/ambari-server/src/main/java/org/apache/ambari/server/orm/dao/RequestDAO.java
@@ -19,27 +19,54 @@
 package org.apache.ambari.server.orm.dao;
 
 import java.text.MessageFormat;
+import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Collections;
+import java.util.Date;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.LinkedList;
 import java.util.List;
+import java.util.Set;
 
 import javax.persistence.EntityManager;
 import javax.persistence.TypedQuery;
 
+import org.apache.ambari.server.AmbariException;
 import org.apache.ambari.server.actionmanager.HostRoleStatus;
+import org.apache.ambari.server.cleanup.TimeBasedCleanupPolicy;
 import org.apache.ambari.server.orm.RequiresSession;
+import org.apache.ambari.server.orm.entities.ExecutionCommandEntity;
+import org.apache.ambari.server.orm.entities.HostRoleCommandEntity;
 import org.apache.ambari.server.orm.entities.RequestEntity;
+import org.apache.ambari.server.orm.entities.RequestOperationLevelEntity;
 import org.apache.ambari.server.orm.entities.RequestResourceFilterEntity;
+import org.apache.ambari.server.orm.entities.RoleSuccessCriteriaEntity;
+import org.apache.ambari.server.orm.entities.StageEntity;
+import org.apache.ambari.server.orm.entities.TopologyHostRequestEntity;
+import org.apache.ambari.server.orm.entities.TopologyHostTaskEntity;
+import org.apache.ambari.server.orm.entities.TopologyLogicalTaskEntity;
+import org.apache.ambari.server.state.Clusters;
 import org.eclipse.persistence.config.HintValues;
 import org.eclipse.persistence.config.QueryHints;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
+import com.google.common.collect.Lists;
+import com.google.common.collect.Sets;
 import com.google.inject.Inject;
 import com.google.inject.Provider;
 import com.google.inject.Singleton;
 import com.google.inject.persist.Transactional;
 
 @Singleton
-public class RequestDAO {
+public class RequestDAO implements Cleanable {
+
+  private static final Logger LOG = LoggerFactory.getLogger(RequestDAO.class);
+
+
+  private static final int BATCH_SIZE = 999;
+
   /**
    * SQL template to retrieve all request IDs, sorted by the ID.
    */
@@ -64,6 +91,27 @@ public class RequestDAO {
   @Inject
   DaoUtils daoUtils;
 
+  @Inject
+  private Provider<Clusters> m_clusters;
+
+  @Inject
+  private HostRoleCommandDAO hostRoleCommandDAO;
+
+  @Inject
+  private StageDAO stageDAO;
+
+  @Inject
+  private TopologyLogicalTaskDAO topologyLogicalTaskDAO;
+
+  @Inject
+  private TopologyHostTaskDAO topologyHostTaskDAO;
+
+  @Inject
+  private TopologyLogicalRequestDAO topologyLogicalRequestDAO;
+
+  @Inject
+  private TopologyRequestDAO topologyRequestDAO;
+
   @RequiresSession
   public RequestEntity findByPK(Long requestId) {
     return entityManagerProvider.get().find(RequestEntity.class, requestId);
@@ -189,4 +237,214 @@ public class RequestDAO {
 
     return daoUtils.selectList(query);
   }
+
+  public static final class StageEntityPK {
+    private Long requestId;
+    private Long stageId;
+
+    public StageEntityPK(Long requestId, Long stageId) {
+      this.requestId = requestId;
+      this.stageId = stageId;
+    }
+
+    public Long getStageId() {
+      return stageId;
+    }
+
+    public void setStageId(Long stageId) {
+      this.stageId = stageId;
+    }
+
+    public Long getRequestId() {
+      return requestId;
+    }
+
+    public void setRequestId(Long requestId) {
+      this.requestId = requestId;
+    }
+  }
+
+  /**
+   * Search for all request ids in Upgrade table
+   * @return the list of request ids
+   */
+  private List<Long> findAllRequestIdsFromUpgrade() {
+    EntityManager entityManager = entityManagerProvider.get();
+    TypedQuery<Long> upgradeQuery =
+            entityManager.createNamedQuery("UpgradeEntity.findAllRequestIds", 
Long.class);
+
+    return daoUtils.selectList(upgradeQuery);
+  }
+
+  /**
+   * Search for all request and stage ids in Request and Stage tables
+   * @return the list of request/stage ids
+   */
+  public List<StageEntityPK> findRequestAndStageIdsInClusterBeforeDate(Long 
clusterId, long  beforeDateMillis) {
+    EntityManager entityManager = entityManagerProvider.get();
+    TypedQuery<StageEntityPK> requestQuery =
+            
entityManager.createNamedQuery("RequestEntity.findRequestStageIdsInClusterBeforeDate",
 StageEntityPK.class);
+
+    requestQuery.setParameter("clusterId", clusterId);
+    requestQuery.setParameter("beforeDate", beforeDateMillis);
+
+    return daoUtils.selectList(requestQuery);
+  }
+
+  /**
+   * In this method we are removing entities using passed ids,
+   * To prevent issues we are using batch request to remove limited
+   * count of entities.
+   * @param ids              list of ids that we are using to remove rows from 
table
+   * @param paramName        name of parameter that we are using in sql query 
(taskIds, stageIds)
+   * @param entityName       name of entity which we will remove
+   * @param beforeDateMillis timestamp which was set by user (remove all 
entities that were created before),
+   *                         we are using it only for logging
+   * @param entityQuery      name of NamedQuery which we will use to remove 
needed entities
+   * @param type             type of entity class which we will use for 
casting query result
+   * @return                 rows count that were removed
+   */
+  @Transactional
+  protected <T> int cleanTableByIds(Set<Long> ids, String paramName, String 
entityName, Long beforeDateMillis,
+                                  String entityQuery, Class<T> type) {
+    LOG.info(String.format("Deleting %s entities before date %s", entityName, 
new Date(beforeDateMillis)));
+    EntityManager entityManager = entityManagerProvider.get();
+    int affectedRows = 0;
+    // Batch delete
+    TypedQuery<T> query = entityManager.createNamedQuery(entityQuery, type);
+    if (ids != null && !ids.isEmpty()) {
+      for (int i = 0; i < ids.size(); i += BATCH_SIZE) {
+        int endRow = (i + BATCH_SIZE) > ids.size() ? ids.size() : (i + 
BATCH_SIZE);
+        List<Long> idsSubList = new ArrayList<>(ids).subList(i, endRow);
+        LOG.info("Deleting " + entityName + " entity batch with task ids: " +
+                idsSubList.get(0) + " - " + idsSubList.get(idsSubList.size() - 
1));
+        query.setParameter(paramName, idsSubList);
+        affectedRows += query.executeUpdate();
+      }
+    }
+
+    return affectedRows;
+  }
+
+  /**
+   * In this method we are removing entities using passed few ids,
+   * To prevent issues we are using batch request to remove limited
+   * count of entities.
+   * @param ids              list of ids pairs that we are using to remove 
rows from table
+   * @param paramNames       list of two names of parameters that we are using 
in sql query (taskIds, stageIds)
+   * @param entityName       name of entity which we will remove
+   * @param beforeDateMillis timestamp which was set by user (remove all 
entities that were created before),
+   *                         we are using it only for logging
+   * @param entityQuery      name of NamedQuery which we will use to remove 
needed entities
+   * @param type             type of entity class which we will use for 
casting query result
+   * @return                 rows count that were removed
+   */
+  @Transactional
+  protected <T> int cleanTableByStageEntityPK(List<StageEntityPK> ids, 
LinkedList<String> paramNames, String entityName, Long beforeDateMillis,
+                                  String entityQuery, Class<T> type) {
+    LOG.info(String.format("Deleting %s entities before date %s", entityName, 
new Date(beforeDateMillis)));
+    EntityManager entityManager = entityManagerProvider.get();
+    int affectedRows = 0;
+    // Batch delete
+    TypedQuery<T> query = entityManager.createNamedQuery(entityQuery, type);
+    if (ids != null && !ids.isEmpty()) {
+      for (int i = 0; i < ids.size(); i += BATCH_SIZE) {
+        int endRow = (i + BATCH_SIZE) > ids.size() ? ids.size() : (i + 
BATCH_SIZE);
+        List<StageEntityPK> idsSubList = new ArrayList<>(ids).subList(i, 
endRow);
+        LOG.info("Deleting " + entityName + " entity batch with task ids: " +
+                idsSubList.get(0) + " - " + idsSubList.get(idsSubList.size() - 
1));
+        for (StageEntityPK requestIds : idsSubList) {
+          query.setParameter(paramNames.get(0), requestIds.getStageId());
+          query.setParameter(paramNames.get(1), requestIds.getRequestId());
+          affectedRows += query.executeUpdate();
+        }
+      }
+    }
+
+    return affectedRows;
+  }
+
+  @Transactional
+  @Override
+  public long cleanup(TimeBasedCleanupPolicy policy) {
+    long affectedRows = 0;
+    Long clusterId = null;
+    try {
+      clusterId = 
m_clusters.get().getCluster(policy.getClusterName()).getClusterId();
+      // find request and stage ids that were created before date populated by 
user.
+      List<StageEntityPK> requestStageIds = 
findRequestAndStageIdsInClusterBeforeDate(clusterId, 
policy.getToDateInMillis());
+
+      // find request ids from Upgrade table and exclude these ids from
+      // request ids set that we already have. We don't want to make any 
changes for upgrade
+      Set<Long> requestIdsFromUpgrade = 
Sets.newHashSet(findAllRequestIdsFromUpgrade());
+      Iterator<StageEntityPK> requestStageIdsIterator =  
requestStageIds.iterator();
+      while (requestStageIdsIterator.hasNext()) {
+        StageEntityPK nextRequestStageIds = requestStageIdsIterator.next();
+        if 
(requestIdsFromUpgrade.contains(nextRequestStageIds.getRequestId())) {
+          requestStageIdsIterator.remove();
+        }
+      }
+
+
+      Set<Long> requestIds = new HashSet<>();
+      for (StageEntityPK ids : requestStageIds) {
+        requestIds.add(ids.getRequestId());
+      }
+
+      // find task ids using request stage ids
+      Set<Long> taskIds = 
Sets.newHashSet(hostRoleCommandDAO.findTaskIdsByRequestStageIds(requestStageIds));
+      LinkedList<String> params = new LinkedList<>();
+      params.add("stageId");
+      params.add("requestId");
+
+      // find host task ids, to find related host requests and also to remove 
needed host tasks
+      List<Long> hostTaskIds = new ArrayList<>();
+      if (taskIds != null && !taskIds.isEmpty()) {
+        hostTaskIds = 
topologyLogicalTaskDAO.findHostTaskIdsByPhysicalTaskIds(Lists.newArrayList(taskIds));
+      }
+
+      // find host request ids by host task ids to remove later needed host 
requests
+      List<Long> hostRequestIds = new ArrayList<>();
+      if (!hostTaskIds.isEmpty()) {
+        hostRequestIds = 
topologyHostTaskDAO.findHostRequestIdsByHostTaskIds(hostTaskIds);
+      }
+
+      List<Long> topologyRequestIds = new ArrayList<>();
+      if (!hostRequestIds.isEmpty()) {
+        topologyRequestIds = 
topologyLogicalRequestDAO.findRequestIdsByIds(hostRequestIds);
+      }
+
+
+      //removing all entities one by one according to their relations using 
stage, task and request ids
+      affectedRows += cleanTableByIds(taskIds, "taskIds", "ExecutionCommand", 
policy.getToDateInMillis(),
+              "ExecutionCommandEntity.removeByTaskIds", 
ExecutionCommandEntity.class);
+      affectedRows += cleanTableByIds(taskIds, "taskIds", 
"TopologyLogicalTask", policy.getToDateInMillis(),
+              "TopologyLogicalTaskEntity.removeByPhysicalTaskIds", 
TopologyLogicalTaskEntity.class);
+      affectedRows += cleanTableByIds(Sets.newHashSet(hostTaskIds), 
"hostTaskIds", "TopologyHostTask", policy.getToDateInMillis(),
+              "TopologyHostTaskEntity.removeByTaskIds", 
TopologyHostTaskEntity.class);
+      affectedRows += cleanTableByIds(Sets.newHashSet(hostRequestIds), 
"hostRequestIds", "TopologyHostRequest", policy.getToDateInMillis(),
+              "TopologyHostRequestEntity.removeByIds", 
TopologyHostRequestEntity.class);
+      for (Long topologyRequestId : topologyRequestIds) {
+        topologyRequestDAO.removeByPK(topologyRequestId);
+      }
+      affectedRows += cleanTableByIds(taskIds, "taskIds", "HostRoleCommand", 
policy.getToDateInMillis(),
+              "HostRoleCommandEntity.removeByTaskIds", 
HostRoleCommandEntity.class);
+      affectedRows += cleanTableByStageEntityPK(requestStageIds, params, 
"RoleSuccessCriteria", policy.getToDateInMillis(),
+              "RoleSuccessCriteriaEntity.removeByRequestStageIds", 
RoleSuccessCriteriaEntity.class);
+      affectedRows += cleanTableByStageEntityPK(requestStageIds, params, 
"Stage", policy.getToDateInMillis(),
+              "StageEntity.removeByRequestStageIds", StageEntity.class);
+      affectedRows += cleanTableByIds(requestIds, "requestIds", 
"RequestResourceFilter", policy.getToDateInMillis(),
+              "RequestResourceFilterEntity.removeByRequestIds", 
RequestResourceFilterEntity.class);
+      affectedRows += cleanTableByIds(requestIds, "requestIds", 
"RequestOperationLevel", policy.getToDateInMillis(),
+              "RequestOperationLevelEntity.removeByRequestIds", 
RequestOperationLevelEntity.class);
+      affectedRows += cleanTableByIds(requestIds, "requestIds", "Request", 
policy.getToDateInMillis(),
+              "RequestEntity.removeByRequestIds", RequestEntity.class);
+
+    } catch (AmbariException e) {
+      LOG.error("Error while looking up cluster with name: {}", 
policy.getClusterName(), e);
+      throw new IllegalStateException(e);
+    }
+
+    return affectedRows;
+  }
 }

http://git-wip-us.apache.org/repos/asf/ambari/blob/c23ef509/ambari-server/src/main/java/org/apache/ambari/server/orm/dao/TopologyHostTaskDAO.java
----------------------------------------------------------------------
diff --git 
a/ambari-server/src/main/java/org/apache/ambari/server/orm/dao/TopologyHostTaskDAO.java
 
b/ambari-server/src/main/java/org/apache/ambari/server/orm/dao/TopologyHostTaskDAO.java
index 85a4f5f..eea8032 100644
--- 
a/ambari-server/src/main/java/org/apache/ambari/server/orm/dao/TopologyHostTaskDAO.java
+++ 
b/ambari-server/src/main/java/org/apache/ambari/server/orm/dao/TopologyHostTaskDAO.java
@@ -52,6 +52,17 @@ public class TopologyHostTaskDAO {
   }
 
   @RequiresSession
+  public List<Long> findHostRequestIdsByHostTaskIds(List<Long> hostTaskIds) {
+    EntityManager entityManager = entityManagerProvider.get();
+    TypedQuery<Long> topologyHostTaskQuery =
+            
entityManager.createNamedQuery("TopologyLogicalTaskEntity.findHostRequestIdsByHostTaskIds",
 Long.class);
+
+    topologyHostTaskQuery.setParameter("hostTaskIds", hostTaskIds);
+
+    return daoUtils.selectList(topologyHostTaskQuery);
+  }
+
+  @RequiresSession
   public List<TopologyHostTaskEntity> findAll() {
     return daoUtils.selectAll(entityManagerProvider.get(), 
TopologyHostTaskEntity.class);
   }

http://git-wip-us.apache.org/repos/asf/ambari/blob/c23ef509/ambari-server/src/main/java/org/apache/ambari/server/orm/dao/TopologyLogicalRequestDAO.java
----------------------------------------------------------------------
diff --git 
a/ambari-server/src/main/java/org/apache/ambari/server/orm/dao/TopologyLogicalRequestDAO.java
 
b/ambari-server/src/main/java/org/apache/ambari/server/orm/dao/TopologyLogicalRequestDAO.java
index e6dcb69..32a38da 100644
--- 
a/ambari-server/src/main/java/org/apache/ambari/server/orm/dao/TopologyLogicalRequestDAO.java
+++ 
b/ambari-server/src/main/java/org/apache/ambari/server/orm/dao/TopologyLogicalRequestDAO.java
@@ -24,6 +24,7 @@ import com.google.inject.persist.Transactional;
 import org.apache.ambari.server.orm.RequiresSession;
 import org.apache.ambari.server.orm.entities.TopologyLogicalRequestEntity;
 import javax.persistence.EntityManager;
+import javax.persistence.TypedQuery;
 import java.util.List;
 
 @Singleton
@@ -58,4 +59,15 @@ public class TopologyLogicalRequestDAO {
   public void remove(TopologyLogicalRequestEntity requestEntity) {
     entityManagerProvider.get().remove(requestEntity);
   }
+
+  @RequiresSession
+  public List<Long> findRequestIdsByIds(List<Long> ids) {
+    EntityManager entityManager = entityManagerProvider.get();
+    TypedQuery<Long> topologyLogicalRequestQuery =
+            
entityManager.createNamedQuery("TopologyLogicalRequestEntity.findRequestIds", 
Long.class);
+
+    topologyLogicalRequestQuery.setParameter("ids", ids);
+
+    return daoUtils.selectList(topologyLogicalRequestQuery);
+  }
 }

http://git-wip-us.apache.org/repos/asf/ambari/blob/c23ef509/ambari-server/src/main/java/org/apache/ambari/server/orm/dao/TopologyLogicalTaskDAO.java
----------------------------------------------------------------------
diff --git 
a/ambari-server/src/main/java/org/apache/ambari/server/orm/dao/TopologyLogicalTaskDAO.java
 
b/ambari-server/src/main/java/org/apache/ambari/server/orm/dao/TopologyLogicalTaskDAO.java
index f0331cc..3a72aed 100644
--- 
a/ambari-server/src/main/java/org/apache/ambari/server/orm/dao/TopologyLogicalTaskDAO.java
+++ 
b/ambari-server/src/main/java/org/apache/ambari/server/orm/dao/TopologyLogicalTaskDAO.java
@@ -25,6 +25,7 @@ import org.apache.ambari.server.orm.RequiresSession;
 import org.apache.ambari.server.orm.entities.TopologyLogicalTaskEntity;
 
 import javax.persistence.EntityManager;
+import javax.persistence.TypedQuery;
 import java.util.List;
 
 @Singleton
@@ -41,6 +42,17 @@ public class TopologyLogicalTaskDAO {
   }
 
   @RequiresSession
+  public List<Long> findHostTaskIdsByPhysicalTaskIds(List<Long> 
physicalTaskIds) {
+    EntityManager entityManager = entityManagerProvider.get();
+    TypedQuery<Long> topologyHostTaskQuery =
+            
entityManager.createNamedQuery("TopologyLogicalTaskEntity.findHostTaskIdsByPhysicalTaskIds",
 Long.class);
+
+    topologyHostTaskQuery.setParameter("physicalTaskIds", physicalTaskIds);
+
+    return daoUtils.selectList(topologyHostTaskQuery);
+  }
+
+  @RequiresSession
   public List<TopologyLogicalTaskEntity> findAll() {
     return daoUtils.selectAll(entityManagerProvider.get(), 
TopologyLogicalTaskEntity.class);
   }

http://git-wip-us.apache.org/repos/asf/ambari/blob/c23ef509/ambari-server/src/main/java/org/apache/ambari/server/orm/entities/ExecutionCommandEntity.java
----------------------------------------------------------------------
diff --git 
a/ambari-server/src/main/java/org/apache/ambari/server/orm/entities/ExecutionCommandEntity.java
 
b/ambari-server/src/main/java/org/apache/ambari/server/orm/entities/ExecutionCommandEntity.java
index 25d830b..7015709 100644
--- 
a/ambari-server/src/main/java/org/apache/ambari/server/orm/entities/ExecutionCommandEntity.java
+++ 
b/ambari-server/src/main/java/org/apache/ambari/server/orm/entities/ExecutionCommandEntity.java
@@ -18,11 +18,24 @@
 
 package org.apache.ambari.server.orm.entities;
 
-import javax.persistence.*;
 import java.util.Arrays;
 
+import javax.persistence.Basic;
+import javax.persistence.Column;
+import javax.persistence.Entity;
+import javax.persistence.Id;
+import javax.persistence.JoinColumn;
+import javax.persistence.Lob;
+import javax.persistence.NamedQueries;
+import javax.persistence.NamedQuery;
+import javax.persistence.OneToOne;
+import javax.persistence.Table;
+
 @Table(name = "execution_command")
 @Entity
+@NamedQueries({
+    @NamedQuery(name = "ExecutionCommandEntity.removeByTaskIds", query = 
"DELETE FROM ExecutionCommandEntity command WHERE command.taskId IN :taskIds")
+})
 public class ExecutionCommandEntity {
 
   @Id

http://git-wip-us.apache.org/repos/asf/ambari/blob/c23ef509/ambari-server/src/main/java/org/apache/ambari/server/orm/entities/HostRoleCommandEntity.java
----------------------------------------------------------------------
diff --git 
a/ambari-server/src/main/java/org/apache/ambari/server/orm/entities/HostRoleCommandEntity.java
 
b/ambari-server/src/main/java/org/apache/ambari/server/orm/entities/HostRoleCommandEntity.java
index 3d946f5..7ac60a9 100644
--- 
a/ambari-server/src/main/java/org/apache/ambari/server/orm/entities/HostRoleCommandEntity.java
+++ 
b/ambari-server/src/main/java/org/apache/ambari/server/orm/entities/HostRoleCommandEntity.java
@@ -60,6 +60,7 @@ import org.apache.commons.lang.ArrayUtils;
     , initialValue = 1
 )
 @NamedQueries({
+    @NamedQuery(name = "HostRoleCommandEntity.findTaskIdsByRequestStageIds", 
query = "SELECT command.taskId FROM HostRoleCommandEntity command WHERE 
command.stageId = :stageId AND command.requestId = :requestId"),
     @NamedQuery(name = "HostRoleCommandEntity.findCountByCommandStatuses", 
query = "SELECT COUNT(command.taskId) FROM HostRoleCommandEntity command WHERE 
command.status IN :statuses"),
     @NamedQuery(name = "HostRoleCommandEntity.findByRequestIdAndStatuses", 
query="SELECT task FROM HostRoleCommandEntity task WHERE 
task.requestId=:requestId AND task.status IN :statuses ORDER BY task.taskId 
ASC"),
     @NamedQuery(name = 
"HostRoleCommandEntity.findTasksByStatusesOrderByIdDesc", query = "SELECT task 
FROM HostRoleCommandEntity task WHERE task.requestId = :requestId AND 
task.status IN :statuses ORDER BY task.taskId DESC"),
@@ -71,12 +72,9 @@ import org.apache.commons.lang.ArrayUtils;
     @NamedQuery(name = "HostRoleCommandEntity.findByStatusBetweenStages", 
query = "SELECT command FROM HostRoleCommandEntity command WHERE 
command.requestId = :requestId AND command.stageId >= :minStageId AND 
command.stageId <= :maxStageId AND command.status = :status"),
     @NamedQuery(name = 
"HostRoleCommandEntity.updateAutoSkipExcludeRoleCommand", query = "UPDATE 
HostRoleCommandEntity command SET command.autoSkipOnFailure = 
:autoSkipOnFailure WHERE command.requestId = :requestId AND command.roleCommand 
<> :roleCommand"),
     @NamedQuery(name = "HostRoleCommandEntity.updateAutoSkipForRoleCommand", 
query = "UPDATE HostRoleCommandEntity command SET command.autoSkipOnFailure = 
:autoSkipOnFailure WHERE command.requestId = :requestId AND command.roleCommand 
= :roleCommand"),
-    @NamedQuery(
-        name = "HostRoleCommandEntity.findHostsByCommandStatus",
-        query = "SELECT DISTINCT(host.hostName) FROM HostRoleCommandEntity 
command, HostEntity host WHERE (command.requestId >= 
:iLowestRequestIdInProgress AND command.requestId <= 
:iHighestRequestIdInProgress) AND command.status IN :statuses AND 
command.hostId = host.hostId AND host.hostName IS NOT NULL"),
-    @NamedQuery(
-        name = "HostRoleCommandEntity.getBlockingHostsForRequest",
-        query = "SELECT DISTINCT(host.hostName) FROM HostRoleCommandEntity 
command, HostEntity host WHERE command.requestId >= :lowerRequestIdInclusive 
AND command.requestId < :upperRequestIdExclusive AND command.status IN 
:statuses AND command.isBackgroundCommand=0 AND command.hostId = host.hostId 
AND host.hostName IS NOT NULL")
+    @NamedQuery(name = "HostRoleCommandEntity.removeByTaskIds", query = 
"DELETE FROM HostRoleCommandEntity command WHERE command.taskId IN :taskIds"),
+    @NamedQuery(name = "HostRoleCommandEntity.findHostsByCommandStatus", query 
= "SELECT DISTINCT(host.hostName) FROM HostRoleCommandEntity command, 
HostEntity host WHERE (command.requestId >= :iLowestRequestIdInProgress AND 
command.requestId <= :iHighestRequestIdInProgress) AND command.status IN 
:statuses AND command.hostId = host.hostId AND host.hostName IS NOT NULL"),
+    @NamedQuery(name = "HostRoleCommandEntity.getBlockingHostsForRequest", 
query = "SELECT DISTINCT(host.hostName) FROM HostRoleCommandEntity command, 
HostEntity host WHERE command.requestId >= :lowerRequestIdInclusive AND 
command.requestId < :upperRequestIdExclusive AND command.status IN :statuses 
AND command.isBackgroundCommand=0 AND command.hostId = host.hostId AND 
host.hostName IS NOT NULL")
 
 })
 public class HostRoleCommandEntity {

http://git-wip-us.apache.org/repos/asf/ambari/blob/c23ef509/ambari-server/src/main/java/org/apache/ambari/server/orm/entities/RequestEntity.java
----------------------------------------------------------------------
diff --git 
a/ambari-server/src/main/java/org/apache/ambari/server/orm/entities/RequestEntity.java
 
b/ambari-server/src/main/java/org/apache/ambari/server/orm/entities/RequestEntity.java
index e46bb51..45fb631 100644
--- 
a/ambari-server/src/main/java/org/apache/ambari/server/orm/entities/RequestEntity.java
+++ 
b/ambari-server/src/main/java/org/apache/ambari/server/orm/entities/RequestEntity.java
@@ -32,6 +32,8 @@ import javax.persistence.Id;
 import javax.persistence.JoinColumn;
 import javax.persistence.Lob;
 import javax.persistence.ManyToOne;
+import javax.persistence.NamedQueries;
+import javax.persistence.NamedQuery;
 import javax.persistence.OneToMany;
 import javax.persistence.OneToOne;
 import javax.persistence.Table;
@@ -39,6 +41,10 @@ import java.util.Collection;
 
 @Table(name = "request")
 @Entity
+@NamedQueries({
+  @NamedQuery(name = "RequestEntity.findRequestStageIdsInClusterBeforeDate", 
query = "SELECT NEW 
org.apache.ambari.server.orm.dao.RequestDAO.StageEntityPK(request.requestId, 
stage.stageId) FROM RequestEntity request JOIN StageEntity stage ON 
request.requestId = stage.requestId WHERE request.clusterId = :clusterId AND 
request.createTime <= :beforeDate"),
+  @NamedQuery(name = "RequestEntity.removeByRequestIds", query = "DELETE FROM 
RequestEntity request WHERE request.requestId IN :requestIds")
+})
 public class RequestEntity {
 
   @Column(name = "request_id")

http://git-wip-us.apache.org/repos/asf/ambari/blob/c23ef509/ambari-server/src/main/java/org/apache/ambari/server/orm/entities/RequestOperationLevelEntity.java
----------------------------------------------------------------------
diff --git 
a/ambari-server/src/main/java/org/apache/ambari/server/orm/entities/RequestOperationLevelEntity.java
 
b/ambari-server/src/main/java/org/apache/ambari/server/orm/entities/RequestOperationLevelEntity.java
index c03816e..64af92a 100644
--- 
a/ambari-server/src/main/java/org/apache/ambari/server/orm/entities/RequestOperationLevelEntity.java
+++ 
b/ambari-server/src/main/java/org/apache/ambari/server/orm/entities/RequestOperationLevelEntity.java
@@ -42,7 +42,9 @@ import javax.persistence.TableGenerator;
 @NamedQueries({
     @NamedQuery(name = "requestOperationLevelByHostId", query =
         "SELECT requestOperationLevel FROM RequestOperationLevelEntity 
requestOperationLevel " +
-            "WHERE requestOperationLevel.hostId=:hostId")
+            "WHERE requestOperationLevel.hostId=:hostId"),
+    @NamedQuery(name = "RequestOperationLevelEntity.removeByRequestIds",
+        query = "DELETE FROM RequestOperationLevelEntity requestOperationLevel 
WHERE requestOperationLevel.requestId IN :requestIds")
 })
 public class RequestOperationLevelEntity {
 

http://git-wip-us.apache.org/repos/asf/ambari/blob/c23ef509/ambari-server/src/main/java/org/apache/ambari/server/orm/entities/RequestResourceFilterEntity.java
----------------------------------------------------------------------
diff --git 
a/ambari-server/src/main/java/org/apache/ambari/server/orm/entities/RequestResourceFilterEntity.java
 
b/ambari-server/src/main/java/org/apache/ambari/server/orm/entities/RequestResourceFilterEntity.java
index 8ee41d2..9597db1 100644
--- 
a/ambari-server/src/main/java/org/apache/ambari/server/orm/entities/RequestResourceFilterEntity.java
+++ 
b/ambari-server/src/main/java/org/apache/ambari/server/orm/entities/RequestResourceFilterEntity.java
@@ -26,6 +26,8 @@ import javax.persistence.Id;
 import javax.persistence.JoinColumn;
 import javax.persistence.Lob;
 import javax.persistence.ManyToOne;
+import javax.persistence.NamedQueries;
+import javax.persistence.NamedQuery;
 import javax.persistence.Table;
 import javax.persistence.TableGenerator;
 
@@ -36,6 +38,9 @@ import javax.persistence.TableGenerator;
   , pkColumnValue = "resourcefilter_id_seq"
   , initialValue = 1
 )
+@NamedQueries({
+  @NamedQuery(name = "RequestResourceFilterEntity.removeByRequestIds", query = 
"DELETE FROM RequestResourceFilterEntity filter WHERE filter.requestId IN 
:requestIds")
+})
 public class RequestResourceFilterEntity {
 
   @Id

http://git-wip-us.apache.org/repos/asf/ambari/blob/c23ef509/ambari-server/src/main/java/org/apache/ambari/server/orm/entities/RoleSuccessCriteriaEntity.java
----------------------------------------------------------------------
diff --git 
a/ambari-server/src/main/java/org/apache/ambari/server/orm/entities/RoleSuccessCriteriaEntity.java
 
b/ambari-server/src/main/java/org/apache/ambari/server/orm/entities/RoleSuccessCriteriaEntity.java
index 3386c24..66e7fd8 100644
--- 
a/ambari-server/src/main/java/org/apache/ambari/server/orm/entities/RoleSuccessCriteriaEntity.java
+++ 
b/ambari-server/src/main/java/org/apache/ambari/server/orm/entities/RoleSuccessCriteriaEntity.java
@@ -26,6 +26,8 @@ import javax.persistence.IdClass;
 import javax.persistence.JoinColumn;
 import javax.persistence.JoinColumns;
 import javax.persistence.ManyToOne;
+import javax.persistence.NamedQueries;
+import javax.persistence.NamedQuery;
 import javax.persistence.Table;
 
 import org.apache.ambari.server.Role;
@@ -33,6 +35,9 @@ import org.apache.ambari.server.Role;
 
@IdClass(org.apache.ambari.server.orm.entities.RoleSuccessCriteriaEntityPK.class)
 @Table(name = "role_success_criteria")
 @Entity
+@NamedQueries({
+  @NamedQuery(name = "RoleSuccessCriteriaEntity.removeByRequestStageIds", 
query = "DELETE FROM RoleSuccessCriteriaEntity criteria WHERE criteria.stageId 
= :stageId AND criteria.requestId = :requestId")
+})
 public class RoleSuccessCriteriaEntity {
 
   @Id

http://git-wip-us.apache.org/repos/asf/ambari/blob/c23ef509/ambari-server/src/main/java/org/apache/ambari/server/orm/entities/StageEntity.java
----------------------------------------------------------------------
diff --git 
a/ambari-server/src/main/java/org/apache/ambari/server/orm/entities/StageEntity.java
 
b/ambari-server/src/main/java/org/apache/ambari/server/orm/entities/StageEntity.java
index aeafda0..3b755f6 100644
--- 
a/ambari-server/src/main/java/org/apache/ambari/server/orm/entities/StageEntity.java
+++ 
b/ambari-server/src/main/java/org/apache/ambari/server/orm/entities/StageEntity.java
@@ -49,7 +49,11 @@ import 
org.apache.ambari.server.actionmanager.CommandExecutionType;
         query = "SELECT stage.requestId, MIN(stage.stageId) from StageEntity 
stage, HostRoleCommandEntity hrc WHERE hrc.status IN :statuses AND hrc.stageId 
= stage.stageId AND hrc.requestId = stage.requestId GROUP by stage.requestId 
ORDER BY stage.requestId"),
     @NamedQuery(
         name = "StageEntity.findByRequestIdAndCommandStatuses",
-        query = "SELECT stage from StageEntity stage WHERE stage.stageId IN 
(SELECT roleCommand.stageId from HostRoleCommandEntity roleCommand WHERE 
roleCommand.requestId = :requestId AND roleCommand.status IN :statuses AND 
roleCommand.stageId = stage.stageId AND roleCommand.requestId = stage.requestId 
) ORDER BY stage.stageId") })
+        query = "SELECT stage from StageEntity stage WHERE stage.stageId IN 
(SELECT roleCommand.stageId from HostRoleCommandEntity roleCommand WHERE 
roleCommand.requestId = :requestId AND roleCommand.status IN :statuses AND 
roleCommand.stageId = stage.stageId AND roleCommand.requestId = stage.requestId 
) ORDER BY stage.stageId"),
+    @NamedQuery(
+        name = "StageEntity.removeByRequestStageIds",
+        query = "DELETE FROM StageEntity stage WHERE stage.stageId = :stageId 
AND stage.requestId = :requestId")
+})
 public class StageEntity {
 
   @Basic

http://git-wip-us.apache.org/repos/asf/ambari/blob/c23ef509/ambari-server/src/main/java/org/apache/ambari/server/orm/entities/TopologyHostRequestEntity.java
----------------------------------------------------------------------
diff --git 
a/ambari-server/src/main/java/org/apache/ambari/server/orm/entities/TopologyHostRequestEntity.java
 
b/ambari-server/src/main/java/org/apache/ambari/server/orm/entities/TopologyHostRequestEntity.java
index 4e05ea1..7abbd51 100644
--- 
a/ambari-server/src/main/java/org/apache/ambari/server/orm/entities/TopologyHostRequestEntity.java
+++ 
b/ambari-server/src/main/java/org/apache/ambari/server/orm/entities/TopologyHostRequestEntity.java
@@ -25,6 +25,8 @@ import javax.persistence.GenerationType;
 import javax.persistence.Id;
 import javax.persistence.JoinColumn;
 import javax.persistence.ManyToOne;
+import javax.persistence.NamedQueries;
+import javax.persistence.NamedQuery;
 import javax.persistence.OneToMany;
 import javax.persistence.Table;
 import javax.persistence.TableGenerator;
@@ -32,6 +34,9 @@ import java.util.Collection;
 
 @Entity
 @Table(name = "topology_host_request")
+@NamedQueries({
+  @NamedQuery(name = "TopologyHostRequestEntity.removeByIds", query = "DELETE 
FROM TopologyHostRequestEntity topologyHostRequest WHERE topologyHostRequest.id 
IN :hostRequestIds")
+})
 public class TopologyHostRequestEntity {
   @Id
 //  @GeneratedValue(strategy = GenerationType.TABLE, generator = 
"topology_host_request_id_generator")

http://git-wip-us.apache.org/repos/asf/ambari/blob/c23ef509/ambari-server/src/main/java/org/apache/ambari/server/orm/entities/TopologyHostTaskEntity.java
----------------------------------------------------------------------
diff --git 
a/ambari-server/src/main/java/org/apache/ambari/server/orm/entities/TopologyHostTaskEntity.java
 
b/ambari-server/src/main/java/org/apache/ambari/server/orm/entities/TopologyHostTaskEntity.java
index 49d3a97..37830b7 100644
--- 
a/ambari-server/src/main/java/org/apache/ambari/server/orm/entities/TopologyHostTaskEntity.java
+++ 
b/ambari-server/src/main/java/org/apache/ambari/server/orm/entities/TopologyHostTaskEntity.java
@@ -40,7 +40,11 @@ import java.util.Collection;
   pkColumnValue = "topology_host_task_id_seq", initialValue = 0)
 @NamedQueries({
   @NamedQuery(name = "TopologyHostTaskEntity.findByHostRequest",
-      query = "SELECT req FROM TopologyHostTaskEntity req WHERE 
req.topologyHostRequestEntity.id = :hostRequestId")
+      query = "SELECT req FROM TopologyHostTaskEntity req WHERE 
req.topologyHostRequestEntity.id = :hostRequestId"),
+  @NamedQuery(name = 
"TopologyLogicalTaskEntity.findHostRequestIdsByHostTaskIds",
+      query = "SELECT tht.hostRequestId from TopologyHostTaskEntity tht WHERE 
tht.id IN :hostTaskIds"),
+  @NamedQuery(name = "TopologyHostTaskEntity.removeByTaskIds",
+      query = "DELETE FROM TopologyHostTaskEntity tht WHERE tht.id IN 
:hostTaskIds")
 })
 public class TopologyHostTaskEntity {
   @Id
@@ -51,6 +55,9 @@ public class TopologyHostTaskEntity {
   @Column(name = "type", length = 255, nullable = false)
   private String type;
 
+  @Column(name = "host_request_id", nullable = false, insertable = false, 
updatable = false)
+  private Long hostRequestId;
+
   @ManyToOne
   @JoinColumn(name = "host_request_id", referencedColumnName = "id", nullable 
= false)
   private TopologyHostRequestEntity topologyHostRequestEntity;
@@ -67,7 +74,11 @@ public class TopologyHostTaskEntity {
   }
 
   public Long getHostRequestId() {
-    return topologyHostRequestEntity != null ? 
topologyHostRequestEntity.getId() : null;
+    return hostRequestId;
+  }
+
+  public void setHostRequestId(Long hostRequestId) {
+    this.hostRequestId = hostRequestId;
   }
 
   public String getType() {

http://git-wip-us.apache.org/repos/asf/ambari/blob/c23ef509/ambari-server/src/main/java/org/apache/ambari/server/orm/entities/TopologyLogicalRequestEntity.java
----------------------------------------------------------------------
diff --git 
a/ambari-server/src/main/java/org/apache/ambari/server/orm/entities/TopologyLogicalRequestEntity.java
 
b/ambari-server/src/main/java/org/apache/ambari/server/orm/entities/TopologyLogicalRequestEntity.java
index 4d255b2..1536b80 100644
--- 
a/ambari-server/src/main/java/org/apache/ambari/server/orm/entities/TopologyLogicalRequestEntity.java
+++ 
b/ambari-server/src/main/java/org/apache/ambari/server/orm/entities/TopologyLogicalRequestEntity.java
@@ -25,6 +25,8 @@ import javax.persistence.GenerationType;
 import javax.persistence.Id;
 import javax.persistence.JoinColumn;
 import javax.persistence.ManyToOne;
+import javax.persistence.NamedQueries;
+import javax.persistence.NamedQuery;
 import javax.persistence.OneToMany;
 import javax.persistence.OneToOne;
 import javax.persistence.Table;
@@ -33,6 +35,9 @@ import java.util.Collection;
 
 @Entity
 @Table(name = "topology_logical_request")
+@NamedQueries({
+  @NamedQuery(name = "TopologyLogicalRequestEntity.findRequestIds", query = 
"SELECT logicalrequest.topologyRequestId from TopologyLogicalRequestEntity 
logicalrequest WHERE logicalrequest.id IN :ids")
+})
 public class TopologyLogicalRequestEntity {
   @Id
   @Column(name = "id", nullable = false, updatable = false)

http://git-wip-us.apache.org/repos/asf/ambari/blob/c23ef509/ambari-server/src/main/java/org/apache/ambari/server/orm/entities/TopologyLogicalTaskEntity.java
----------------------------------------------------------------------
diff --git 
a/ambari-server/src/main/java/org/apache/ambari/server/orm/entities/TopologyLogicalTaskEntity.java
 
b/ambari-server/src/main/java/org/apache/ambari/server/orm/entities/TopologyLogicalTaskEntity.java
index c71d4e4..2954863 100644
--- 
a/ambari-server/src/main/java/org/apache/ambari/server/orm/entities/TopologyLogicalTaskEntity.java
+++ 
b/ambari-server/src/main/java/org/apache/ambari/server/orm/entities/TopologyLogicalTaskEntity.java
@@ -24,6 +24,8 @@ import javax.persistence.GenerationType;
 import javax.persistence.Id;
 import javax.persistence.JoinColumn;
 import javax.persistence.ManyToOne;
+import javax.persistence.NamedQueries;
+import javax.persistence.NamedQuery;
 import javax.persistence.OneToOne;
 import javax.persistence.Table;
 import javax.persistence.TableGenerator;
@@ -33,6 +35,10 @@ import javax.persistence.TableGenerator;
 @TableGenerator(name = "topology_logical_task_id_generator", table = 
"ambari_sequences",
   pkColumnName = "sequence_name", valueColumnName = "sequence_value",
   pkColumnValue = "topology_logical_task_id_seq", initialValue = 0)
+@NamedQueries({
+  @NamedQuery(name = 
"TopologyLogicalTaskEntity.findHostTaskIdsByPhysicalTaskIds", query = "SELECT 
logicaltask.hostTaskId from TopologyLogicalTaskEntity logicaltask WHERE 
logicaltask.physicalTaskId IN :physicalTaskIds"),
+  @NamedQuery(name = "TopologyLogicalTaskEntity.removeByPhysicalTaskIds", 
query = "DELETE FROM TopologyLogicalTaskEntity logicaltask WHERE 
logicaltask.physicalTaskId IN :taskIds")
+})
 public class TopologyLogicalTaskEntity {
   @Id
   @GeneratedValue(strategy = GenerationType.TABLE, generator = 
"topology_logical_task_id_generator")
@@ -42,12 +48,18 @@ public class TopologyLogicalTaskEntity {
   @Column(name = "component", length = 255)
   private String componentName;
 
+  @Column(name = "host_task_id", nullable = false, insertable = false, 
updatable = false)
+  private Long hostTaskId;
+
+  @Column(name = "physical_task_id", nullable = false, insertable = false, 
updatable = false)
+  private Long physicalTaskId;
+
   @ManyToOne
   @JoinColumn(name = "host_task_id", referencedColumnName = "id", nullable = 
false)
   private TopologyHostTaskEntity topologyHostTaskEntity;
 
   @OneToOne
-  @JoinColumn(name = "physical_task_id", referencedColumnName = "task_id")
+  @JoinColumn(name = "physical_task_id", referencedColumnName = "task_id", 
nullable = false)
   private HostRoleCommandEntity hostRoleCommandEntity;
 
   public Long getId() {
@@ -58,14 +70,22 @@ public class TopologyLogicalTaskEntity {
     this.id = id;
   }
 
-  public Long getHostTaskId() {
-    return topologyHostTaskEntity != null ? topologyHostTaskEntity.getId() : 
null;
-  }
-
   public Long getPhysicalTaskId() {
     return hostRoleCommandEntity != null ? hostRoleCommandEntity.getTaskId() : 
null;
   }
 
+  public void setPhysicalTaskId(Long physicalTaskId) {
+    this.physicalTaskId = physicalTaskId;
+  }
+
+  public void setHostTaskId(Long hostTaskId) {
+    this.hostTaskId = hostTaskId;
+  }
+
+  public Long getHostTaskId() {
+    return hostTaskId;
+  }
+
   public String getComponentName() {
     return componentName;
   }

http://git-wip-us.apache.org/repos/asf/ambari/blob/c23ef509/ambari-server/src/main/java/org/apache/ambari/server/orm/entities/UpgradeEntity.java
----------------------------------------------------------------------
diff --git 
a/ambari-server/src/main/java/org/apache/ambari/server/orm/entities/UpgradeEntity.java
 
b/ambari-server/src/main/java/org/apache/ambari/server/orm/entities/UpgradeEntity.java
index 0b27e3b..152fde1 100644
--- 
a/ambari-server/src/main/java/org/apache/ambari/server/orm/entities/UpgradeEntity.java
+++ 
b/ambari-server/src/main/java/org/apache/ambari/server/orm/entities/UpgradeEntity.java
@@ -65,6 +65,8 @@ import 
org.apache.ambari.server.state.stack.upgrade.UpgradeType;
       query = "SELECT u FROM UpgradeEntity u JOIN RequestEntity r ON 
u.requestId = r.requestId WHERE u.clusterId = :clusterId AND u.direction = 
:direction ORDER BY r.startTime DESC, u.upgradeId DESC"),
   @NamedQuery(name = "UpgradeEntity.findLatestForCluster",
       query = "SELECT u FROM UpgradeEntity u JOIN RequestEntity r ON 
u.requestId = r.requestId WHERE u.clusterId = :clusterId ORDER BY r.startTime 
DESC"),
+  @NamedQuery(name = "UpgradeEntity.findAllRequestIds",
+      query = "SELECT upgrade.requestId FROM UpgradeEntity upgrade")
 })
 public class UpgradeEntity {
 

http://git-wip-us.apache.org/repos/asf/ambari/blob/c23ef509/ambari-server/src/main/java/org/apache/ambari/server/orm/entities/UpgradeItemEntity.java
----------------------------------------------------------------------
diff --git 
a/ambari-server/src/main/java/org/apache/ambari/server/orm/entities/UpgradeItemEntity.java
 
b/ambari-server/src/main/java/org/apache/ambari/server/orm/entities/UpgradeItemEntity.java
index 560970a..35ea769 100644
--- 
a/ambari-server/src/main/java/org/apache/ambari/server/orm/entities/UpgradeItemEntity.java
+++ 
b/ambari-server/src/main/java/org/apache/ambari/server/orm/entities/UpgradeItemEntity.java
@@ -27,6 +27,8 @@ import javax.persistence.GenerationType;
 import javax.persistence.Id;
 import javax.persistence.JoinColumn;
 import javax.persistence.ManyToOne;
+import javax.persistence.NamedQueries;
+import javax.persistence.NamedQuery;
 import javax.persistence.Table;
 import javax.persistence.TableGenerator;
 
@@ -48,6 +50,9 @@ import org.apache.ambari.server.state.UpgradeState;
     pkColumnValue = "upgrade_item_id_seq",
     initialValue = 0,
     allocationSize = 1000)
+@NamedQueries({
+  @NamedQuery(name = "UpgradeItemEntity.findAllStageIds", query = "SELECT 
upgradeItem.stageId FROM UpgradeItemEntity upgradeItem")
+})
 public class UpgradeItemEntity {
 
   @Id

http://git-wip-us.apache.org/repos/asf/ambari/blob/c23ef509/ambari-server/src/main/python/ambari-server.py
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/python/ambari-server.py 
b/ambari-server/src/main/python/ambari-server.py
index 47256ab..235ef95 100755
--- a/ambari-server/src/main/python/ambari-server.py
+++ b/ambari-server/src/main/python/ambari-server.py
@@ -199,6 +199,12 @@ def restart(args):
   start(args)
 
 
+@OsFamilyFuncImpl(OsFamilyImpl.DEFAULT)
+def database_cleanup(args):
+  logger.info("Database cleanup.")
+  if args.silent:
+    stop(args)
+  db_cleanup(args)
 
 #
 # The Ambari Server status.
@@ -466,7 +472,7 @@ def init_parser_options(parser):
                     help="Print verbose status messages")
   parser.add_option("-s", "--silent",
                     action="store_true", dest="silent", default=False,
-                    help="Silently accepts default prompt values")
+                    help="Silently accepts default prompt values. For 
db-cleanup command, silent mode will stop ambari server.")
   parser.add_option('-g', '--debug', action="store_true", dest='debug', 
default=False,
                     help="Start ambari-server in debug mode")
   parser.add_option('-y', '--suspend-start', action="store_true", 
dest='suspend_start', default=False,
@@ -756,7 +762,7 @@ def create_user_action_map(args, options):
         CHECK_DATABASE_ACTION: UserAction(check_database, options),
         ENABLE_STACK_ACTION: UserAction(enable_stack, options, args),
         SETUP_SSO_ACTION: UserActionRestart(setup_sso, options),
-        DB_CLEANUP_ACTION: UserAction(db_cleanup, options),
+        DB_CLEANUP_ACTION: UserAction(database_cleanup, options),
         INSTALL_MPACK_ACTION: UserAction(install_mpack, options),
         UNINSTALL_MPACK_ACTION: UserAction(uninstall_mpack, options),
         UPGRADE_MPACK_ACTION: UserAction(upgrade_mpack, options),

http://git-wip-us.apache.org/repos/asf/ambari/blob/c23ef509/ambari-server/src/main/python/ambari_server/dbCleanup.py
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/python/ambari_server/dbCleanup.py 
b/ambari-server/src/main/python/ambari_server/dbCleanup.py
index abc8267..6e16bc5 100644
--- a/ambari-server/src/main/python/ambari_server/dbCleanup.py
+++ b/ambari-server/src/main/python/ambari_server/dbCleanup.py
@@ -42,25 +42,29 @@ def run_db_cleanup(options):
     if validate_args(options):
         return 1
 
-    db_title = get_db_type(get_ambari_properties()).title
+    status, stateDesc = is_server_runing()
 
-    confirmBackup = get_YN_input("Ambari Server configured for {0}. Confirm 
you have made a backup of the Ambari Server database [y/n]".format(
-            db_title), True)
-    if not confirmBackup:
-        print_info_msg("Ambari Server Database cleanup aborted")
-        return 0
+    if not options.silent:
+      db_title = get_db_type(get_ambari_properties()).title
+
+      confirmBackup = get_YN_input("Ambari Server configured for {0}. Confirm 
you have made a backup of the Ambari Server database [y/n]".format(
+              db_title), True)
+      if not confirmBackup:
+          print_info_msg("Ambari Server Database cleanup aborted")
+          return 0
+
+      if status:
+          print_error_msg("The database cleanup cannot proceed while Ambari 
Server is running. Please shut down Ambari first.")
+          return 1
+
+      confirm = get_YN_input(
+          "Ambari server is using db type {0}. Cleanable database entries 
older than {1} will be cleaned up. Proceed [y/n]".format(
+              db_title, options.cleanup_from_date), True)
+      if not confirm:
+          print_info_msg("Ambari Server Database cleanup aborted")
+          return 0
 
-    status, stateDesc = is_server_runing()
-    if status:
-        print_error_msg("The database cleanup cannot proceed while Ambari 
Server is running. Please shut down Ambari first.")
-        return 1
 
-    confirm = get_YN_input(
-        "Ambari server is using db type {0}. Cleanable database entries older 
than {1} will be cleaned up. Proceed [y/n]".format(
-            db_title, options.cleanup_from_date), True)
-    if not confirm:
-        print_info_msg("Ambari Server Database cleanup aborted")
-        return 0
 
     jdk_path = get_java_exe_path()
     if jdk_path is None:
@@ -101,7 +105,6 @@ def run_db_cleanup(options):
 # Database cleanup
 #
 def db_cleanup(options):
-    logger.info("Database cleanup.")
     return run_db_cleanup(options)
 
 

http://git-wip-us.apache.org/repos/asf/ambari/blob/c23ef509/ambari-server/src/test/java/org/apache/ambari/server/checks/DatabaseConsistencyCheckHelperTest.java
----------------------------------------------------------------------
diff --git 
a/ambari-server/src/test/java/org/apache/ambari/server/checks/DatabaseConsistencyCheckHelperTest.java
 
b/ambari-server/src/test/java/org/apache/ambari/server/checks/DatabaseConsistencyCheckHelperTest.java
index 5869630..9c8eb74 100644
--- 
a/ambari-server/src/test/java/org/apache/ambari/server/checks/DatabaseConsistencyCheckHelperTest.java
+++ 
b/ambari-server/src/test/java/org/apache/ambari/server/checks/DatabaseConsistencyCheckHelperTest.java
@@ -580,4 +580,70 @@ public class DatabaseConsistencyCheckHelperTest {
   }
 
 
+  @Test
+  public void testCheckForLargeTables() throws Exception {
+    EasyMockSupport easyMockSupport = new EasyMockSupport();
+    final AmbariMetaInfo mockAmbariMetainfo = 
easyMockSupport.createNiceMock(AmbariMetaInfo.class);
+    final DBAccessor mockDBDbAccessor = 
easyMockSupport.createNiceMock(DBAccessor.class);
+    final Connection mockConnection = 
easyMockSupport.createNiceMock(Connection.class);
+    final Statement mockStatement = 
easyMockSupport.createNiceMock(Statement.class);
+    final EntityManager mockEntityManager = 
easyMockSupport.createNiceMock(EntityManager.class);
+    final Clusters mockClusters = 
easyMockSupport.createNiceMock(Clusters.class);
+    final OsFamily mockOSFamily = 
easyMockSupport.createNiceMock(OsFamily.class);
+    final StackManagerFactory mockStackManagerFactory = 
easyMockSupport.createNiceMock(StackManagerFactory.class);
+
+    final ResultSet hostRoleCommandResultSet = 
easyMockSupport.createNiceMock(ResultSet.class);
+    final ResultSet executionCommandResultSet = 
easyMockSupport.createNiceMock(ResultSet.class);
+    final ResultSet stageResultSet = 
easyMockSupport.createNiceMock(ResultSet.class);
+    final ResultSet requestResultSet = 
easyMockSupport.createNiceMock(ResultSet.class);
+    final ResultSet alertHistoryResultSet = 
easyMockSupport.createNiceMock(ResultSet.class);
+
+    final Injector mockInjector = Guice.createInjector(new AbstractModule() {
+      @Override
+      protected void configure() {
+        bind(AmbariMetaInfo.class).toInstance(mockAmbariMetainfo);
+        bind(StackManagerFactory.class).toInstance(mockStackManagerFactory);
+        bind(EntityManager.class).toInstance(mockEntityManager);
+        bind(DBAccessor.class).toInstance(mockDBDbAccessor);
+        bind(Clusters.class).toInstance(mockClusters);
+        bind(OsFamily.class).toInstance(mockOSFamily);
+      }
+    });
+
+    expect(hostRoleCommandResultSet.next()).andReturn(true).once();
+    expect(executionCommandResultSet.next()).andReturn(true).once();
+    expect(stageResultSet.next()).andReturn(true).once();
+    expect(requestResultSet.next()).andReturn(true).once();
+    expect(alertHistoryResultSet.next()).andReturn(true).once();
+    expect(hostRoleCommandResultSet.getLong(1)).andReturn(2345L).atLeastOnce();
+    
expect(executionCommandResultSet.getLong(1)).andReturn(12345L).atLeastOnce();
+    expect(stageResultSet.getLong(1)).andReturn(2321L).atLeastOnce();
+    expect(requestResultSet.getLong(1)).andReturn(1111L).atLeastOnce();
+    expect(alertHistoryResultSet.getLong(1)).andReturn(2223L).atLeastOnce();
+    expect(mockDBDbAccessor.getConnection()).andReturn(mockConnection);
+    expect(mockDBDbAccessor.getDbType()).andReturn(DBAccessor.DbType.MYSQL);
+    expect(mockDBDbAccessor.getDbSchema()).andReturn("test_schema");
+    expect(mockConnection.createStatement(ResultSet.TYPE_SCROLL_SENSITIVE, 
ResultSet.CONCUR_UPDATABLE)).andReturn(mockStatement).anyTimes();
+    expect(mockStatement.executeQuery("SELECT (data_length + index_length) 
\"Table Size\" " +
+            "FROM information_schema.TABLES WHERE table_schema = 
\"test_schema\" AND table_name 
=\"host_role_command\"")).andReturn(hostRoleCommandResultSet);
+    expect(mockStatement.executeQuery("SELECT (data_length + index_length) 
\"Table Size\" " +
+            "FROM information_schema.TABLES WHERE table_schema = 
\"test_schema\" AND table_name 
=\"execution_command\"")).andReturn(executionCommandResultSet);
+    expect(mockStatement.executeQuery("SELECT (data_length + index_length) 
\"Table Size\" " +
+            "FROM information_schema.TABLES WHERE table_schema = 
\"test_schema\" AND table_name =\"stage\"")).andReturn(stageResultSet);
+    expect(mockStatement.executeQuery("SELECT (data_length + index_length) 
\"Table Size\" " +
+            "FROM information_schema.TABLES WHERE table_schema = 
\"test_schema\" AND table_name =\"request\"")).andReturn(requestResultSet);
+    expect(mockStatement.executeQuery("SELECT (data_length + index_length) 
\"Table Size\" " +
+            "FROM information_schema.TABLES WHERE table_schema = 
\"test_schema\" AND table_name 
=\"alert_history\"")).andReturn(alertHistoryResultSet);
+
+    DatabaseConsistencyCheckHelper.setInjector(mockInjector);
+
+    easyMockSupport.replayAll();
+
+    mockAmbariMetainfo.init();
+
+    DatabaseConsistencyCheckHelper.resetCheckResult();
+    DatabaseConsistencyCheckHelper.checkForLargeTables();
+
+    easyMockSupport.verifyAll();
+  }
 }

Reply via email to