This is an automated email from the ASF dual-hosted git repository.

jsinovassinnaik pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/unomi.git


The following commit(s) were added to refs/heads/master by this push:
     new 82d540608 Unomi 739 update purge system (#578)
82d540608 is described below

commit 82d5406080eb39d5dbba39e77302f1e09832b7bb
Author: jsinovassin <58434978+jsinovas...@users.noreply.github.com>
AuthorDate: Thu Mar 2 16:02:59 2023 +0100

    Unomi 739 update purge system (#578)
    
    * UNOMI-739 : add session and event purge property
    
    * UNOMI-739 : update purge mechanism
    
    * UNOMI-739 : remove deletion in rollover policy
    
    * UNOMI-739 : remove useless code and fix test
---
 .../apache/unomi/api/services/ClusterService.java  |   1 +
 .../apache/unomi/api/services/ProfileService.java  |  16 ++-
 .../org/apache/unomi/itests/ProfileServiceIT.java  |  21 ++-
 .../main/resources/etc/custom.system.properties    |   8 +-
 .../ElasticSearchPersistenceServiceImpl.java       |  78 +++++------
 .../unomi/persistence/spi/PersistenceService.java  |  20 +++
 .../rest/endpoints/ClusterServiceEndPoint.java     |   1 +
 .../services/impl/profiles/ProfileServiceImpl.java | 148 ++++++++++++++-------
 .../resources/OSGI-INF/blueprint/blueprint.xml     |   4 +
 .../main/resources/org.apache.unomi.services.cfg   |  10 +-
 .../2.2.0/create_rollover_policy_query.json        |   6 -
 11 files changed, 204 insertions(+), 109 deletions(-)

diff --git 
a/api/src/main/java/org/apache/unomi/api/services/ClusterService.java 
b/api/src/main/java/org/apache/unomi/api/services/ClusterService.java
index 4c89ba94b..299ac9098 100644
--- a/api/src/main/java/org/apache/unomi/api/services/ClusterService.java
+++ b/api/src/main/java/org/apache/unomi/api/services/ClusterService.java
@@ -41,6 +41,7 @@ public interface ClusterService {
      *
      * @param date the Date before which all data needs to be removed
      */
+    @Deprecated
     void purge(final Date date);
 
     /**
diff --git 
a/api/src/main/java/org/apache/unomi/api/services/ProfileService.java 
b/api/src/main/java/org/apache/unomi/api/services/ProfileService.java
index 527e77fec..03da6ce9b 100644
--- a/api/src/main/java/org/apache/unomi/api/services/ProfileService.java
+++ b/api/src/main/java/org/apache/unomi/api/services/ProfileService.java
@@ -417,7 +417,6 @@ public interface ProfileService {
      * Purge (delete) profiles
      * example: Purge profile inactive since 10 days only:
      * purgeProfiles(10, 0);
-     *
      * example: Purge profile created since 30 days only:
      * purgeProfiles(0, 30);
      *
@@ -427,8 +426,21 @@ public interface ProfileService {
     void purgeProfiles(int inactiveNumberOfDays, int existsNumberOfDays);
 
     /**
-     * Purge (delete) monthly indices by removing old indices
+     * Purge (delete) session items
+     * @param existsNumberOfDays will purge sessions created since this number 
of days (0 or negative value, will have no effect)
+     */
+    void purgeSessionItems(int existsNumberOfDays);
+
+    /**
+     * Purge (delete) event items
+     * @param existsNumberOfDays will purge events created since this number 
of days (0 or negative value, will have no effect)
+     */
+    void purgeEventItems(int existsNumberOfDays);
+
+    /**
+     * Use purgeSessionItems and purgeEventItems to remove rollover items 
instead
      * @param existsNumberOfMonths used to remove monthly indices older than 
this number of months
      */
+    @Deprecated
     void purgeMonthlyItems(int existsNumberOfMonths);
 }
diff --git a/itests/src/test/java/org/apache/unomi/itests/ProfileServiceIT.java 
b/itests/src/test/java/org/apache/unomi/itests/ProfileServiceIT.java
index 79a3bf843..455ab9073 100644
--- a/itests/src/test/java/org/apache/unomi/itests/ProfileServiceIT.java
+++ b/itests/src/test/java/org/apache/unomi/itests/ProfileServiceIT.java
@@ -372,30 +372,37 @@ public class ProfileServiceIT extends BaseIT {
                 (count) -> count == (450 + originalEventsCount), 1000, 100);
 
         // Should have no effect
-        profileService.purgeMonthlyItems(0);
+        profileService.purgeSessionItems(0);
         keepTrying("Sessions number should be 450", () -> 
persistenceService.getAllItemsCount(Session.ITEM_TYPE),
                 (count) -> count == (450 + originalSessionsCount), 1000, 100);
+        profileService.purgeEventItems(0);
         keepTrying("Events number should be 450", () -> 
persistenceService.getAllItemsCount(Event.ITEM_TYPE),
                 (count) -> count == (450 + originalEventsCount), 1000, 100);
 
-        // Should have no effect there is no monthly items older than 40 months
-        profileService.purgeMonthlyItems(40);
+        // Should have no effect there is no sessions items older than 1200 
days
+        profileService.purgeSessionItems(1200);
         keepTrying("Sessions number should be 450", () -> 
persistenceService.getAllItemsCount(Session.ITEM_TYPE),
                 (count) -> count == (450 + originalSessionsCount), 1000, 100);
+        // Should have no effect there is no events items older than 1200 days
+        profileService.purgeEventItems(1200);
         keepTrying("Events number should be 450", () -> 
persistenceService.getAllItemsCount(Event.ITEM_TYPE),
                 (count) -> count == (450 + originalEventsCount), 1000, 100);
 
-        // Should purge monthly items older than 25 days
-        profileService.purgeMonthlyItems(25);
+        // Should purge sessions older than 750 days
+        profileService.purgeSessionItems(750);
         keepTrying("Sessions number should be 300", () -> 
persistenceService.getAllItemsCount(Session.ITEM_TYPE),
                 (count) -> count == (300 + originalSessionsCount), 1000, 100);
+        // Should purge events older than 750 days
+        profileService.purgeEventItems(750);
         keepTrying("Events number should be 300", () -> 
persistenceService.getAllItemsCount(Event.ITEM_TYPE),
                 (count) -> count == (300 + originalEventsCount), 1000, 100);
 
-        // Should purge monthly items older than 5 days
-        profileService.purgeMonthlyItems(5);
+        // Should purge sessions older than 150 days
+        profileService.purgeSessionItems(150);
         keepTrying("Sessions number should be 150", () -> 
persistenceService.getAllItemsCount(Session.ITEM_TYPE),
                 (count) -> count == (150 + originalSessionsCount), 1000, 100);
+        // Should purge events older than 150 days
+        profileService.purgeEventItems(150);
         keepTrying("Events number should be 150", () -> 
persistenceService.getAllItemsCount(Event.ITEM_TYPE),
                 (count) -> count == (150 + originalEventsCount), 1000, 100);
     }
diff --git a/package/src/main/resources/etc/custom.system.properties 
b/package/src/main/resources/etc/custom.system.properties
index e8b049d40..ab10c3a8e 100644
--- a/package/src/main/resources/etc/custom.system.properties
+++ b/package/src/main/resources/etc/custom.system.properties
@@ -162,8 +162,14 @@ 
org.apache.unomi.profile.purge.interval=${env:UNOMI_PROFILE_PURGE_INTERVAL:-1}
 
org.apache.unomi.profile.purge.inactiveTime=${env:UNOMI_PROFILE_PURGE_INACTIVETIME:-180}
 # Purge profiles that have been created for a specific number of days
 
org.apache.unomi.profile.purge.existTime=${env:UNOMI_PROFILE_PURGE_EXISTTIME:--1}
-# Purge all monthly indexes (sessions/events) that have been created for a 
specific number of months
+# Deprecated: Use org.apache.unomi.sessions.purge.existTime and 
org.apache.unomi.events.purge.existTime instead.
+# If there is no value for org.apache.unomi.sessions.purge.existTime or 
org.apache.unomi.events.purge.existTime, the value
+# of this property will be use for the purge.
 
org.apache.unomi.monthly.index.purge.existTime=${env:UNOMI_MONTHLY_INDEX_PURGE_EXISTTIME:-12}
+# Purge sessions that have been created for a specific number of days
+org.apache.unomi.session.purge.existTime=${env:UNOMI_SESSION_PURGE_EXISTTIME:--1}
+# Purge events that have been created for a specific number of days
+org.apache.unomi.event.purge.existTime=${env:UNOMI_EVENT_PURGE_EXISTTIME:--1}
 # Refresh Elasticsearch after saving a profile
 
org.apache.unomi.profile.forceRefreshOnSave=${env:UNOMI_PROFILE_REFRESH_ON_SAVE:-false}
 # When performing segment updates, this controls the size of the scrolling 
query size used to iterate over all the
diff --git 
a/persistence-elasticsearch/core/src/main/java/org/apache/unomi/persistence/elasticsearch/ElasticSearchPersistenceServiceImpl.java
 
b/persistence-elasticsearch/core/src/main/java/org/apache/unomi/persistence/elasticsearch/ElasticSearchPersistenceServiceImpl.java
index 13a802db0..d14d284d9 100644
--- 
a/persistence-elasticsearch/core/src/main/java/org/apache/unomi/persistence/elasticsearch/ElasticSearchPersistenceServiceImpl.java
+++ 
b/persistence-elasticsearch/core/src/main/java/org/apache/unomi/persistence/elasticsearch/ElasticSearchPersistenceServiceImpl.java
@@ -73,13 +73,7 @@ import org.elasticsearch.action.search.SearchScrollRequest;
 import org.elasticsearch.action.support.WriteRequest;
 import org.elasticsearch.action.support.master.AcknowledgedResponse;
 import org.elasticsearch.action.update.UpdateRequest;
-import org.elasticsearch.client.Node;
-import org.elasticsearch.client.RequestOptions;
-import org.elasticsearch.client.Requests;
-import org.elasticsearch.client.HttpAsyncResponseConsumerFactory;
-import org.elasticsearch.client.RestClient;
-import org.elasticsearch.client.RestClientBuilder;
-import org.elasticsearch.client.RestHighLevelClient;
+import org.elasticsearch.client.*;
 import org.elasticsearch.action.update.UpdateResponse;
 import org.elasticsearch.client.core.CountRequest;
 import org.elasticsearch.client.core.CountResponse;
@@ -154,8 +148,6 @@ import java.security.KeyManagementException;
 import java.security.NoSuchAlgorithmException;
 import java.security.SecureRandom;
 import java.security.cert.X509Certificate;
-import java.text.ParseException;
-import java.text.SimpleDateFormat;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collections;
@@ -170,6 +162,8 @@ import java.util.Set;
 import java.util.TreeSet;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.TimeUnit;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
 
 import static org.elasticsearch.index.query.QueryBuilders.termQuery;
 
@@ -1409,10 +1403,6 @@ public class ElasticSearchPersistenceServiceImpl 
implements PersistenceService,
                 );
                 phases.put("hot", new Phase("hot", TimeValue.ZERO, 
hotActions));
 
