This is an automated email from the ASF dual-hosted git repository.

radhikakundam pushed a commit to branch branch-2.0
in repository https://gitbox.apache.org/repos/asf/atlas.git


The following commit(s) were added to refs/heads/branch-2.0 by this push:
     new f944f813a ATLAS-4768: Implement aging for audits stored by Atlas.
f944f813a is described below

commit f944f813a04ae91aba94f3622e1213da34dab30a
Author: radhikakundam <radhikakun...@apache.org>
AuthorDate: Fri Sep 29 10:39:51 2023 -0700

    ATLAS-4768: Implement aging for audits stored by Atlas.
    
    Signed-off-by: radhikakundam <radhikakun...@apache.org>
    (cherry picked from commit 79ef3cce1be4874c17b296aff8b200c9edb42bcb)
---
 .../main/java/org/apache/atlas/AtlasClientV2.java  |  10 +
 .../org/apache/atlas/repository/Constants.java     |  23 ++
 .../java/org/apache/atlas/AtlasConfiguration.java  |  18 +-
 .../atlas/model/audit/AuditReductionCriteria.java  | 226 +++++++++++++
 .../atlas/discovery/AtlasDiscoveryService.java     |  18 +
 .../atlas/discovery/EntityDiscoveryService.java    |  60 ++++
 .../audit/AbstractStorageBasedAuditRepository.java |  11 +-
 .../audit/AtlasAuditReductionService.java          | 363 +++++++++++++++++++++
 .../repository/audit/EntityAuditRepository.java    |  12 +
 .../audit/HBaseBasedAuditRepository.java           | 158 ++++++++-
 .../audit/InMemoryEntityAuditRepository.java       |  53 ++-
 .../audit/NoopEntityAuditRepository.java           |   7 +
 .../repository/graph/GraphBackedSearchIndexer.java |   3 +
 .../tasks/AuditReductionEntityRetrievalTask.java   | 231 +++++++++++++
 .../store/graph/v2/tasks/AuditReductionTask.java   | 150 +++++++++
 .../graph/v2/tasks/AuditReductionTaskFactory.java  | 113 +++++++
 .../repository/audit/AuditRepositoryTestBase.java  |  67 +++-
 .../resources/solr/core-template/solrconfig.xml    |   2 +-
 .../atlas/web/filters/ActiveServerFilter.java      |   2 +-
 .../apache/atlas/web/resources/AdminResource.java  |  46 ++-
 .../atlas/web/resources/AdminResourceTest.java     |   4 +-
 21 files changed, 1543 insertions(+), 34 deletions(-)

diff --git a/client/client-v2/src/main/java/org/apache/atlas/AtlasClientV2.java 
b/client/client-v2/src/main/java/org/apache/atlas/AtlasClientV2.java
index 6f97da192..6477b4091 100644
--- a/client/client-v2/src/main/java/org/apache/atlas/AtlasClientV2.java
+++ b/client/client-v2/src/main/java/org/apache/atlas/AtlasClientV2.java
@@ -31,6 +31,7 @@ import com.sun.jersey.multipart.file.StreamDataBodyPart;
 import org.apache.atlas.bulkimport.BulkImportResponse;
 import org.apache.atlas.model.SearchFilter;
 import org.apache.atlas.model.audit.AtlasAuditEntry;
+import org.apache.atlas.model.audit.AuditReductionCriteria;
 import org.apache.atlas.model.audit.AuditSearchParameters;
 import org.apache.atlas.model.audit.EntityAuditEventV2;
 import org.apache.atlas.model.discovery.AtlasQuickSearchResult;
@@ -838,6 +839,14 @@ public class AtlasClientV2 extends AtlasBaseClient {
         });
     }
 
