This is an automated email from the ASF dual-hosted git repository.

jsinovassinnaik pushed a commit to branch unomi-1.x
in repository https://gitbox.apache.org/repos/asf/unomi.git


The following commit(s) were added to refs/heads/unomi-1.x by this push:
     new a5ad5f6a2 UNOMI-430: Fix batch profile update by using scrolling (#625)
a5ad5f6a2 is described below

commit a5ad5f6a242e94df4b060e4ae5f2fb47f2806604
Author: kevan Jahanshahi <jke...@apache.org>
AuthorDate: Fri May 12 18:26:10 2023 +0200

    UNOMI-430: Fix batch profile update by using scrolling (#625)
---
 .../java/org/apache/unomi/api/BatchUpdate.java     | 42 +++++++++++++++++++
 .../org/apache/unomi/itests/ProfileServiceIT.java  | 47 ++++++++++++++++++++--
 .../services/impl/profiles/ProfileServiceImpl.java | 19 +++++++--
 3 files changed, 101 insertions(+), 7 deletions(-)

diff --git a/api/src/main/java/org/apache/unomi/api/BatchUpdate.java 
b/api/src/main/java/org/apache/unomi/api/BatchUpdate.java
index 45bf9e80a..8f55a5195 100644
--- a/api/src/main/java/org/apache/unomi/api/BatchUpdate.java
+++ b/api/src/main/java/org/apache/unomi/api/BatchUpdate.java
@@ -27,6 +27,8 @@ public class BatchUpdate {
     private Object propertyValue;
     private Condition condition;
     private String strategy;
+    private String scrollTimeValidity = "10m";
+    private int scrollBatchSize = 1000;
 
     /**
      * Retrieves the property name which value needs to be updated. Note that 
the property name follows the
@@ -101,4 +103,44 @@ public class BatchUpdate {
     public void setStrategy(String strategy) {
         this.strategy = strategy;
     }
+
+    /**
+     * Batch update will perform scroll queries to query document to be 
updated, the scroll time validity allow specifying
+     * how much time the scroll context should stay open in memory to be able 
to complete the update.
+     *
+     * @return the scroll time validity (default: 10m)
+     */
+    public String getScrollTimeValidity() {
+        return scrollTimeValidity;
+    }
+
+    /**
+     * Batch update will perform scroll queries to query document to be 
updated, the scroll time validity allow specifying
+     * how much time the scroll context should stay open in memory to be able 
to complete the update.
+     *
+     * @param scrollTimeValidity the scroll time validity in time unit
+     */
+    public void setScrollTimeValidity(String scrollTimeValidity) {
+        this.scrollTimeValidity = scrollTimeValidity;
+    }
+
+    /**
+     * Batch update will perform scroll queries to query document to be 
updated, the scroll batch size allow specifying
+     * how many document we want to load per scroll.
+     *
+     * @return the scroll batch size (default: 1000)
+     */
+    public int getScrollBatchSize() {
+        return scrollBatchSize;
+    }
+
+    /**
+     * Batch update will perform scroll queries to query document to be 
updated, the scroll batch size allow specifying
+     * how many document we want to load per scroll.
+     *
+     * @param scrollBatchSize the scroll batch size (default: 1000)
+     */
+    public void setScrollBatchSize(int scrollBatchSize) {
+        this.scrollBatchSize = scrollBatchSize;
+    }
 }
diff --git a/itests/src/test/java/org/apache/unomi/itests/ProfileServiceIT.java 
b/itests/src/test/java/org/apache/unomi/itests/ProfileServiceIT.java
index 80573dfee..fe82fa749 100644
--- a/itests/src/test/java/org/apache/unomi/itests/ProfileServiceIT.java
+++ b/itests/src/test/java/org/apache/unomi/itests/ProfileServiceIT.java
@@ -16,9 +16,8 @@
  */
 package org.apache.unomi.itests;
 
-import org.apache.unomi.api.Event;
-import org.apache.unomi.api.Profile;
-import org.apache.unomi.api.Session;
+import org.apache.unomi.api.*;
+import org.apache.unomi.api.conditions.Condition;
 import org.apache.unomi.api.query.Query;
 import org.apache.unomi.api.services.ProfileService;
 import org.apache.unomi.persistence.spi.PersistenceService;
@@ -288,4 +287,46 @@ public class ProfileServiceIT extends BaseIT {
         keepTrying("Events number should be 150", () -> 
persistenceService.getAllItemsCount(Event.ITEM_TYPE),
                 (count) -> count == (150 + originalEventsCount), 1000, 100);
     }
+
+    @Test
+    public void testBatchProfileUpdate() throws Exception {
+        // Create 50 profiles
+        for (int i = 1; i <= 50; i++) {
+            Profile profile = new Profile();
+            profile.setItemId("batchProfileUpdateTest" + i);
+            profile.setProperty("name", "Boby");
+            profile.setProperty("test", "batchProfileUpdateTest");
+
+            profileService.save(profile);
+        }
+
+        Condition batchUpdateCondition = new 
Condition(definitionsService.getConditionType("profilePropertyCondition"));
+        batchUpdateCondition.setParameter("propertyName","properties.test");
+        batchUpdateCondition.setParameter("comparisonOperator","equals");
+        batchUpdateCondition.setParameter("propertyValue", 
"batchProfileUpdateTest");
+        keepTrying("We should wait for profiles to be saved", () -> 
persistenceService.queryCount(batchUpdateCondition, Profile.ITEM_TYPE),
+                (count) -> count == 50, 1000, 100);
+
+        BatchUpdate batchUpdate = new BatchUpdate();
+        batchUpdate.setCondition(batchUpdateCondition);
+        batchUpdate.setStrategy("alwaysSet");
+        batchUpdate.setPropertyName("properties.name");
+        batchUpdate.setPropertyValue("Billybob");
+        batchUpdate.setScrollBatchSize(10);
+        profileService.batchProfilesUpdate(batchUpdate);
+
+        Condition updatedProfilesCondition = new 
Condition(definitionsService.getConditionType("profilePropertyCondition"));
+        
updatedProfilesCondition.setParameter("propertyName","properties.name");
+        updatedProfilesCondition.setParameter("comparisonOperator","equals");
+        updatedProfilesCondition.setParameter("propertyValue", "Billybob");
+        keepTrying("We should still retrieve the 50 updated profiles", () -> 
persistenceService.queryCount(updatedProfilesCondition, Profile.ITEM_TYPE),
+                (count) -> count == 50, 1000, 100);
+
+        Condition oldProfilesCondition = new 
Condition(definitionsService.getConditionType("profilePropertyCondition"));
+        oldProfilesCondition.setParameter("propertyName","properties.name");
+        oldProfilesCondition.setParameter("comparisonOperator","equals");
+        oldProfilesCondition.setParameter("propertyValue", "Boby");
+        keepTrying("We should not be able to retrieve previous profile based 
on previous value", () -> persistenceService.queryCount(oldProfilesCondition, 
Profile.ITEM_TYPE),
+                (count) -> count == 0, 1000, 100);
+    }
 }
