Repository: ambari Updated Branches: refs/heads/branch-2.5 80d804849 -> 32ee5995b
AMBARI-20749. Ambari data purging. (stoader) Project: http://git-wip-us.apache.org/repos/asf/ambari/repo Commit: http://git-wip-us.apache.org/repos/asf/ambari/commit/32ee5995 Tree: http://git-wip-us.apache.org/repos/asf/ambari/tree/32ee5995 Diff: http://git-wip-us.apache.org/repos/asf/ambari/diff/32ee5995 Branch: refs/heads/branch-2.5 Commit: 32ee5995bdaaabeb52e5aa6ef4533eb70d9bf809 Parents: 80d8048 Author: Toader, Sebastian <stoa...@hortonworks.com> Authored: Wed Jun 14 16:39:07 2017 +0200 Committer: Toader, Sebastian <stoa...@hortonworks.com> Committed: Wed Jun 14 16:39:50 2017 +0200 ---------------------------------------------------------------------- ambari-server/sbin/ambari-server | 6 +-- .../checks/DatabaseConsistencyCheckHelper.java | 4 +- .../ambari/server/cleanup/CleanupDriver.java | 15 +++++--- .../ambari/server/cleanup/CleanupService.java | 19 +++++++++- .../server/cleanup/CleanupServiceImpl.java | 33 ++++++++++++++-- .../server/orm/dao/HostRoleCommandDAO.java | 7 ++-- .../ambari/server/orm/dao/RequestDAO.java | 24 ++++++------ .../server/orm/dao/TopologyHostTaskDAO.java | 7 +++- .../orm/dao/TopologyLogicalRequestDAO.java | 6 ++- .../server/orm/dao/TopologyLogicalTaskDAO.java | 8 +++- .../orm/entities/TopologyHostTaskEntity.java | 2 +- .../entities/TopologyLogicalRequestEntity.java | 2 +- .../orm/entities/TopologyLogicalTaskEntity.java | 2 +- ambari-server/src/main/python/ambari-server.py | 16 ++++---- .../src/main/python/ambari_server/dbCleanup.py | 34 ++++++++--------- .../main/python/ambari_server/setupActions.py | 2 +- .../server/cleanup/CleanupServiceImplTest.java | 40 +++++++++++++++++++- 17 files changed, 162 insertions(+), 65 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ambari/blob/32ee5995/ambari-server/sbin/ambari-server ---------------------------------------------------------------------- diff --git a/ambari-server/sbin/ambari-server b/ambari-server/sbin/ambari-server index 7044096..cc1c923 100755 --- a/ambari-server/sbin/ambari-server +++ b/ambari-server/sbin/ambari-server @@ -174,8 +174,8 @@ case "${1:-}" in echo -e "Setting up SSO authentication properties..." $PYTHON "$AMBARI_PYTHON_EXECUTABLE" $@ ;; - db-cleanup) - echo -e "Cleanup database..." + db-purge-history) + echo -e "Purge database history..." $PYTHON "$AMBARI_PYTHON_EXECUTABLE" $@ ;; install-mpack) @@ -196,7 +196,7 @@ case "${1:-}" in ;; *) echo "Usage: $AMBARI_EXECUTABLE - {start|stop|reset|restart|upgrade|status|upgradestack|setup|setup-jce|setup-ldap|sync-ldap|set-current|setup-security|refresh-stack-hash|backup|restore|update-host-names|check-database|enable-stack|setup-sso|db-cleanup|install-mpack|uninstall-mpack|upgrade-mpack|setup-kerberos} [options] + {start|stop|reset|restart|upgrade|status|upgradestack|setup|setup-jce|setup-ldap|sync-ldap|set-current|setup-security|refresh-stack-hash|backup|restore|update-host-names|check-database|enable-stack|setup-sso|db-purge-history|install-mpack|uninstall-mpack|upgrade-mpack|setup-kerberos} [options] Use $AMBARI_PYTHON_EXECUTABLE <action> --help to get details on options available. Or, simply invoke ambari-server.py --help to print the options." exit 1 http://git-wip-us.apache.org/repos/asf/ambari/blob/32ee5995/ambari-server/src/main/java/org/apache/ambari/server/checks/DatabaseConsistencyCheckHelper.java ---------------------------------------------------------------------- diff --git a/ambari-server/src/main/java/org/apache/ambari/server/checks/DatabaseConsistencyCheckHelper.java b/ambari-server/src/main/java/org/apache/ambari/server/checks/DatabaseConsistencyCheckHelper.java index 70c3661..29c1222 100644 --- a/ambari-server/src/main/java/org/apache/ambari/server/checks/DatabaseConsistencyCheckHelper.java +++ b/ambari-server/src/main/java/org/apache/ambari/server/checks/DatabaseConsistencyCheckHelper.java @@ -328,7 +328,7 @@ public class DatabaseConsistencyCheckHelper { if (tableSizeInMB != null && tableSizeInMB > TABLE_SIZE_LIMIT_MB) { warning("The database table {} is currently {} MB (limit is {}) and may impact performance. It is recommended " + - "that you reduce its size by executing \"ambari-server db-cleanup\".", + "that you reduce its size by executing \"ambari-server db-purge-history\".", tableName, tableSizeInMB, TABLE_SIZE_LIMIT_MB); } else if (tableSizeInMB != null && tableSizeInMB < TABLE_SIZE_LIMIT_MB) { LOG.info(String.format("The database table %s is currently %.3f MB and is within normal limits (%.3f)", @@ -348,7 +348,7 @@ public class DatabaseConsistencyCheckHelper { if (tableRowCount > TABLE_ROW_COUNT_LIMIT) { warning("The database table {} currently has {} rows (limit is {}) and may impact performance. It is " + - "recommended that you reduce its size by executing \"ambari-server db-cleanup\".", + "recommended that you reduce its size by executing \"ambari-server db-purge-history\".", tableName, tableRowCount, TABLE_ROW_COUNT_LIMIT); } else if (tableRowCount != -1 && tableRowCount < TABLE_ROW_COUNT_LIMIT) { LOG.info(String.format("The database table %s currently has %d rows and is within normal limits (%d)", tableName, tableRowCount, TABLE_ROW_COUNT_LIMIT)); http://git-wip-us.apache.org/repos/asf/ambari/blob/32ee5995/ambari-server/src/main/java/org/apache/ambari/server/cleanup/CleanupDriver.java ---------------------------------------------------------------------- diff --git a/ambari-server/src/main/java/org/apache/ambari/server/cleanup/CleanupDriver.java b/ambari-server/src/main/java/org/apache/ambari/server/cleanup/CleanupDriver.java index 788290b..b4c8369 100644 --- a/ambari-server/src/main/java/org/apache/ambari/server/cleanup/CleanupDriver.java +++ b/ambari-server/src/main/java/org/apache/ambari/server/cleanup/CleanupDriver.java @@ -49,7 +49,7 @@ public class CleanupDriver { private static Options getOptions() { Options options = new Options(); options.addOption(Option.builder().longOpt(CLUSTER_NAME_ARG).desc("The cluster name").required().type(String.class).hasArg().valueSeparator(' ').build()); - options.addOption(Option.builder().longOpt(FROM_DATE_ARG).desc("The day from which the cleanup runs").required().type(String.class).hasArg().valueSeparator(' ').build()); + options.addOption(Option.builder().longOpt(FROM_DATE_ARG).desc("Date up until data will be purged.").required().type(String.class).hasArg().valueSeparator(' ').build()); return options; } @@ -67,7 +67,7 @@ public class CleanupDriver { } catch (Exception exp) { System.err.println("Parsing failed. Reason: " + exp.getMessage()); LOGGER.error("Parsing failed. Reason: ", exp); - formatter.printHelp("cleanup", getOptions()); + formatter.printHelp("db-purge-history", getOptions()); System.exit(1); } return ctx; @@ -75,7 +75,7 @@ public class CleanupDriver { public static void main(String... args) throws Exception { - LOGGER.info("DB-CLEANUP - Starting the cleanup process ..."); + LOGGER.info("DB-PURGE - Starting the database purge process ..."); CleanupContext cleanupContext = processArguments(args); @@ -86,12 +86,17 @@ public class CleanupDriver { injector.getInstance(AmbariJpaPersistService.class).start(); CleanupServiceImpl cleanupService = injector.getInstance(CleanupServiceImpl.class); - long affected = cleanupService.cleanup(new TimeBasedCleanupPolicy(cleanupContext.getClusterName(), cleanupContext.getFromDayTimestamp())); + CleanupService.CleanupResult result = cleanupService.cleanup(new TimeBasedCleanupPolicy(cleanupContext.getClusterName(), cleanupContext.getFromDayTimestamp())); // explicitly stopping the persist service injector.getInstance(AmbariJpaPersistService.class).stop(); - LOGGER.info("DB-CLEANUP - completed. Number of affected records [{}]", affected); + if (result.getErrorCount() > 0) { + LOGGER.warn("DB-PURGE - completed with error, check Ambari Server log for details ! Number of affected records [{}]", result.getAffectedRows()); + System.exit(2); + } + + LOGGER.info("DB-PURGE - completed. Number of affected records [{}]", result.getAffectedRows()); } /** http://git-wip-us.apache.org/repos/asf/ambari/blob/32ee5995/ambari-server/src/main/java/org/apache/ambari/server/cleanup/CleanupService.java ---------------------------------------------------------------------- diff --git a/ambari-server/src/main/java/org/apache/ambari/server/cleanup/CleanupService.java b/ambari-server/src/main/java/org/apache/ambari/server/cleanup/CleanupService.java index 880207c..8ac0b92 100644 --- a/ambari-server/src/main/java/org/apache/ambari/server/cleanup/CleanupService.java +++ b/ambari-server/src/main/java/org/apache/ambari/server/cleanup/CleanupService.java @@ -23,11 +23,28 @@ package org.apache.ambari.server.cleanup; */ public interface CleanupService<T> { + interface CleanupResult { + /** + * Returns the number of rows deleted by the cleanup + * @return The total number of rows deleted by the cleanup + */ + long getAffectedRows(); + + /** + * The cleanup process executes the specific cleanup operations via + * {@link org.apache.ambari.server.orm.dao.Cleanable} implementations. + * Some of these may fail during the cleanup process. This method returns + * the number of failed clean ups. + * @return The number of failed cleanups. + */ + int getErrorCount(); + } + /** * Triggers the cleanup for the given cleanup policy. * * @param cleanupPolicy the cleanup policy based on which the cleanup is executed. * @return the affected "rows" */ - long cleanup(T cleanupPolicy); + CleanupResult cleanup(T cleanupPolicy); } http://git-wip-us.apache.org/repos/asf/ambari/blob/32ee5995/ambari-server/src/main/java/org/apache/ambari/server/cleanup/CleanupServiceImpl.java ---------------------------------------------------------------------- diff --git a/ambari-server/src/main/java/org/apache/ambari/server/cleanup/CleanupServiceImpl.java b/ambari-server/src/main/java/org/apache/ambari/server/cleanup/CleanupServiceImpl.java index 29a9041..0436c92 100644 --- a/ambari-server/src/main/java/org/apache/ambari/server/cleanup/CleanupServiceImpl.java +++ b/ambari-server/src/main/java/org/apache/ambari/server/cleanup/CleanupServiceImpl.java @@ -34,6 +34,25 @@ import com.google.inject.Singleton; public class CleanupServiceImpl implements CleanupService<TimeBasedCleanupPolicy> { private static final Logger LOGGER = LoggerFactory.getLogger(CleanupServiceImpl.class); + class Result implements CleanupResult { + private final long affectedRows; + private final int errorCount; + + public Result(long affectedRows, int errorCount) { + this.affectedRows = affectedRows; + this.errorCount = errorCount; + } + + @Override + public long getAffectedRows() { + return affectedRows; + } + + @Override + public int getErrorCount() { + return errorCount; + } + } // this Set is automatically populated by the guice framework (based on the cleanup interface) private Set<Cleanable> cleanables; @@ -54,13 +73,21 @@ public class CleanupServiceImpl implements CleanupService<TimeBasedCleanupPolicy * @param cleanupPolicy the policy based on which the cleanup is done * @return the number of affected rows */ - public long cleanup(TimeBasedCleanupPolicy cleanupPolicy) { + public CleanupResult cleanup(TimeBasedCleanupPolicy cleanupPolicy) { long affectedRows = 0; + int errorCount = 0; for (Cleanable cleanable : cleanables) { LOGGER.info("Running the purge process for DAO: [{}] with cleanup policy: [{}]", cleanable, cleanupPolicy); - affectedRows += cleanable.cleanup(cleanupPolicy); + try { + affectedRows += cleanable.cleanup(cleanupPolicy); + } + catch (Exception ex) { + LOGGER.error("Running the purge process for DAO: [{}] failed with: {}", cleanable, ex); + errorCount++; + } } - return affectedRows; + + return new Result(affectedRows, errorCount); } } http://git-wip-us.apache.org/repos/asf/ambari/blob/32ee5995/ambari-server/src/main/java/org/apache/ambari/server/orm/dao/HostRoleCommandDAO.java ---------------------------------------------------------------------- diff --git a/ambari-server/src/main/java/org/apache/ambari/server/orm/dao/HostRoleCommandDAO.java b/ambari-server/src/main/java/org/apache/ambari/server/orm/dao/HostRoleCommandDAO.java index b9e1fab..14c8443 100644 --- a/ambari-server/src/main/java/org/apache/ambari/server/orm/dao/HostRoleCommandDAO.java +++ b/ambari-server/src/main/java/org/apache/ambari/server/orm/dao/HostRoleCommandDAO.java @@ -63,6 +63,7 @@ import com.google.common.cache.Cache; import com.google.common.cache.CacheBuilder; import com.google.common.cache.LoadingCache; import com.google.common.collect.Lists; +import com.google.common.collect.Sets; import com.google.inject.Inject; import com.google.inject.Provider; import com.google.inject.Singleton; @@ -933,9 +934,9 @@ public class HostRoleCommandDAO { } } - public List<Long> findTaskIdsByRequestStageIds(List<RequestDAO.StageEntityPK> requestStageIds) { + public Set<Long> findTaskIdsByRequestStageIds(List<RequestDAO.StageEntityPK> requestStageIds) { EntityManager entityManager = entityManagerProvider.get(); - List<Long> taskIds = new ArrayList<Long>(); + List<Long> taskIds = new ArrayList<>(); for (RequestDAO.StageEntityPK requestIds : requestStageIds) { TypedQuery<Long> hostRoleCommandQuery = entityManager.createNamedQuery("HostRoleCommandEntity.findTaskIdsByRequestStageIds", Long.class); @@ -946,6 +947,6 @@ public class HostRoleCommandDAO { taskIds.addAll(daoUtils.selectList(hostRoleCommandQuery)); } - return taskIds; + return Sets.newHashSet(taskIds); } } http://git-wip-us.apache.org/repos/asf/ambari/blob/32ee5995/ambari-server/src/main/java/org/apache/ambari/server/orm/dao/RequestDAO.java ---------------------------------------------------------------------- diff --git a/ambari-server/src/main/java/org/apache/ambari/server/orm/dao/RequestDAO.java b/ambari-server/src/main/java/org/apache/ambari/server/orm/dao/RequestDAO.java index 38c0977..4007237 100644 --- a/ambari-server/src/main/java/org/apache/ambari/server/orm/dao/RequestDAO.java +++ b/ambari-server/src/main/java/org/apache/ambari/server/orm/dao/RequestDAO.java @@ -215,8 +215,8 @@ public class RequestDAO implements Cleanable { * Retrieves from the database for a cluster, or specifically for non-cluster requests. * This method should be considered temporary until Request/Stage/Task cleanup is achieved. * - * @param maxResults the max number to return - * @param ascOrder {@code true} to sort by requestId ascending, {@code false} for descending + * @param limit the max number to return + * @param sortAscending {@code true} to sort by requestId ascending, {@code false} for descending * @param clusterId the cluster to find, or {@code null} to search for requests without cluster */ @RequiresSession @@ -268,12 +268,12 @@ public class RequestDAO implements Cleanable { * Search for all request ids in Upgrade table * @return the list of request ids */ - private List<Long> findAllRequestIdsFromUpgrade() { + private Set<Long> findAllRequestIdsFromUpgrade() { EntityManager entityManager = entityManagerProvider.get(); TypedQuery<Long> upgradeQuery = entityManager.createNamedQuery("UpgradeEntity.findAllRequestIds", Long.class); - return daoUtils.selectList(upgradeQuery); + return Sets.newHashSet(daoUtils.selectList(upgradeQuery)); } /** @@ -376,7 +376,7 @@ public class RequestDAO implements Cleanable { // find request ids from Upgrade table and exclude these ids from // request ids set that we already have. We don't want to make any changes for upgrade - Set<Long> requestIdsFromUpgrade = Sets.newHashSet(findAllRequestIdsFromUpgrade()); + Set<Long> requestIdsFromUpgrade = findAllRequestIdsFromUpgrade(); Iterator<StageEntityPK> requestStageIdsIterator = requestStageIds.iterator(); while (requestStageIdsIterator.hasNext()) { StageEntityPK nextRequestStageIds = requestStageIdsIterator.next(); @@ -392,24 +392,24 @@ public class RequestDAO implements Cleanable { } // find task ids using request stage ids - Set<Long> taskIds = Sets.newHashSet(hostRoleCommandDAO.findTaskIdsByRequestStageIds(requestStageIds)); + Set<Long> taskIds = hostRoleCommandDAO.findTaskIdsByRequestStageIds(requestStageIds); LinkedList<String> params = new LinkedList<>(); params.add("stageId"); params.add("requestId"); // find host task ids, to find related host requests and also to remove needed host tasks - List<Long> hostTaskIds = new ArrayList<>(); + Set<Long> hostTaskIds = new HashSet<>(); if (taskIds != null && !taskIds.isEmpty()) { - hostTaskIds = topologyLogicalTaskDAO.findHostTaskIdsByPhysicalTaskIds(Lists.newArrayList(taskIds)); + hostTaskIds = topologyLogicalTaskDAO.findHostTaskIdsByPhysicalTaskIds(taskIds); } // find host request ids by host task ids to remove later needed host requests - List<Long> hostRequestIds = new ArrayList<>(); + Set<Long> hostRequestIds = new HashSet<>(); if (!hostTaskIds.isEmpty()) { hostRequestIds = topologyHostTaskDAO.findHostRequestIdsByHostTaskIds(hostTaskIds); } - List<Long> topologyRequestIds = new ArrayList<>(); + Set<Long> topologyRequestIds = new HashSet<>(); if (!hostRequestIds.isEmpty()) { topologyRequestIds = topologyLogicalRequestDAO.findRequestIdsByIds(hostRequestIds); } @@ -420,9 +420,9 @@ public class RequestDAO implements Cleanable { "ExecutionCommandEntity.removeByTaskIds", ExecutionCommandEntity.class); affectedRows += cleanTableByIds(taskIds, "taskIds", "TopologyLogicalTask", policy.getToDateInMillis(), "TopologyLogicalTaskEntity.removeByPhysicalTaskIds", TopologyLogicalTaskEntity.class); - affectedRows += cleanTableByIds(Sets.newHashSet(hostTaskIds), "hostTaskIds", "TopologyHostTask", policy.getToDateInMillis(), + affectedRows += cleanTableByIds(hostTaskIds, "hostTaskIds", "TopologyHostTask", policy.getToDateInMillis(), "TopologyHostTaskEntity.removeByTaskIds", TopologyHostTaskEntity.class); - affectedRows += cleanTableByIds(Sets.newHashSet(hostRequestIds), "hostRequestIds", "TopologyHostRequest", policy.getToDateInMillis(), + affectedRows += cleanTableByIds(hostRequestIds, "hostRequestIds", "TopologyHostRequest", policy.getToDateInMillis(), "TopologyHostRequestEntity.removeByIds", TopologyHostRequestEntity.class); for (Long topologyRequestId : topologyRequestIds) { topologyRequestDAO.removeByPK(topologyRequestId); http://git-wip-us.apache.org/repos/asf/ambari/blob/32ee5995/ambari-server/src/main/java/org/apache/ambari/server/orm/dao/TopologyHostTaskDAO.java ---------------------------------------------------------------------- diff --git a/ambari-server/src/main/java/org/apache/ambari/server/orm/dao/TopologyHostTaskDAO.java b/ambari-server/src/main/java/org/apache/ambari/server/orm/dao/TopologyHostTaskDAO.java index eea8032..d7fc810 100644 --- a/ambari-server/src/main/java/org/apache/ambari/server/orm/dao/TopologyHostTaskDAO.java +++ b/ambari-server/src/main/java/org/apache/ambari/server/orm/dao/TopologyHostTaskDAO.java @@ -17,6 +17,7 @@ */ package org.apache.ambari.server.orm.dao; +import com.google.common.collect.Sets; import com.google.inject.Inject; import com.google.inject.Provider; import com.google.inject.Singleton; @@ -27,7 +28,9 @@ import org.apache.ambari.server.orm.entities.TopologyHostTaskEntity; import javax.persistence.EntityManager; import javax.persistence.TypedQuery; import java.util.Collection; +import java.util.HashSet; import java.util.List; +import java.util.Set; @Singleton public class TopologyHostTaskDAO { @@ -52,14 +55,14 @@ public class TopologyHostTaskDAO { } @RequiresSession - public List<Long> findHostRequestIdsByHostTaskIds(List<Long> hostTaskIds) { + public Set<Long> findHostRequestIdsByHostTaskIds(Set<Long> hostTaskIds) { EntityManager entityManager = entityManagerProvider.get(); TypedQuery<Long> topologyHostTaskQuery = entityManager.createNamedQuery("TopologyLogicalTaskEntity.findHostRequestIdsByHostTaskIds", Long.class); topologyHostTaskQuery.setParameter("hostTaskIds", hostTaskIds); - return daoUtils.selectList(topologyHostTaskQuery); + return Sets.newHashSet(daoUtils.selectList(topologyHostTaskQuery)); } @RequiresSession http://git-wip-us.apache.org/repos/asf/ambari/blob/32ee5995/ambari-server/src/main/java/org/apache/ambari/server/orm/dao/TopologyLogicalRequestDAO.java ---------------------------------------------------------------------- diff --git a/ambari-server/src/main/java/org/apache/ambari/server/orm/dao/TopologyLogicalRequestDAO.java b/ambari-server/src/main/java/org/apache/ambari/server/orm/dao/TopologyLogicalRequestDAO.java index 32a38da..bc61e0b 100644 --- a/ambari-server/src/main/java/org/apache/ambari/server/orm/dao/TopologyLogicalRequestDAO.java +++ b/ambari-server/src/main/java/org/apache/ambari/server/orm/dao/TopologyLogicalRequestDAO.java @@ -17,6 +17,7 @@ */ package org.apache.ambari.server.orm.dao; +import com.google.common.collect.Sets; import com.google.inject.Inject; import com.google.inject.Provider; import com.google.inject.Singleton; @@ -26,6 +27,7 @@ import org.apache.ambari.server.orm.entities.TopologyLogicalRequestEntity; import javax.persistence.EntityManager; import javax.persistence.TypedQuery; import java.util.List; +import java.util.Set; @Singleton public class TopologyLogicalRequestDAO { @@ -61,13 +63,13 @@ public class TopologyLogicalRequestDAO { } @RequiresSession - public List<Long> findRequestIdsByIds(List<Long> ids) { + public Set<Long> findRequestIdsByIds(Set<Long> ids) { EntityManager entityManager = entityManagerProvider.get(); TypedQuery<Long> topologyLogicalRequestQuery = entityManager.createNamedQuery("TopologyLogicalRequestEntity.findRequestIds", Long.class); topologyLogicalRequestQuery.setParameter("ids", ids); - return daoUtils.selectList(topologyLogicalRequestQuery); + return Sets.newHashSet(daoUtils.selectList(topologyLogicalRequestQuery)); } } http://git-wip-us.apache.org/repos/asf/ambari/blob/32ee5995/ambari-server/src/main/java/org/apache/ambari/server/orm/dao/TopologyLogicalTaskDAO.java ---------------------------------------------------------------------- diff --git a/ambari-server/src/main/java/org/apache/ambari/server/orm/dao/TopologyLogicalTaskDAO.java b/ambari-server/src/main/java/org/apache/ambari/server/orm/dao/TopologyLogicalTaskDAO.java index 3a72aed..4660766 100644 --- a/ambari-server/src/main/java/org/apache/ambari/server/orm/dao/TopologyLogicalTaskDAO.java +++ b/ambari-server/src/main/java/org/apache/ambari/server/orm/dao/TopologyLogicalTaskDAO.java @@ -17,6 +17,7 @@ */ package org.apache.ambari.server.orm.dao; +import com.google.common.collect.Sets; import com.google.inject.Inject; import com.google.inject.Provider; import com.google.inject.Singleton; @@ -26,7 +27,10 @@ import org.apache.ambari.server.orm.entities.TopologyLogicalTaskEntity; import javax.persistence.EntityManager; import javax.persistence.TypedQuery; + +import java.util.HashSet; import java.util.List; +import java.util.Set; @Singleton public class TopologyLogicalTaskDAO { @@ -42,14 +46,14 @@ public class TopologyLogicalTaskDAO { } @RequiresSession - public List<Long> findHostTaskIdsByPhysicalTaskIds(List<Long> physicalTaskIds) { + public Set<Long> findHostTaskIdsByPhysicalTaskIds(Set<Long> physicalTaskIds) { EntityManager entityManager = entityManagerProvider.get(); TypedQuery<Long> topologyHostTaskQuery = entityManager.createNamedQuery("TopologyLogicalTaskEntity.findHostTaskIdsByPhysicalTaskIds", Long.class); topologyHostTaskQuery.setParameter("physicalTaskIds", physicalTaskIds); - return daoUtils.selectList(topologyHostTaskQuery); + return Sets.newHashSet(daoUtils.selectList(topologyHostTaskQuery)); } @RequiresSession http://git-wip-us.apache.org/repos/asf/ambari/blob/32ee5995/ambari-server/src/main/java/org/apache/ambari/server/orm/entities/TopologyHostTaskEntity.java ---------------------------------------------------------------------- diff --git a/ambari-server/src/main/java/org/apache/ambari/server/orm/entities/TopologyHostTaskEntity.java b/ambari-server/src/main/java/org/apache/ambari/server/orm/entities/TopologyHostTaskEntity.java index 37830b7..549ba7e 100644 --- a/ambari-server/src/main/java/org/apache/ambari/server/orm/entities/TopologyHostTaskEntity.java +++ b/ambari-server/src/main/java/org/apache/ambari/server/orm/entities/TopologyHostTaskEntity.java @@ -42,7 +42,7 @@ import java.util.Collection; @NamedQuery(name = "TopologyHostTaskEntity.findByHostRequest", query = "SELECT req FROM TopologyHostTaskEntity req WHERE req.topologyHostRequestEntity.id = :hostRequestId"), @NamedQuery(name = "TopologyLogicalTaskEntity.findHostRequestIdsByHostTaskIds", - query = "SELECT tht.hostRequestId from TopologyHostTaskEntity tht WHERE tht.id IN :hostTaskIds"), + query = "SELECT DISTINCT tht.hostRequestId from TopologyHostTaskEntity tht WHERE tht.id IN :hostTaskIds"), @NamedQuery(name = "TopologyHostTaskEntity.removeByTaskIds", query = "DELETE FROM TopologyHostTaskEntity tht WHERE tht.id IN :hostTaskIds") }) http://git-wip-us.apache.org/repos/asf/ambari/blob/32ee5995/ambari-server/src/main/java/org/apache/ambari/server/orm/entities/TopologyLogicalRequestEntity.java ---------------------------------------------------------------------- diff --git a/ambari-server/src/main/java/org/apache/ambari/server/orm/entities/TopologyLogicalRequestEntity.java b/ambari-server/src/main/java/org/apache/ambari/server/orm/entities/TopologyLogicalRequestEntity.java index 1536b80..ae13a49 100644 --- a/ambari-server/src/main/java/org/apache/ambari/server/orm/entities/TopologyLogicalRequestEntity.java +++ b/ambari-server/src/main/java/org/apache/ambari/server/orm/entities/TopologyLogicalRequestEntity.java @@ -36,7 +36,7 @@ import java.util.Collection; @Entity @Table(name = "topology_logical_request") @NamedQueries({ - @NamedQuery(name = "TopologyLogicalRequestEntity.findRequestIds", query = "SELECT logicalrequest.topologyRequestId from TopologyLogicalRequestEntity logicalrequest WHERE logicalrequest.id IN :ids") + @NamedQuery(name = "TopologyLogicalRequestEntity.findRequestIds", query = "SELECT DISTINCT t.topologyLogicalRequestEntity.topologyRequestId from TopologyHostRequestEntity t WHERE t.id IN :ids") }) public class TopologyLogicalRequestEntity { @Id http://git-wip-us.apache.org/repos/asf/ambari/blob/32ee5995/ambari-server/src/main/java/org/apache/ambari/server/orm/entities/TopologyLogicalTaskEntity.java ---------------------------------------------------------------------- diff --git a/ambari-server/src/main/java/org/apache/ambari/server/orm/entities/TopologyLogicalTaskEntity.java b/ambari-server/src/main/java/org/apache/ambari/server/orm/entities/TopologyLogicalTaskEntity.java index 2954863..15238ff 100644 --- a/ambari-server/src/main/java/org/apache/ambari/server/orm/entities/TopologyLogicalTaskEntity.java +++ b/ambari-server/src/main/java/org/apache/ambari/server/orm/entities/TopologyLogicalTaskEntity.java @@ -36,7 +36,7 @@ import javax.persistence.TableGenerator; pkColumnName = "sequence_name", valueColumnName = "sequence_value", pkColumnValue = "topology_logical_task_id_seq", initialValue = 0) @NamedQueries({ - @NamedQuery(name = "TopologyLogicalTaskEntity.findHostTaskIdsByPhysicalTaskIds", query = "SELECT logicaltask.hostTaskId from TopologyLogicalTaskEntity logicaltask WHERE logicaltask.physicalTaskId IN :physicalTaskIds"), + @NamedQuery(name = "TopologyLogicalTaskEntity.findHostTaskIdsByPhysicalTaskIds", query = "SELECT DISTINCT logicaltask.hostTaskId from TopologyLogicalTaskEntity logicaltask WHERE logicaltask.physicalTaskId IN :physicalTaskIds"), @NamedQuery(name = "TopologyLogicalTaskEntity.removeByPhysicalTaskIds", query = "DELETE FROM TopologyLogicalTaskEntity logicaltask WHERE logicaltask.physicalTaskId IN :taskIds") }) public class TopologyLogicalTaskEntity { http://git-wip-us.apache.org/repos/asf/ambari/blob/32ee5995/ambari-server/src/main/python/ambari-server.py ---------------------------------------------------------------------- diff --git a/ambari-server/src/main/python/ambari-server.py b/ambari-server/src/main/python/ambari-server.py index 235ef95..71459fb 100755 --- a/ambari-server/src/main/python/ambari-server.py +++ b/ambari-server/src/main/python/ambari-server.py @@ -42,7 +42,7 @@ from ambari_server.setupHttps import setup_https, setup_truststore from ambari_server.setupMpacks import install_mpack, uninstall_mpack, upgrade_mpack, STACK_DEFINITIONS_RESOURCE_NAME, \ SERVICE_DEFINITIONS_RESOURCE_NAME, MPACKS_RESOURCE_NAME from ambari_server.setupSso import setup_sso -from ambari_server.dbCleanup import db_cleanup +from ambari_server.dbCleanup import db_purge from ambari_server.hostUpdate import update_host_names from ambari_server.checkDatabase import check_database from ambari_server.enableStack import enable_stack_version @@ -52,7 +52,7 @@ from ambari_server.setupActions import BACKUP_ACTION, LDAP_SETUP_ACTION, LDAP_SY SETUP_ACTION, SETUP_SECURITY_ACTION,START_ACTION, STATUS_ACTION, STOP_ACTION, RESTART_ACTION, UPGRADE_ACTION, \ UPGRADE_STACK_ACTION, SETUP_JCE_ACTION, SET_CURRENT_ACTION, START_ACTION, STATUS_ACTION, STOP_ACTION, UPGRADE_ACTION, \ UPGRADE_STACK_ACTION, SETUP_JCE_ACTION, SET_CURRENT_ACTION, ENABLE_STACK_ACTION, SETUP_SSO_ACTION, \ - DB_CLEANUP_ACTION, INSTALL_MPACK_ACTION, UNINSTALL_MPACK_ACTION, UPGRADE_MPACK_ACTION, PAM_SETUP_ACTION, KERBEROS_SETUP_ACTION + DB_PURGE_ACTION, INSTALL_MPACK_ACTION, UNINSTALL_MPACK_ACTION, UPGRADE_MPACK_ACTION, PAM_SETUP_ACTION, KERBEROS_SETUP_ACTION from ambari_server.setupSecurity import setup_ldap, sync_ldap, setup_master_key, setup_ambari_krb5_jaas, setup_pam from ambari_server.userInput import get_validated_string_input from ambari_server.kerberos_setup import setup_kerberos @@ -200,11 +200,11 @@ def restart(args): @OsFamilyFuncImpl(OsFamilyImpl.DEFAULT) -def database_cleanup(args): - logger.info("Database cleanup.") +def database_purge(args): + logger.info("Purging historical data from database.") if args.silent: stop(args) - db_cleanup(args) + db_purge(args) # # The Ambari Server status. @@ -472,7 +472,7 @@ def init_parser_options(parser): help="Print verbose status messages") parser.add_option("-s", "--silent", action="store_true", dest="silent", default=False, - help="Silently accepts default prompt values. For db-cleanup command, silent mode will stop ambari server.") + help="Silently accepts default prompt values. For db-purge-history command, silent mode will stop ambari server.") parser.add_option('-g', '--debug', action="store_true", dest='debug', default=False, help="Start ambari-server in debug mode") parser.add_option('-y', '--suspend-start', action="store_true", dest='suspend_start', default=False, @@ -517,7 +517,7 @@ def init_parser_options(parser): help="Specify stack version that needs to be enabled. All other stacks versions will be disabled") parser.add_option('--stack', dest="stack_name", default=None, type="string", help="Specify stack name for the stack versions that needs to be enabled") - parser.add_option("-d", "--from-date", dest="cleanup_from_date", default=None, type="string", help="Specify date for the cleanup process in 'yyyy-MM-dd' format") + parser.add_option("-d", "--from-date", dest="purge_from_date", default=None, type="string", help="Specify date for the database purge process in 'yyyy-MM-dd' format") add_parser_options('--mpack', default=None, help="Specify the path for management pack to be installed/upgraded", @@ -762,7 +762,7 @@ def create_user_action_map(args, options): CHECK_DATABASE_ACTION: UserAction(check_database, options), ENABLE_STACK_ACTION: UserAction(enable_stack, options, args), SETUP_SSO_ACTION: UserActionRestart(setup_sso, options), - DB_CLEANUP_ACTION: UserAction(database_cleanup, options), + DB_PURGE_ACTION: UserAction(database_purge, options), INSTALL_MPACK_ACTION: UserAction(install_mpack, options), UNINSTALL_MPACK_ACTION: UserAction(uninstall_mpack, options), UPGRADE_MPACK_ACTION: UserAction(upgrade_mpack, options), http://git-wip-us.apache.org/repos/asf/ambari/blob/32ee5995/ambari-server/src/main/python/ambari_server/dbCleanup.py ---------------------------------------------------------------------- diff --git a/ambari-server/src/main/python/ambari_server/dbCleanup.py b/ambari-server/src/main/python/ambari_server/dbCleanup.py index 6e16bc5..2611141 100644 --- a/ambari-server/src/main/python/ambari_server/dbCleanup.py +++ b/ambari-server/src/main/python/ambari_server/dbCleanup.py @@ -37,7 +37,7 @@ DB_CLEANUP_CMD = "{0} -cp {1} org.apache.ambari.server.cleanup.CleanupDriver --c # # Run the db cleanup process # -def run_db_cleanup(options): +def run_db_purge(options): if validate_args(options): return 1 @@ -50,18 +50,18 @@ def run_db_cleanup(options): confirmBackup = get_YN_input("Ambari Server configured for {0}. Confirm you have made a backup of the Ambari Server database [y/n]".format( db_title), True) if not confirmBackup: - print_info_msg("Ambari Server Database cleanup aborted") + print_info_msg("Ambari Server Database purge aborted") return 0 if status: - print_error_msg("The database cleanup cannot proceed while Ambari Server is running. Please shut down Ambari first.") + print_error_msg("The database purge historical data cannot proceed while Ambari Server is running. Please shut down Ambari first.") return 1 confirm = get_YN_input( - "Ambari server is using db type {0}. Cleanable database entries older than {1} will be cleaned up. Proceed [y/n]".format( - db_title, options.cleanup_from_date), True) + "Ambari server is using db type {0}. Cleanable database entries older than {1} will be purged. Proceed [y/n]".format( + db_title, options.purge_from_date), True) if not confirm: - print_info_msg("Ambari Server Database cleanup aborted") + print_info_msg("Ambari Server Database purge aborted") return 0 @@ -81,31 +81,31 @@ def run_db_cleanup(options): current_user = ensure_can_start_under_current_user(ambari_user) environ = generate_env(options, ambari_user, current_user) - print "Cleaning up the database ..." - command = DB_CLEANUP_CMD.format(jdk_path, class_path, options.cluster_name, options.cleanup_from_date) + print "Purging historical data from the database ..." + command = DB_CLEANUP_CMD.format(jdk_path, class_path, options.cluster_name, options.purge_from_date) (retcode, stdout, stderr) = run_os_command(command, env=environ) print_info_msg("Return code from database cleanup command, retcode = " + str(retcode)) if stdout: - print "Console output from database cleanup command:" + print "Console output from database purge-history command:" print stdout print if stderr: - print "Error output from database cleanup command:" + print "Error output from database purge-history command:" print stderr print if retcode > 0: - print_error_msg("Error wncountered while cleaning up the Ambari Server Database. Check the ambari-server.log for details.") + print_error_msg("Error encountered while purging the Ambari Server Database. Check the ambari-server.log for details.") else: - print "Cleanup completed. Check the ambari-server.log for details." + print "Purging historical data completed. Check the ambari-server.log for details." return retcode # -# Database cleanup +# Database purge # -def db_cleanup(options): - return run_db_cleanup(options) +def db_purge(options): + return run_db_purge(options) def validate_args(options): @@ -113,12 +113,12 @@ def validate_args(options): print_error_msg("Please provide the --cluster-name argument.") return 1 - if not options.cleanup_from_date: + if not options.purge_from_date: print_error_msg("Please provide the --from-date argument.") return 1 try: - datetime.datetime.strptime(options.cleanup_from_date, "%Y-%m-%d") + datetime.datetime.strptime(options.purge_from_date, "%Y-%m-%d") except ValueError as e: print_error_msg("The --from-date argument has an invalid format. {0}".format(e.args[0])) return 1; http://git-wip-us.apache.org/repos/asf/ambari/blob/32ee5995/ambari-server/src/main/python/ambari_server/setupActions.py ---------------------------------------------------------------------- diff --git a/ambari-server/src/main/python/ambari_server/setupActions.py b/ambari-server/src/main/python/ambari_server/setupActions.py index 358bfc9..142a4d7 100644 --- a/ambari-server/src/main/python/ambari_server/setupActions.py +++ b/ambari-server/src/main/python/ambari_server/setupActions.py @@ -43,7 +43,7 @@ BACKUP_ACTION = "backup" RESTORE_ACTION = "restore" SETUP_JCE_ACTION = "setup-jce" ENABLE_STACK_ACTION = "enable-stack" -DB_CLEANUP_ACTION = "db-cleanup" +DB_PURGE_ACTION = "db-purge-history" INSTALL_MPACK_ACTION = "install-mpack" UNINSTALL_MPACK_ACTION = "uninstall-mpack" UPGRADE_MPACK_ACTION = "upgrade-mpack" http://git-wip-us.apache.org/repos/asf/ambari/blob/32ee5995/ambari-server/src/test/java/org/apache/ambari/server/cleanup/CleanupServiceImplTest.java ---------------------------------------------------------------------- diff --git a/ambari-server/src/test/java/org/apache/ambari/server/cleanup/CleanupServiceImplTest.java b/ambari-server/src/test/java/org/apache/ambari/server/cleanup/CleanupServiceImplTest.java index 7de5aae..33c2844 100644 --- a/ambari-server/src/test/java/org/apache/ambari/server/cleanup/CleanupServiceImplTest.java +++ b/ambari-server/src/test/java/org/apache/ambari/server/cleanup/CleanupServiceImplTest.java @@ -32,6 +32,7 @@ import junit.framework.Assert; import static org.easymock.EasyMock.capture; import static org.easymock.EasyMock.expect; +import static org.easymock.EasyMock.expectLastCall; import static org.easymock.EasyMock.newCapture; import static org.easymock.EasyMock.replay; import static org.easymock.EasyMock.reset; @@ -71,7 +72,7 @@ public class CleanupServiceImplTest { cleanupServiceImpl = new CleanupServiceImpl(cleanables); // WHEN - long rows = cleanupServiceImpl.cleanup(cleanupPolicy); + cleanupServiceImpl.cleanup(cleanupPolicy); // THEN Assert.assertNotNull("The argument is null", timeBasedCleanupPolicyCapture.getValue()); @@ -79,4 +80,41 @@ public class CleanupServiceImplTest { Assert.assertEquals("The to date is wrong!", timeBasedCleanupPolicyCapture.getValue().getToDateInMillis(), FROM_DATE_TIMESTAMP); } + @Test + public void testAffectedRowsNoError() throws Exception { + // GIVEN + cleanables = new HashSet<>(); + cleanables.add(cleanableDao); + expect(cleanableDao.cleanup(cleanupPolicy)).andReturn(2L); + + replay(cleanableDao); + cleanupServiceImpl = new CleanupServiceImpl(cleanables); + + // WHEN + CleanupService.CleanupResult res = cleanupServiceImpl.cleanup(cleanupPolicy); + + // THEN + Assert.assertEquals("The affected rows count is wrong", 2L, res.getAffectedRows()); + Assert.assertEquals("The error count is wrong", 0L, res.getErrorCount()); + } + + @Test + public void testAffectedRowsWithErrors() throws Exception { + // GIVEN + cleanables = new HashSet<>(); + cleanables.add(cleanableDao); + expect(cleanableDao.cleanup(cleanupPolicy)).andThrow(new RuntimeException()); + + + replay(cleanableDao); + cleanupServiceImpl = new CleanupServiceImpl(cleanables); + + // WHEN + CleanupService.CleanupResult res = cleanupServiceImpl.cleanup(cleanupPolicy); + + // THEN + Assert.assertEquals("The affected rows count is wrong", 0L, res.getAffectedRows()); + Assert.assertEquals("The error count is wrong", 1L, res.getErrorCount()); + } + } \ No newline at end of file