+    public void ageoutAtlasAudits(AuditReductionCriteria 
auditReductionCriteria, boolean useAuditConfig) throws AtlasServiceException {
+        MultivaluedMap<String, String> queryParams = new MultivaluedMapImpl();
+
+        queryParams.add("useAuditConfig", String.valueOf(useAuditConfig));
+
+        callAPI(API_V2.AGEOUT_ATLAS_AUDITS, List.class, 
auditReductionCriteria, queryParams);
+    }
+
 
     // Glossary APIs
     public List<AtlasGlossary> getAllGlossaries(String sortByAttribute, int 
limit, int offset) throws AtlasServiceException {
@@ -1269,6 +1278,7 @@ public class AtlasClientV2 extends AtlasBaseClient {
 
         // Admin APIs
         public static final API_V2 GET_ATLAS_AUDITS            = new 
API_V2(ATLAS_AUDIT_API, HttpMethod.POST, Response.Status.OK);
+        public static final API_V2 AGEOUT_ATLAS_AUDITS         = new 
API_V2(ATLAS_AUDIT_API + "ageout/", HttpMethod.POST, Response.Status.OK);
 
         // Glossary APIs
         public static final API_V2 GET_ALL_GLOSSARIES              = new 
API_V2(GLOSSARY_URI, HttpMethod.GET, Response.Status.OK);
diff --git a/common/src/main/java/org/apache/atlas/repository/Constants.java 
b/common/src/main/java/org/apache/atlas/repository/Constants.java
index 51b093284..2134133f5 100644
--- a/common/src/main/java/org/apache/atlas/repository/Constants.java
+++ b/common/src/main/java/org/apache/atlas/repository/Constants.java
@@ -239,6 +239,15 @@ public final class Constants {
     public static final String PROPERTY_KEY_INDEX_RECOVERY_PREV_TIME   = 
encodePropertyKey(INDEX_RECOVERY_PREFIX + "prevTime");
     public static final String PROPERTY_KEY_INDEX_RECOVERY_CUSTOM_TIME = 
encodePropertyKey(INDEX_RECOVERY_PREFIX + "customTime");
 
+    /**
+     * Audit Reduction vertex property keys.
+     */
+    public static final String AUDIT_REDUCTION_PREFIX                  = 
INTERNAL_PROPERTY_KEY_PREFIX + "auditReduction_";
+    public static final String PROPERTY_KEY_AUDIT_REDUCTION_NAME       = 
encodePropertyKey(AUDIT_REDUCTION_PREFIX + "name");
+    public static final String PROPERTY_KEY_GUIDS_TO_AGEOUT_BY_DEFAULT = 
encodePropertyKey(AUDIT_REDUCTION_PREFIX + "default");
+    public static final String PROPERTY_KEY_GUIDS_TO_AGEOUT_BY_CUSTOM  = 
encodePropertyKey(AUDIT_REDUCTION_PREFIX + "custom");
+    public static final String PROPERTY_KEY_GUIDS_TO_SWEEPOUT          = 
encodePropertyKey(AUDIT_REDUCTION_PREFIX + "sweepout");
+
     public static final String SQOOP_SOURCE       = "sqoop";
     public static final String FALCON_SOURCE      = "falcon";
     public static final String HBASE_SOURCE       = "hbase";
@@ -248,6 +257,20 @@ public final class Constants {
     public static final String STORM_SOURCE       = "storm";
     public static final String FILE_SPOOL_SOURCE  = "file_spool";
 
+    /**
+     * Audit Reduction related constants
+     */
+    public enum AtlasAuditAgingType { DEFAULT, CUSTOM, SWEEP }
+    public static final String AUDIT_REDUCTION_TYPE_NAME            = 
"__auditReductionInfo";
+    public static final String AUDIT_AGING_TYPE_KEY                 = 
"auditAgingType";
+    public static final String AUDIT_AGING_TTL_KEY                  = "ttl";
+    public static final String AUDIT_AGING_COUNT_KEY                = 
"auditCount";
+    public static final String AUDIT_AGING_ENTITY_TYPES_KEY         = 
"entityTypes";
+    public static final String AUDIT_AGING_ACTION_TYPES_KEY         = 
"actionTypes";
+    public static final String AUDIT_AGING_EXCLUDE_ENTITY_TYPES_KEY = 
"excludeEntityTypes";
+    public static final String CREATE_EVENTS_AGEOUT_ALLOWED_KEY     = 
"createEventsAgeoutAllowed";
+    public static final String AUDIT_AGING_SUBTYPES_INCLUDED_KEY    = 
"subTypesIncluded";
+
     /*
      * All supported file-format extensions for Bulk Imports through file 
upload
      */
diff --git a/intg/src/main/java/org/apache/atlas/AtlasConfiguration.java 
b/intg/src/main/java/org/apache/atlas/AtlasConfiguration.java
index 58a2fa725..090889e1c 100644
--- a/intg/src/main/java/org/apache/atlas/AtlasConfiguration.java
+++ b/intg/src/main/java/org/apache/atlas/AtlasConfiguration.java
@@ -93,7 +93,23 @@ public enum AtlasConfiguration {
     SESSION_TIMEOUT_SECS("atlas.session.timeout.secs", -1),
     UPDATE_COMPOSITE_INDEX_STATUS("atlas.update.composite.index.status", true),
     METRICS_TIME_TO_LIVE_HOURS( "atlas.metrics.ttl.hours", 336), // 14 days 
default
-       SOLR_INDEX_TX_LOG_TTL_CONF("write.ahead.log.ttl.in.hours", 240); //10 
days default
+       SOLR_INDEX_TX_LOG_TTL_CONF("write.ahead.log.ttl.in.hours", 240), //10 
days default
+
+    ATLAS_AUDIT_AGING_ENABLED("atlas.audit.aging.enabled", false),
+    ATLAS_AUDIT_DEFAULT_AGEOUT_ENABLED("atlas.audit.default.ageout.enabled", 
true),
+    ATLAS_AUDIT_DEFAULT_AGEOUT_TTL("atlas.audit.default.ageout.ttl.in.days", 
90),
+    ATLAS_AUDIT_DEFAULT_AGEOUT_COUNT("atlas.audit.default.ageout.count", 0),
+    ATLAS_AUDIT_CUSTOM_AGEOUT_TTL("atlas.audit.custom.ageout.ttl.in.days", 0),
+    ATLAS_AUDIT_CUSTOM_AGEOUT_COUNT("atlas.audit.custom.ageout.count", 0),
+    ATLAS_AUDIT_SWEEP_OUT("atlas.audit.sweep.out.enabled", false),
+    
ATLAS_AUDIT_CREATE_EVENTS_AGEOUT_ALLOWED("atlas.audit.create.events.ageout.allowed",
 false),
+    
ATLAS_AUDIT_AGING_SCHEDULER_FREQUENCY("atlas.audit.aging.scheduler.frequency.in.days",
 30),
+    ATLAS_AUDIT_AGING_SUBTYPES_INCLUDED("atlas.audit.aging.subtypes.included", 
true),
+    MIN_TTL_TO_MAINTAIN("atlas.audit.min.ttl.to.maintain", 7),
+    MIN_AUDIT_COUNT_TO_MAINTAIN("atlas.audit.min.count.to.maintain", 50),
+    ATLAS_AUDIT_AGING_SEARCH_MAX_LIMIT("atlas.audit.aging.search.maxlimit", 
10000),
+    
ATLAS_AUDIT_DEFAULT_AGEOUT_IGNORE_TTL("atlas.audit.default.ageout.ignore.ttl", 
false),
+    
ATLAS_AUDIT_AGING_TTL_TEST_AUTOMATION("atlas.audit.aging.ttl.test.automation", 
false); //Only for test automation
 
     private static final Configuration APPLICATION_PROPERTIES;
 
diff --git 
a/intg/src/main/java/org/apache/atlas/model/audit/AuditReductionCriteria.java 
b/intg/src/main/java/org/apache/atlas/model/audit/AuditReductionCriteria.java
new file mode 100644
index 000000000..c956b3ac8
--- /dev/null
+++ 
b/intg/src/main/java/org/apache/atlas/model/audit/AuditReductionCriteria.java
@@ -0,0 +1,226 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.atlas.model.audit;
+
+
+import com.fasterxml.jackson.annotation.JsonAutoDetect;
+import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
+import com.fasterxml.jackson.databind.annotation.JsonSerialize;
+
+import java.io.Serializable;
+import java.util.*;
+
+import static com.fasterxml.jackson.annotation.JsonAutoDetect.Visibility.NONE;
+import static 
com.fasterxml.jackson.annotation.JsonAutoDetect.Visibility.PUBLIC_ONLY;
+
+@JsonAutoDetect(getterVisibility = PUBLIC_ONLY, setterVisibility = 
PUBLIC_ONLY, fieldVisibility = NONE)
+@JsonSerialize(include = JsonSerialize.Inclusion.NON_NULL)
+@JsonIgnoreProperties(ignoreUnknown = true)
+public class AuditReductionCriteria implements Serializable {
+    private static final long serialVersionUID = 1L;
+
+    private boolean auditAgingEnabled         = false;
+    private boolean defaultAgeoutEnabled      = false;
+    private boolean auditSweepoutEnabled      = false;
+    private boolean createEventsAgeoutAllowed = false;
+    private boolean subTypesIncluded          = true;
+    private boolean ignoreDefaultAgeoutTTL    = false;
+
+    private int defaultAgeoutAuditCount;
+    private int defaultAgeoutTTLInDays;
+    private int customAgeoutAuditCount;
+    private int customAgeoutTTLInDays;
+
+    private String customAgeoutEntityTypes;
+    private String customAgeoutActionTypes;
+
+    private String sweepoutEntityTypes;
+    private String sweepoutActionTypes;
+
+    public boolean isAuditAgingEnabled() {
+        return auditAgingEnabled;
+    }
+
+    public void setAuditAgingEnabled(boolean auditAgingEnabled) {
+        this.auditAgingEnabled = auditAgingEnabled;
+    }
+
+    public boolean isDefaultAgeoutEnabled() {
+        return defaultAgeoutEnabled;
+    }
+
+    public void setDefaultAgeoutEnabled(boolean defaultAgeoutEnabled) {
+        this.defaultAgeoutEnabled = defaultAgeoutEnabled;
+    }
+
+    public boolean isAuditSweepoutEnabled() {
+        return auditSweepoutEnabled;
+    }
+
+    public void setAuditSweepoutEnabled(boolean auditSweepoutEnabled) {
+        this.auditSweepoutEnabled = auditSweepoutEnabled;
+    }
+
+    public boolean isCreateEventsAgeoutAllowed() {
+        return createEventsAgeoutAllowed;
+    }
+
+    public void setCreateEventsAgeoutAllowed(boolean 
createEventsAgeoutAllowed) {
+        this.createEventsAgeoutAllowed = createEventsAgeoutAllowed;
+    }
+
+    public boolean isSubTypesIncluded() {
+        return subTypesIncluded;
+    }
+
+    public void setSubTypesIncluded(boolean subTypesIncluded) {
+        this.subTypesIncluded = subTypesIncluded;
+    }
+
+    public boolean ignoreDefaultAgeoutTTL() {
+        return ignoreDefaultAgeoutTTL;
+    }
+
+    public void setIgnoreDefaultAgeoutTTL(boolean ignoreDefaultAgeoutTTL) {
+        this.ignoreDefaultAgeoutTTL = ignoreDefaultAgeoutTTL;
+    }
+
+    public int getDefaultAgeoutTTLInDays() {
+        return defaultAgeoutTTLInDays;
+    }
+
+    public void setDefaultAgeoutTTLInDays(int defaultAgeoutTTLInDays) {
+        this.defaultAgeoutTTLInDays = defaultAgeoutTTLInDays;
+    }
+
+    public int getDefaultAgeoutAuditCount() {
+        return defaultAgeoutAuditCount;
+    }
+
+    public void setDefaultAgeoutAuditCount(int defaultAgeoutAuditCount) {
+        this.defaultAgeoutAuditCount = defaultAgeoutAuditCount;
+    }
+
+    public int getCustomAgeoutTTLInDays() {
+        return customAgeoutTTLInDays;
+    }
+
+    public void setCustomAgeoutTTLInDays(int customAgeoutTTLInDays) {
+        this.customAgeoutTTLInDays = customAgeoutTTLInDays;
+    }
+
+    public int getCustomAgeoutAuditCount() {
+        return customAgeoutAuditCount;
+    }
+
+    public void setCustomAgeoutAuditCount(int customAgeoutAuditCount) {
+        this.customAgeoutAuditCount = customAgeoutAuditCount;
+    }
+
+    public String getCustomAgeoutEntityTypes() {
+        return customAgeoutEntityTypes;
+    }
+
+    public void setCustomAgeoutEntityTypes(String customAgeoutEntityTypes) {
+        this.customAgeoutEntityTypes = customAgeoutEntityTypes;
+    }
+
+    public String getCustomAgeoutActionTypes() {
+        return customAgeoutActionTypes;
+    }
+
+    public void setCustomAgeoutActionTypes(String customAgeoutActionTypes) {
+        this.customAgeoutActionTypes = customAgeoutActionTypes;
+    }
+
+    public String getSweepoutEntityTypes() {
+        return sweepoutEntityTypes;
+    }
+
+    public void setSweepoutEntityTypes(String sweepoutEntityTypes) {
+        this.sweepoutEntityTypes = sweepoutEntityTypes;
+    }
+
+    public String getSweepoutActionTypes() {
+        return sweepoutActionTypes;
+    }
+
+    public void setSweepoutActionTypes(String sweepoutActionTypes) {
+        this.sweepoutActionTypes = sweepoutActionTypes;
+    }
+
+
+    @Override
+    public boolean equals(Object o) {
+        if (this == o) return true;
+        if (o == null || getClass() != o.getClass()) return false;
+        AuditReductionCriteria that = (AuditReductionCriteria) o;
+        return auditAgingEnabled == that.auditAgingEnabled &&
+                defaultAgeoutEnabled == that.defaultAgeoutEnabled &&
+                auditSweepoutEnabled == that.auditSweepoutEnabled &&
+                createEventsAgeoutAllowed == that.createEventsAgeoutAllowed &&
+                subTypesIncluded == that.subTypesIncluded &&
+                ignoreDefaultAgeoutTTL == that.ignoreDefaultAgeoutTTL &&
+                defaultAgeoutAuditCount == that.defaultAgeoutAuditCount &&
+                defaultAgeoutTTLInDays == that.defaultAgeoutTTLInDays &&
+                customAgeoutAuditCount == that.customAgeoutAuditCount &&
+                customAgeoutTTLInDays == that.customAgeoutTTLInDays &&
+                Objects.equals(customAgeoutEntityTypes, 
that.customAgeoutEntityTypes) &&
+                Objects.equals(customAgeoutActionTypes, 
that.customAgeoutActionTypes) &&
+                Objects.equals(sweepoutEntityTypes, that.sweepoutEntityTypes) 
&&
+                Objects.equals(sweepoutActionTypes, that.sweepoutActionTypes);
+    }
+
+    @Override
+    public int hashCode() {
+        return Objects.hash(auditAgingEnabled, defaultAgeoutEnabled, 
auditSweepoutEnabled, createEventsAgeoutAllowed, subTypesIncluded, 
ignoreDefaultAgeoutTTL, defaultAgeoutAuditCount, defaultAgeoutTTLInDays, 
customAgeoutAuditCount, customAgeoutTTLInDays,
+                            customAgeoutEntityTypes, customAgeoutActionTypes, 
sweepoutEntityTypes, sweepoutActionTypes);
+    }
+
+    public StringBuilder toString(StringBuilder sb) {
+        if (sb == null) {
+            sb = new StringBuilder();
+        }
+
+        sb.append('{');
+        
sb.append("auditAgingEnabled='").append(auditAgingEnabled).append('\'');
+        sb.append(", 
createEventsAgeoutAllowed='").append(createEventsAgeoutAllowed).append('\'');
+        sb.append(", 
subTypesIncluded='").append(subTypesIncluded).append('\'');
+        sb.append(", 
defaultAgeoutEnabled='").append(defaultAgeoutEnabled).append('\'');
+        sb.append(", 
ignoreDefaultAgeoutTTL='").append(ignoreDefaultAgeoutTTL).append('\'');
+        sb.append(", 
defaultAgeoutTTLInDays='").append(defaultAgeoutTTLInDays).append('\'');
+        sb.append(", 
defaultAgeoutAuditCount='").append(defaultAgeoutAuditCount).append('\'');
+        sb.append(", 
auditSweepoutEnabled='").append(auditSweepoutEnabled).append('\'');
+        sb.append(", 
customAgeoutAuditCount='").append(customAgeoutAuditCount).append('\'');
+        sb.append(", 
customAgeoutTTLInDays='").append(customAgeoutTTLInDays).append('\'');
+        sb.append(", 
customAgeoutEntityTypes=").append(customAgeoutEntityTypes);
+        sb.append(", 
customAgeoutActionTypes=").append(customAgeoutActionTypes);
+        sb.append(", sweepoutEntityTypes=").append(sweepoutEntityTypes);
+        sb.append(", sweepoutActionTypes=").append(sweepoutActionTypes);
+        sb.append('}');
+
+        return sb;
+    }
+
+    @Override
+    public String toString() {
+        return toString(new StringBuilder()).toString();
+    }
+
+
+}
diff --git 
a/repository/src/main/java/org/apache/atlas/discovery/AtlasDiscoveryService.java
 
b/repository/src/main/java/org/apache/atlas/discovery/AtlasDiscoveryService.java
index f8e55b886..acab4ce0a 100644
--- 
a/repository/src/main/java/org/apache/atlas/discovery/AtlasDiscoveryService.java
+++ 
b/repository/src/main/java/org/apache/atlas/discovery/AtlasDiscoveryService.java
@@ -22,10 +22,13 @@ package org.apache.atlas.discovery;
 import org.apache.atlas.exception.AtlasBaseException;
 import org.apache.atlas.model.discovery.*;
 import org.apache.atlas.model.profile.AtlasUserSavedSearch;
+import org.apache.atlas.model.tasks.AtlasTask;
+import org.apache.atlas.repository.Constants.AtlasAuditAgingType;
 
 import java.io.IOException;
 import java.util.List;
 import java.util.Map;
+import java.util.Set;
 
 public interface AtlasDiscoveryService {
     /**
@@ -79,6 +82,14 @@ public interface AtlasDiscoveryService {
      */
     AtlasSearchResult searchWithParameters(SearchParameters searchParameters) 
throws AtlasBaseException;
 
+    /**
+     * Search for guids of entities matching the search criteria
+     * @param searchParameters Search criteria
+     * @return GUIDs of matching entities
+     * @throws AtlasBaseException
+     */
+    Set<String> searchGUIDsWithParameters(AtlasAuditAgingType auditAgingType, 
Set<String> entityTypes, SearchParameters searchParameters) throws 
AtlasBaseException;
+
     /**
      * Search for relations (edges) matching the search criteria
      * @param searchParameters Search criteria
@@ -173,4 +184,11 @@ public interface AtlasDiscoveryService {
      * @throws IOException
      */
     AtlasSearchResultDownloadStatus getSearchResultDownloadStatus() throws 
IOException;
+
+    /**
+     * Creates task to age out audits
+     * @param taskParams parameters of AtlasTask
+     * @return Task created to perform audit aging
+     */
+    AtlasTask createAndQueueAuditReductionTask(Map<String, Object> taskParams, 
String taskType) throws AtlasBaseException;
 }
diff --git 
a/repository/src/main/java/org/apache/atlas/discovery/EntityDiscoveryService.java
 
b/repository/src/main/java/org/apache/atlas/discovery/EntityDiscoveryService.java
index 5b4395355..9be6517e9 100644
--- 
a/repository/src/main/java/org/apache/atlas/discovery/EntityDiscoveryService.java
+++ 
b/repository/src/main/java/org/apache/atlas/discovery/EntityDiscoveryService.java
@@ -50,6 +50,7 @@ import org.apache.atlas.query.executors.DSLQueryExecutor;
 import org.apache.atlas.query.executors.ScriptEngineBasedExecutor;
 import org.apache.atlas.query.executors.TraversalBasedExecutor;
 import org.apache.atlas.repository.Constants;
+import org.apache.atlas.repository.Constants.AtlasAuditAgingType;
 import org.apache.atlas.repository.graph.GraphBackedSearchIndexer;
 import org.apache.atlas.repository.graph.GraphHelper;
 import org.apache.atlas.repository.graphdb.AtlasEdge;
@@ -60,6 +61,7 @@ import 
org.apache.atlas.repository.graphdb.AtlasIndexQuery.Result;
 import org.apache.atlas.repository.graphdb.AtlasVertex;
 import org.apache.atlas.repository.store.graph.v2.AtlasGraphUtilsV2;
 import org.apache.atlas.repository.store.graph.v2.EntityGraphRetriever;
+import 
org.apache.atlas.repository.store.graph.v2.tasks.AuditReductionTaskFactory;
 import 
org.apache.atlas.repository.store.graph.v2.tasks.searchdownload.SearchResultDownloadTask;
 import 
org.apache.atlas.repository.store.graph.v2.tasks.searchdownload.SearchResultDownloadTaskFactory;
 import org.apache.atlas.repository.userprofile.UserProfileService;
@@ -467,6 +469,12 @@ public class EntityDiscoveryService implements 
AtlasDiscoveryService {
         return searchWithSearchContext(new SearchContext(searchParameters, 
typeRegistry, graph, indexer.getVertexIndexKeys()));
     }
 
+    @Override
+    @GraphTransaction
+    public Set<String> searchGUIDsWithParameters(AtlasAuditAgingType 
auditAgingType, Set<String> entityTypes, SearchParameters searchParameters) 
throws AtlasBaseException {
+        return searchEntityGUIDs(auditAgingType, entityTypes, new 
SearchContext(searchParameters, typeRegistry, graph, 
indexer.getVertexIndexKeys()));
+    }
+
     @Override
     @GraphTransaction
     public void createAndQueueSearchResultDownloadTask(Map<String, Object> 
taskParams) throws AtlasBaseException {
@@ -509,6 +517,19 @@ public class EntityDiscoveryService implements 
AtlasDiscoveryService {
         return result;
     }
 
+    @Override
+    @GraphTransaction
+    public AtlasTask createAndQueueAuditReductionTask(Map<String, Object> 
taskParams, String taskType) throws AtlasBaseException {
+        List<AtlasTask> pendingTasks = 
taskManagement.getPendingTasksByType(taskType);
+        if (CollectionUtils.isNotEmpty(pendingTasks) && pendingTasks.size() > 
AuditReductionTaskFactory.MAX_PENDING_TASKS_ALLOWED) {
+            throw new AtlasBaseException(PENDING_TASKS_ALREADY_IN_PROGRESS, 
String.valueOf(pendingTasks.size()));
+        }
+        AtlasTask task = taskManagement.createTask(taskType, 
RequestContext.getCurrentUser(), taskParams);
+        RequestContext.get().queueTask(task);
+
+        return task;
+    }
+
     @Override
     @GraphTransaction
     public AtlasSearchResult 
searchRelationsWithParameters(RelationshipSearchParameters searchParameters) 
throws AtlasBaseException {
@@ -549,6 +570,45 @@ public class EntityDiscoveryService implements 
AtlasDiscoveryService {
         return ret;
     }
 
+    public Set<String> searchEntityGUIDs(AtlasAuditAgingType auditAgingType, 
Set<String> entityTypes, SearchContext searchContext) throws AtlasBaseException 
{
+        SearchParameters searchParameters = 
searchContext.getSearchParameters();
+        final QueryParams params          = 
QueryParams.getNormalizedParams(searchParameters.getLimit(),searchParameters.getOffset());
+        String           searchID         = searchTracker.add(searchContext); 
// For future cancellations
+
+        searchParameters.setLimit(params.limit());
+        searchParameters.setOffset(params.offset());
+
+        Set<String> guids = new HashSet<>();
+        try {
+            List<AtlasVertex> resultList = 
searchContext.getSearchProcessor().execute();
+            do {
+                for (AtlasVertex atlasVertex : resultList) {
+                    if (atlasVertex != null && 
checkVertexMatchesSearchCriteria(atlasVertex, auditAgingType, entityTypes)) {
+                        
guids.add(atlasVertex.getProperty(Constants.GUID_PROPERTY_KEY, String.class));
+                    }
+                }
+                searchParameters.setOffset(searchParameters.getOffset() + 
searchParameters.getLimit());
+                resultList = searchContext.getSearchProcessor().execute();
+            } while (CollectionUtils.isNotEmpty(resultList));
+            LOG.info("Total {} entities are eligible for Audit aging", 
guids.size());
+        } catch (Throwable t) {
+            LOG.error("Error while retrieving eligible entities for audit 
aging");
+        } finally {
+            searchTracker.remove(searchID);
+        }
+
+        return guids;
+    }
+
+    private boolean checkVertexMatchesSearchCriteria(AtlasVertex vertex, 
AtlasAuditAgingType auditAgingType, Set<String> entityTypes) {
+        if (CollectionUtils.isEmpty(entityTypes)) {
+            return true;
+        }
+        String typeName = 
vertex.getProperty(Constants.ENTITY_TYPE_PROPERTY_KEY, String.class);
+        boolean typeNameMatchesCriteria = entityTypes.contains(typeName);
+        return (auditAgingType == AtlasAuditAgingType.DEFAULT) ? 
!typeNameMatchesCriteria : typeNameMatchesCriteria;
+    }
+
     private AtlasSearchResult searchWithSearchContext(SearchContext 
searchContext) throws AtlasBaseException {
         SearchParameters  searchParameters = 
searchContext.getSearchParameters();
         AtlasSearchResult ret              = new 
AtlasSearchResult(searchParameters);
diff --git 
a/repository/src/main/java/org/apache/atlas/repository/audit/AbstractStorageBasedAuditRepository.java
 
b/repository/src/main/java/org/apache/atlas/repository/audit/AbstractStorageBasedAuditRepository.java
index 1aac37577..1b3dd478e 100644
--- 
a/repository/src/main/java/org/apache/atlas/repository/audit/AbstractStorageBasedAuditRepository.java
+++ 
b/repository/src/main/java/org/apache/atlas/repository/audit/AbstractStorageBasedAuditRepository.java
@@ -24,6 +24,7 @@ import org.apache.atlas.EntityAuditEvent;
 import org.apache.atlas.exception.AtlasBaseException;
 import org.apache.atlas.listener.ActiveStateChangeHandler;
 import org.apache.atlas.model.audit.EntityAuditEventV2;
+import org.apache.atlas.repository.Constants.AtlasAuditAgingType;
 import org.apache.atlas.service.Service;
 import org.apache.commons.collections.CollectionUtils;
 import org.apache.commons.configuration.Configuration;
@@ -31,10 +32,7 @@ import org.apache.hadoop.hbase.util.Bytes;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.util.Arrays;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
+import java.util.*;
 
 /**
  * This abstract base class should be used when adding support for an audit 
storage backend.
@@ -162,4 +160,9 @@ public abstract class AbstractStorageBasedAuditRepository 
implements Service, En
     return Bytes.toBytes(keyStr);
   }
 
+  @Override
+  public List<EntityAuditEventV2> deleteEventsV2(String entityId, 
Set<EntityAuditEventV2.EntityAuditActionV2> entityAuditActions, short 
auditCount, int ttlInDays, boolean createEventsAgeoutAllowed, 
AtlasAuditAgingType auditAgingType) throws AtlasBaseException, AtlasException {
+    return null;
+  }
+
 }
diff --git 
a/repository/src/main/java/org/apache/atlas/repository/audit/AtlasAuditReductionService.java
 
b/repository/src/main/java/org/apache/atlas/repository/audit/AtlasAuditReductionService.java
new file mode 100644
index 000000000..3dcfe73fe
--- /dev/null
+++ 
b/repository/src/main/java/org/apache/atlas/repository/audit/AtlasAuditReductionService.java
@@ -0,0 +1,363 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.atlas.repository.audit;
+
+import org.apache.atlas.ApplicationProperties;
+import org.apache.atlas.AtlasConfiguration;
+import org.apache.atlas.AtlasException;
+import org.apache.atlas.discovery.AtlasDiscoveryService;
+import org.apache.atlas.model.audit.AuditReductionCriteria;
+import org.apache.atlas.model.audit.EntityAuditEventV2;
+import org.apache.atlas.model.tasks.AtlasTask;
+import org.apache.atlas.repository.Constants.AtlasAuditAgingType;
+import org.apache.atlas.repository.graphdb.AtlasGraph;
+import org.apache.atlas.type.AtlasTypeRegistry;
+import org.apache.commons.collections.CollectionUtils;
+import org.apache.commons.configuration.Configuration;
+import org.apache.commons.lang.StringUtils;
+import org.apache.commons.lang.time.DateUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.scheduling.annotation.SchedulingConfigurer;
+import org.springframework.scheduling.config.IntervalTask;
+import org.springframework.scheduling.config.ScheduledTaskRegistrar;
+import org.springframework.stereotype.Component;
+
+import javax.inject.Inject;
+import java.util.*;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+
+import static org.apache.atlas.repository.Constants.*;
+import static 
org.apache.atlas.repository.store.graph.v2.tasks.AuditReductionTaskFactory.ATLAS_AUDIT_REDUCTION_ENTITY_RETRIEVAL;
+
+@Component
+public class AtlasAuditReductionService implements SchedulingConfigurer {
+    private static final Logger LOG = 
LoggerFactory.getLogger(AtlasAuditReductionService.class);
+
+    private final AtlasGraph            graph;
+    private final AtlasDiscoveryService discoveryService;
+    private final AtlasTypeRegistry     typeRegistry;
+
+    private static final String VALUE_DELIMITER                    = ",";
+
+    private static final String ATLAS_AUDIT_SWEEP_OUT_ENTITY_TYPES = 
"atlas.audit.sweep.out.entity.types";
+    private static final String ATLAS_AUDIT_SWEEP_OUT_ACTION_TYPES = 
"atlas.audit.sweep.out.action.types";
+
+    private static final String ATLAS_AUDIT_CUSTOM_AGEOUT_ENTITY_TYPES = 
"atlas.audit.custom.ageout.entity.types";
+    private static final String ATLAS_AUDIT_CUSTOM_AGEOUT_ACTION_TYPES = 
"atlas.audit.custom.ageout.action.types";
+
+    private static final String ATLAS_AUDIT_AGING_SCHEDULER_INITIAL_DELAY = 
"atlas.audit.aging.scheduler.initial.delay.in.min";
+
+    private static final int MIN_TTL_TO_MAINTAIN = 
AtlasConfiguration.MIN_TTL_TO_MAINTAIN.getInt();
+    private static final int MIN_AUDIT_COUNT_TO_MAINTAIN = 
AtlasConfiguration.MIN_AUDIT_COUNT_TO_MAINTAIN.getInt();
+
+    private Configuration atlasConfiguration;
+    private AuditReductionCriteria ageoutCriteriaByConfig;
+    private List<Map<String, Object>> ageoutTypeCriteriaMap;
+
+    @Inject
+    public AtlasAuditReductionService(Configuration config, AtlasGraph graph, 
AtlasDiscoveryService discoveryService, AtlasTypeRegistry typeRegistry) {
+        this.atlasConfiguration = config;
+        this.graph              = graph;
+        this.discoveryService   = discoveryService;
+        this.typeRegistry       = typeRegistry;
+    }
+
+    public List<AtlasTask> startAuditAgingByConfig() {
+        List<AtlasTask> tasks = null;
+        try {
+            if (ageoutCriteriaByConfig == null) {
+                ageoutCriteriaByConfig = 
convertConfigToAuditReductionCriteria();
+                LOG.info("Audit aging is enabled by configuration");
+            }
+            LOG.info("Audit aging is triggered with configuration: {}", 
ageoutCriteriaByConfig.toString());
+
+            if (ageoutTypeCriteriaMap == null) {
+                ageoutTypeCriteriaMap = 
buildAgeoutCriteriaForAllAgingTypes(ageoutCriteriaByConfig);
+            }
+
+            tasks = startAuditAgingByCriteria(ageoutTypeCriteriaMap);
+
+        } catch (Exception e) {
+            LOG.error("Error while aging out audits by configuration: ", e);
+        }
+        return tasks;
+    }
+
+    public List<AtlasTask> startAuditAgingByCriteria(List<Map<String, Object>> 
ageoutTypeCriteriaMap) {
+        if (CollectionUtils.isEmpty(ageoutTypeCriteriaMap)) {
+            return null;
+        }
+        List<AtlasTask> tasks = new ArrayList<>();
+        try {
+            for (Map<String, Object> eachCriteria : ageoutTypeCriteriaMap) {
+                AtlasTask auditAgingTask = 
discoveryService.createAndQueueAuditReductionTask(eachCriteria, 
ATLAS_AUDIT_REDUCTION_ENTITY_RETRIEVAL);
+                if (auditAgingTask != null) {
+                    tasks.add(auditAgingTask);
+                }
+            }
+        } catch (Exception e) {
+            LOG.error("Error while aging out audits by criteria: ", e);
+        }
+        return  tasks;
+    }
+
+    private AuditReductionCriteria convertConfigToAuditReductionCriteria() {
+        boolean auditAgingEnabled           = 
AtlasConfiguration.ATLAS_AUDIT_AGING_ENABLED.getBoolean();
+        boolean createAuditsAgeoutAllowed   = 
AtlasConfiguration.ATLAS_AUDIT_CREATE_EVENTS_AGEOUT_ALLOWED.getBoolean();
+        boolean subTypesIncluded            = 
AtlasConfiguration.ATLAS_AUDIT_AGING_SUBTYPES_INCLUDED.getBoolean();
+        boolean ignoreDefaultAgeoutTTL      = 
AtlasConfiguration.ATLAS_AUDIT_DEFAULT_AGEOUT_IGNORE_TTL.getBoolean();
+
+        int defaultAgeoutTTLConfigured        = 
AtlasConfiguration.ATLAS_AUDIT_DEFAULT_AGEOUT_TTL.getInt();
+        int defaultAgeoutAuditCountConfigured = 
AtlasConfiguration.ATLAS_AUDIT_DEFAULT_AGEOUT_COUNT.getInt();
+
+        int customAgeoutTTLConfigured         = 
AtlasConfiguration.ATLAS_AUDIT_CUSTOM_AGEOUT_TTL.getInt();
+        int customAgeoutAuditCountConfigured  = 
AtlasConfiguration.ATLAS_AUDIT_CUSTOM_AGEOUT_COUNT.getInt();
+
+        boolean defaultAgeoutEnabled        = 
AtlasConfiguration.ATLAS_AUDIT_DEFAULT_AGEOUT_ENABLED.getBoolean();
+        int     defaultAgeoutTTL            = 
getGuaranteedMinValueOf(AtlasConfiguration.ATLAS_AUDIT_DEFAULT_AGEOUT_TTL, 
defaultAgeoutTTLConfigured, MIN_TTL_TO_MAINTAIN);
+        int     defaultAgeoutAuditCount     = 
defaultAgeoutAuditCountConfigured <= 0 ? defaultAgeoutAuditCountConfigured
+                                                : 
getGuaranteedMinValueOf(AtlasConfiguration.ATLAS_AUDIT_DEFAULT_AGEOUT_COUNT, 
defaultAgeoutAuditCountConfigured, MIN_AUDIT_COUNT_TO_MAINTAIN);
+
+        int    customAgeoutTTL          = customAgeoutTTLConfigured <= 0 ? 
customAgeoutTTLConfigured
+                                            : 
getGuaranteedMinValueOf(AtlasConfiguration.ATLAS_AUDIT_CUSTOM_AGEOUT_TTL, 
customAgeoutTTLConfigured, MIN_TTL_TO_MAINTAIN);
+        int    customAgeoutAuditCount   = customAgeoutAuditCountConfigured <= 
0 ? customAgeoutAuditCountConfigured
+                                            : 
getGuaranteedMinValueOf(AtlasConfiguration.ATLAS_AUDIT_CUSTOM_AGEOUT_COUNT, 
customAgeoutAuditCountConfigured, MIN_AUDIT_COUNT_TO_MAINTAIN);
+        String customAgeoutEntityTypes  = 
getStringOf(ATLAS_AUDIT_CUSTOM_AGEOUT_ENTITY_TYPES);
+        String customAgeoutActionTypes  = 
getStringOf(ATLAS_AUDIT_CUSTOM_AGEOUT_ACTION_TYPES);
+
+        AuditReductionCriteria auditReductionCriteria = new 
AuditReductionCriteria();
+
+        auditReductionCriteria.setAuditAgingEnabled(auditAgingEnabled);
+        
auditReductionCriteria.setCreateEventsAgeoutAllowed(createAuditsAgeoutAllowed);
+        auditReductionCriteria.setSubTypesIncluded(subTypesIncluded);
+        
auditReductionCriteria.setIgnoreDefaultAgeoutTTL(ignoreDefaultAgeoutTTL);
+
+        auditReductionCriteria.setDefaultAgeoutEnabled(defaultAgeoutEnabled);
+        auditReductionCriteria.setDefaultAgeoutTTLInDays(defaultAgeoutTTL);
+        
auditReductionCriteria.setDefaultAgeoutAuditCount(defaultAgeoutAuditCount);
+
+        auditReductionCriteria.setCustomAgeoutTTLInDays(customAgeoutTTL);
+        
auditReductionCriteria.setCustomAgeoutAuditCount(customAgeoutAuditCount);
+        
auditReductionCriteria.setCustomAgeoutEntityTypes(customAgeoutEntityTypes);
+        
auditReductionCriteria.setCustomAgeoutActionTypes(customAgeoutActionTypes);
+
+        boolean isSweepOutEnabled = 
AtlasConfiguration.ATLAS_AUDIT_SWEEP_OUT.getBoolean();
+        auditReductionCriteria.setAuditSweepoutEnabled(isSweepOutEnabled);
+        if (isSweepOutEnabled) {
+            String sweepoutEntityTypes  = 
getStringOf(ATLAS_AUDIT_SWEEP_OUT_ENTITY_TYPES);
+            String sweepoutActionTypes  = 
getStringOf(ATLAS_AUDIT_SWEEP_OUT_ACTION_TYPES);
+            auditReductionCriteria.setSweepoutEntityTypes(sweepoutEntityTypes);
+            auditReductionCriteria.setSweepoutActionTypes(sweepoutActionTypes);
+        }
+
+        return auditReductionCriteria;
+    }
+
+    public List<Map<String, Object>> 
buildAgeoutCriteriaForAllAgingTypes(AuditReductionCriteria 
auditReductionCriteria) {
+        if (auditReductionCriteria == null || 
!auditReductionCriteria.isAuditAgingEnabled()) {
+            return null;
+        }
+        List<Map<String, Object>> auditAgeoutCriteriaByType         = new 
ArrayList<>();
+        Set<String>               defaultAgeoutEntityTypesToExclude = new 
HashSet<>();
+        Set<String>               defaultAgeoutActionTypes          = 
Arrays.stream(EntityAuditEventV2.EntityAuditActionV2.values()).map(x -> 
x.toString()).collect(Collectors.toSet());
+
+        boolean createEventsAgeoutAllowed = 
auditReductionCriteria.isCreateEventsAgeoutAllowed();
+        boolean subTypesIncluded          = 
auditReductionCriteria.isSubTypesIncluded();
+        boolean ignoreDefaultAgeoutTTL    = 
auditReductionCriteria.ignoreDefaultAgeoutTTL();
+
+        boolean defaultAgeoutEnabled    = 
auditReductionCriteria.isDefaultAgeoutEnabled();
+        int     defaultAgeoutTTL        = ignoreDefaultAgeoutTTL ? 0 : 
getGuaranteedMinValueOf(AtlasConfiguration.ATLAS_AUDIT_DEFAULT_AGEOUT_TTL, 
auditReductionCriteria.getDefaultAgeoutTTLInDays(), MIN_TTL_TO_MAINTAIN);
+        int     defaultAgeoutAuditCount = 
auditReductionCriteria.getDefaultAgeoutAuditCount() <= 0 ? 
auditReductionCriteria.getDefaultAgeoutAuditCount()
+                                            : 
getGuaranteedMinValueOf(AtlasConfiguration.ATLAS_AUDIT_DEFAULT_AGEOUT_COUNT, 
auditReductionCriteria.getDefaultAgeoutAuditCount(), 
MIN_AUDIT_COUNT_TO_MAINTAIN);
+
+        int         customAgeoutTTL         = 
auditReductionCriteria.getCustomAgeoutTTLInDays() <= 0 ? 
auditReductionCriteria.getCustomAgeoutTTLInDays()
+                                                : 
getGuaranteedMinValueOf(AtlasConfiguration.ATLAS_AUDIT_CUSTOM_AGEOUT_TTL, 
auditReductionCriteria.getCustomAgeoutTTLInDays(), MIN_TTL_TO_MAINTAIN);
+        int         customAgeoutAuditCount  = 
auditReductionCriteria.getCustomAgeoutAuditCount() <= 0 ? 
auditReductionCriteria.getCustomAgeoutAuditCount()
+                                                : 
getGuaranteedMinValueOf(AtlasConfiguration.ATLAS_AUDIT_CUSTOM_AGEOUT_COUNT, 
auditReductionCriteria.getCustomAgeoutAuditCount(), 
MIN_AUDIT_COUNT_TO_MAINTAIN);
+        Set<String> customAgeoutEntityTypes = 
getUniqueListOf(auditReductionCriteria.getCustomAgeoutEntityTypes());
+        Set<String> customAgeoutActionTypes = 
getValidActionTypes(AtlasAuditAgingType.CUSTOM, 
getUniqueListOf(auditReductionCriteria.getCustomAgeoutActionTypes()));
+
+        defaultAgeoutEntityTypesToExclude.addAll(customAgeoutEntityTypes);
+        if (CollectionUtils.isEmpty(customAgeoutEntityTypes)) {
+            defaultAgeoutActionTypes.removeAll(customAgeoutActionTypes);
+        }
+
+        boolean isSweepOutEnabled = 
auditReductionCriteria.isAuditSweepoutEnabled();
+        if (isSweepOutEnabled) {
+            Set<String> sweepOutEntityTypes  = 
getUniqueListOf(auditReductionCriteria.getSweepoutEntityTypes());
+            Set<String> sweepOutActionTypes  = 
getValidActionTypes(AtlasAuditAgingType.SWEEP, 
getUniqueListOf(auditReductionCriteria.getSweepoutActionTypes()));
+
+            if (CollectionUtils.isNotEmpty(sweepOutEntityTypes) || 
CollectionUtils.isNotEmpty(sweepOutActionTypes)) {
+                Map<String, Object> sweepAgeoutCriteria = 
getAgeoutCriteriaMap(AtlasAuditAgingType.SWEEP, 0, 0, sweepOutEntityTypes, 
sweepOutActionTypes, createEventsAgeoutAllowed, subTypesIncluded);
+                auditAgeoutCriteriaByType.add(sweepAgeoutCriteria);
+            } else {
+                LOG.error("Sweepout of audits is skipped.At least one of two 
properties: entity types/action types should be configured.");
+            }
+
+            defaultAgeoutEntityTypesToExclude.addAll(sweepOutEntityTypes);
+            customAgeoutEntityTypes.removeAll(sweepOutEntityTypes);
+
+            if (CollectionUtils.isEmpty(sweepOutEntityTypes)) {
+                defaultAgeoutActionTypes.removeAll(sweepOutActionTypes);
+                customAgeoutActionTypes.removeAll(sweepOutActionTypes);
+            }
+        }
+
+        if ((customAgeoutTTL > 0 || customAgeoutAuditCount > 0) && 
(CollectionUtils.isNotEmpty(customAgeoutEntityTypes) || 
CollectionUtils.isNotEmpty(customAgeoutActionTypes))) {
+            Map<String, Object> customAgeoutCriteria = 
getAgeoutCriteriaMap(AtlasAuditAgingType.CUSTOM, customAgeoutTTL, 
customAgeoutAuditCount, customAgeoutEntityTypes, customAgeoutActionTypes, 
createEventsAgeoutAllowed, subTypesIncluded);
+            auditAgeoutCriteriaByType.add(customAgeoutCriteria);
+        } else if (customAgeoutTTL <= 0 && customAgeoutAuditCount <= 0 && 
CollectionUtils.isEmpty(customAgeoutEntityTypes) && 
CollectionUtils.isEmpty(customAgeoutActionTypes)) {
+            //Do Nothing
+        } else if (customAgeoutTTL <= 0 && customAgeoutAuditCount <= 0) {
+            LOG.error("Custom Audit aging is skipped.At least one of two 
properties: TTL/Audit Count should be configured.");
+        } else {
+            LOG.error("Custom Audit aging is skipped.At least one of two 
properties: entity types/action types should be configured.");
+        }
+
+        if (defaultAgeoutEnabled) {
+            if (ignoreDefaultAgeoutTTL) {
+                LOG.info("'{}' config property or 'ignoreDefaultAgeoutTTL' 
property in API configured as: {}, Default audit aging will be done by audit 
count only", 
AtlasConfiguration.ATLAS_AUDIT_DEFAULT_AGEOUT_IGNORE_TTL.getPropertyName(), 
ignoreDefaultAgeoutTTL);
+            }
+
+            /**In case of default ageout with all available audit actions, 
query to ATLAS_ENTITY_AUDIT_EVENTS table
+             * without any action type provides data for all audit actions and 
is more performant than
+             * multiple queries to ATLAS_ENTITY_AUDIT_EVENTS with each action 
type
+             */
+            if (defaultAgeoutActionTypes.size() == 
EntityAuditEventV2.EntityAuditActionV2.values().length) {
+                defaultAgeoutActionTypes.clear();
+            }
+            if (!ignoreDefaultAgeoutTTL || defaultAgeoutAuditCount > 0) {
+                Map<String, Object> defaultAgeoutCriteria = 
getAgeoutCriteriaMap(AtlasAuditAgingType.DEFAULT, defaultAgeoutTTL, 
defaultAgeoutAuditCount, defaultAgeoutEntityTypesToExclude, 
defaultAgeoutActionTypes, createEventsAgeoutAllowed, subTypesIncluded);
+                auditAgeoutCriteriaByType.add(defaultAgeoutCriteria);
+            } else {
+                LOG.error("Default Audit aging is skipped. Valid audit count 
should be configured when TTL criteria is ignored.");
+            }
+        }
+        return auditAgeoutCriteriaByType;
+    }
+
+    private Map<String, Object> getAgeoutCriteriaMap(AtlasAuditAgingType 
agingOption, int ttl, int minCount, Set<String> entityTypes, Set<String> 
actionTypes, boolean createEventsAgeoutAllowed, boolean subTypesIncluded) {
+        Map<String, Object> auditAgingOptions = new HashMap<>();
+        auditAgingOptions.put(AUDIT_AGING_TYPE_KEY, agingOption);
+        auditAgingOptions.put(AUDIT_AGING_TTL_KEY, ttl);
+        auditAgingOptions.put(AUDIT_AGING_COUNT_KEY, minCount);
+        auditAgingOptions.put(AUDIT_AGING_ENTITY_TYPES_KEY, entityTypes);
+        auditAgingOptions.put(AUDIT_AGING_ACTION_TYPES_KEY, actionTypes);
+        auditAgingOptions.put(CREATE_EVENTS_AGEOUT_ALLOWED_KEY, 
createEventsAgeoutAllowed);
+        auditAgingOptions.put(AUDIT_AGING_SUBTYPES_INCLUDED_KEY, 
subTypesIncluded);
+
+        return auditAgingOptions;
+    }
+
+    private int getGuaranteedMinValueOf(AtlasConfiguration configuration, int 
configuredValue, int minValueToMaintain) {
+        if (configuredValue < minValueToMaintain) {
+            LOG.info("Minimum value for '{}' should be {}", 
configuration.getPropertyName(), minValueToMaintain);
+        }
+        return configuredValue < minValueToMaintain ? minValueToMaintain : 
configuredValue;
+    }
+
+    private String getStringOf(String configProperty) {
+        String  configuredValue       = null;
+
+        if (StringUtils.isNotEmpty(configProperty)) {
+            configuredValue = String.join(VALUE_DELIMITER , (List) 
atlasConfiguration.getList(configProperty));
+        }
+
+        return configuredValue;
+    }
+
+    private Set<String> getUniqueListOf(String value) {
+        Set<String>  configuredValues       = null;
+
+        if (StringUtils.isNotEmpty(value)) {
+            configuredValues = 
Stream.of(value.split(VALUE_DELIMITER)).map(String::trim).collect(Collectors.toSet());
+        }
+
+        return configuredValues == null ? new HashSet<>() : configuredValues;
+    }
+
+    private Set<String> getValidActionTypes(AtlasAuditAgingType 
auditAgingType, Set<String> actionTypes) {
+        if (CollectionUtils.isEmpty(actionTypes)) {
+            return Collections.emptySet();
+        }
+        Set<String> allActionTypes     = 
Arrays.stream(EntityAuditEventV2.EntityAuditActionV2.values()).map(x -> 
x.toString()).collect(Collectors.toSet());
+        Set<String> entityAuditActions = new HashSet<>();
+        Set<String> invalidActionTypes = new HashSet<>();
+
+        for (String actionType : actionTypes) {
+            Set<String> matchedActionTypes;
+            final String actionTypeToMatch = actionType.contains("*") ? 
actionType.replace("*", "") : actionType;
+            if (actionTypeToMatch.startsWith("*")) {
+                matchedActionTypes = allActionTypes.stream().filter(x -> 
x.contains(actionTypeToMatch)).collect(Collectors.toSet());
+            } else {
+                matchedActionTypes = allActionTypes.stream().filter(x -> 
x.startsWith(actionTypeToMatch)).collect(Collectors.toSet());
+            }
+
+            if (CollectionUtils.isEmpty(matchedActionTypes)) {
+                invalidActionTypes.add(actionType);
+            } else {
+                entityAuditActions.addAll(matchedActionTypes);
+            }
+        }
+
+        if (CollectionUtils.isNotEmpty(actionTypes) && 
CollectionUtils.isEmpty(entityAuditActions)) {
+            throw new IllegalArgumentException("No enum constant " + 
EntityAuditEventV2.EntityAuditActionV2.class.getCanonicalName() + "." + 
String.join(VALUE_DELIMITER, invalidActionTypes));
+        } else {
+            LOG.info("Action type name(s) {} provided for aging type-{}", 
String.join(VALUE_DELIMITER, entityAuditActions), auditAgingType);
+        }
+
+        if (CollectionUtils.isNotEmpty(invalidActionTypes)){
+            LOG.warn("Invalid action type name(s) {} provided for aging 
type-{}", String.join(VALUE_DELIMITER, invalidActionTypes), auditAgingType);
+        }
+
+        return entityAuditActions;
+    }
+
+    @Override
+    public void configureTasks(ScheduledTaskRegistrar taskRegistrar) {
+        if (!AtlasConfiguration.ATLAS_AUDIT_AGING_ENABLED.getBoolean()) {
+            LOG.warn("Audit aging is not enabled");
+            return;
+        }
+        IntervalTask task = new IntervalTask(new Runnable() {
+            @Override
+            public void run() {
+                startAuditAgingByConfig();
+            }
+        }, getAuditAgingFrequencyInMillis(), 
getAuditAgingInitialDelayInMillis());
+
+        taskRegistrar.addFixedRateTask(task);
+    }
+
+    private long getAuditAgingFrequencyInMillis() {
+        int frequencyInDays = 
getGuaranteedMinValueOf(AtlasConfiguration.ATLAS_AUDIT_AGING_SCHEDULER_FREQUENCY,
 AtlasConfiguration.ATLAS_AUDIT_AGING_SCHEDULER_FREQUENCY.getInt(), 1);
+        return frequencyInDays * DateUtils.MILLIS_PER_DAY;
+    }
+
+    private long getAuditAgingInitialDelayInMillis() {
+        int initialDelayInMins = 1;
+        try {
+            initialDelayInMins = 
ApplicationProperties.get().getInt(ATLAS_AUDIT_AGING_SCHEDULER_INITIAL_DELAY, 
1);
+        } catch (AtlasException ex) {
+            LOG.error("Error while fetching application properties", ex);
+        }
+        return (initialDelayInMins < 1 ? 1 : initialDelayInMins) * 
DateUtils.MILLIS_PER_MINUTE;
+    }
+}
\ No newline at end of file
diff --git 
a/repository/src/main/java/org/apache/atlas/repository/audit/EntityAuditRepository.java
 
b/repository/src/main/java/org/apache/atlas/repository/audit/EntityAuditRepository.java
index bad2a89fc..1fdacab29 100644
--- 
a/repository/src/main/java/org/apache/atlas/repository/audit/EntityAuditRepository.java
+++ 
b/repository/src/main/java/org/apache/atlas/repository/audit/EntityAuditRepository.java
@@ -22,6 +22,7 @@ import org.apache.atlas.AtlasException;
 import org.apache.atlas.EntityAuditEvent;
 import org.apache.atlas.model.audit.EntityAuditEventV2;
 import org.apache.atlas.exception.AtlasBaseException;
+import org.apache.atlas.repository.Constants.AtlasAuditAgingType;
 
 import java.util.List;
 import java.util.Set;
@@ -92,6 +93,17 @@ public interface EntityAuditRepository {
      */
     List<EntityAuditEventV2> listEventsV2(String entityId, 
EntityAuditEventV2.EntityAuditActionV2 auditAction, String sortByColumn, 
boolean sortOrderDesc, int offset, short limit) throws AtlasBaseException;
 
+    /**
+     * Delete all events for the given entity id by keeping only auditCount 
number of events with entityAuditActions
+     * @param entityId entity id
+     * @param entityAuditActions operation(s) to be used to filter at HBase 
column
+     * @param auditCount  Max numbers of events to keep without deleting
+     * @param ttlInDays  time-to-live of events
+     * @return list of events
+     * @throws AtlasBaseException
+     */
+    List<EntityAuditEventV2> deleteEventsV2(String entityId, 
Set<EntityAuditEventV2.EntityAuditActionV2> entityAuditActions, short 
auditCount, int ttlInDays, boolean createEventsAgeoutAllowed, 
AtlasAuditAgingType auditAgingType) throws AtlasBaseException, AtlasException;
+
     /***
      * List events for given time range where classifications have been added, 
deleted or updated.
      * @param fromTimestamp from timestamp
diff --git 
a/repository/src/main/java/org/apache/atlas/repository/audit/HBaseBasedAuditRepository.java
 
b/repository/src/main/java/org/apache/atlas/repository/audit/HBaseBasedAuditRepository.java
index a99f9b383..b7f0dd5e2 100644
--- 
a/repository/src/main/java/org/apache/atlas/repository/audit/HBaseBasedAuditRepository.java
+++ 
b/repository/src/main/java/org/apache/atlas/repository/audit/HBaseBasedAuditRepository.java
@@ -20,6 +20,7 @@ package org.apache.atlas.repository.audit;
 
 import com.google.common.annotations.VisibleForTesting;
 import org.apache.atlas.ApplicationProperties;
+import org.apache.atlas.AtlasConfiguration;
 import org.apache.atlas.AtlasException;
 import org.apache.atlas.EntityAuditEvent;
 import org.apache.atlas.RequestContext;
@@ -28,10 +29,12 @@ import org.apache.atlas.exception.AtlasBaseException;
 import org.apache.atlas.ha.HAConfiguration;
 import org.apache.atlas.model.audit.EntityAuditEventV2;
 import org.apache.atlas.model.audit.EntityAuditEventV2.EntityAuditActionV2;
+import org.apache.atlas.repository.Constants.AtlasAuditAgingType;
 import org.apache.atlas.utils.AtlasPerfMetrics;
 import org.apache.commons.collections.CollectionUtils;
 import org.apache.commons.configuration.Configuration;
 import org.apache.commons.lang.StringUtils;
+import org.apache.commons.lang.time.DateUtils;
 import org.apache.hadoop.hbase.HBaseConfiguration;
 import org.apache.hadoop.hbase.HColumnDescriptor;
 import org.apache.hadoop.hbase.HTableDescriptor;
@@ -39,6 +42,7 @@ import org.apache.hadoop.hbase.TableName;
 import org.apache.hadoop.hbase.client.Admin;
 import org.apache.hadoop.hbase.client.Connection;
 import org.apache.hadoop.hbase.client.ConnectionFactory;
+import org.apache.hadoop.hbase.client.Delete;
 import org.apache.hadoop.hbase.client.Put;
 import org.apache.hadoop.hbase.client.Result;
 import org.apache.hadoop.hbase.client.ResultScanner;
@@ -48,6 +52,7 @@ import org.apache.hadoop.hbase.filter.BinaryComparator;
 import org.apache.hadoop.hbase.filter.BinaryPrefixComparator;
 import org.apache.hadoop.hbase.filter.CompareFilter;
 import org.apache.hadoop.hbase.filter.Filter;
+import org.apache.hadoop.hbase.filter.FilterList;
 import org.apache.hadoop.hbase.filter.MultiRowRangeFilter;
 import org.apache.hadoop.hbase.filter.PageFilter;
 import org.apache.hadoop.hbase.filter.SingleColumnValueFilter;
@@ -63,14 +68,18 @@ import org.springframework.core.annotation.Order;
 import javax.inject.Singleton;
 import java.io.Closeable;
 import java.io.IOException;
+import java.sql.Timestamp;
+import java.time.LocalDateTime;
 import java.util.ArrayList;
-import java.util.HashSet;
 import java.util.Arrays;
+import java.util.Collections;
 import java.util.HashMap;
+import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
 import java.util.Properties;
 import java.util.Set;
+import java.util.stream.Collectors;
 
 
 /**
@@ -329,8 +338,13 @@ public class HBaseBasedAuditRepository extends 
AbstractStorageBasedAuditReposito
 
     @Override
     public List<EntityAuditEventV2> listEventsV2(String entityId, 
EntityAuditEventV2.EntityAuditActionV2 auditAction, String sortByColumn, 
boolean sortOrderDesc, int offset, short limit) throws AtlasBaseException {
+        return listEventsV2(entityId, auditAction, sortByColumn, 
sortOrderDesc, offset, limit, false, true, false, null);
+    }
+
+    private List<EntityAuditEventV2> listEventsV2(String entityId, 
EntityAuditEventV2.EntityAuditActionV2 auditAction, String sortByColumn, 
boolean sortOrderDesc, int offset, short limit, boolean isAgeoutTransaction, 
boolean createEventsAgeoutAllowed, boolean allowAgeoutByAuditCount, 
List<EntityAuditEventV2> eventsToKeep) throws AtlasBaseException {
         if (LOG.isDebugEnabled()) {
-            LOG.debug("==> HBaseBasedAuditRepository.listEventsV2(entityId={}, 
auditAction={}, sortByColumn={}, sortOrderDesc={}, offset={}, limit={})", 
entityId, auditAction, sortByColumn, offset, limit);
+            LOG.debug("==> HBaseBasedAuditRepository.listEventsV2(entityId={}, 
auditAction={}, sortByColumn={}, sortOrderDesc={}, offset={}, limit={})",
+                    entityId, auditAction, sortByColumn, sortOrderDesc, 
offset, limit);
         }
 
         AtlasPerfMetrics.MetricRecorder metric = 
RequestContext.get().startMetricRecord("listEventsV2");
@@ -343,7 +357,7 @@ public class HBaseBasedAuditRepository extends 
AbstractStorageBasedAuditReposito
             offset = 0;
         }
 
-        if (limit < 0) {
+        if (!isAgeoutTransaction && limit < 0) {
             limit = 100;
         }
 
@@ -356,19 +370,29 @@ public class HBaseBasedAuditRepository extends 
AbstractStorageBasedAuditReposito
              * MultiRowRangeFilter and then again scan the table to get all 
the columns from the table this time.
              */
             Scan scan = new Scan().setReversed(true)
-                                  .setCaching(DEFAULT_CACHING)
-                                  .setSmall(true)
-                                  .setStopRow(Bytes.toBytes(entityId))
-                                  .setStartRow(getKey(entityId, 
Long.MAX_VALUE, Integer.MAX_VALUE))
-                                  .addColumn(COLUMN_FAMILY, COLUMN_ACTION)
-                                  .addColumn(COLUMN_FAMILY, COLUMN_USER);
-
+                    .setCaching(DEFAULT_CACHING)
+                    .setSmall(true)
+                    .setStopRow(Bytes.toBytes(entityId))
+                    .setStartRow(getKey(entityId, Long.MAX_VALUE, 
Integer.MAX_VALUE))
+                    .addColumn(COLUMN_FAMILY, COLUMN_ACTION)
+                    .addColumn(COLUMN_FAMILY, COLUMN_USER);
+            FilterList filterList = new FilterList();
             if (auditAction != null) {
                 Filter filterAction = new 
SingleColumnValueFilter(COLUMN_FAMILY, COLUMN_ACTION, 
CompareFilter.CompareOp.EQUAL, new 
BinaryComparator(Bytes.toBytes(auditAction.toString())));
+                filterList.addFilter(filterAction);
+            }
 
-                scan.setFilter(filterAction);
+            if (!createEventsAgeoutAllowed) {
+                FilterList createEventFilterList          = new 
FilterList(FilterList.Operator.MUST_PASS_ALL);
+                Filter     filterByCreateActionType       = new 
SingleColumnValueFilter(COLUMN_FAMILY, COLUMN_ACTION, 
CompareFilter.CompareOp.NOT_EQUAL, new 
BinaryComparator(Bytes.toBytes(EntityAuditActionV2.ENTITY_CREATE.toString())));
+                Filter     filterByImportCreateActionType = new 
SingleColumnValueFilter(COLUMN_FAMILY, COLUMN_ACTION, 
CompareFilter.CompareOp.NOT_EQUAL, new 
BinaryComparator(Bytes.toBytes(EntityAuditActionV2.ENTITY_IMPORT_CREATE.toString())));
+                createEventFilterList.addFilter(filterByCreateActionType);
+                
createEventFilterList.addFilter(filterByImportCreateActionType);
+                filterList.addFilter(createEventFilterList);
             }
 
+            scan.setFilter(filterList);
+
             List<EntityAuditEventV2> events = new ArrayList<>();
 
             try (ResultScanner scanner = table.getScanner(scan)) {
@@ -383,10 +407,25 @@ public class HBaseBasedAuditRepository extends 
AbstractStorageBasedAuditReposito
             }
 
             EntityAuditEventV2.sortEvents(events, sortByColumn, sortOrderDesc);
+            int fromIndex = Math.min(events.size(), offset);
+            int endIndex = events.size();
+            if (limit > 0) {
+                endIndex = Math.min(events.size(), offset + limit);
+            }
+
+            if (isAgeoutTransaction) {
+                if (!allowAgeoutByAuditCount) { //No audit events allowed to 
age-out by audit count
+                    eventsToKeep.addAll(events);
+                    return Collections.emptyList();
+                }
+                eventsToKeep.addAll(events.subList(0, fromIndex));
+            }
+
+            events = events.subList(fromIndex, endIndex);
 
-            events = events.subList(Math.min(events.size(), offset), 
Math.min(events.size(), offset + limit));
 
             if (events.size() > 0) {
+
                 List<MultiRowRangeFilter.RowRange> ranges = new ArrayList<>();
 
                 events.forEach(e -> {
@@ -394,11 +433,11 @@ public class HBaseBasedAuditRepository extends 
AbstractStorageBasedAuditReposito
                 });
 
                 scan = new Scan().setReversed(true)
-                                 .setCaching(DEFAULT_CACHING)
-                                 .setSmall(true)
-                                 .setStopRow(Bytes.toBytes(entityId))
-                                 .setStartRow(getKey(entityId, Long.MAX_VALUE, 
Integer.MAX_VALUE))
-                                 .setFilter(new MultiRowRangeFilter(ranges));
+                        .setCaching(DEFAULT_CACHING)
+                        .setSmall(true)
+                        .setStopRow(Bytes.toBytes(entityId))
+                        .setStartRow(getKey(entityId, Long.MAX_VALUE, 
Integer.MAX_VALUE))
+                        .setFilter(new MultiRowRangeFilter(ranges));
 
                 try (ResultScanner scanner = table.getScanner(scan)) {
                     events = new ArrayList<>();
@@ -426,7 +465,8 @@ public class HBaseBasedAuditRepository extends 
AbstractStorageBasedAuditReposito
             }
 
             if (LOG.isDebugEnabled()) {
-                LOG.debug("<== 
HBaseBasedAuditRepository.listEventsV2(entityId={}, auditAction={}, 
sortByColumn={}, sortOrderDesc={}, offset={}, limit={}): #recored returned {}", 
entityId, auditAction, sortByColumn, offset, limit, events.size());
+                LOG.debug("<== 
HBaseBasedAuditRepository.listEventsV2(entityId={}, auditAction={}, 
sortByColumn={}, sortOrderDesc={}, offset={}, limit={}): #records returned {}",
+                        entityId, auditAction, sortByColumn, sortOrderDesc, 
offset, limit, events.size());
             }
 
             return events;
@@ -452,6 +492,88 @@ public class HBaseBasedAuditRepository extends 
AbstractStorageBasedAuditReposito
         return ret;
     }
 
+    @Override
+    public List<EntityAuditEventV2> deleteEventsV2(String entityId, 
Set<EntityAuditEventV2.EntityAuditActionV2> entityAuditActions, short 
allowedAuditCount, int ttlInDays, boolean createEventsAgeoutAllowed, 
AtlasAuditAgingType auditAgingType) throws AtlasBaseException, AtlasException {
+        final String  SORT_BY_COLUMN  = 
EntityAuditEventV2.SORT_COLUMN_TIMESTAMP;
+        final boolean SORT_ORDER_DESC = true;
+
+        if (LOG.isDebugEnabled()) {
+        LOG.debug("==> HBaseBasedAuditRepository.deleteEventsV2(entityId={}, 
auditActions={}, sortByColumn={}, sortOrderDesc={}, offset={}, limit={})",
+                entityId, Arrays.toString(entityAuditActions.toArray()), 
SORT_BY_COLUMN, SORT_ORDER_DESC, allowedAuditCount, -1);
+        }
+
+        AtlasPerfMetrics.MetricRecorder metric = 
RequestContext.get().startMetricRecord("deleteEventsV2");
+
+        Table                    table   = null;
+        List<EntityAuditEventV2> eventsEligibleForAgeout  = new ArrayList<>();
+        try {
+            table = connection.getTable(tableName);
+            List<EntityAuditEventV2> eventsToKeep = new ArrayList<>();
+            boolean allowAgeoutByAuditCount = allowedAuditCount > 0 || 
(auditAgingType == AtlasAuditAgingType.SWEEP);
+            if (CollectionUtils.isEmpty(entityAuditActions)) {
+                eventsEligibleForAgeout.addAll(listEventsV2(entityId, null, 
SORT_BY_COLUMN, SORT_ORDER_DESC,
+                                                    allowedAuditCount, (short) 
-1, true, createEventsAgeoutAllowed, allowAgeoutByAuditCount, eventsToKeep));
+            } else {
+                for (EntityAuditActionV2 eachAuditAction : entityAuditActions) 
{
+                    List<EntityAuditEventV2> eventsByEachAuditAction = 
listEventsV2(entityId, eachAuditAction, SORT_BY_COLUMN, SORT_ORDER_DESC,
+                                                    allowedAuditCount, (short) 
-1, true, createEventsAgeoutAllowed, allowAgeoutByAuditCount, eventsToKeep);
+
+                    if (CollectionUtils.isNotEmpty(eventsByEachAuditAction)) {
+                        
eventsEligibleForAgeout.addAll(eventsByEachAuditAction);
+                    }
+                }
+            }
+
+            if (CollectionUtils.isNotEmpty(eventsToKeep)) {
+                //Limit events based on configured audit count by grouping 
events of all action types
+                if (allowAgeoutByAuditCount && (auditAgingType == 
AtlasAuditAgingType.DEFAULT || CollectionUtils.isEmpty(entityAuditActions))) {
+                    LOG.debug("Aging out audit events by audit count for 
entity: {}", entityId);
+                    EntityAuditEventV2.sortEvents(eventsToKeep, 
SORT_BY_COLUMN, SORT_ORDER_DESC);
+                    if (allowedAuditCount < eventsToKeep.size()) {
+                        
eventsEligibleForAgeout.addAll(eventsToKeep.subList(allowedAuditCount, 
eventsToKeep.size()));
+                        eventsToKeep = eventsToKeep.subList(0, 
allowedAuditCount);
+                    }
+                }
+                //TTL based aging
+                LocalDateTime now = LocalDateTime.now();
+                boolean isTTLTestAutomation = 
AtlasConfiguration.ATLAS_AUDIT_AGING_TTL_TEST_AUTOMATION.getBoolean();
+                if (ttlInDays > 0) {
+                    LOG.debug("Aging out audit events by TTL for entity: {}", 
entityId);
+                    long ttlTimestamp = Timestamp.valueOf(isTTLTestAutomation 
? now.minusMinutes(ttlInDays) : now.minusDays(ttlInDays)).getTime();
+                    eventsToKeep.forEach(e -> {
+                        if (e.getTimestamp() < ttlTimestamp) {
+                            eventsEligibleForAgeout.add(e);
+                        }
+                    });
+                }
+            }
+
+            List<Delete> eventsToDelete = new ArrayList<>();
+            for (EntityAuditEventV2 event : eventsEligibleForAgeout) {
+                Delete delete = new Delete(Bytes.toBytes(event.getEventKey()));
+                eventsToDelete.add(delete);
+            }
+
+            if (CollectionUtils.isNotEmpty(eventsToDelete)) {
+                if (LOG.isDebugEnabled()) {
+                    LOG.debug("Deleting events from table:{} are {}", 
tableName, Arrays.toString(eventsToDelete.toArray()));
+                }
+                table.delete(eventsToDelete);
+            }
+
+            if (LOG.isDebugEnabled()) {
+            LOG.debug("<== 
HBaseBasedAuditRepository.deleteEventsV2(entityId={}, auditAction={}, 
sortByColumn={}, sortOrderDesc={}, offset={}, limit={}): ",
+                    entityId, Arrays.toString(entityAuditActions.toArray()), 
SORT_BY_COLUMN, SORT_ORDER_DESC, allowedAuditCount, -1);
+            }
+        } catch (IOException e) {
+            LOG.error("Failed deleting audit events for guid:{}", entityId);
+        } finally {
+            RequestContext.get().endMetricRecord(metric);
+            close(table);
+        }
+        return eventsEligibleForAgeout;
+    }
+
     private <T> void addColumn(Put put, byte[] columnName, T columnValue) {
         if (columnValue != null && !columnValue.toString().isEmpty()) {
             put.addColumn(COLUMN_FAMILY, columnName, 
Bytes.toBytes(columnValue.toString()));
diff --git 
a/repository/src/main/java/org/apache/atlas/repository/audit/InMemoryEntityAuditRepository.java
 
b/repository/src/main/java/org/apache/atlas/repository/audit/InMemoryEntityAuditRepository.java
index 51efff1d3..6d31e9727 100644
--- 
a/repository/src/main/java/org/apache/atlas/repository/audit/InMemoryEntityAuditRepository.java
+++ 
b/repository/src/main/java/org/apache/atlas/repository/audit/InMemoryEntityAuditRepository.java
@@ -23,10 +23,13 @@ import org.apache.atlas.EntityAuditEvent;
 import org.apache.atlas.annotation.ConditionalOnAtlasProperty;
 import org.apache.atlas.exception.AtlasBaseException;
 import org.apache.atlas.model.audit.EntityAuditEventV2;
+import org.apache.atlas.repository.Constants.AtlasAuditAgingType;
 import org.apache.commons.collections.CollectionUtils;
 import org.springframework.stereotype.Component;
 
 import javax.inject.Singleton;
+import java.sql.Timestamp;
+import java.time.LocalDateTime;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.HashSet;
@@ -103,17 +106,59 @@ public class InMemoryEntityAuditRepository implements 
EntityAuditRepository {
 
     @Override
     public List<EntityAuditEventV2> listEventsV2(String entityId, 
EntityAuditEventV2.EntityAuditActionV2 auditAction, String sortByColumn, 
boolean sortOrderDesc, int offset, short limit) throws AtlasBaseException {
+        return listEventsV2(entityId, auditAction, sortByColumn, 
sortOrderDesc, 0, offset, limit, true, true);
+    }
+
+    private List<EntityAuditEventV2> listEventsV2(String entityId, 
EntityAuditEventV2.EntityAuditActionV2 auditAction, String sortByColumn, 
boolean sortOrderDesc, int ttlInDays, int offset, short limit, boolean 
allowMaxResults, boolean createEventsAgeoutAllowed) throws AtlasBaseException {
         List<EntityAuditEventV2> events     = new ArrayList<>();
         SortedMap<String, EntityAuditEventV2> subMap = 
auditEventsV2.tailMap(entityId);
         for (EntityAuditEventV2 event : subMap.values()) {
             if (event.getEntityId().equals(entityId)) {
-                events.add(event);
+                if (auditAction == null || event.getAction() == auditAction) {
+                    if (event.getAction() == 
EntityAuditEventV2.EntityAuditActionV2.ENTITY_CREATE && 
!createEventsAgeoutAllowed) {
+                        continue;
+                    }
+                    events.add(event);
+                }
             }
         }
+
+        if (allowMaxResults && limit == -1) {
+            limit = (short) events.size();
+        }
         EntityAuditEventV2.sortEvents(events, sortByColumn, sortOrderDesc);
-        events = events.subList(
-                Math.min(events.size(), offset),
-                Math.min(events.size(), offset + limit));
+        int fromIndex = Math.min(events.size(), offset);
+        int endIndex  = Math.min(events.size(), offset + limit);
+
+        List<EntityAuditEventV2> possibleExpiredEvents = events.subList(0, 
fromIndex);
+
+        events = new ArrayList<>(events.subList(fromIndex, endIndex));
+
+        // This is only for Audit Aging, including expired audit events to 
result
+        if (CollectionUtils.isNotEmpty(possibleExpiredEvents) && ttlInDays > 0 
) {
+            LocalDateTime now = LocalDateTime.now();
+            long ttlTimestamp = 
Timestamp.valueOf(now.minusDays(ttlInDays)).getTime();
+            possibleExpiredEvents.removeIf(e -> (auditAction!= null && 
e.getAction() != auditAction) || e.getTimestamp() > ttlTimestamp);
+            if (CollectionUtils.isNotEmpty(possibleExpiredEvents)) {
+                events.addAll(possibleExpiredEvents);
+            }
+        }
+        return events;
+    }
+
+    @Override
+    public List<EntityAuditEventV2> deleteEventsV2(String entityId, 
Set<EntityAuditEventV2.EntityAuditActionV2> entityAuditActions, short 
auditCount, int ttlInDays, boolean createEventsAgeoutAllowed, 
AtlasAuditAgingType auditAgingType) throws AtlasBaseException, AtlasException {
+        List<EntityAuditEventV2> events = new ArrayList<>();
+        if (CollectionUtils.isEmpty(entityAuditActions)) {
+            events = listEventsV2(entityId, null, "timestamp", true, 
ttlInDays, auditCount, (short) -1, true, createEventsAgeoutAllowed);
+        } else {
+            for (EntityAuditEventV2.EntityAuditActionV2 auditAction : 
entityAuditActions) {
+                List<EntityAuditEventV2> eventsByAction = 
listEventsV2(entityId, auditAction, "timestamp", true, ttlInDays, auditCount, 
(short) -1, true, createEventsAgeoutAllowed);
+                if (CollectionUtils.isNotEmpty(eventsByAction)) {
+                    events.addAll(eventsByAction);
+                }
+            }
+        }
         return events;
     }
 
diff --git 
a/repository/src/main/java/org/apache/atlas/repository/audit/NoopEntityAuditRepository.java
 
b/repository/src/main/java/org/apache/atlas/repository/audit/NoopEntityAuditRepository.java
index c62bb7eaa..c357c0f59 100644
--- 
a/repository/src/main/java/org/apache/atlas/repository/audit/NoopEntityAuditRepository.java
+++ 
b/repository/src/main/java/org/apache/atlas/repository/audit/NoopEntityAuditRepository.java
@@ -18,10 +18,12 @@
 
 package org.apache.atlas.repository.audit;
 
+import org.apache.atlas.AtlasException;
 import org.apache.atlas.EntityAuditEvent;
 import org.apache.atlas.annotation.ConditionalOnAtlasProperty;
 import org.apache.atlas.exception.AtlasBaseException;
 import org.apache.atlas.model.audit.EntityAuditEventV2;
+import org.apache.atlas.repository.Constants.AtlasAuditAgingType;
 import org.springframework.stereotype.Component;
 
 import javax.inject.Singleton;
@@ -67,6 +69,11 @@ public class NoopEntityAuditRepository implements 
EntityAuditRepository {
         return Collections.emptyList();
     }
 
+    @Override
+    public List<EntityAuditEventV2> deleteEventsV2(String entityId, 
Set<EntityAuditEventV2.EntityAuditActionV2> entityAuditActions, short 
auditCount, int ttlInDays, boolean createEventsAgeoutAllowed, 
AtlasAuditAgingType auditAgingType) throws AtlasBaseException, AtlasException {
+        return null;
+    }
+
     @Override
     public List<EntityAuditEventV2> listEventsV2(String entityId, 
EntityAuditEventV2.EntityAuditActionV2 auditAction, String startKey, short 
maxResultCount) {
         return Collections.emptyList();
diff --git 
a/repository/src/main/java/org/apache/atlas/repository/graph/GraphBackedSearchIndexer.java
 
b/repository/src/main/java/org/apache/atlas/repository/graph/GraphBackedSearchIndexer.java
index 5780f2ea0..01aa22f69 100755
--- 
a/repository/src/main/java/org/apache/atlas/repository/graph/GraphBackedSearchIndexer.java
+++ 
b/repository/src/main/java/org/apache/atlas/repository/graph/GraphBackedSearchIndexer.java
@@ -398,6 +398,9 @@ public class GraphBackedSearchIndexer implements 
SearchIndexer, ActiveStateChang
             // index recovery
             createCommonVertexIndex(management, 
PROPERTY_KEY_INDEX_RECOVERY_NAME, UniqueKind.GLOBAL_UNIQUE, String.class, 
SINGLE, true, false);
 
+            // audit reduction
+            createCommonVertexIndex(management, 
PROPERTY_KEY_AUDIT_REDUCTION_NAME, UniqueKind.GLOBAL_UNIQUE, String.class, 
SINGLE, true, false);
+
             //metrics
             createCommonVertexIndex(management," 
__AtlasMetricsStat.metricsId", UniqueKind.GLOBAL_UNIQUE, String.class, SINGLE, 
true, false);
             createCommonVertexIndex(management," 
__AtlasMetricsStat.__u_metricsId", UniqueKind.GLOBAL_UNIQUE, String.class, 
SINGLE, true, false);
diff --git 
a/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/tasks/AuditReductionEntityRetrievalTask.java
 
b/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/tasks/AuditReductionEntityRetrievalTask.java
new file mode 100644
index 000000000..e53416f6b
--- /dev/null
+++ 
b/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/tasks/AuditReductionEntityRetrievalTask.java
@@ -0,0 +1,231 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.atlas.repository.store.graph.v2.tasks;
+
+import org.apache.atlas.AtlasConfiguration;
+import org.apache.atlas.AtlasException;
+import org.apache.atlas.RequestContext;
+import org.apache.atlas.annotation.GraphTransaction;
+import org.apache.atlas.discovery.AtlasDiscoveryService;
+import org.apache.atlas.exception.AtlasBaseException;
+import org.apache.atlas.model.discovery.SearchParameters;
+import org.apache.atlas.model.tasks.AtlasTask;
+import org.apache.atlas.repository.Constants.*;
+import org.apache.atlas.repository.graphdb.AtlasGraph;
+import org.apache.atlas.repository.graphdb.AtlasGraphQuery;
+import org.apache.atlas.repository.graphdb.AtlasVertex;
+import org.apache.atlas.tasks.AbstractTask;
+import org.apache.atlas.type.AtlasEntityType;
+import org.apache.atlas.type.AtlasTypeRegistry;
+import org.apache.commons.collections.CollectionUtils;
+import org.apache.commons.collections4.MapUtils;
+import org.apache.commons.lang.StringUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.*;
+import java.util.stream.Collectors;
+
+import static org.apache.atlas.model.tasks.AtlasTask.Status.COMPLETE;
+import static org.apache.atlas.model.tasks.AtlasTask.Status.FAILED;
+import static org.apache.atlas.repository.Constants.*;
+import static 
org.apache.atlas.repository.store.graph.v2.AtlasGraphUtilsV2.setEncodedProperty;
+import static 
org.apache.atlas.repository.store.graph.v2.tasks.AuditReductionTaskFactory.AGING_TYPE_PROPERTY_KEY_MAP;
+import static 
org.apache.atlas.repository.store.graph.v2.tasks.AuditReductionTaskFactory.ATLAS_AUDIT_REDUCTION;
+
+
+public class AuditReductionEntityRetrievalTask extends AbstractTask {
+    private static final Logger LOG = 
LoggerFactory.getLogger(AuditReductionEntityRetrievalTask.class);
+
+    private static final String VALUE_DELIMITER = ",";
+
+    private final AtlasDiscoveryService discoveryService;
+    private final AtlasTypeRegistry     typeRegistry;
+    private final AtlasGraph            graph;
+
+    public AuditReductionEntityRetrievalTask(AtlasTask task, AtlasGraph graph, 
AtlasDiscoveryService discoveryService, AtlasTypeRegistry typeRegistry) {
+        super(task);
+        this.graph            = graph;
+        this.discoveryService = discoveryService;
+        this.typeRegistry     = typeRegistry;
+    }
+
+    @Override
+    public AtlasTask.Status perform() throws Exception {
+        RequestContext.clear();
+        Map<String, Object> params = getTaskDef().getParameters();
+
+        if (MapUtils.isEmpty(params)) {
+            LOG.warn("Task: {}: Unable to process task: Parameters is not 
readable!", getTaskGuid());
+            return FAILED;
+        }
+
+        String userName = getTaskDef().getCreatedBy();
+
+        if (StringUtils.isEmpty(userName)) {
+            LOG.warn("Task: {}: Unable to process task as user name is 
empty!", getTaskGuid());
+
+            return FAILED;
+        }
+
+        RequestContext.get().setUser(userName, null);
+
+        try {
+            run(params);
+
+            setStatus(COMPLETE);
+        } catch (Exception e) {
+            LOG.error("Task: {}: Error performing task!", getTaskGuid(), e);
+
+            setStatus(FAILED);
+
+            throw e;
+        } finally {
+            RequestContext.clear();
+        }
+        return getStatus();
+    }
+
+    protected void run(Map<String, Object> parameters) throws 
AtlasBaseException, IOException, AtlasException {
+        try {
+            AtlasTask auditAgingTask = 
createAgingTaskWithEligibleGUIDs(parameters);
+            if (auditAgingTask != null) {
+                LOG.info("{} task created for audit aging type-{}", 
ATLAS_AUDIT_REDUCTION, parameters.get(AUDIT_AGING_TYPE_KEY));
+            }
+        } catch (Exception e) {
+            LOG.error("Error while retrieving entities eligible for audit 
aging and creating audit aging tasks", e.getMessage());
+        }
+    }
+
+    protected AtlasTask createAgingTaskWithEligibleGUIDs(Map<String, Object> 
parameters) throws AtlasBaseException {
+        final String ALL_ENTITY_TYPES = "_ALL_ENTITY_TYPES";
+        final int    SEARCH_OFFSET    = 0;
+        final int    SEARCH_LIMIT     = 
AtlasConfiguration.ATLAS_AUDIT_AGING_SEARCH_MAX_LIMIT.getInt();
+
+        Set<String>         entityTypes      = ((Collection<String>) 
parameters.get(AUDIT_AGING_ENTITY_TYPES_KEY)).stream().collect(Collectors.toSet());
+        AtlasAuditAgingType auditAgingType   = 
(AtlasAuditAgingType)parameters.get(AUDIT_AGING_TYPE_KEY);
+        boolean             subTypesIncluded = 
(boolean)parameters.get(AUDIT_AGING_SUBTYPES_INCLUDED_KEY);
+
+        SearchParameters searchEntitiesToReduceAudit = new SearchParameters();
+        searchEntitiesToReduceAudit.setTypeName(ALL_ENTITY_TYPES);
+        searchEntitiesToReduceAudit.setOffset(SEARCH_OFFSET);
+        searchEntitiesToReduceAudit.setLimit(SEARCH_LIMIT);
+        searchEntitiesToReduceAudit.setIncludeSubTypes(subTypesIncluded);
+
+        if (CollectionUtils.isNotEmpty(entityTypes)) {
+            if (!validateTypesAndIncludeSubTypes(entityTypes, auditAgingType, 
subTypesIncluded)) {
+                LOG.error("All entity type names provided for audit aging 
type-{} are invalid", auditAgingType);
+                return null;
+            }
+
+            String queryString = String.join(VALUE_DELIMITER, entityTypes);
+            if (auditAgingType == AtlasAuditAgingType.DEFAULT && 
StringUtils.isNotEmpty(queryString)) {
+                queryString = new 
StringBuilder().append("!").append(queryString).toString();
+            }
+            searchEntitiesToReduceAudit.setQuery(queryString);
+        }
+
+        LOG.info("Getting GUIDs eligible for Audit aging type-{} with 
SearchParameters: {}", auditAgingType.toString(), 
searchEntitiesToReduceAudit.toString());
+
+        Set<String> guids                = 
discoveryService.searchGUIDsWithParameters(auditAgingType, entityTypes, 
searchEntitiesToReduceAudit);
+
+        AtlasVertex  auditReductionVertex = getOrCreateVertex();
+
+        AtlasTask ageoutTask = 
updateVertexWithGuidsAndCreateAgingTask(auditReductionVertex, 
AGING_TYPE_PROPERTY_KEY_MAP.get(auditAgingType), guids, parameters);
+
+        /** For DEFAULT audit aging, "entityTypes" should be excluded from 
_ALL_ENTITY_TYPES i.e., negating the queryString
+         *  Including AUDIT_AGING_EXCLUDE_ENTITY_TYPES_KEY to indicate the 
same in AtlasTask response to user
+         */
+        if (ageoutTask != null) {
+            if (auditAgingType == AtlasAuditAgingType.DEFAULT && 
CollectionUtils.isNotEmpty(entityTypes)) {
+                
ageoutTask.getParameters().put(AUDIT_AGING_EXCLUDE_ENTITY_TYPES_KEY, true);
+            } else {
+                
ageoutTask.getParameters().put(AUDIT_AGING_EXCLUDE_ENTITY_TYPES_KEY, false);
+            }
+        }
+
+        return ageoutTask;
+    }
+
+    private boolean validateTypesAndIncludeSubTypes(Set<String> entityTypes, 
AtlasAuditAgingType auditAgingType, boolean subTypesIncluded) throws 
AtlasBaseException {
+        Collection<String> allEntityTypeNames     = 
typeRegistry.getAllEntityDefNames();
+        Set<String>        entityTypesToSearch    = new HashSet<>();
+        Set<String>        invalidEntityTypeNames = new HashSet<>();
+
+        entityTypes.stream().forEach(entityType -> {
+            if (entityType.endsWith("*")) {
+                String suffix = entityType.replace("*", "");
+                
entityTypesToSearch.addAll(allEntityTypeNames.stream().filter(e -> 
e.startsWith(suffix)).collect(Collectors.toSet()));
+            } else if (allEntityTypeNames.contains(entityType)) {
+                entityTypesToSearch.add(entityType);
+            } else {
+                invalidEntityTypeNames.add(entityType);
+            }
+        });
+
+        if (auditAgingType != AtlasAuditAgingType.DEFAULT) {
+            if (CollectionUtils.isNotEmpty(invalidEntityTypeNames)) {
+                LOG.warn("Invalid entity type name(s) {} provided for aging 
type-{}", String.join(VALUE_DELIMITER, invalidEntityTypeNames), auditAgingType);
+            }
+
+            if (CollectionUtils.isEmpty(entityTypesToSearch)) {
+                return false;
+            }
+        }
+
+        entityTypes.clear();
+        entityTypes.addAll(subTypesIncluded ? 
AtlasEntityType.getEntityTypesAndAllSubTypes(entityTypesToSearch, typeRegistry) 
: entityTypesToSearch);
+
+        return true;
+    }
+
+    @GraphTransaction
+    private AtlasTask updateVertexWithGuidsAndCreateAgingTask(AtlasVertex 
vertex, String vertexProperty, Set<String> guids, Map<String, Object> params) 
throws AtlasBaseException {
+        List<String> guidsEligibleForAuditReduction = 
vertex.getProperty(vertexProperty, List.class);
+        if (CollectionUtils.isEmpty(guidsEligibleForAuditReduction) && 
CollectionUtils.isEmpty(guids)) {
+            return null;
+        }
+
+        if (CollectionUtils.isEmpty(guidsEligibleForAuditReduction)) {
+            guidsEligibleForAuditReduction = new ArrayList<>();
+        }
+
+        if (CollectionUtils.isNotEmpty(guids)) {
+            guidsEligibleForAuditReduction.addAll(guids);
+            setEncodedProperty(vertex, vertexProperty, 
guidsEligibleForAuditReduction);
+        }
+
+        return discoveryService.createAndQueueAuditReductionTask(params, 
ATLAS_AUDIT_REDUCTION);
+    }
+
+    private AtlasVertex getOrCreateVertex() {
+
+        AtlasGraphQuery query   = 
graph.query().has(PROPERTY_KEY_AUDIT_REDUCTION_NAME, AUDIT_REDUCTION_TYPE_NAME);
+        Iterator<AtlasVertex> results = query.vertices().iterator();
+
+        AtlasVertex auditReductionVertex = results.hasNext() ? results.next() 
: null;
+
+        if (auditReductionVertex == null) {
+            auditReductionVertex = graph.addVertex();
+            setEncodedProperty(auditReductionVertex, 
PROPERTY_KEY_AUDIT_REDUCTION_NAME, AUDIT_REDUCTION_TYPE_NAME);
+        }
+        return auditReductionVertex;
+    }
+
+}
\ No newline at end of file
diff --git 
a/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/tasks/AuditReductionTask.java
 
b/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/tasks/AuditReductionTask.java
new file mode 100644
index 000000000..e8d7f6806
--- /dev/null
+++ 
b/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/tasks/AuditReductionTask.java
@@ -0,0 +1,150 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.atlas.repository.store.graph.v2.tasks;
+
+import org.apache.atlas.AtlasException;
+import org.apache.atlas.RequestContext;
+import org.apache.atlas.exception.AtlasBaseException;
+import org.apache.atlas.model.audit.EntityAuditEventV2;
+import org.apache.atlas.model.tasks.AtlasTask;
+import org.apache.atlas.repository.Constants.AtlasAuditAgingType;
+import org.apache.atlas.repository.audit.EntityAuditRepository;
+import org.apache.atlas.repository.graphdb.AtlasGraph;
+import org.apache.atlas.repository.graphdb.AtlasGraphQuery;
+import org.apache.atlas.repository.graphdb.AtlasVertex;
+import org.apache.atlas.tasks.AbstractTask;
+import org.apache.commons.collections4.CollectionUtils;
+import org.apache.commons.collections4.MapUtils;
+import org.apache.commons.lang.StringUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.*;
+import java.util.stream.Collectors;
+
+import static org.apache.atlas.model.tasks.AtlasTask.Status.COMPLETE;
+import static org.apache.atlas.model.tasks.AtlasTask.Status.FAILED;
+import static org.apache.atlas.repository.Constants.*;
+
+import static 
org.apache.atlas.repository.store.graph.v2.tasks.AuditReductionTaskFactory.AGING_TYPE_PROPERTY_KEY_MAP;
+
+
+public class AuditReductionTask extends AbstractTask {
+    private static final Logger LOG = 
LoggerFactory.getLogger(AuditReductionTask.class);
+
+    private static final int GUID_BATCH_SIZE_PER_AGE_OUT_TASK = 100;
+
+    private final EntityAuditRepository auditRepository;
+    private final AtlasGraph            graph;
+
+    public AuditReductionTask(AtlasTask task, EntityAuditRepository 
auditRepository, AtlasGraph graph) {
+        super(task);
+        this.auditRepository  = auditRepository;
+        this.graph            = graph;
+    }
+
+    @Override
+    public AtlasTask.Status perform() throws Exception {
+        RequestContext.clear();
+        Map<String, Object> params = getTaskDef().getParameters();
+
+        if (MapUtils.isEmpty(params)) {
+            LOG.warn("Task: {}: Unable to process task: Parameters is not 
readable!", getTaskGuid());
+            return FAILED;
+        }
+
+        String userName = getTaskDef().getCreatedBy();
+
+        if (StringUtils.isEmpty(userName)) {
+            LOG.warn("Task: {}: Unable to process task as user name is 
empty!", getTaskGuid());
+
+            return FAILED;
+        }
+
+        RequestContext.get().setUser(userName, null);
+
+        try {
+            run(params);
+
+            setStatus(COMPLETE);
+        } catch (Exception e) {
+            LOG.error("Task: {}: Error performing task!", getTaskGuid(), e);
+
+            setStatus(FAILED);
+
+            throw e;
+        } finally {
+            RequestContext.clear();
+        }
+        return getStatus();
+    }
+
+    protected void run(Map<String, Object> parameters) throws 
AtlasBaseException, IOException, AtlasException {
+        AtlasVertex vertex = findVertex();
+
+        if (vertex == null) {
+            return;
+        }
+
+        Map<String, List<EntityAuditEventV2>> entitiesWithSucceededAgeout = 
new HashMap<>();
+
+        AtlasAuditAgingType auditAgingType              = 
AtlasAuditAgingType.valueOf(String.valueOf(parameters.get(AUDIT_AGING_TYPE_KEY)));
+        Set<String>         actionTypes                 = 
((Collection<String>) 
parameters.get(AUDIT_AGING_ACTION_TYPES_KEY)).stream().collect(Collectors.toSet());
+        int                 auditCountInput             = (int) 
parameters.get(AUDIT_AGING_COUNT_KEY);
+        short               auditCount                  = auditCountInput > 
Short.MAX_VALUE ? Short.MAX_VALUE : auditCountInput < Short.MIN_VALUE ? 
Short.MIN_VALUE : (short)auditCountInput;
+        int                 ttl                         = (int) 
parameters.get(AUDIT_AGING_TTL_KEY);
+        boolean             createEventsAgeoutAllowed   = (boolean) 
parameters.get(CREATE_EVENTS_AGEOUT_ALLOWED_KEY);
+        String              vertexPropertyKeyForGuids   = 
AGING_TYPE_PROPERTY_KEY_MAP.get(auditAgingType);
+
+        List<String>        entityGuidsEligibleForAuditAgeout  = 
vertex.getProperty(vertexPropertyKeyForGuids, List.class);
+        int                 guidsCount                         = 
CollectionUtils.isNotEmpty(entityGuidsEligibleForAuditAgeout) ? 
entityGuidsEligibleForAuditAgeout.size() : 0;
+        int                 batchIndex                         = 1;
+
+        Set<EntityAuditEventV2.EntityAuditActionV2> entityAuditActions      = 
actionTypes.stream().map(x -> 
EntityAuditEventV2.EntityAuditActionV2.fromString(x)).collect(Collectors.toSet());
+
+        for (int startIndex = 0; startIndex < guidsCount; ) {
+            int          endIndex   = startIndex + 
GUID_BATCH_SIZE_PER_AGE_OUT_TASK < guidsCount ? startIndex + 
GUID_BATCH_SIZE_PER_AGE_OUT_TASK : guidsCount;
+            List<String> guidsBatch = 
entityGuidsEligibleForAuditAgeout.subList(startIndex, endIndex);
+
+            for (String guid : guidsBatch) {
+                List<EntityAuditEventV2> deletedAuditEvents = 
auditRepository.deleteEventsV2(guid,  entityAuditActions, auditCount, ttl, 
createEventsAgeoutAllowed, auditAgingType);
+                entitiesWithSucceededAgeout.put(guid, deletedAuditEvents);
+            }
+
+            if (LOG.isDebugEnabled()) {
+                LOG.debug("{} Audit aging completed for batch-{} with guids: 
{}", auditAgingType.toString(), batchIndex, 
Arrays.toString(entitiesWithSucceededAgeout.keySet().toArray()));
+            }
+
+            entitiesWithSucceededAgeout.clear();
+            startIndex = endIndex;
+            batchIndex++;
+            List<String> remainingGuids = startIndex < guidsCount ? new 
ArrayList<>(entityGuidsEligibleForAuditAgeout.subList(startIndex, guidsCount)) 
: null;
+            vertex.setProperty(vertexPropertyKeyForGuids, remainingGuids);
+        }
+
+
+    }
+
+    public AtlasVertex findVertex() {
+        AtlasGraphQuery       query   = 
graph.query().has(PROPERTY_KEY_AUDIT_REDUCTION_NAME, AUDIT_REDUCTION_TYPE_NAME);
+        Iterator<AtlasVertex> results = query.vertices().iterator();
+
+        return results.hasNext() ? results.next() : null;
+    }
+}
\ No newline at end of file
diff --git 
a/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/tasks/AuditReductionTaskFactory.java
 
b/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/tasks/AuditReductionTaskFactory.java
new file mode 100644
index 000000000..7e6aca1ab
--- /dev/null
+++ 
b/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/tasks/AuditReductionTaskFactory.java
@@ -0,0 +1,113 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.atlas.repository.store.graph.v2.tasks;
+
+import org.apache.atlas.ApplicationProperties;
+import org.apache.atlas.discovery.AtlasDiscoveryService;
+import org.apache.atlas.model.tasks.AtlasTask;
+import org.apache.atlas.repository.Constants.AtlasAuditAgingType;
+import org.apache.atlas.repository.audit.EntityAuditRepository;
+import org.apache.atlas.repository.graphdb.AtlasGraph;
+import org.apache.atlas.tasks.AbstractTask;
+import org.apache.atlas.tasks.TaskFactory;
+import org.apache.atlas.type.AtlasTypeRegistry;
+import org.apache.commons.configuration.Configuration;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.stereotype.Component;
+
+import javax.inject.Inject;
+import java.util.*;
+
+import static 
org.apache.atlas.repository.Constants.PROPERTY_KEY_GUIDS_TO_AGEOUT_BY_DEFAULT;
+import static 
org.apache.atlas.repository.Constants.PROPERTY_KEY_GUIDS_TO_AGEOUT_BY_CUSTOM;
+import static 
org.apache.atlas.repository.Constants.PROPERTY_KEY_GUIDS_TO_SWEEPOUT;
+
+
+@Component
+public class AuditReductionTaskFactory implements TaskFactory {
+    private static final Logger LOG = 
LoggerFactory.getLogger(AuditReductionTaskFactory.class);
+
+    private static Configuration configuration;
+    public  static final int     MAX_PENDING_TASKS_ALLOWED;
+    private static final int     MAX_PENDING_TASKS_ALLOWED_DEFAULT      = 50;
+    private static final String  MAX_PENDING_TASKS_ALLOWED_KEY          = 
"atlas.audit.reduction.max.pending.tasks";
+    public  static final String  ATLAS_AUDIT_REDUCTION                  = 
"ATLAS_AUDIT_REDUCTION";
+    public  static final String  ATLAS_AUDIT_REDUCTION_ENTITY_RETRIEVAL = 
"AUDIT_REDUCTION_ENTITY_RETRIEVAL";
+
+    static {
+        try {
+            configuration = ApplicationProperties.get();
+        } catch (Exception e) {
+            LOG.info("Failed to load application properties", e);
+        }
+        if (configuration != null) {
+            MAX_PENDING_TASKS_ALLOWED   = 
configuration.getInt(MAX_PENDING_TASKS_ALLOWED_KEY, 
MAX_PENDING_TASKS_ALLOWED_DEFAULT);
+        } else {
+            MAX_PENDING_TASKS_ALLOWED   = MAX_PENDING_TASKS_ALLOWED_DEFAULT;
+        }
+    }
+
+    private static final List<String> supportedTypes = new ArrayList<String>() 
{{
+        add(ATLAS_AUDIT_REDUCTION);
+        add(ATLAS_AUDIT_REDUCTION_ENTITY_RETRIEVAL);
+    }};
+
+    public static final Map<AtlasAuditAgingType, String> 
AGING_TYPE_PROPERTY_KEY_MAP = new HashMap<AtlasAuditAgingType, String>() {
+        {
+            put(AtlasAuditAgingType.DEFAULT, 
PROPERTY_KEY_GUIDS_TO_AGEOUT_BY_DEFAULT);
+            put(AtlasAuditAgingType.SWEEP, PROPERTY_KEY_GUIDS_TO_SWEEPOUT);
+            put(AtlasAuditAgingType.CUSTOM, 
PROPERTY_KEY_GUIDS_TO_AGEOUT_BY_CUSTOM);
+        }
+    };
+
+    private final EntityAuditRepository auditRepository;
+    private final AtlasGraph            graph;
+    private final AtlasDiscoveryService discoveryService;
+    private final AtlasTypeRegistry     typeRegistry;
+
+    @Inject
+    public AuditReductionTaskFactory(EntityAuditRepository auditRepository, 
AtlasGraph graph, AtlasDiscoveryService discoveryService, AtlasTypeRegistry 
typeRegistry) {
+        this.auditRepository  = auditRepository;
+        this.graph            = graph;
+        this.discoveryService = discoveryService;
+        this.typeRegistry     = typeRegistry;
+    }
+
+    @Override
+    public AbstractTask create(AtlasTask task) {
+        String taskType = task.getType();
+        String taskGuid = task.getGuid();
+        switch (taskType) {
+            case ATLAS_AUDIT_REDUCTION:
+                return new AuditReductionTask(task, auditRepository, graph);
+
+            case ATLAS_AUDIT_REDUCTION_ENTITY_RETRIEVAL:
+                return new AuditReductionEntityRetrievalTask(task, graph, 
discoveryService, typeRegistry);
+
+            default:
+                LOG.warn("Type: {} - {} not found!. The task will be 
ignored.", taskType, taskGuid);
+                return null;
+        }
+    }
+
+    @Override
+    public List<String> getSupportedTypes() {
+        return this.supportedTypes;
+    }
+}
\ No newline at end of file
diff --git 
a/repository/src/test/java/org/apache/atlas/repository/audit/AuditRepositoryTestBase.java
 
b/repository/src/test/java/org/apache/atlas/repository/audit/AuditRepositoryTestBase.java
index 679df3c5c..24a8bda23 100644
--- 
a/repository/src/test/java/org/apache/atlas/repository/audit/AuditRepositoryTestBase.java
+++ 
b/repository/src/test/java/org/apache/atlas/repository/audit/AuditRepositoryTestBase.java
@@ -22,15 +22,20 @@ import org.apache.atlas.EntityAuditEvent;
 import org.apache.atlas.TestUtilsV2;
 import org.apache.atlas.model.audit.EntityAuditEventV2;
 import org.apache.atlas.model.instance.AtlasEntity;
+import org.apache.atlas.repository.Constants.AtlasAuditAgingType;
 import org.apache.atlas.v1.model.instance.Referenceable;
+import org.apache.commons.lang.time.DateUtils;
 import org.testng.annotations.BeforeTest;
 import org.testng.annotations.Test;
 
 import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashSet;
 import java.util.List;
+import java.util.stream.Collectors;
+
+import static org.testng.Assert.*;
 
-import static org.testng.Assert.assertEquals;
-import static org.testng.Assert.assertNotNull;
 public class AuditRepositoryTestBase {
     protected EntityAuditRepository eventRepository;
 
@@ -203,4 +208,62 @@ public class AuditRepositoryTestBase {
         assertEquals(actual.getDetails(), expected.getDetails());
     }
 
+    @Test
+    public void testDeleteEventsV2() throws Exception {
+        String      id1       = "id1" + rand();
+        int         ttlInDays = 1;
+        long        ts        = System.currentTimeMillis() - (ttlInDays * 
DateUtils.MILLIS_PER_DAY);
+        AtlasEntity entity    = new AtlasEntity(rand());
+
+        int j = 0;
+        List<EntityAuditEventV2> expectedEvents = new ArrayList<>();
+        expectedEvents.add(new EntityAuditEventV2(id1, ts + j++, "user-a", 
EntityAuditEventV2.EntityAuditActionV2.ENTITY_CREATE, "details" + j, entity));
+        expectedEvents.add(new EntityAuditEventV2(id1, ts + j++, "user-C", 
EntityAuditEventV2.EntityAuditActionV2.ENTITY_UPDATE, "details" + j, entity));
+        for (int i = 0; i < 5; i++) {
+            expectedEvents.add(new EntityAuditEventV2(id1, ts + j++, "user" + 
i, EntityAuditEventV2.EntityAuditActionV2.ENTITY_UPDATE, "details" + j, 
entity));
+            expectedEvents.add(new EntityAuditEventV2(id1, ts + j++, "user" + 
i, EntityAuditEventV2.EntityAuditActionV2.CLASSIFICATION_ADD, "details" + j, 
entity));
+        }
+        expectedEvents.add(new EntityAuditEventV2(id1, ts+ j++, "User-b", 
EntityAuditEventV2.EntityAuditActionV2.ENTITY_DELETE, "details" + j, entity));
+        for(EntityAuditEventV2 event : expectedEvents) {
+            eventRepository.putEventsV2(event);
+        }
+
+        List<EntityAuditEventV2> events = eventRepository.listEventsV2(id1, 
null, "timestamp", false, 0, (short) -1);
+        assertEquals(events.size(), 13);
+        assertEventV2Equals(events.get(0), expectedEvents.get(0));
+        assertEventV2Equals(events.get(1), expectedEvents.get(1));
+
+        short expectedUpdateEventsCount = 2;
+        List<EntityAuditEventV2> deletedUpdateEvents   = 
eventRepository.deleteEventsV2(id1, new 
HashSet<>(Arrays.asList(EntityAuditEventV2.EntityAuditActionV2.ENTITY_UPDATE)), 
expectedUpdateEventsCount , 0, false, AtlasAuditAgingType.CUSTOM);
+        List<EntityAuditEventV2> remainingEvents       = 
events.stream().filter(x -> 
!deletedUpdateEvents.contains(x)).collect(Collectors.toList());
+        List<EntityAuditEventV2> remainingUpdateEvents = 
remainingEvents.stream().filter(x -> x.getAction() == 
EntityAuditEventV2.EntityAuditActionV2.ENTITY_UPDATE).collect(Collectors.toList());
+
+        assertEquals(remainingUpdateEvents.size(), expectedUpdateEventsCount);
+
+        short expectedEventsCount = 4;
+        List<EntityAuditEventV2> deletedEvents = 
eventRepository.deleteEventsV2(id1, null, expectedEventsCount , 0, false, 
AtlasAuditAgingType.DEFAULT);
+        remainingEvents = events.stream().filter(x -> 
!deletedEvents.contains(x)).collect(Collectors.toList());
+
+        assertEquals(remainingEvents.size(), expectedEventsCount + 1);
+        assertEventV2Equals(remainingEvents.get(0), events.get(0));
+        assertTrue(remainingEvents.contains(events.get(0)));
+
+        List<EntityAuditEventV2> deletedEventsIncludingCreate = 
eventRepository.deleteEventsV2(id1, null, expectedEventsCount , 0, true, 
AtlasAuditAgingType.DEFAULT);
+        remainingEvents = events.stream().filter(x -> 
!deletedEventsIncludingCreate.contains(x)).collect(Collectors.toList());
+
+        assertEquals(remainingEvents.size(), expectedEventsCount);
+        assertNotEquals(remainingEvents.get(3), events.get(0));
+        assertFalse(remainingEvents.contains(events.get(0)));
+
+        EntityAuditEventV2 latestEvent = new EntityAuditEventV2(id1, 
System.currentTimeMillis(), "User-b", 
EntityAuditEventV2.EntityAuditActionV2.ENTITY_DELETE, "details" + j++, entity);
+        eventRepository.putEventsV2(latestEvent);
+        List<EntityAuditEventV2> allEvents = eventRepository.listEventsV2(id1, 
null, "timestamp", false, 0, (short) -1);
+
+        List<EntityAuditEventV2> deletedEventsByTTL   = 
eventRepository.deleteEventsV2(id1, null, expectedUpdateEventsCount , 
ttlInDays, true, AtlasAuditAgingType.DEFAULT);
+        assertEquals(deletedEventsByTTL.size(), allEvents.size() - 1);
+
+        List<EntityAuditEventV2> remainingEventsByTTL = 
allEvents.stream().filter(x -> 
!deletedEventsByTTL.contains(x)).collect(Collectors.toList());
+        assertEquals(remainingEventsByTTL.size(), 1);
+        assertEquals(latestEvent, remainingEventsByTTL.get(0));
+    }
 }
diff --git a/test-tools/src/main/resources/solr/core-template/solrconfig.xml 
b/test-tools/src/main/resources/solr/core-template/solrconfig.xml
index 1550052b4..7a0e8dd16 100644
--- a/test-tools/src/main/resources/solr/core-template/solrconfig.xml
+++ b/test-tools/src/main/resources/solr/core-template/solrconfig.xml
@@ -445,7 +445,7 @@
          -->
         <lst name="defaults">
             <str name="defType">edismax</str>
-            <str name="qf">35x_t 5j9_t 7wl_t a9x_t but_t dfp_l f0l_t i6d_l 
iyt_l jr9_t kjp_s lc5_t m4l_s mx1_t ohx_t xz9_i 1151_t 12px_t 14at_l 15vp_t 
1891_t 19tx_t 1bet_t 1czp_t 1ekl_t 1gxx_t 1iit_l 1k3p_t 1lol_t 1o1x_t 1qf9_t 
1ssl_t 1udh_t 1wqt_t 4eth_t 4rgl_s 4pvp_s 4nid_s 4lxh_s 4p39_t 4wzp_t 4t1h_t 
4umd_l 4vet_t 51qd_t 505h_t 53b9_t 5af9_t 5fyd_t 5kp1_t 5ibp_t 5j45_t 5hj9_t 
5r0l_t 5q85_t 5y4l_l 5zph_l 5rt1_t 5xc5_t 68ed_l 69z9_l 66th_t 658l_t 6bk5_t 
61ad_t 622t_t 63np_t 6ltx_t 6mmd_ [...]
+            <str name="qf">35x_t 5j9_t 7wl_t a9x_t but_t dfp_l f0l_t i6d_l 
iyt_l jr9_t kjp_s lc5_t m4l_s mx1_t ohx_t xz9_i 1151_t 12px_t 14at_l 15vp_t 
1891_t 19tx_t 1bet_t 1czp_t 1ekl_t 1gxx_t 1iit_l 1k3p_t 1lol_t 1o1x_t 1qf9_t 
1ssl_t 1v5x_t 1wqt_t 1z45_t 4h6t_t 4ttx_s 4s91_s 4pvp_s 4oat_s 4rgl_t 4zd1_t 
4vet_t 4wzp_l 4xs5_t 543p_t 52it_t 55ol_t 5csl_t 5ibp_t 5n2d_t 5kp1_t 5lhh_t 
5jwl_t 5tdx_t 5slh_t 60hx_l 622t_l 5u6d_t 5zph_t 6arp_l 6ccl_l 696t_t 67lx_t 
6dxh_t 63np_t 64g5_t 6611_t 6o79_ [...]
             <str name="hl.fl">*</str>
             <bool name="hl.requireFieldMatch">true</bool>
             <bool name="lowercaseOperators">true</bool>
diff --git 
a/webapp/src/main/java/org/apache/atlas/web/filters/ActiveServerFilter.java 
b/webapp/src/main/java/org/apache/atlas/web/filters/ActiveServerFilter.java
index a25a51b5a..e19beccb1 100644
--- a/webapp/src/main/java/org/apache/atlas/web/filters/ActiveServerFilter.java
+++ b/webapp/src/main/java/org/apache/atlas/web/filters/ActiveServerFilter.java
@@ -111,7 +111,7 @@ public class ActiveServerFilter implements Filter {
 
     final String adminUriNotFiltered[] = { "/admin/export", "/admin/import", 
"/admin/importfile", "/admin/audits",
             "/admin/purge", "/admin/expimp/audit", "/admin/metrics", 
"/admin/server", "/admin/audit/", "admin/tasks",
-            "/admin/debug/metrics"};
+            "/admin/debug/metrics", "/admin/audits/ageout"};
     private boolean isFilteredURI(ServletRequest servletRequest) {
         HttpServletRequest httpServletRequest = (HttpServletRequest) 
servletRequest;
         String requestURI = httpServletRequest.getRequestURI();
diff --git 
a/webapp/src/main/java/org/apache/atlas/web/resources/AdminResource.java 
b/webapp/src/main/java/org/apache/atlas/web/resources/AdminResource.java
index b19095b48..4d59fa3d6 100755
--- a/webapp/src/main/java/org/apache/atlas/web/resources/AdminResource.java
+++ b/webapp/src/main/java/org/apache/atlas/web/resources/AdminResource.java
@@ -31,6 +31,7 @@ import org.apache.atlas.discovery.SearchContext;
 import org.apache.atlas.exception.AtlasBaseException;
 import org.apache.atlas.model.audit.AtlasAuditEntry;
 import org.apache.atlas.model.audit.AtlasAuditEntry.AuditOperation;
+import org.apache.atlas.model.audit.AuditReductionCriteria;
 import org.apache.atlas.model.audit.AuditSearchParameters;
 import org.apache.atlas.model.audit.EntityAuditEventV2;
 import org.apache.atlas.model.audit.EntityAuditEventV2.EntityAuditActionV2;
@@ -52,6 +53,7 @@ import org.apache.atlas.model.metrics.AtlasMetricsStat;
 import org.apache.atlas.model.patches.AtlasPatch.AtlasPatches;
 import org.apache.atlas.model.tasks.AtlasTask;
 import org.apache.atlas.repository.audit.AtlasAuditService;
+import org.apache.atlas.repository.audit.AtlasAuditReductionService;
 import org.apache.atlas.repository.audit.EntityAuditRepository;
 import org.apache.atlas.repository.impexp.AtlasServerService;
 import org.apache.atlas.repository.impexp.ExportImportAuditService;
@@ -187,6 +189,8 @@ public class AdminResource {
     private final  boolean                  isOnDemandLineageEnabled;
     private final  int                      defaultLineageNodeCount;
 
+    private AtlasAuditReductionService auditReductionService;
+
     static {
         try {
             atlasProperties = ApplicationProperties.get();
@@ -202,7 +206,7 @@ public class AdminResource {
                          AtlasServerService serverService,
                          ExportImportAuditService exportImportAuditService, 
AtlasEntityStore entityStore,
                          AtlasPatchManager patchManager, AtlasAuditService 
auditService, EntityAuditRepository auditRepository,
-                         TaskManagement taskManagement, AtlasDebugMetricsSink 
debugMetricsRESTSink) {
+                         TaskManagement taskManagement, AtlasDebugMetricsSink 
debugMetricsRESTSink, AtlasAuditReductionService atlasAuditReductionService) {
         this.serviceState              = serviceState;
         this.metricsService            = metricsService;
         this.exportService             = exportService;
@@ -219,6 +223,7 @@ public class AdminResource {
         this.auditRepository           = auditRepository;
         this.taskManagement            = taskManagement;
         this.debugMetricsRESTSink      = debugMetricsRESTSink;
+        this.auditReductionService     = atlasAuditReductionService;
 
         if (atlasProperties != null) {
             this.defaultUIVersion = 
atlasProperties.getString(DEFAULT_UI_VERSION, UI_VERSION_V2);
@@ -820,6 +825,45 @@ public class AdminResource {
         }
     }
 
+    @POST
+    @Path("/audits/ageout")
+    @Consumes(Servlets.JSON_MEDIA_TYPE)
+    @Produces(Servlets.JSON_MEDIA_TYPE)
+    public List<AtlasTask> ageoutAuditData(AuditReductionCriteria 
auditReductionCriteria, @QueryParam("useAuditConfig") @DefaultValue("false") 
Boolean useAuditConfig) throws AtlasBaseException {
+        AtlasPerfTracer perf = null;
+        try {
+            AtlasAuthorizationUtils.verifyAccess(new 
AtlasAdminAccessRequest(AtlasPrivilege.ADMIN_AUDITS), "Admin Audits Ageout");
+
+            if (useAuditConfig) {
+                return auditReductionService.startAuditAgingByConfig();
+            }
+
+            if (!auditReductionCriteria.isAuditAgingEnabled()) {
+                LOG.warn("Audit aging should be enabled");
+                return null;
+            }
+
+            if (AtlasPerfTracer.isPerfTraceEnabled(PERF_LOG)) {
+                perf = AtlasPerfTracer.getPerfTracer(PERF_LOG, 
"AdminResource.ageoutAuditData(" + auditReductionCriteria + ")");
+            }
+
+            updateCriteriaWithDefaultValues(auditReductionCriteria);
+
+            List<Map<String, Object>> ageoutTypeCriteriaMap = 
auditReductionService.buildAgeoutCriteriaForAllAgingTypes(auditReductionCriteria);
+
+            return 
auditReductionService.startAuditAgingByCriteria(ageoutTypeCriteriaMap);
+
+        } finally {
+            AtlasPerfTracer.log(perf);
+        }
+    }
+
+    private void updateCriteriaWithDefaultValues(AuditReductionCriteria 
auditReductionCriteria) {
+        if (auditReductionCriteria.getDefaultAgeoutTTLInDays() <= 0) {
+            
auditReductionCriteria.setDefaultAgeoutTTLInDays(AtlasConfiguration.ATLAS_AUDIT_DEFAULT_AGEOUT_TTL.getInt());
+        }
+    }
+
     @POST
     @Path("/audits")
     @Consumes(Servlets.JSON_MEDIA_TYPE)
diff --git 
a/webapp/src/test/java/org/apache/atlas/web/resources/AdminResourceTest.java 
b/webapp/src/test/java/org/apache/atlas/web/resources/AdminResourceTest.java
index a4d794615..5b16ba149 100644
--- a/webapp/src/test/java/org/apache/atlas/web/resources/AdminResourceTest.java
+++ b/webapp/src/test/java/org/apache/atlas/web/resources/AdminResourceTest.java
@@ -51,7 +51,7 @@ public class AdminResourceTest {
 
         
when(serviceState.getState()).thenReturn(ServiceState.ServiceStateValue.ACTIVE);
 
-        AdminResource adminResource = new AdminResource(serviceState, null, 
null, null, null, null, null, null, null, null, null, null, null, null, null);
+        AdminResource adminResource = new AdminResource(serviceState, null, 
null, null, null, null, null, null, null, null, null, null, null, null, null, 
null);
         Response response = adminResource.getStatus();
         assertEquals(response.getStatus(), HttpServletResponse.SC_OK);
         JsonNode entity = AtlasJson.parseToV1JsonNode((String) 
response.getEntity());
@@ -62,7 +62,7 @@ public class AdminResourceTest {
     public void testResourceGetsValueFromServiceState() throws IOException {
         
when(serviceState.getState()).thenReturn(ServiceState.ServiceStateValue.PASSIVE);
 
-        AdminResource adminResource = new AdminResource(serviceState, null, 
null, null, null, null, null, null, null, null, null, null, null, null, null);
+        AdminResource adminResource = new AdminResource(serviceState, null, 
null, null, null, null, null, null, null, null, null, null, null, null, null, 
null);
         Response response = adminResource.getStatus();
 
         verify(serviceState).getState();

Reply via email to