diff --git 
a/services/src/main/java/org/apache/unomi/services/impl/profiles/ProfileServiceImpl.java
 
b/services/src/main/java/org/apache/unomi/services/impl/profiles/ProfileServiceImpl.java
index 09c355edf..0e88f8706 100644
--- 
a/services/src/main/java/org/apache/unomi/services/impl/profiles/ProfileServiceImpl.java
+++ 
b/services/src/main/java/org/apache/unomi/services/impl/profiles/ProfileServiceImpl.java
@@ -814,14 +814,25 @@ public class ProfileServiceImpl implements 
ProfileService, SynchronousBundleList
     }
 
     public void batchProfilesUpdate(BatchUpdate update) {
+        logger.info("Starting batch profiles update");
+        long startTime = System.currentTimeMillis();
+        long updatedCount = 0;
+
         ParserHelper.resolveConditionType(definitionsService, 
update.getCondition(), "batch update on property " + update.getPropertyName());
-        List<Profile> profiles = 
persistenceService.query(update.getCondition(), null, Profile.class);
+        PartialList<Profile> profiles = 
persistenceService.query(update.getCondition(), null, Profile.class, 
0,update.getScrollBatchSize(), update.getScrollTimeValidity());
 
-        for (Profile profile : profiles) {
-            if (PropertyHelper.setProperty(profile, update.getPropertyName(), 
update.getPropertyValue(), update.getStrategy())) {
-                save(profile);
+        while (profiles != null && profiles.getList().size() > 0) {
+            for (Profile profile : profiles.getList()) {
+                if (PropertyHelper.setProperty(profile, 
update.getPropertyName(), update.getPropertyValue(), update.getStrategy())) {
+                    save(profile);
+                    updatedCount += 1;
+                }
             }
+            profiles = persistenceService.continueScrollQuery(Profile.class, 
profiles.getScrollIdentifier(), profiles.getScrollTimeValidity());
         }
+
+        long totalTime = System.currentTimeMillis() - startTime;
+        logger.info("Batch profiles updated: {} profiles in {}ms", 
updatedCount, totalTime);
     }
 
     public Persona loadPersona(String personaId) {

Reply via email to