-                // TODO - Handle this with the purge 
https://issues.apache.org/jira/browse/UNOMI-726
-                Map<String, LifecycleAction> deleteActions = 
Collections.singletonMap(DeleteAction.NAME, new DeleteAction());
-                phases.put("delete", new Phase("delete", new TimeValue(90, 
TimeUnit.DAYS), deleteActions));
-
                 LifecyclePolicy policy = new LifecyclePolicy(indexPrefix + "-" 
+ ROLLOVER_LIFECYCLE_NAME, phases);
                 PutLifecyclePolicyRequest request = new 
PutLifecyclePolicyRequest(policy);
                 org.elasticsearch.client.core.AcknowledgedResponse 
putLifecyclePolicy = client.indexLifecycle().putLifecyclePolicy(request, 
RequestOptions.DEFAULT);
@@ -1454,8 +1444,8 @@ public class ElasticSearchPersistenceServiceImpl 
implements PersistenceService,
         }
     }
 
-    public boolean removeIndex(final String itemType) {
-        String index = getIndex(itemType);
+    public boolean removeIndex(final String itemType, boolean addPrefix){
+        String index = addPrefix ? getIndex(itemType) : itemType;
 
         Boolean result = new InClassLoaderExecute<Boolean>(metricsService, 
this.getClass().getName() + ".removeIndex", this.bundleContext, 
this.fatalIllegalStateErrors, throwExceptions) {
             protected Boolean execute(Object... args) throws IOException {
@@ -1475,6 +1465,9 @@ public class ElasticSearchPersistenceServiceImpl 
implements PersistenceService,
             return result;
         }
     }
+    public boolean removeIndex(final String itemType) {
+        return removeIndex(itemType, true);
+    }
 
     private void internalCreateRolloverTemplate(String itemName) throws 
IOException {
         String rolloverAlias = indexPrefix + "-" + itemName;
@@ -1925,6 +1918,29 @@ public class ElasticSearchPersistenceServiceImpl 
implements PersistenceService,
         }
     }
 
+    @Override
+    public Map<String, Long> docCountPerIndex(String... indexes) {
+        return new InClassLoaderExecute<Map<String, Long>>(metricsService, 
this.getClass().getName() + ".docCountPerIndex", this.bundleContext, 
this.fatalIllegalStateErrors, throwExceptions) {
+            @Override
+            protected Map<String, Long> execute(Object... args) throws 
IOException {
+                List<String> indexesForQuery = Stream.of(indexes).map(index -> 
getIndexNameForQuery(index)).collect(Collectors.toList());
+                String[] itemsArray = new String[indexesForQuery.size()];
+                itemsArray = indexesForQuery.toArray(itemsArray);
+                GetIndexRequest request = new GetIndexRequest(itemsArray);
+                GetIndexResponse getIndexResponse = 
client.indices().get(request, RequestOptions.DEFAULT);
+
+                Map<String, Long> countPerIndex = new HashMap<>();
+
+                for (String index : getIndexResponse.getIndices()) {
+                    CountRequest countRequest = new CountRequest(index);
+                    CountResponse response = client.count(countRequest, 
RequestOptions.DEFAULT);
+                    countPerIndex.put(index, response.getCount());
+                }
+                return countPerIndex;
+            }
+        }.catchingExecuteInClassLoader(true);
+    }
+
     private long queryCount(final QueryBuilder filter, final String itemType) {
         return new InClassLoaderExecute<Long>(metricsService, 
this.getClass().getName() + ".queryCount", this.bundleContext, 
this.fatalIllegalStateErrors, throwExceptions) {
 
@@ -2404,38 +2420,6 @@ public class ElasticSearchPersistenceServiceImpl 
implements PersistenceService,
 
     @Override
     public void purge(final Date date) {
-        new InClassLoaderExecute<Object>(metricsService, 
this.getClass().getName() + ".purgeWithDate", this.bundleContext, 
this.fatalIllegalStateErrors, throwExceptions) {
-            @Override
-            protected Object execute(Object... args) throws Exception {
-
-                GetIndexRequest getIndexRequest = new 
GetIndexRequest(getAllIndexForQuery());
-                GetIndexResponse getIndexResponse = 
client.indices().get(getIndexRequest, RequestOptions.DEFAULT);
-                String[] indices = getIndexResponse.getIndices();
-
-                SimpleDateFormat d = new SimpleDateFormat("yyyy-MM");
-
-                List<String> toDelete = new ArrayList<>();
-                for (String currentIndexName : indices) {
-                    int indexDatePrefixPos = 
currentIndexName.indexOf(INDEX_DATE_PREFIX);
-                    if (indexDatePrefixPos > -1) {
-                        try {
-                            Date indexDate = 
d.parse(currentIndexName.substring(indexDatePrefixPos + 
INDEX_DATE_PREFIX.length()));
-
-                            if (indexDate.before(date)) {
-                                toDelete.add(currentIndexName);
-                            }
-                        } catch (ParseException e) {
-                            throw new Exception("Cannot parse index name " + 
currentIndexName, e);
-                        }
-                    }
-                }
-                if (!toDelete.isEmpty()) {
-                    DeleteIndexRequest deleteIndexRequest = new 
DeleteIndexRequest(toDelete.toArray(new String[toDelete.size()]));
-                    client.indices().delete(deleteIndexRequest, 
RequestOptions.DEFAULT);
-                }
-                return null;
-            }
-        }.catchingExecuteInClassLoader(true);
     }
 
     @Override
diff --git 
a/persistence-spi/src/main/java/org/apache/unomi/persistence/spi/PersistenceService.java
 
b/persistence-spi/src/main/java/org/apache/unomi/persistence/spi/PersistenceService.java
index d0911131d..751a34c85 100644
--- 
a/persistence-spi/src/main/java/org/apache/unomi/persistence/spi/PersistenceService.java
+++ 
b/persistence-spi/src/main/java/org/apache/unomi/persistence/spi/PersistenceService.java
@@ -680,8 +680,19 @@ public interface PersistenceService {
      *
      * @param date the date (not included) before which we want to erase all 
data
      */
+    @Deprecated
     void purge(Date date);
 
+    /**
+     * Retrieves the number of document per indexes.
+     * If the index is a rollover index, each rollover index will be return 
with its own number of document
+     * For example: with "event" as parameter, the indexes named 
...-event-000001, ...-event-000002 and so one will be returned
+     *
+     * @param indexes names of the indexes to count the documents
+     * @return Map where the key in the index name and the value is the number 
of document for this index
+     */
+    Map<String, Long> docCountPerIndex(String... indexes);
+
     /**
      * Retrieves all items of the specified Item subclass which specified 
ranged property is within the specified bounds, ordered according to the 
specified {@code sortBy} String
      * and and paged: only {@code size} of them are retrieved, starting with 
the {@code offset}-th one.
@@ -733,6 +744,15 @@ public interface PersistenceService {
      */
     boolean removeIndex(final String itemType);
 
+    /**
+     * Removes the index for the specified item type.
+     *
+     * @param itemType the item type
+     * @param addPrefix should add the index prefix to the itemType passed as 
parameter
+     * @return {@code true} if the operation was successful, {@code false} 
otherwise
+     */
+    boolean removeIndex(final String itemType, boolean addPrefix);
+
     /**
      * Removes all data associated with the provided scope.
      *
diff --git 
a/rest/src/main/java/org/apache/unomi/rest/endpoints/ClusterServiceEndPoint.java
 
b/rest/src/main/java/org/apache/unomi/rest/endpoints/ClusterServiceEndPoint.java
index 9cfeae9a2..d0c7cf29b 100644
--- 
a/rest/src/main/java/org/apache/unomi/rest/endpoints/ClusterServiceEndPoint.java
+++ 
b/rest/src/main/java/org/apache/unomi/rest/endpoints/ClusterServiceEndPoint.java
@@ -87,6 +87,7 @@ public class ClusterServiceEndPoint {
      */
     @GET
     @Path("/purge/{date}")
+    @Deprecated
     public void purge(@PathParam("date") String date) {
         try {
             clusterService.purge(new 
SimpleDateFormat("yyyy-MM-dd").parse(date));
diff --git 
a/services/src/main/java/org/apache/unomi/services/impl/profiles/ProfileServiceImpl.java
 
b/services/src/main/java/org/apache/unomi/services/impl/profiles/ProfileServiceImpl.java
index c12dc411f..7912cc6de 100644
--- 
a/services/src/main/java/org/apache/unomi/services/impl/profiles/ProfileServiceImpl.java
+++ 
b/services/src/main/java/org/apache/unomi/services/impl/profiles/ProfileServiceImpl.java
@@ -20,18 +20,7 @@ package org.apache.unomi.services.impl.profiles;
 import org.apache.commons.beanutils.BeanUtils;
 import org.apache.commons.beanutils.PropertyUtils;
 import org.apache.commons.lang3.StringUtils;
-import org.apache.unomi.api.BatchUpdate;
-import org.apache.unomi.api.Item;
-import org.apache.unomi.api.PartialList;
-import org.apache.unomi.api.Persona;
-import org.apache.unomi.api.PersonaSession;
-import org.apache.unomi.api.PersonaWithSessions;
-import org.apache.unomi.api.Profile;
-import org.apache.unomi.api.ProfileAlias;
-import org.apache.unomi.api.PropertyMergeStrategyExecutor;
-import org.apache.unomi.api.PropertyMergeStrategyType;
-import org.apache.unomi.api.PropertyType;
-import org.apache.unomi.api.Session;
+import org.apache.unomi.api.*;
 import org.apache.unomi.api.conditions.Condition;
 import org.apache.unomi.api.conditions.ConditionType;
 import org.apache.unomi.api.query.Query;
@@ -56,25 +45,7 @@ import org.slf4j.LoggerFactory;
 
 import java.io.IOException;
 import java.net.URL;
-import java.util.ArrayList;
-import java.util.Calendar;
-import java.util.Collection;
-import java.util.Collections;
-import java.util.Comparator;
-import java.util.Date;
-import java.util.Enumeration;
-import java.util.GregorianCalendar;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.LinkedHashMap;
-import java.util.LinkedHashSet;
-import java.util.List;
-import java.util.Map;
-import java.util.Objects;
-import java.util.Set;
-import java.util.TimerTask;
-import java.util.TreeSet;
-import java.util.UUID;
+import java.util.*;
 import java.util.concurrent.TimeUnit;
 import java.util.stream.Collectors;
 import java.util.stream.Stream;
@@ -203,7 +174,14 @@ public class ProfileServiceImpl implements ProfileService, 
SynchronousBundleList
 
     private Integer purgeProfileExistTime = 0;
     private Integer purgeProfileInactiveTime = 0;
+
+    /**
+     * Use purgeSessionExistTime and purgeEventExistTime instead
+     */
+    @Deprecated
     private Integer purgeSessionsAndEventsTime = 0;
+    private Integer purgeSessionExistTime = 0;
+    private Integer purgeEventExistTime = 0;
     private Integer purgeProfileInterval = 0;
     private TimerTask purgeTask = null;
     private long propertiesRefreshInterval = 10000;
@@ -256,6 +234,7 @@ public class ProfileServiceImpl implements ProfileService, 
SynchronousBundleList
             }
         }
         bundleContext.addBundleListener(this);
+        initializeDefaultPurgeValuesIfNecessary();
         initializePurge();
         schedulePropertyTypeLoad();
         logger.info("Profile service initialized.");
@@ -283,6 +262,22 @@ public class ProfileServiceImpl implements ProfileService, 
SynchronousBundleList
     private void processBundleStop(BundleContext bundleContext) {
     }
 
+    /**
+     * Fill purgeEventExistTime and purgeSessionExistTime with the old 
property purgeSessionsAndEventsTime
+     * if there is no value set for these properties. This is done to allow 
the using of the old property.
+     * This method should be removed once the purgeSessionsAndEventsTime 
property is deleted.
+     */
+    private void initializeDefaultPurgeValuesIfNecessary() {
+        if (purgeSessionsAndEventsTime > 0) {
+            if (purgeEventExistTime <= 0) {
+                purgeEventExistTime = purgeSessionsAndEventsTime * 30;
+            }
+            if (purgeSessionExistTime <= 0) {
+                purgeSessionExistTime = purgeSessionsAndEventsTime * 30;
+            }
+        }
+    }
+
     public void setPurgeProfileExistTime(Integer purgeProfileExistTime) {
         this.purgeProfileExistTime = purgeProfileExistTime;
     }
@@ -291,6 +286,7 @@ public class ProfileServiceImpl implements ProfileService, 
SynchronousBundleList
         this.purgeProfileInactiveTime = purgeProfileInactiveTime;
     }
 
+    @Deprecated
     public void setPurgeSessionsAndEventsTime(Integer 
purgeSessionsAndEventsTime) {
         this.purgeSessionsAndEventsTime = purgeSessionsAndEventsTime;
     }
@@ -299,6 +295,14 @@ public class ProfileServiceImpl implements ProfileService, 
SynchronousBundleList
         this.purgeProfileInterval = purgeProfileInterval;
     }
 
+    public void setPurgeSessionExistTime(Integer purgeSessionExistTime) {
+        this.purgeSessionExistTime = purgeSessionExistTime;
+    }
+
+    public void setPurgeEventExistTime(Integer purgeEventExistTime) {
+        this.purgeEventExistTime = purgeEventExistTime;
+    }
+
     private void schedulePropertyTypeLoad() {
         propertyTypeLoadTask = new TimerTask() {
             @Override
@@ -366,26 +370,86 @@ public class ProfileServiceImpl implements 
ProfileService, SynchronousBundleList
         }
     }
 
+    private <T extends Item> void purgeRolloverItems(int existsNumberOfDays, 
Class<T> clazz) {
+        if (existsNumberOfDays > 0) {
+            String conditionType = null;
+            String itemType = null;
+
+            if (clazz.getName().equals(Event.class.getName())) {
+                conditionType = "eventPropertyCondition";
+                itemType = Event.ITEM_TYPE;
+            } else if (clazz.getName().equals(Session.class.getName())) {
+                conditionType = "sessionPropertyCondition";
+                itemType = Session.ITEM_TYPE;
+            }
+
+            ConditionType propertyConditionType = 
definitionsService.getConditionType(conditionType);
+            if (propertyConditionType == null) {
+                // definition service not yet fully instantiate
+                return;
+            }
+
+            Condition condition = new Condition(propertyConditionType);
+
+            condition.setParameter("propertyName", "timeStamp");
+            condition.setParameter("comparisonOperator", "lessThanOrEqualTo");
+            condition.setParameter("propertyValueDateExpr", "now-" + 
existsNumberOfDays + "d");
+            persistenceService.removeByQuery(condition, clazz);
+            deleteEmptyRolloverIndex(itemType);
+        }
+    }
+
+    @Override
+    public void purgeSessionItems(int existsNumberOfDays) {
+        if (existsNumberOfDays > 0) {
+            logger.info("Purging: Sessions created since more than {} days", 
existsNumberOfDays);
+            purgeRolloverItems(existsNumberOfDays, Session.class);
+        }
+    }
+
+    @Override
+    public void purgeEventItems(int existsNumberOfDays) {
+        if (existsNumberOfDays > 0) {
+            logger.info("Purging: Events created since more than {} days", 
existsNumberOfDays);
+            purgeRolloverItems(existsNumberOfDays, Event.class);
+        }
+    }
+
+    @Deprecated
     @Override
     public void purgeMonthlyItems(int existsNumberOfMonths) {
-        if (existsNumberOfMonths > 0) {
-            logger.info("Purging: Monthly items (sessions/events) created 
before {} months", existsNumberOfMonths);
-            
persistenceService.purge(getMonth(-existsNumberOfMonths).getTime());
+
+    }
+
+    public void deleteEmptyRolloverIndex(String indexName) {
+        TreeMap<String, Long> countsPerIndex = new 
TreeMap<>(persistenceService.docCountPerIndex(indexName));
+        if (countsPerIndex.size() >= 1) {
+            // do not check the last index, because it's the one used to write 
documents
+            countsPerIndex.pollLastEntry();
+            countsPerIndex.forEach((index, count) -> {
+                if (count == 0) {
+                    persistenceService.removeIndex(index, false);
+                }
+            });
         }
     }
 
     private void initializePurge() {
         logger.info("Purge: Initializing");
 
-        if (purgeProfileInactiveTime > 0 || purgeProfileExistTime > 0 || 
purgeSessionsAndEventsTime > 0) {
+        if (purgeProfileInactiveTime > 0 || purgeProfileExistTime > 0 || 
purgeSessionExistTime > 0 || purgeEventExistTime > 0) {
             if (purgeProfileInactiveTime > 0) {
                 logger.info("Purge: Profile with no visits since more than {} 
days, will be purged", purgeProfileInactiveTime);
             }
             if (purgeProfileExistTime > 0) {
                 logger.info("Purge: Profile created since more than {} days, 
will be purged", purgeProfileExistTime);
             }
-            if (purgeSessionsAndEventsTime > 0) {
-                logger.info("Purge: Monthly items (sessions/events) created 
since more than {} months, will be purged", purgeSessionsAndEventsTime);
+
+            if (purgeSessionExistTime > 0) {
+                logger.info("Purge: Session items created since more than {} 
days, will be purged", purgeSessionExistTime);
+            }
+            if (purgeEventExistTime > 0) {
+                logger.info("Purge: Event items created since more than {} 
days, will be purged", purgeEventExistTime);
             }
 
             purgeTask = new TimerTask() {
@@ -399,8 +463,8 @@ public class ProfileServiceImpl implements ProfileService, 
SynchronousBundleList
                         purgeProfiles(purgeProfileInactiveTime, 
purgeProfileExistTime);
 
                         // Monthly items purge
-                        purgeMonthlyItems(purgeSessionsAndEventsTime);
-
+                        purgeSessionItems(purgeSessionExistTime);
+                        purgeEventItems(purgeEventExistTime);
                         logger.info("Purge: executed in {} ms", 
System.currentTimeMillis() - purgeStartTime);
                     } catch (Throwable t) {
                         logger.error("Error while purging", t);
@@ -416,12 +480,6 @@ public class ProfileServiceImpl implements ProfileService, 
SynchronousBundleList
         }
     }
 
-    private GregorianCalendar getMonth(int offset) {
-        GregorianCalendar gc = new GregorianCalendar();
-        gc = new GregorianCalendar(gc.get(Calendar.YEAR), 
gc.get(Calendar.MONTH), 1);
-        gc.add(Calendar.MONTH, offset);
-        return gc;
-    }
 
     public long getAllProfilesCount() {
         return persistenceService.getAllItemsCount(Profile.ITEM_TYPE);
diff --git a/services/src/main/resources/OSGI-INF/blueprint/blueprint.xml 
b/services/src/main/resources/OSGI-INF/blueprint/blueprint.xml
index 3347588ab..f49f3c849 100644
--- a/services/src/main/resources/OSGI-INF/blueprint/blueprint.xml
+++ b/services/src/main/resources/OSGI-INF/blueprint/blueprint.xml
@@ -29,6 +29,8 @@
             <cm:property name="profile.purge.inactiveTime" value="180"/>
             <cm:property name="profile.purge.existTime" value="-1"/>
             <cm:property name="monthly.index.purge.existTime" value="12"/>
+            <cm:property name="session.purge.existTime" value="-1"/>
+            <cm:property name="event.purge.existTime" value="-1"/>
             <cm:property name="segment.update.batchSize" value="1000"/>
             <cm:property name="profile.forceRefreshOnSave" value="false"/>
             <cm:property name="definitions.refresh.interval" value="10000"/>
@@ -235,6 +237,8 @@
         <property name="purgeProfileInactiveTime" 
value="${services.profile.purge.inactiveTime}"/>
         <property name="purgeProfileExistTime" 
value="${services.profile.purge.existTime}"/>
         <property name="purgeSessionsAndEventsTime" 
value="${services.monthly.index.purge.existTime}"/>
+        <property name="purgeSessionExistTime" 
value="${services.session.purge.existTime}"/>
+        <property name="purgeEventExistTime" 
value="${services.event.purge.existTime}"/>
         <property name="forceRefreshOnSave" 
value="${services.profile.forceRefreshOnSave}"/>
         <property name="propertiesRefreshInterval" 
value="${services.properties.refresh.interval}"/>
         <property name="schedulerService" ref="schedulerServiceImpl"/>
diff --git a/services/src/main/resources/org.apache.unomi.services.cfg 
b/services/src/main/resources/org.apache.unomi.services.cfg
index 239305529..818b9ca78 100644
--- a/services/src/main/resources/org.apache.unomi.services.cfg
+++ b/services/src/main/resources/org.apache.unomi.services.cfg
@@ -27,9 +27,17 @@ 
profile.purge.existTime=${org.apache.unomi.profile.purge.existTime:--1}
 # Refresh Elasticsearch after saving a profile
 
profile.forceRefreshOnSave=${org.apache.unomi.profile.forceRefreshOnSave:-false}
 
-# Purge all monthly indexes (sessions/events) that have been created for a 
specific number of months
+# Deprecated: Use session.purge.existTime and event.purge.existTime instead.
+# If there is no value for session.purge.existTime or event.purge.existTime, 
the value
+# of this property will be use for the purge.
 
monthly.index.purge.existTime=${org.apache.unomi.monthly.index.purge.existTime:-12}
 
+# Purge sessions that have been created for a specific number of days
+session.purge.existTime=${org.apache.unomi.session.purge.existTime:--1}
+
+# Purge events that have been created for a specific number of days
+event.purge.existTime=${org.apache.unomi.event.purge.existTime:--1}
+
 # When performing segment updates, this controls the size of the scrolling 
query size used to iterate over all the
 # profiles that need updating
 segment.update.batchSize=${org.apache.unomi.segment.update.batchSize:-1000}
diff --git 
a/tools/shell-commands/src/main/resources/requestBody/2.2.0/create_rollover_policy_query.json
 
b/tools/shell-commands/src/main/resources/requestBody/2.2.0/create_rollover_policy_query.json
index c9bc94a29..2084db3e7 100644
--- 
a/tools/shell-commands/src/main/resources/requestBody/2.2.0/create_rollover_policy_query.json
+++ 
b/tools/shell-commands/src/main/resources/requestBody/2.2.0/create_rollover_policy_query.json
@@ -7,12 +7,6 @@
             #rolloverHotActions
           }
         }
-      },
-      "delete": {
-        "min_age": "90d",
-        "actions": {
-          "delete": {}
-        }
       }
     }
   }

Reply via email to