Repository: ambari Updated Branches: refs/heads/trunk 273653b5a -> 11ab63f7f
AMBARI-20687. Perf: Refactor ambari db-cleanup to include all big tables.(vbrodetskyi) Project: http://git-wip-us.apache.org/repos/asf/ambari/repo Commit: http://git-wip-us.apache.org/repos/asf/ambari/commit/11ab63f7 Tree: http://git-wip-us.apache.org/repos/asf/ambari/tree/11ab63f7 Diff: http://git-wip-us.apache.org/repos/asf/ambari/diff/11ab63f7 Branch: refs/heads/trunk Commit: 11ab63f7ffa99a7d381b86282846fa7c80ddc88e Parents: 273653b Author: Vitaly Brodetskyi <vbrodets...@hortonworks.com> Authored: Wed Apr 12 16:30:34 2017 +0300 Committer: Vitaly Brodetskyi <vbrodets...@hortonworks.com> Committed: Wed Apr 12 16:30:34 2017 +0300 ---------------------------------------------------------------------- .../checks/DatabaseConsistencyCheckHelper.java | 117 +++++++++ .../apache/ambari/server/orm/DBAccessor.java | 6 + .../ambari/server/orm/DBAccessorImpl.java | 5 + .../server/orm/dao/HostRoleCommandDAO.java | 16 ++ .../ambari/server/orm/dao/RequestDAO.java | 260 ++++++++++++++++++- .../server/orm/dao/TopologyHostTaskDAO.java | 11 + .../orm/dao/TopologyLogicalRequestDAO.java | 12 + .../server/orm/dao/TopologyLogicalTaskDAO.java | 12 + .../orm/entities/ExecutionCommandEntity.java | 5 + .../orm/entities/HostRoleCommandEntity.java | 10 +- .../server/orm/entities/RequestEntity.java | 6 + .../entities/RequestOperationLevelEntity.java | 4 +- .../entities/RequestResourceFilterEntity.java | 5 + .../orm/entities/RoleSuccessCriteriaEntity.java | 5 + .../ambari/server/orm/entities/StageEntity.java | 6 +- .../orm/entities/TopologyHostRequestEntity.java | 5 + .../orm/entities/TopologyHostTaskEntity.java | 15 +- .../entities/TopologyLogicalRequestEntity.java | 5 + .../orm/entities/TopologyLogicalTaskEntity.java | 30 ++- .../server/orm/entities/UpgradeEntity.java | 2 + .../server/orm/entities/UpgradeItemEntity.java | 5 + ambari-server/src/main/python/ambari-server.py | 10 +- .../src/main/python/ambari_server/dbCleanup.py | 37 +-- .../DatabaseConsistencyCheckHelperTest.java | 66 +++++ 24 files changed, 620 insertions(+), 35 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ambari/blob/11ab63f7/ambari-server/src/main/java/org/apache/ambari/server/checks/DatabaseConsistencyCheckHelper.java ---------------------------------------------------------------------- diff --git a/ambari-server/src/main/java/org/apache/ambari/server/checks/DatabaseConsistencyCheckHelper.java b/ambari-server/src/main/java/org/apache/ambari/server/checks/DatabaseConsistencyCheckHelper.java index e7e9433..b2a03e4 100644 --- a/ambari-server/src/main/java/org/apache/ambari/server/checks/DatabaseConsistencyCheckHelper.java +++ b/ambari-server/src/main/java/org/apache/ambari/server/checks/DatabaseConsistencyCheckHelper.java @@ -24,6 +24,7 @@ import java.sql.ResultSet; import java.sql.SQLException; import java.sql.Statement; import java.util.ArrayList; +import java.util.Arrays; import java.util.Collection; import java.util.HashMap; import java.util.HashSet; @@ -46,9 +47,12 @@ import org.apache.ambari.server.api.services.AmbariMetaInfo; import org.apache.ambari.server.configuration.Configuration; import org.apache.ambari.server.orm.DBAccessor; import org.apache.ambari.server.orm.dao.ClusterDAO; +import org.apache.ambari.server.orm.dao.ExecutionCommandDAO; import org.apache.ambari.server.orm.dao.HostComponentDesiredStateDAO; import org.apache.ambari.server.orm.dao.HostComponentStateDAO; +import org.apache.ambari.server.orm.dao.HostRoleCommandDAO; import org.apache.ambari.server.orm.dao.MetainfoDAO; +import org.apache.ambari.server.orm.dao.StageDAO; import org.apache.ambari.server.orm.entities.ClusterConfigEntity; import org.apache.ambari.server.orm.entities.HostComponentDesiredStateEntity; import org.apache.ambari.server.orm.entities.HostComponentStateEntity; @@ -81,6 +85,9 @@ public class DatabaseConsistencyCheckHelper { private static Connection connection; private static AmbariMetaInfo ambariMetaInfo; private static DBAccessor dbAccessor; + private static HostRoleCommandDAO hostRoleCommandDAO; + private static ExecutionCommandDAO executionCommandDAO; + private static StageDAO stageDAO; private static DatabaseConsistencyCheckResult checkResult = DatabaseConsistencyCheckResult.DB_CHECK_SUCCESS; @@ -174,6 +181,7 @@ public class DatabaseConsistencyCheckHelper { checkHostComponentStates(); checkServiceConfigs(); checkTopologyTables(); + checkForLargeTables(); LOG.info("******************************* Check database completed *******************************"); return checkResult; } @@ -223,6 +231,115 @@ public class DatabaseConsistencyCheckHelper { } /** + * This method checks if ambari database has tables with too big size (according to limit). + * First of all we are trying to get table size from schema information, but if it's not possible, + * we will get tables rows count and compare it with row count limit. + */ + static void checkForLargeTables() { + LOG.info("Checking for tables with large physical size"); + + ensureConnection(); + + DBAccessor.DbType dbType = dbAccessor.getDbType(); + String schemaName = dbAccessor.getDbSchema(); + + String GET_TABLE_SIZE_IN_BYTES_POSTGRESQL = "SELECT pg_total_relation_size('%s') \"Table Size\""; + String GET_TABLE_SIZE_IN_BYTES_MYSQL = "SELECT (data_length + index_length) \"Table Size\" FROM information_schema.TABLES WHERE table_schema = \"" + schemaName + "\" AND table_name =\"%s\""; + String GET_TABLE_SIZE_IN_BYTES_ORACLE = "SELECT bytes \"Table Size\" FROM user_segments WHERE segment_type='TABLE' AND segment_name='%s'"; + String GET_ROW_COUNT_QUERY = "SELECT COUNT(*) FROM %s"; + + Map<DBAccessor.DbType, String> tableSizeQueryMap = new HashMap<>(); + tableSizeQueryMap.put(DBAccessor.DbType.POSTGRES, GET_TABLE_SIZE_IN_BYTES_POSTGRESQL); + tableSizeQueryMap.put(DBAccessor.DbType.MYSQL, GET_TABLE_SIZE_IN_BYTES_MYSQL); + tableSizeQueryMap.put(DBAccessor.DbType.ORACLE, GET_TABLE_SIZE_IN_BYTES_ORACLE); + + List<String> tablesToCheck = Arrays.asList("host_role_command", "execution_command", "stage", "request", "alert_history"); + + final double TABLE_SIZE_LIMIT_MB = 3000.0; + final int TABLE_ROW_COUNT_LIMIT = 3000000; + + String findTableSizeQuery = tableSizeQueryMap.get(dbType); + + if (dbType == DBAccessor.DbType.ORACLE) { + for (int i = 0;i < tablesToCheck.size(); i++) { + tablesToCheck.set(i, tablesToCheck.get(i).toUpperCase()); + } + } + + for (String tableName : tablesToCheck) { + + ResultSet rs = null; + Statement statement = null; + Double tableSizeInMB = null; + Long tableSizeInBytes = null; + int tableRowCount = -1; + + try { + statement = connection.createStatement(ResultSet.TYPE_SCROLL_SENSITIVE, ResultSet.CONCUR_UPDATABLE); + rs = statement.executeQuery(String.format(findTableSizeQuery, tableName)); + if (rs != null) { + while (rs.next()) { + tableSizeInBytes = rs.getLong(1); + if (tableSizeInBytes != null) { + tableSizeInMB = tableSizeInBytes / 1024.0 / 1024.0; + } + } + } + + if (tableSizeInMB != null && tableSizeInMB > TABLE_SIZE_LIMIT_MB) { + warning("The database table {} is currently {} MB (limit is {}) and may impact performance. It is recommended " + + "that you reduce its size by executing \"ambari-server db-cleanup\".", + tableName, tableSizeInMB, TABLE_SIZE_LIMIT_MB); + } else if (tableSizeInMB != null && tableSizeInMB < TABLE_SIZE_LIMIT_MB) { + LOG.info(String.format("The database table %s is currently %.3f MB and is within normal limits (%.3f)", + tableName, tableSizeInMB, TABLE_SIZE_LIMIT_MB)); + } else { + throw new Exception(); + } + } catch (Exception e) { + LOG.error(String.format("Failed to get %s table size from database, will check row count: ", tableName), e); + try { + rs = statement.executeQuery(String.format(GET_ROW_COUNT_QUERY, tableName)); + if (rs != null) { + while (rs.next()) { + tableRowCount = rs.getInt(1); + } + } + + if (tableRowCount > TABLE_ROW_COUNT_LIMIT) { + warning("The database table {} currently has {} rows (limit is {}) and may impact performance. It is " + + "recommended that you reduce its size by executing \"ambari-server db-cleanup\".", + tableName, tableRowCount, TABLE_ROW_COUNT_LIMIT); + } else if (tableRowCount != -1 && tableRowCount < TABLE_ROW_COUNT_LIMIT) { + LOG.info(String.format("The database table %s currently has %d rows and is within normal limits (%d)", tableName, tableRowCount, TABLE_ROW_COUNT_LIMIT)); + } else { + throw new SQLException(); + } + } catch (SQLException ex) { + LOG.error(String.format("Failed to get %s row count: ", tableName), e); + } + } finally { + if (rs != null) { + try { + rs.close(); + } catch (SQLException e) { + LOG.error("Exception occurred during result set closing procedure: ", e); + } + } + + if (statement != null) { + try { + statement.close(); + } catch (SQLException e) { + LOG.error("Exception occurred during statement closing procedure: ", e); + } + } + } + } + + } + + /** * This method checks if any config type in clusterconfig table, has more than * one versions selected. If config version is selected(in selected column = * 1), it means that this version of config is actual. So, if any config type http://git-wip-us.apache.org/repos/asf/ambari/blob/11ab63f7/ambari-server/src/main/java/org/apache/ambari/server/orm/DBAccessor.java ---------------------------------------------------------------------- diff --git a/ambari-server/src/main/java/org/apache/ambari/server/orm/DBAccessor.java b/ambari-server/src/main/java/org/apache/ambari/server/orm/DBAccessor.java index c132a3d..ae07dc0 100644 --- a/ambari-server/src/main/java/org/apache/ambari/server/orm/DBAccessor.java +++ b/ambari-server/src/main/java/org/apache/ambari/server/orm/DBAccessor.java @@ -639,6 +639,12 @@ public interface DBAccessor { DbType getDbType(); /** + * Get database schema name + * @return @dbSchema + */ + String getDbSchema(); + + /** * Capture column type */ class DBColumnInfo { http://git-wip-us.apache.org/repos/asf/ambari/blob/11ab63f7/ambari-server/src/main/java/org/apache/ambari/server/orm/DBAccessorImpl.java ---------------------------------------------------------------------- diff --git a/ambari-server/src/main/java/org/apache/ambari/server/orm/DBAccessorImpl.java b/ambari-server/src/main/java/org/apache/ambari/server/orm/DBAccessorImpl.java index 1dd3b54..c11589d 100644 --- a/ambari-server/src/main/java/org/apache/ambari/server/orm/DBAccessorImpl.java +++ b/ambari-server/src/main/java/org/apache/ambari/server/orm/DBAccessorImpl.java @@ -233,6 +233,11 @@ public class DBAccessorImpl implements DBAccessor { } @Override + public String getDbSchema() { + return dbSchema; + } + + @Override public boolean tableHasData(String tableName) throws SQLException { String query = "SELECT count(*) from " + tableName; Statement statement = getConnection().createStatement(); http://git-wip-us.apache.org/repos/asf/ambari/blob/11ab63f7/ambari-server/src/main/java/org/apache/ambari/server/orm/dao/HostRoleCommandDAO.java ---------------------------------------------------------------------- diff --git a/ambari-server/src/main/java/org/apache/ambari/server/orm/dao/HostRoleCommandDAO.java b/ambari-server/src/main/java/org/apache/ambari/server/orm/dao/HostRoleCommandDAO.java index 7318162..6b34575 100644 --- a/ambari-server/src/main/java/org/apache/ambari/server/orm/dao/HostRoleCommandDAO.java +++ b/ambari-server/src/main/java/org/apache/ambari/server/orm/dao/HostRoleCommandDAO.java @@ -993,4 +993,20 @@ public class HostRoleCommandDAO { return HostRoleCommandEntity_.getPredicateMapping().get(propertyId); } } + + public List<Long> findTaskIdsByRequestStageIds(List<RequestDAO.StageEntityPK> requestStageIds) { + EntityManager entityManager = entityManagerProvider.get(); + List<Long> taskIds = new ArrayList<Long>(); + for (RequestDAO.StageEntityPK requestIds : requestStageIds) { + TypedQuery<Long> hostRoleCommandQuery = + entityManager.createNamedQuery("HostRoleCommandEntity.findTaskIdsByRequestStageIds", Long.class); + + hostRoleCommandQuery.setParameter("requestId", requestIds.getRequestId()); + hostRoleCommandQuery.setParameter("stageId", requestIds.getStageId()); + + taskIds.addAll(daoUtils.selectList(hostRoleCommandQuery)); + } + + return taskIds; + } } http://git-wip-us.apache.org/repos/asf/ambari/blob/11ab63f7/ambari-server/src/main/java/org/apache/ambari/server/orm/dao/RequestDAO.java ---------------------------------------------------------------------- diff --git a/ambari-server/src/main/java/org/apache/ambari/server/orm/dao/RequestDAO.java b/ambari-server/src/main/java/org/apache/ambari/server/orm/dao/RequestDAO.java index 2696f66..5d53416 100644 --- a/ambari-server/src/main/java/org/apache/ambari/server/orm/dao/RequestDAO.java +++ b/ambari-server/src/main/java/org/apache/ambari/server/orm/dao/RequestDAO.java @@ -19,27 +19,54 @@ package org.apache.ambari.server.orm.dao; import java.text.MessageFormat; +import java.util.ArrayList; import java.util.Collection; import java.util.Collections; +import java.util.Date; +import java.util.HashSet; +import java.util.Iterator; +import java.util.LinkedList; import java.util.List; +import java.util.Set; import javax.persistence.EntityManager; import javax.persistence.TypedQuery; +import org.apache.ambari.server.AmbariException; import org.apache.ambari.server.actionmanager.HostRoleStatus; +import org.apache.ambari.server.cleanup.TimeBasedCleanupPolicy; import org.apache.ambari.server.orm.RequiresSession; +import org.apache.ambari.server.orm.entities.ExecutionCommandEntity; +import org.apache.ambari.server.orm.entities.HostRoleCommandEntity; import org.apache.ambari.server.orm.entities.RequestEntity; +import org.apache.ambari.server.orm.entities.RequestOperationLevelEntity; import org.apache.ambari.server.orm.entities.RequestResourceFilterEntity; +import org.apache.ambari.server.orm.entities.RoleSuccessCriteriaEntity; +import org.apache.ambari.server.orm.entities.StageEntity; +import org.apache.ambari.server.orm.entities.TopologyHostRequestEntity; +import org.apache.ambari.server.orm.entities.TopologyHostTaskEntity; +import org.apache.ambari.server.orm.entities.TopologyLogicalTaskEntity; +import org.apache.ambari.server.state.Clusters; import org.eclipse.persistence.config.HintValues; import org.eclipse.persistence.config.QueryHints; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import com.google.common.collect.Lists; +import com.google.common.collect.Sets; import com.google.inject.Inject; import com.google.inject.Provider; import com.google.inject.Singleton; import com.google.inject.persist.Transactional; @Singleton -public class RequestDAO { +public class RequestDAO implements Cleanable { + + private static final Logger LOG = LoggerFactory.getLogger(RequestDAO.class); + + + private static final int BATCH_SIZE = 999; + /** * SQL template to retrieve all request IDs, sorted by the ID. */ @@ -64,6 +91,27 @@ public class RequestDAO { @Inject DaoUtils daoUtils; + @Inject + private Provider<Clusters> m_clusters; + + @Inject + private HostRoleCommandDAO hostRoleCommandDAO; + + @Inject + private StageDAO stageDAO; + + @Inject + private TopologyLogicalTaskDAO topologyLogicalTaskDAO; + + @Inject + private TopologyHostTaskDAO topologyHostTaskDAO; + + @Inject + private TopologyLogicalRequestDAO topologyLogicalRequestDAO; + + @Inject + private TopologyRequestDAO topologyRequestDAO; + @RequiresSession public RequestEntity findByPK(Long requestId) { return entityManagerProvider.get().find(RequestEntity.class, requestId); @@ -197,4 +245,214 @@ public class RequestDAO { return daoUtils.selectList(query); } + + public static final class StageEntityPK { + private Long requestId; + private Long stageId; + + public StageEntityPK(Long requestId, Long stageId) { + this.requestId = requestId; + this.stageId = stageId; + } + + public Long getStageId() { + return stageId; + } + + public void setStageId(Long stageId) { + this.stageId = stageId; + } + + public Long getRequestId() { + return requestId; + } + + public void setRequestId(Long requestId) { + this.requestId = requestId; + } + } + + /** + * Search for all request ids in Upgrade table + * @return the list of request ids + */ + private List<Long> findAllRequestIdsFromUpgrade() { + EntityManager entityManager = entityManagerProvider.get(); + TypedQuery<Long> upgradeQuery = + entityManager.createNamedQuery("UpgradeEntity.findAllRequestIds", Long.class); + + return daoUtils.selectList(upgradeQuery); + } + + /** + * Search for all request and stage ids in Request and Stage tables + * @return the list of request/stage ids + */ + public List<StageEntityPK> findRequestAndStageIdsInClusterBeforeDate(Long clusterId, long beforeDateMillis) { + EntityManager entityManager = entityManagerProvider.get(); + TypedQuery<StageEntityPK> requestQuery = + entityManager.createNamedQuery("RequestEntity.findRequestStageIdsInClusterBeforeDate", StageEntityPK.class); + + requestQuery.setParameter("clusterId", clusterId); + requestQuery.setParameter("beforeDate", beforeDateMillis); + + return daoUtils.selectList(requestQuery); + } + + /** + * In this method we are removing entities using passed ids, + * To prevent issues we are using batch request to remove limited + * count of entities. + * @param ids list of ids that we are using to remove rows from table + * @param paramName name of parameter that we are using in sql query (taskIds, stageIds) + * @param entityName name of entity which we will remove + * @param beforeDateMillis timestamp which was set by user (remove all entities that were created before), + * we are using it only for logging + * @param entityQuery name of NamedQuery which we will use to remove needed entities + * @param type type of entity class which we will use for casting query result + * @return rows count that were removed + */ + @Transactional + protected <T> int cleanTableByIds(Set<Long> ids, String paramName, String entityName, Long beforeDateMillis, + String entityQuery, Class<T> type) { + LOG.info(String.format("Deleting %s entities before date %s", entityName, new Date(beforeDateMillis))); + EntityManager entityManager = entityManagerProvider.get(); + int affectedRows = 0; + // Batch delete + TypedQuery<T> query = entityManager.createNamedQuery(entityQuery, type); + if (ids != null && !ids.isEmpty()) { + for (int i = 0; i < ids.size(); i += BATCH_SIZE) { + int endRow = (i + BATCH_SIZE) > ids.size() ? ids.size() : (i + BATCH_SIZE); + List<Long> idsSubList = new ArrayList<>(ids).subList(i, endRow); + LOG.info("Deleting " + entityName + " entity batch with task ids: " + + idsSubList.get(0) + " - " + idsSubList.get(idsSubList.size() - 1)); + query.setParameter(paramName, idsSubList); + affectedRows += query.executeUpdate(); + } + } + + return affectedRows; + } + + /** + * In this method we are removing entities using passed few ids, + * To prevent issues we are using batch request to remove limited + * count of entities. + * @param ids list of ids pairs that we are using to remove rows from table + * @param paramNames list of two names of parameters that we are using in sql query (taskIds, stageIds) + * @param entityName name of entity which we will remove + * @param beforeDateMillis timestamp which was set by user (remove all entities that were created before), + * we are using it only for logging + * @param entityQuery name of NamedQuery which we will use to remove needed entities + * @param type type of entity class which we will use for casting query result + * @return rows count that were removed + */ + @Transactional + protected <T> int cleanTableByStageEntityPK(List<StageEntityPK> ids, LinkedList<String> paramNames, String entityName, Long beforeDateMillis, + String entityQuery, Class<T> type) { + LOG.info(String.format("Deleting %s entities before date %s", entityName, new Date(beforeDateMillis))); + EntityManager entityManager = entityManagerProvider.get(); + int affectedRows = 0; + // Batch delete + TypedQuery<T> query = entityManager.createNamedQuery(entityQuery, type); + if (ids != null && !ids.isEmpty()) { + for (int i = 0; i < ids.size(); i += BATCH_SIZE) { + int endRow = (i + BATCH_SIZE) > ids.size() ? ids.size() : (i + BATCH_SIZE); + List<StageEntityPK> idsSubList = new ArrayList<>(ids).subList(i, endRow); + LOG.info("Deleting " + entityName + " entity batch with task ids: " + + idsSubList.get(0) + " - " + idsSubList.get(idsSubList.size() - 1)); + for (StageEntityPK requestIds : idsSubList) { + query.setParameter(paramNames.get(0), requestIds.getStageId()); + query.setParameter(paramNames.get(1), requestIds.getRequestId()); + affectedRows += query.executeUpdate(); + } + } + } + + return affectedRows; + } + + @Transactional + @Override + public long cleanup(TimeBasedCleanupPolicy policy) { + long affectedRows = 0; + Long clusterId = null; + try { + clusterId = m_clusters.get().getCluster(policy.getClusterName()).getClusterId(); + // find request and stage ids that were created before date populated by user. + List<StageEntityPK> requestStageIds = findRequestAndStageIdsInClusterBeforeDate(clusterId, policy.getToDateInMillis()); + + // find request ids from Upgrade table and exclude these ids from + // request ids set that we already have. We don't want to make any changes for upgrade + Set<Long> requestIdsFromUpgrade = Sets.newHashSet(findAllRequestIdsFromUpgrade()); + Iterator<StageEntityPK> requestStageIdsIterator = requestStageIds.iterator(); + while (requestStageIdsIterator.hasNext()) { + StageEntityPK nextRequestStageIds = requestStageIdsIterator.next(); + if (requestIdsFromUpgrade.contains(nextRequestStageIds.getRequestId())) { + requestStageIdsIterator.remove(); + } + } + + + Set<Long> requestIds = new HashSet<>(); + for (StageEntityPK ids : requestStageIds) { + requestIds.add(ids.getRequestId()); + } + + // find task ids using request stage ids + Set<Long> taskIds = Sets.newHashSet(hostRoleCommandDAO.findTaskIdsByRequestStageIds(requestStageIds)); + LinkedList<String> params = new LinkedList<>(); + params.add("stageId"); + params.add("requestId"); + + // find host task ids, to find related host requests and also to remove needed host tasks + List<Long> hostTaskIds = new ArrayList<>(); + if (taskIds != null && !taskIds.isEmpty()) { + hostTaskIds = topologyLogicalTaskDAO.findHostTaskIdsByPhysicalTaskIds(Lists.newArrayList(taskIds)); + } + + // find host request ids by host task ids to remove later needed host requests + List<Long> hostRequestIds = new ArrayList<>(); + if (!hostTaskIds.isEmpty()) { + hostRequestIds = topologyHostTaskDAO.findHostRequestIdsByHostTaskIds(hostTaskIds); + } + + List<Long> topologyRequestIds = new ArrayList<>(); + if (!hostRequestIds.isEmpty()) { + topologyRequestIds = topologyLogicalRequestDAO.findRequestIdsByIds(hostRequestIds); + } + + + //removing all entities one by one according to their relations using stage, task and request ids + affectedRows += cleanTableByIds(taskIds, "taskIds", "ExecutionCommand", policy.getToDateInMillis(), + "ExecutionCommandEntity.removeByTaskIds", ExecutionCommandEntity.class); + affectedRows += cleanTableByIds(taskIds, "taskIds", "TopologyLogicalTask", policy.getToDateInMillis(), + "TopologyLogicalTaskEntity.removeByPhysicalTaskIds", TopologyLogicalTaskEntity.class); + affectedRows += cleanTableByIds(Sets.newHashSet(hostTaskIds), "hostTaskIds", "TopologyHostTask", policy.getToDateInMillis(), + "TopologyHostTaskEntity.removeByTaskIds", TopologyHostTaskEntity.class); + affectedRows += cleanTableByIds(Sets.newHashSet(hostRequestIds), "hostRequestIds", "TopologyHostRequest", policy.getToDateInMillis(), + "TopologyHostRequestEntity.removeByIds", TopologyHostRequestEntity.class); + for (Long topologyRequestId : topologyRequestIds) { + topologyRequestDAO.removeByPK(topologyRequestId); + } + affectedRows += cleanTableByIds(taskIds, "taskIds", "HostRoleCommand", policy.getToDateInMillis(), + "HostRoleCommandEntity.removeByTaskIds", HostRoleCommandEntity.class); + affectedRows += cleanTableByStageEntityPK(requestStageIds, params, "RoleSuccessCriteria", policy.getToDateInMillis(), + "RoleSuccessCriteriaEntity.removeByRequestStageIds", RoleSuccessCriteriaEntity.class); + affectedRows += cleanTableByStageEntityPK(requestStageIds, params, "Stage", policy.getToDateInMillis(), + "StageEntity.removeByRequestStageIds", StageEntity.class); + affectedRows += cleanTableByIds(requestIds, "requestIds", "RequestResourceFilter", policy.getToDateInMillis(), + "RequestResourceFilterEntity.removeByRequestIds", RequestResourceFilterEntity.class); + affectedRows += cleanTableByIds(requestIds, "requestIds", "RequestOperationLevel", policy.getToDateInMillis(), + "RequestOperationLevelEntity.removeByRequestIds", RequestOperationLevelEntity.class); + affectedRows += cleanTableByIds(requestIds, "requestIds", "Request", policy.getToDateInMillis(), + "RequestEntity.removeByRequestIds", RequestEntity.class); + + } catch (AmbariException e) { + LOG.error("Error while looking up cluster with name: {}", policy.getClusterName(), e); + throw new IllegalStateException(e); + } + + return affectedRows; + } } http://git-wip-us.apache.org/repos/asf/ambari/blob/11ab63f7/ambari-server/src/main/java/org/apache/ambari/server/orm/dao/TopologyHostTaskDAO.java ---------------------------------------------------------------------- diff --git a/ambari-server/src/main/java/org/apache/ambari/server/orm/dao/TopologyHostTaskDAO.java b/ambari-server/src/main/java/org/apache/ambari/server/orm/dao/TopologyHostTaskDAO.java index 02532db..1b18ffe 100644 --- a/ambari-server/src/main/java/org/apache/ambari/server/orm/dao/TopologyHostTaskDAO.java +++ b/ambari-server/src/main/java/org/apache/ambari/server/orm/dao/TopologyHostTaskDAO.java @@ -54,6 +54,17 @@ public class TopologyHostTaskDAO { } @RequiresSession + public List<Long> findHostRequestIdsByHostTaskIds(List<Long> hostTaskIds) { + EntityManager entityManager = entityManagerProvider.get(); + TypedQuery<Long> topologyHostTaskQuery = + entityManager.createNamedQuery("TopologyLogicalTaskEntity.findHostRequestIdsByHostTaskIds", Long.class); + + topologyHostTaskQuery.setParameter("hostTaskIds", hostTaskIds); + + return daoUtils.selectList(topologyHostTaskQuery); + } + + @RequiresSession public List<TopologyHostTaskEntity> findAll() { return daoUtils.selectAll(entityManagerProvider.get(), TopologyHostTaskEntity.class); } http://git-wip-us.apache.org/repos/asf/ambari/blob/11ab63f7/ambari-server/src/main/java/org/apache/ambari/server/orm/dao/TopologyLogicalRequestDAO.java ---------------------------------------------------------------------- diff --git a/ambari-server/src/main/java/org/apache/ambari/server/orm/dao/TopologyLogicalRequestDAO.java b/ambari-server/src/main/java/org/apache/ambari/server/orm/dao/TopologyLogicalRequestDAO.java index e917dc2..ce1131a 100644 --- a/ambari-server/src/main/java/org/apache/ambari/server/orm/dao/TopologyLogicalRequestDAO.java +++ b/ambari-server/src/main/java/org/apache/ambari/server/orm/dao/TopologyLogicalRequestDAO.java @@ -20,6 +20,7 @@ package org.apache.ambari.server.orm.dao; import java.util.List; import javax.persistence.EntityManager; +import javax.persistence.TypedQuery; import org.apache.ambari.server.orm.RequiresSession; import org.apache.ambari.server.orm.entities.TopologyLogicalRequestEntity; @@ -61,4 +62,15 @@ public class TopologyLogicalRequestDAO { public void remove(TopologyLogicalRequestEntity requestEntity) { entityManagerProvider.get().remove(requestEntity); } + + @RequiresSession + public List<Long> findRequestIdsByIds(List<Long> ids) { + EntityManager entityManager = entityManagerProvider.get(); + TypedQuery<Long> topologyLogicalRequestQuery = + entityManager.createNamedQuery("TopologyLogicalRequestEntity.findRequestIds", Long.class); + + topologyLogicalRequestQuery.setParameter("ids", ids); + + return daoUtils.selectList(topologyLogicalRequestQuery); + } } http://git-wip-us.apache.org/repos/asf/ambari/blob/11ab63f7/ambari-server/src/main/java/org/apache/ambari/server/orm/dao/TopologyLogicalTaskDAO.java ---------------------------------------------------------------------- diff --git a/ambari-server/src/main/java/org/apache/ambari/server/orm/dao/TopologyLogicalTaskDAO.java b/ambari-server/src/main/java/org/apache/ambari/server/orm/dao/TopologyLogicalTaskDAO.java index 35f47a7..780a3ba 100644 --- a/ambari-server/src/main/java/org/apache/ambari/server/orm/dao/TopologyLogicalTaskDAO.java +++ b/ambari-server/src/main/java/org/apache/ambari/server/orm/dao/TopologyLogicalTaskDAO.java @@ -20,6 +20,7 @@ package org.apache.ambari.server.orm.dao; import java.util.List; import javax.persistence.EntityManager; +import javax.persistence.TypedQuery; import org.apache.ambari.server.orm.RequiresSession; import org.apache.ambari.server.orm.entities.TopologyLogicalTaskEntity; @@ -43,6 +44,17 @@ public class TopologyLogicalTaskDAO { } @RequiresSession + public List<Long> findHostTaskIdsByPhysicalTaskIds(List<Long> physicalTaskIds) { + EntityManager entityManager = entityManagerProvider.get(); + TypedQuery<Long> topologyHostTaskQuery = + entityManager.createNamedQuery("TopologyLogicalTaskEntity.findHostTaskIdsByPhysicalTaskIds", Long.class); + + topologyHostTaskQuery.setParameter("physicalTaskIds", physicalTaskIds); + + return daoUtils.selectList(topologyHostTaskQuery); + } + + @RequiresSession public List<TopologyLogicalTaskEntity> findAll() { return daoUtils.selectAll(entityManagerProvider.get(), TopologyLogicalTaskEntity.class); } http://git-wip-us.apache.org/repos/asf/ambari/blob/11ab63f7/ambari-server/src/main/java/org/apache/ambari/server/orm/entities/ExecutionCommandEntity.java ---------------------------------------------------------------------- diff --git a/ambari-server/src/main/java/org/apache/ambari/server/orm/entities/ExecutionCommandEntity.java b/ambari-server/src/main/java/org/apache/ambari/server/orm/entities/ExecutionCommandEntity.java index 85f3a25..7015709 100644 --- a/ambari-server/src/main/java/org/apache/ambari/server/orm/entities/ExecutionCommandEntity.java +++ b/ambari-server/src/main/java/org/apache/ambari/server/orm/entities/ExecutionCommandEntity.java @@ -26,11 +26,16 @@ import javax.persistence.Entity; import javax.persistence.Id; import javax.persistence.JoinColumn; import javax.persistence.Lob; +import javax.persistence.NamedQueries; +import javax.persistence.NamedQuery; import javax.persistence.OneToOne; import javax.persistence.Table; @Table(name = "execution_command") @Entity +@NamedQueries({ + @NamedQuery(name = "ExecutionCommandEntity.removeByTaskIds", query = "DELETE FROM ExecutionCommandEntity command WHERE command.taskId IN :taskIds") +}) public class ExecutionCommandEntity { @Id http://git-wip-us.apache.org/repos/asf/ambari/blob/11ab63f7/ambari-server/src/main/java/org/apache/ambari/server/orm/entities/HostRoleCommandEntity.java ---------------------------------------------------------------------- diff --git a/ambari-server/src/main/java/org/apache/ambari/server/orm/entities/HostRoleCommandEntity.java b/ambari-server/src/main/java/org/apache/ambari/server/orm/entities/HostRoleCommandEntity.java index fdec5f0..6197940 100644 --- a/ambari-server/src/main/java/org/apache/ambari/server/orm/entities/HostRoleCommandEntity.java +++ b/ambari-server/src/main/java/org/apache/ambari/server/orm/entities/HostRoleCommandEntity.java @@ -60,6 +60,7 @@ import org.apache.commons.lang.ArrayUtils; , initialValue = 1 ) @NamedQueries({ + @NamedQuery(name = "HostRoleCommandEntity.findTaskIdsByRequestStageIds", query = "SELECT command.taskId FROM HostRoleCommandEntity command WHERE command.stageId = :stageId AND command.requestId = :requestId"), @NamedQuery(name = "HostRoleCommandEntity.findCountByCommandStatuses", query = "SELECT COUNT(command.taskId) FROM HostRoleCommandEntity command WHERE command.status IN :statuses"), @NamedQuery(name = "HostRoleCommandEntity.findByRequestIdAndStatuses", query="SELECT task FROM HostRoleCommandEntity task WHERE task.requestId=:requestId AND task.status IN :statuses ORDER BY task.taskId ASC"), @NamedQuery(name = "HostRoleCommandEntity.findTasksByStatusesOrderByIdDesc", query = "SELECT task FROM HostRoleCommandEntity task WHERE task.requestId = :requestId AND task.status IN :statuses ORDER BY task.taskId DESC"), @@ -71,12 +72,9 @@ import org.apache.commons.lang.ArrayUtils; @NamedQuery(name = "HostRoleCommandEntity.findByStatusBetweenStages", query = "SELECT command FROM HostRoleCommandEntity command WHERE command.requestId = :requestId AND command.stageId >= :minStageId AND command.stageId <= :maxStageId AND command.status = :status"), @NamedQuery(name = "HostRoleCommandEntity.updateAutoSkipExcludeRoleCommand", query = "UPDATE HostRoleCommandEntity command SET command.autoSkipOnFailure = :autoSkipOnFailure WHERE command.requestId = :requestId AND command.roleCommand <> :roleCommand"), @NamedQuery(name = "HostRoleCommandEntity.updateAutoSkipForRoleCommand", query = "UPDATE HostRoleCommandEntity command SET command.autoSkipOnFailure = :autoSkipOnFailure WHERE command.requestId = :requestId AND command.roleCommand = :roleCommand"), - @NamedQuery( - name = "HostRoleCommandEntity.findHostsByCommandStatus", - query = "SELECT DISTINCT(host.hostName) FROM HostRoleCommandEntity command, HostEntity host WHERE (command.requestId >= :iLowestRequestIdInProgress AND command.requestId <= :iHighestRequestIdInProgress) AND command.status IN :statuses AND command.hostId = host.hostId AND host.hostName IS NOT NULL"), - @NamedQuery( - name = "HostRoleCommandEntity.getBlockingHostsForRequest", - query = "SELECT DISTINCT(host.hostName) FROM HostRoleCommandEntity command, HostEntity host WHERE command.requestId >= :lowerRequestIdInclusive AND command.requestId < :upperRequestIdExclusive AND command.status IN :statuses AND command.isBackgroundCommand=0 AND command.hostId = host.hostId AND host.hostName IS NOT NULL") + @NamedQuery(name = "HostRoleCommandEntity.removeByTaskIds", query = "DELETE FROM HostRoleCommandEntity command WHERE command.taskId IN :taskIds"), + @NamedQuery(name = "HostRoleCommandEntity.findHostsByCommandStatus", query = "SELECT DISTINCT(host.hostName) FROM HostRoleCommandEntity command, HostEntity host WHERE (command.requestId >= :iLowestRequestIdInProgress AND command.requestId <= :iHighestRequestIdInProgress) AND command.status IN :statuses AND command.hostId = host.hostId AND host.hostName IS NOT NULL"), + @NamedQuery(name = "HostRoleCommandEntity.getBlockingHostsForRequest", query = "SELECT DISTINCT(host.hostName) FROM HostRoleCommandEntity command, HostEntity host WHERE command.requestId >= :lowerRequestIdInclusive AND command.requestId < :upperRequestIdExclusive AND command.status IN :statuses AND command.isBackgroundCommand=0 AND command.hostId = host.hostId AND host.hostName IS NOT NULL") }) public class HostRoleCommandEntity { http://git-wip-us.apache.org/repos/asf/ambari/blob/11ab63f7/ambari-server/src/main/java/org/apache/ambari/server/orm/entities/RequestEntity.java ---------------------------------------------------------------------- diff --git a/ambari-server/src/main/java/org/apache/ambari/server/orm/entities/RequestEntity.java b/ambari-server/src/main/java/org/apache/ambari/server/orm/entities/RequestEntity.java index f19aa72..099d08f 100644 --- a/ambari-server/src/main/java/org/apache/ambari/server/orm/entities/RequestEntity.java +++ b/ambari-server/src/main/java/org/apache/ambari/server/orm/entities/RequestEntity.java @@ -30,6 +30,8 @@ import javax.persistence.Id; import javax.persistence.JoinColumn; import javax.persistence.Lob; import javax.persistence.ManyToOne; +import javax.persistence.NamedQueries; +import javax.persistence.NamedQuery; import javax.persistence.OneToMany; import javax.persistence.OneToOne; import javax.persistence.Table; @@ -39,6 +41,10 @@ import org.apache.ambari.server.actionmanager.RequestType; @Table(name = "request") @Entity +@NamedQueries({ + @NamedQuery(name = "RequestEntity.findRequestStageIdsInClusterBeforeDate", query = "SELECT NEW org.apache.ambari.server.orm.dao.RequestDAO.StageEntityPK(request.requestId, stage.stageId) FROM RequestEntity request JOIN StageEntity stage ON request.requestId = stage.requestId WHERE request.clusterId = :clusterId AND request.createTime <= :beforeDate"), + @NamedQuery(name = "RequestEntity.removeByRequestIds", query = "DELETE FROM RequestEntity request WHERE request.requestId IN :requestIds") +}) public class RequestEntity { @Column(name = "request_id") http://git-wip-us.apache.org/repos/asf/ambari/blob/11ab63f7/ambari-server/src/main/java/org/apache/ambari/server/orm/entities/RequestOperationLevelEntity.java ---------------------------------------------------------------------- diff --git a/ambari-server/src/main/java/org/apache/ambari/server/orm/entities/RequestOperationLevelEntity.java b/ambari-server/src/main/java/org/apache/ambari/server/orm/entities/RequestOperationLevelEntity.java index ff14e3a..a7cd0d0 100644 --- a/ambari-server/src/main/java/org/apache/ambari/server/orm/entities/RequestOperationLevelEntity.java +++ b/ambari-server/src/main/java/org/apache/ambari/server/orm/entities/RequestOperationLevelEntity.java @@ -40,7 +40,9 @@ import javax.persistence.TableGenerator; @NamedQueries({ @NamedQuery(name = "requestOperationLevelByHostId", query = "SELECT requestOperationLevel FROM RequestOperationLevelEntity requestOperationLevel " + - "WHERE requestOperationLevel.hostId=:hostId") + "WHERE requestOperationLevel.hostId=:hostId"), + @NamedQuery(name = "RequestOperationLevelEntity.removeByRequestIds", + query = "DELETE FROM RequestOperationLevelEntity requestOperationLevel WHERE requestOperationLevel.requestId IN :requestIds") }) public class RequestOperationLevelEntity { http://git-wip-us.apache.org/repos/asf/ambari/blob/11ab63f7/ambari-server/src/main/java/org/apache/ambari/server/orm/entities/RequestResourceFilterEntity.java ---------------------------------------------------------------------- diff --git a/ambari-server/src/main/java/org/apache/ambari/server/orm/entities/RequestResourceFilterEntity.java b/ambari-server/src/main/java/org/apache/ambari/server/orm/entities/RequestResourceFilterEntity.java index 8ee41d2..9597db1 100644 --- a/ambari-server/src/main/java/org/apache/ambari/server/orm/entities/RequestResourceFilterEntity.java +++ b/ambari-server/src/main/java/org/apache/ambari/server/orm/entities/RequestResourceFilterEntity.java @@ -26,6 +26,8 @@ import javax.persistence.Id; import javax.persistence.JoinColumn; import javax.persistence.Lob; import javax.persistence.ManyToOne; +import javax.persistence.NamedQueries; +import javax.persistence.NamedQuery; import javax.persistence.Table; import javax.persistence.TableGenerator; @@ -36,6 +38,9 @@ import javax.persistence.TableGenerator; , pkColumnValue = "resourcefilter_id_seq" , initialValue = 1 ) +@NamedQueries({ + @NamedQuery(name = "RequestResourceFilterEntity.removeByRequestIds", query = "DELETE FROM RequestResourceFilterEntity filter WHERE filter.requestId IN :requestIds") +}) public class RequestResourceFilterEntity { @Id http://git-wip-us.apache.org/repos/asf/ambari/blob/11ab63f7/ambari-server/src/main/java/org/apache/ambari/server/orm/entities/RoleSuccessCriteriaEntity.java ---------------------------------------------------------------------- diff --git a/ambari-server/src/main/java/org/apache/ambari/server/orm/entities/RoleSuccessCriteriaEntity.java b/ambari-server/src/main/java/org/apache/ambari/server/orm/entities/RoleSuccessCriteriaEntity.java index 3386c24..66e7fd8 100644 --- a/ambari-server/src/main/java/org/apache/ambari/server/orm/entities/RoleSuccessCriteriaEntity.java +++ b/ambari-server/src/main/java/org/apache/ambari/server/orm/entities/RoleSuccessCriteriaEntity.java @@ -26,6 +26,8 @@ import javax.persistence.IdClass; import javax.persistence.JoinColumn; import javax.persistence.JoinColumns; import javax.persistence.ManyToOne; +import javax.persistence.NamedQueries; +import javax.persistence.NamedQuery; import javax.persistence.Table; import org.apache.ambari.server.Role; @@ -33,6 +35,9 @@ import org.apache.ambari.server.Role; @IdClass(org.apache.ambari.server.orm.entities.RoleSuccessCriteriaEntityPK.class) @Table(name = "role_success_criteria") @Entity +@NamedQueries({ + @NamedQuery(name = "RoleSuccessCriteriaEntity.removeByRequestStageIds", query = "DELETE FROM RoleSuccessCriteriaEntity criteria WHERE criteria.stageId = :stageId AND criteria.requestId = :requestId") +}) public class RoleSuccessCriteriaEntity { @Id http://git-wip-us.apache.org/repos/asf/ambari/blob/11ab63f7/ambari-server/src/main/java/org/apache/ambari/server/orm/entities/StageEntity.java ---------------------------------------------------------------------- diff --git a/ambari-server/src/main/java/org/apache/ambari/server/orm/entities/StageEntity.java b/ambari-server/src/main/java/org/apache/ambari/server/orm/entities/StageEntity.java index d035729..f688412 100644 --- a/ambari-server/src/main/java/org/apache/ambari/server/orm/entities/StageEntity.java +++ b/ambari-server/src/main/java/org/apache/ambari/server/orm/entities/StageEntity.java @@ -50,7 +50,11 @@ import org.apache.ambari.server.actionmanager.HostRoleStatus; query = "SELECT stage.requestId, MIN(stage.stageId) from StageEntity stage, HostRoleCommandEntity hrc WHERE hrc.status IN :statuses AND hrc.stageId = stage.stageId AND hrc.requestId = stage.requestId GROUP by stage.requestId ORDER BY stage.requestId"), @NamedQuery( name = "StageEntity.findByRequestIdAndCommandStatuses", - query = "SELECT stage from StageEntity stage WHERE stage.status IN :statuses AND stage.requestId = :requestId ORDER BY stage.stageId") }) + query = "SELECT stage from StageEntity stage WHERE stage.status IN :statuses AND stage.requestId = :requestId ORDER BY stage.stageId"), + @NamedQuery( + name = "StageEntity.removeByRequestStageIds", + query = "DELETE FROM StageEntity stage WHERE stage.stageId = :stageId AND stage.requestId = :requestId") +}) public class StageEntity { @Basic http://git-wip-us.apache.org/repos/asf/ambari/blob/11ab63f7/ambari-server/src/main/java/org/apache/ambari/server/orm/entities/TopologyHostRequestEntity.java ---------------------------------------------------------------------- diff --git a/ambari-server/src/main/java/org/apache/ambari/server/orm/entities/TopologyHostRequestEntity.java b/ambari-server/src/main/java/org/apache/ambari/server/orm/entities/TopologyHostRequestEntity.java index b90e192..2700f68 100644 --- a/ambari-server/src/main/java/org/apache/ambari/server/orm/entities/TopologyHostRequestEntity.java +++ b/ambari-server/src/main/java/org/apache/ambari/server/orm/entities/TopologyHostRequestEntity.java @@ -25,11 +25,16 @@ import javax.persistence.Entity; import javax.persistence.Id; import javax.persistence.JoinColumn; import javax.persistence.ManyToOne; +import javax.persistence.NamedQueries; +import javax.persistence.NamedQuery; import javax.persistence.OneToMany; import javax.persistence.Table; @Entity @Table(name = "topology_host_request") +@NamedQueries({ + @NamedQuery(name = "TopologyHostRequestEntity.removeByIds", query = "DELETE FROM TopologyHostRequestEntity topologyHostRequest WHERE topologyHostRequest.id IN :hostRequestIds") +}) public class TopologyHostRequestEntity { @Id // @GeneratedValue(strategy = GenerationType.TABLE, generator = "topology_host_request_id_generator") http://git-wip-us.apache.org/repos/asf/ambari/blob/11ab63f7/ambari-server/src/main/java/org/apache/ambari/server/orm/entities/TopologyHostTaskEntity.java ---------------------------------------------------------------------- diff --git a/ambari-server/src/main/java/org/apache/ambari/server/orm/entities/TopologyHostTaskEntity.java b/ambari-server/src/main/java/org/apache/ambari/server/orm/entities/TopologyHostTaskEntity.java index bba0e06..0bb3e19 100644 --- a/ambari-server/src/main/java/org/apache/ambari/server/orm/entities/TopologyHostTaskEntity.java +++ b/ambari-server/src/main/java/org/apache/ambari/server/orm/entities/TopologyHostTaskEntity.java @@ -40,7 +40,11 @@ import javax.persistence.TableGenerator; pkColumnValue = "topology_host_task_id_seq", initialValue = 0) @NamedQueries({ @NamedQuery(name = "TopologyHostTaskEntity.findByHostRequest", - query = "SELECT req FROM TopologyHostTaskEntity req WHERE req.topologyHostRequestEntity.id = :hostRequestId") + query = "SELECT req FROM TopologyHostTaskEntity req WHERE req.topologyHostRequestEntity.id = :hostRequestId"), + @NamedQuery(name = "TopologyLogicalTaskEntity.findHostRequestIdsByHostTaskIds", + query = "SELECT tht.hostRequestId from TopologyHostTaskEntity tht WHERE tht.id IN :hostTaskIds"), + @NamedQuery(name = "TopologyHostTaskEntity.removeByTaskIds", + query = "DELETE FROM TopologyHostTaskEntity tht WHERE tht.id IN :hostTaskIds") }) public class TopologyHostTaskEntity { @Id @@ -51,6 +55,9 @@ public class TopologyHostTaskEntity { @Column(name = "type", length = 255, nullable = false) private String type; + @Column(name = "host_request_id", nullable = false, insertable = false, updatable = false) + private Long hostRequestId; + @ManyToOne @JoinColumn(name = "host_request_id", referencedColumnName = "id", nullable = false) private TopologyHostRequestEntity topologyHostRequestEntity; @@ -67,7 +74,11 @@ public class TopologyHostTaskEntity { } public Long getHostRequestId() { - return topologyHostRequestEntity != null ? topologyHostRequestEntity.getId() : null; + return hostRequestId; + } + + public void setHostRequestId(Long hostRequestId) { + this.hostRequestId = hostRequestId; } public String getType() { http://git-wip-us.apache.org/repos/asf/ambari/blob/11ab63f7/ambari-server/src/main/java/org/apache/ambari/server/orm/entities/TopologyLogicalRequestEntity.java ---------------------------------------------------------------------- diff --git a/ambari-server/src/main/java/org/apache/ambari/server/orm/entities/TopologyLogicalRequestEntity.java b/ambari-server/src/main/java/org/apache/ambari/server/orm/entities/TopologyLogicalRequestEntity.java index 4f865f4..605a043 100644 --- a/ambari-server/src/main/java/org/apache/ambari/server/orm/entities/TopologyLogicalRequestEntity.java +++ b/ambari-server/src/main/java/org/apache/ambari/server/orm/entities/TopologyLogicalRequestEntity.java @@ -24,12 +24,17 @@ import javax.persistence.Column; import javax.persistence.Entity; import javax.persistence.Id; import javax.persistence.JoinColumn; +import javax.persistence.NamedQueries; +import javax.persistence.NamedQuery; import javax.persistence.OneToMany; import javax.persistence.OneToOne; import javax.persistence.Table; @Entity @Table(name = "topology_logical_request") +@NamedQueries({ + @NamedQuery(name = "TopologyLogicalRequestEntity.findRequestIds", query = "SELECT logicalrequest.topologyRequestId from TopologyLogicalRequestEntity logicalrequest WHERE logicalrequest.id IN :ids") +}) public class TopologyLogicalRequestEntity { @Id @Column(name = "id", nullable = false, updatable = false) http://git-wip-us.apache.org/repos/asf/ambari/blob/11ab63f7/ambari-server/src/main/java/org/apache/ambari/server/orm/entities/TopologyLogicalTaskEntity.java ---------------------------------------------------------------------- diff --git a/ambari-server/src/main/java/org/apache/ambari/server/orm/entities/TopologyLogicalTaskEntity.java b/ambari-server/src/main/java/org/apache/ambari/server/orm/entities/TopologyLogicalTaskEntity.java index c71d4e4..2954863 100644 --- a/ambari-server/src/main/java/org/apache/ambari/server/orm/entities/TopologyLogicalTaskEntity.java +++ b/ambari-server/src/main/java/org/apache/ambari/server/orm/entities/TopologyLogicalTaskEntity.java @@ -24,6 +24,8 @@ import javax.persistence.GenerationType; import javax.persistence.Id; import javax.persistence.JoinColumn; import javax.persistence.ManyToOne; +import javax.persistence.NamedQueries; +import javax.persistence.NamedQuery; import javax.persistence.OneToOne; import javax.persistence.Table; import javax.persistence.TableGenerator; @@ -33,6 +35,10 @@ import javax.persistence.TableGenerator; @TableGenerator(name = "topology_logical_task_id_generator", table = "ambari_sequences", pkColumnName = "sequence_name", valueColumnName = "sequence_value", pkColumnValue = "topology_logical_task_id_seq", initialValue = 0) +@NamedQueries({ + @NamedQuery(name = "TopologyLogicalTaskEntity.findHostTaskIdsByPhysicalTaskIds", query = "SELECT logicaltask.hostTaskId from TopologyLogicalTaskEntity logicaltask WHERE logicaltask.physicalTaskId IN :physicalTaskIds"), + @NamedQuery(name = "TopologyLogicalTaskEntity.removeByPhysicalTaskIds", query = "DELETE FROM TopologyLogicalTaskEntity logicaltask WHERE logicaltask.physicalTaskId IN :taskIds") +}) public class TopologyLogicalTaskEntity { @Id @GeneratedValue(strategy = GenerationType.TABLE, generator = "topology_logical_task_id_generator") @@ -42,12 +48,18 @@ public class TopologyLogicalTaskEntity { @Column(name = "component", length = 255) private String componentName; + @Column(name = "host_task_id", nullable = false, insertable = false, updatable = false) + private Long hostTaskId; + + @Column(name = "physical_task_id", nullable = false, insertable = false, updatable = false) + private Long physicalTaskId; + @ManyToOne @JoinColumn(name = "host_task_id", referencedColumnName = "id", nullable = false) private TopologyHostTaskEntity topologyHostTaskEntity; @OneToOne - @JoinColumn(name = "physical_task_id", referencedColumnName = "task_id") + @JoinColumn(name = "physical_task_id", referencedColumnName = "task_id", nullable = false) private HostRoleCommandEntity hostRoleCommandEntity; public Long getId() { @@ -58,14 +70,22 @@ public class TopologyLogicalTaskEntity { this.id = id; } - public Long getHostTaskId() { - return topologyHostTaskEntity != null ? topologyHostTaskEntity.getId() : null; - } - public Long getPhysicalTaskId() { return hostRoleCommandEntity != null ? hostRoleCommandEntity.getTaskId() : null; } + public void setPhysicalTaskId(Long physicalTaskId) { + this.physicalTaskId = physicalTaskId; + } + + public void setHostTaskId(Long hostTaskId) { + this.hostTaskId = hostTaskId; + } + + public Long getHostTaskId() { + return hostTaskId; + } + public String getComponentName() { return componentName; } http://git-wip-us.apache.org/repos/asf/ambari/blob/11ab63f7/ambari-server/src/main/java/org/apache/ambari/server/orm/entities/UpgradeEntity.java ---------------------------------------------------------------------- diff --git a/ambari-server/src/main/java/org/apache/ambari/server/orm/entities/UpgradeEntity.java b/ambari-server/src/main/java/org/apache/ambari/server/orm/entities/UpgradeEntity.java index 89574bc..bea1d19 100644 --- a/ambari-server/src/main/java/org/apache/ambari/server/orm/entities/UpgradeEntity.java +++ b/ambari-server/src/main/java/org/apache/ambari/server/orm/entities/UpgradeEntity.java @@ -63,6 +63,8 @@ import org.apache.ambari.server.state.stack.upgrade.UpgradeType; query = "SELECT u FROM UpgradeEntity u JOIN RequestEntity r ON u.requestId = r.requestId WHERE u.clusterId = :clusterId AND u.direction = :direction ORDER BY r.startTime DESC, u.upgradeId DESC"), @NamedQuery(name = "UpgradeEntity.findLatestForCluster", query = "SELECT u FROM UpgradeEntity u JOIN RequestEntity r ON u.requestId = r.requestId WHERE u.clusterId = :clusterId ORDER BY r.startTime DESC"), + @NamedQuery(name = "UpgradeEntity.findAllRequestIds", + query = "SELECT upgrade.requestId FROM UpgradeEntity upgrade") }) public class UpgradeEntity { http://git-wip-us.apache.org/repos/asf/ambari/blob/11ab63f7/ambari-server/src/main/java/org/apache/ambari/server/orm/entities/UpgradeItemEntity.java ---------------------------------------------------------------------- diff --git a/ambari-server/src/main/java/org/apache/ambari/server/orm/entities/UpgradeItemEntity.java b/ambari-server/src/main/java/org/apache/ambari/server/orm/entities/UpgradeItemEntity.java index 560970a..35ea769 100644 --- a/ambari-server/src/main/java/org/apache/ambari/server/orm/entities/UpgradeItemEntity.java +++ b/ambari-server/src/main/java/org/apache/ambari/server/orm/entities/UpgradeItemEntity.java @@ -27,6 +27,8 @@ import javax.persistence.GenerationType; import javax.persistence.Id; import javax.persistence.JoinColumn; import javax.persistence.ManyToOne; +import javax.persistence.NamedQueries; +import javax.persistence.NamedQuery; import javax.persistence.Table; import javax.persistence.TableGenerator; @@ -48,6 +50,9 @@ import org.apache.ambari.server.state.UpgradeState; pkColumnValue = "upgrade_item_id_seq", initialValue = 0, allocationSize = 1000) +@NamedQueries({ + @NamedQuery(name = "UpgradeItemEntity.findAllStageIds", query = "SELECT upgradeItem.stageId FROM UpgradeItemEntity upgradeItem") +}) public class UpgradeItemEntity { @Id http://git-wip-us.apache.org/repos/asf/ambari/blob/11ab63f7/ambari-server/src/main/python/ambari-server.py ---------------------------------------------------------------------- diff --git a/ambari-server/src/main/python/ambari-server.py b/ambari-server/src/main/python/ambari-server.py index 87cc6c2..737be6a 100755 --- a/ambari-server/src/main/python/ambari-server.py +++ b/ambari-server/src/main/python/ambari-server.py @@ -199,6 +199,12 @@ def restart(args): start(args) +@OsFamilyFuncImpl(OsFamilyImpl.DEFAULT) +def database_cleanup(args): + logger.info("Database cleanup.") + if args.silent: + stop(args) + db_cleanup(args) # # The Ambari Server status. @@ -469,7 +475,7 @@ def init_parser_options(parser): help="Print verbose status messages") parser.add_option("-s", "--silent", action="store_true", dest="silent", default=False, - help="Silently accepts default prompt values") + help="Silently accepts default prompt values. For db-cleanup command, silent mode will stop ambari server.") parser.add_option('-g', '--debug', action="store_true", dest='debug', default=False, help="Start ambari-server in debug mode") parser.add_option('-y', '--suspend-start', action="store_true", dest='suspend_start', default=False, @@ -759,7 +765,7 @@ def create_user_action_map(args, options): CHECK_DATABASE_ACTION: UserAction(check_database, options), ENABLE_STACK_ACTION: UserAction(enable_stack, options, args), SETUP_SSO_ACTION: UserActionRestart(setup_sso, options), - DB_CLEANUP_ACTION: UserAction(db_cleanup, options), + DB_CLEANUP_ACTION: UserAction(database_cleanup, options), INSTALL_MPACK_ACTION: UserAction(install_mpack, options), UNINSTALL_MPACK_ACTION: UserAction(uninstall_mpack, options), UPGRADE_MPACK_ACTION: UserAction(upgrade_mpack, options), http://git-wip-us.apache.org/repos/asf/ambari/blob/11ab63f7/ambari-server/src/main/python/ambari_server/dbCleanup.py ---------------------------------------------------------------------- diff --git a/ambari-server/src/main/python/ambari_server/dbCleanup.py b/ambari-server/src/main/python/ambari_server/dbCleanup.py index abc8267..6e16bc5 100644 --- a/ambari-server/src/main/python/ambari_server/dbCleanup.py +++ b/ambari-server/src/main/python/ambari_server/dbCleanup.py @@ -42,25 +42,29 @@ def run_db_cleanup(options): if validate_args(options): return 1 - db_title = get_db_type(get_ambari_properties()).title + status, stateDesc = is_server_runing() - confirmBackup = get_YN_input("Ambari Server configured for {0}. Confirm you have made a backup of the Ambari Server database [y/n]".format( - db_title), True) - if not confirmBackup: - print_info_msg("Ambari Server Database cleanup aborted") - return 0 + if not options.silent: + db_title = get_db_type(get_ambari_properties()).title + + confirmBackup = get_YN_input("Ambari Server configured for {0}. Confirm you have made a backup of the Ambari Server database [y/n]".format( + db_title), True) + if not confirmBackup: + print_info_msg("Ambari Server Database cleanup aborted") + return 0 + + if status: + print_error_msg("The database cleanup cannot proceed while Ambari Server is running. Please shut down Ambari first.") + return 1 + + confirm = get_YN_input( + "Ambari server is using db type {0}. Cleanable database entries older than {1} will be cleaned up. Proceed [y/n]".format( + db_title, options.cleanup_from_date), True) + if not confirm: + print_info_msg("Ambari Server Database cleanup aborted") + return 0 - status, stateDesc = is_server_runing() - if status: - print_error_msg("The database cleanup cannot proceed while Ambari Server is running. Please shut down Ambari first.") - return 1 - confirm = get_YN_input( - "Ambari server is using db type {0}. Cleanable database entries older than {1} will be cleaned up. Proceed [y/n]".format( - db_title, options.cleanup_from_date), True) - if not confirm: - print_info_msg("Ambari Server Database cleanup aborted") - return 0 jdk_path = get_java_exe_path() if jdk_path is None: @@ -101,7 +105,6 @@ def run_db_cleanup(options): # Database cleanup # def db_cleanup(options): - logger.info("Database cleanup.") return run_db_cleanup(options) http://git-wip-us.apache.org/repos/asf/ambari/blob/11ab63f7/ambari-server/src/test/java/org/apache/ambari/server/checks/DatabaseConsistencyCheckHelperTest.java ---------------------------------------------------------------------- diff --git a/ambari-server/src/test/java/org/apache/ambari/server/checks/DatabaseConsistencyCheckHelperTest.java b/ambari-server/src/test/java/org/apache/ambari/server/checks/DatabaseConsistencyCheckHelperTest.java index 7d8ba50..d6e12dc 100644 --- a/ambari-server/src/test/java/org/apache/ambari/server/checks/DatabaseConsistencyCheckHelperTest.java +++ b/ambari-server/src/test/java/org/apache/ambari/server/checks/DatabaseConsistencyCheckHelperTest.java @@ -533,4 +533,70 @@ public class DatabaseConsistencyCheckHelperTest { } + @Test + public void testCheckForLargeTables() throws Exception { + EasyMockSupport easyMockSupport = new EasyMockSupport(); + final AmbariMetaInfo mockAmbariMetainfo = easyMockSupport.createNiceMock(AmbariMetaInfo.class); + final DBAccessor mockDBDbAccessor = easyMockSupport.createNiceMock(DBAccessor.class); + final Connection mockConnection = easyMockSupport.createNiceMock(Connection.class); + final Statement mockStatement = easyMockSupport.createNiceMock(Statement.class); + final EntityManager mockEntityManager = easyMockSupport.createNiceMock(EntityManager.class); + final Clusters mockClusters = easyMockSupport.createNiceMock(Clusters.class); + final OsFamily mockOSFamily = easyMockSupport.createNiceMock(OsFamily.class); + final StackManagerFactory mockStackManagerFactory = easyMockSupport.createNiceMock(StackManagerFactory.class); + + final ResultSet hostRoleCommandResultSet = easyMockSupport.createNiceMock(ResultSet.class); + final ResultSet executionCommandResultSet = easyMockSupport.createNiceMock(ResultSet.class); + final ResultSet stageResultSet = easyMockSupport.createNiceMock(ResultSet.class); + final ResultSet requestResultSet = easyMockSupport.createNiceMock(ResultSet.class); + final ResultSet alertHistoryResultSet = easyMockSupport.createNiceMock(ResultSet.class); + + final Injector mockInjector = Guice.createInjector(new AbstractModule() { + @Override + protected void configure() { + bind(AmbariMetaInfo.class).toInstance(mockAmbariMetainfo); + bind(StackManagerFactory.class).toInstance(mockStackManagerFactory); + bind(EntityManager.class).toInstance(mockEntityManager); + bind(DBAccessor.class).toInstance(mockDBDbAccessor); + bind(Clusters.class).toInstance(mockClusters); + bind(OsFamily.class).toInstance(mockOSFamily); + } + }); + + expect(hostRoleCommandResultSet.next()).andReturn(true).once(); + expect(executionCommandResultSet.next()).andReturn(true).once(); + expect(stageResultSet.next()).andReturn(true).once(); + expect(requestResultSet.next()).andReturn(true).once(); + expect(alertHistoryResultSet.next()).andReturn(true).once(); + expect(hostRoleCommandResultSet.getLong(1)).andReturn(2345L).atLeastOnce(); + expect(executionCommandResultSet.getLong(1)).andReturn(12345L).atLeastOnce(); + expect(stageResultSet.getLong(1)).andReturn(2321L).atLeastOnce(); + expect(requestResultSet.getLong(1)).andReturn(1111L).atLeastOnce(); + expect(alertHistoryResultSet.getLong(1)).andReturn(2223L).atLeastOnce(); + expect(mockDBDbAccessor.getConnection()).andReturn(mockConnection); + expect(mockDBDbAccessor.getDbType()).andReturn(DBAccessor.DbType.MYSQL); + expect(mockDBDbAccessor.getDbSchema()).andReturn("test_schema"); + expect(mockConnection.createStatement(ResultSet.TYPE_SCROLL_SENSITIVE, ResultSet.CONCUR_UPDATABLE)).andReturn(mockStatement).anyTimes(); + expect(mockStatement.executeQuery("SELECT (data_length + index_length) \"Table Size\" " + + "FROM information_schema.TABLES WHERE table_schema = \"test_schema\" AND table_name =\"host_role_command\"")).andReturn(hostRoleCommandResultSet); + expect(mockStatement.executeQuery("SELECT (data_length + index_length) \"Table Size\" " + + "FROM information_schema.TABLES WHERE table_schema = \"test_schema\" AND table_name =\"execution_command\"")).andReturn(executionCommandResultSet); + expect(mockStatement.executeQuery("SELECT (data_length + index_length) \"Table Size\" " + + "FROM information_schema.TABLES WHERE table_schema = \"test_schema\" AND table_name =\"stage\"")).andReturn(stageResultSet); + expect(mockStatement.executeQuery("SELECT (data_length + index_length) \"Table Size\" " + + "FROM information_schema.TABLES WHERE table_schema = \"test_schema\" AND table_name =\"request\"")).andReturn(requestResultSet); + expect(mockStatement.executeQuery("SELECT (data_length + index_length) \"Table Size\" " + + "FROM information_schema.TABLES WHERE table_schema = \"test_schema\" AND table_name =\"alert_history\"")).andReturn(alertHistoryResultSet); + + DatabaseConsistencyCheckHelper.setInjector(mockInjector); + + easyMockSupport.replayAll(); + + mockAmbariMetainfo.init(); + + DatabaseConsistencyCheckHelper.resetCheckResult(); + DatabaseConsistencyCheckHelper.checkForLargeTables(); + + easyMockSupport.verifyAll(); + } }