This is an automated email from the ASF dual-hosted git repository. jsinovassinnaik pushed a commit to branch unomi-1.x in repository https://gitbox.apache.org/repos/asf/unomi.git
The following commit(s) were added to refs/heads/unomi-1.x by this push: new a5ad5f6a2 UNOMI-430: Fix batch profile update by using scrolling (#625) a5ad5f6a2 is described below commit a5ad5f6a242e94df4b060e4ae5f2fb47f2806604 Author: kevan Jahanshahi <jke...@apache.org> AuthorDate: Fri May 12 18:26:10 2023 +0200 UNOMI-430: Fix batch profile update by using scrolling (#625) --- .../java/org/apache/unomi/api/BatchUpdate.java | 42 +++++++++++++++++++ .../org/apache/unomi/itests/ProfileServiceIT.java | 47 ++++++++++++++++++++-- .../services/impl/profiles/ProfileServiceImpl.java | 19 +++++++-- 3 files changed, 101 insertions(+), 7 deletions(-) diff --git a/api/src/main/java/org/apache/unomi/api/BatchUpdate.java b/api/src/main/java/org/apache/unomi/api/BatchUpdate.java index 45bf9e80a..8f55a5195 100644 --- a/api/src/main/java/org/apache/unomi/api/BatchUpdate.java +++ b/api/src/main/java/org/apache/unomi/api/BatchUpdate.java @@ -27,6 +27,8 @@ public class BatchUpdate { private Object propertyValue; private Condition condition; private String strategy; + private String scrollTimeValidity = "10m"; + private int scrollBatchSize = 1000; /** * Retrieves the property name which value needs to be updated. Note that the property name follows the @@ -101,4 +103,44 @@ public class BatchUpdate { public void setStrategy(String strategy) { this.strategy = strategy; } + + /** + * Batch update will perform scroll queries to query document to be updated, the scroll time validity allow specifying + * how much time the scroll context should stay open in memory to be able to complete the update. + * + * @return the scroll time validity (default: 10m) + */ + public String getScrollTimeValidity() { + return scrollTimeValidity; + } + + /** + * Batch update will perform scroll queries to query document to be updated, the scroll time validity allow specifying + * how much time the scroll context should stay open in memory to be able to complete the update. + * + * @param scrollTimeValidity the scroll time validity in time unit + */ + public void setScrollTimeValidity(String scrollTimeValidity) { + this.scrollTimeValidity = scrollTimeValidity; + } + + /** + * Batch update will perform scroll queries to query document to be updated, the scroll batch size allow specifying + * how many document we want to load per scroll. + * + * @return the scroll batch size (default: 1000) + */ + public int getScrollBatchSize() { + return scrollBatchSize; + } + + /** + * Batch update will perform scroll queries to query document to be updated, the scroll batch size allow specifying + * how many document we want to load per scroll. + * + * @param scrollBatchSize the scroll batch size (default: 1000) + */ + public void setScrollBatchSize(int scrollBatchSize) { + this.scrollBatchSize = scrollBatchSize; + } } diff --git a/itests/src/test/java/org/apache/unomi/itests/ProfileServiceIT.java b/itests/src/test/java/org/apache/unomi/itests/ProfileServiceIT.java index 80573dfee..fe82fa749 100644 --- a/itests/src/test/java/org/apache/unomi/itests/ProfileServiceIT.java +++ b/itests/src/test/java/org/apache/unomi/itests/ProfileServiceIT.java @@ -16,9 +16,8 @@ */ package org.apache.unomi.itests; -import org.apache.unomi.api.Event; -import org.apache.unomi.api.Profile; -import org.apache.unomi.api.Session; +import org.apache.unomi.api.*; +import org.apache.unomi.api.conditions.Condition; import org.apache.unomi.api.query.Query; import org.apache.unomi.api.services.ProfileService; import org.apache.unomi.persistence.spi.PersistenceService; @@ -288,4 +287,46 @@ public class ProfileServiceIT extends BaseIT { keepTrying("Events number should be 150", () -> persistenceService.getAllItemsCount(Event.ITEM_TYPE), (count) -> count == (150 + originalEventsCount), 1000, 100); } + + @Test + public void testBatchProfileUpdate() throws Exception { + // Create 50 profiles + for (int i = 1; i <= 50; i++) { + Profile profile = new Profile(); + profile.setItemId("batchProfileUpdateTest" + i); + profile.setProperty("name", "Boby"); + profile.setProperty("test", "batchProfileUpdateTest"); + + profileService.save(profile); + } + + Condition batchUpdateCondition = new Condition(definitionsService.getConditionType("profilePropertyCondition")); + batchUpdateCondition.setParameter("propertyName","properties.test"); + batchUpdateCondition.setParameter("comparisonOperator","equals"); + batchUpdateCondition.setParameter("propertyValue", "batchProfileUpdateTest"); + keepTrying("We should wait for profiles to be saved", () -> persistenceService.queryCount(batchUpdateCondition, Profile.ITEM_TYPE), + (count) -> count == 50, 1000, 100); + + BatchUpdate batchUpdate = new BatchUpdate(); + batchUpdate.setCondition(batchUpdateCondition); + batchUpdate.setStrategy("alwaysSet"); + batchUpdate.setPropertyName("properties.name"); + batchUpdate.setPropertyValue("Billybob"); + batchUpdate.setScrollBatchSize(10); + profileService.batchProfilesUpdate(batchUpdate); + + Condition updatedProfilesCondition = new Condition(definitionsService.getConditionType("profilePropertyCondition")); + updatedProfilesCondition.setParameter("propertyName","properties.name"); + updatedProfilesCondition.setParameter("comparisonOperator","equals"); + updatedProfilesCondition.setParameter("propertyValue", "Billybob"); + keepTrying("We should still retrieve the 50 updated profiles", () -> persistenceService.queryCount(updatedProfilesCondition, Profile.ITEM_TYPE), + (count) -> count == 50, 1000, 100); + + Condition oldProfilesCondition = new Condition(definitionsService.getConditionType("profilePropertyCondition")); + oldProfilesCondition.setParameter("propertyName","properties.name"); + oldProfilesCondition.setParameter("comparisonOperator","equals"); + oldProfilesCondition.setParameter("propertyValue", "Boby"); + keepTrying("We should not be able to retrieve previous profile based on previous value", () -> persistenceService.queryCount(oldProfilesCondition, Profile.ITEM_TYPE), + (count) -> count == 0, 1000, 100); + } } diff --git a/services/src/main/java/org/apache/unomi/services/impl/profiles/ProfileServiceImpl.java b/services/src/main/java/org/apache/unomi/services/impl/profiles/ProfileServiceImpl.java index 09c355edf..0e88f8706 100644 --- a/services/src/main/java/org/apache/unomi/services/impl/profiles/ProfileServiceImpl.java +++ b/services/src/main/java/org/apache/unomi/services/impl/profiles/ProfileServiceImpl.java @@ -814,14 +814,25 @@ public class ProfileServiceImpl implements ProfileService, SynchronousBundleList } public void batchProfilesUpdate(BatchUpdate update) { + logger.info("Starting batch profiles update"); + long startTime = System.currentTimeMillis(); + long updatedCount = 0; + ParserHelper.resolveConditionType(definitionsService, update.getCondition(), "batch update on property " + update.getPropertyName()); - List<Profile> profiles = persistenceService.query(update.getCondition(), null, Profile.class); + PartialList<Profile> profiles = persistenceService.query(update.getCondition(), null, Profile.class, 0,update.getScrollBatchSize(), update.getScrollTimeValidity()); - for (Profile profile : profiles) { - if (PropertyHelper.setProperty(profile, update.getPropertyName(), update.getPropertyValue(), update.getStrategy())) { - save(profile); + while (profiles != null && profiles.getList().size() > 0) { + for (Profile profile : profiles.getList()) { + if (PropertyHelper.setProperty(profile, update.getPropertyName(), update.getPropertyValue(), update.getStrategy())) { + save(profile); + updatedCount += 1; + } } + profiles = persistenceService.continueScrollQuery(Profile.class, profiles.getScrollIdentifier(), profiles.getScrollTimeValidity()); } + + long totalTime = System.currentTimeMillis() - startTime; + logger.info("Batch profiles updated: {} profiles in {}ms", updatedCount, totalTime); } public Persona loadPersona(String personaId) {