This is an automated email from the ASF dual-hosted git repository. jsinovassinnaik pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/unomi.git
The following commit(s) were added to refs/heads/master by this push: new 4d0565385 UNOMI-430: Fix batch profile update by using scrolling (#623) 4d0565385 is described below commit 4d05653851a76f96b0af4db4109ac90f870ee45b Author: kevan Jahanshahi <jke...@apache.org> AuthorDate: Fri May 12 18:03:24 2023 +0200 UNOMI-430: Fix batch profile update by using scrolling (#623) --- .../java/org/apache/unomi/api/BatchUpdate.java | 42 +++++++++++++++++++++ .../org/apache/unomi/itests/ProfileServiceIT.java | 43 ++++++++++++++++++++++ .../services/impl/profiles/ProfileServiceImpl.java | 19 ++++++++-- 3 files changed, 100 insertions(+), 4 deletions(-) diff --git a/api/src/main/java/org/apache/unomi/api/BatchUpdate.java b/api/src/main/java/org/apache/unomi/api/BatchUpdate.java index 45bf9e80a..8f55a5195 100644 --- a/api/src/main/java/org/apache/unomi/api/BatchUpdate.java +++ b/api/src/main/java/org/apache/unomi/api/BatchUpdate.java @@ -27,6 +27,8 @@ public class BatchUpdate { private Object propertyValue; private Condition condition; private String strategy; + private String scrollTimeValidity = "10m"; + private int scrollBatchSize = 1000; /** * Retrieves the property name which value needs to be updated. Note that the property name follows the @@ -101,4 +103,44 @@ public class BatchUpdate { public void setStrategy(String strategy) { this.strategy = strategy; } + + /** + * Batch update will perform scroll queries to query document to be updated, the scroll time validity allow specifying + * how much time the scroll context should stay open in memory to be able to complete the update. + * + * @return the scroll time validity (default: 10m) + */ + public String getScrollTimeValidity() { + return scrollTimeValidity; + } + + /** + * Batch update will perform scroll queries to query document to be updated, the scroll time validity allow specifying + * how much time the scroll context should stay open in memory to be able to complete the update. + * + * @param scrollTimeValidity the scroll time validity in time unit + */ + public void setScrollTimeValidity(String scrollTimeValidity) { + this.scrollTimeValidity = scrollTimeValidity; + } + + /** + * Batch update will perform scroll queries to query document to be updated, the scroll batch size allow specifying + * how many document we want to load per scroll. + * + * @return the scroll batch size (default: 1000) + */ + public int getScrollBatchSize() { + return scrollBatchSize; + } + + /** + * Batch update will perform scroll queries to query document to be updated, the scroll batch size allow specifying + * how many document we want to load per scroll. + * + * @param scrollBatchSize the scroll batch size (default: 1000) + */ + public void setScrollBatchSize(int scrollBatchSize) { + this.scrollBatchSize = scrollBatchSize; + } } diff --git a/itests/src/test/java/org/apache/unomi/itests/ProfileServiceIT.java b/itests/src/test/java/org/apache/unomi/itests/ProfileServiceIT.java index fdd2088a2..306f9d8c8 100644 --- a/itests/src/test/java/org/apache/unomi/itests/ProfileServiceIT.java +++ b/itests/src/test/java/org/apache/unomi/itests/ProfileServiceIT.java @@ -17,6 +17,7 @@ package org.apache.unomi.itests; import org.apache.unomi.api.*; +import org.apache.unomi.api.conditions.Condition; import org.apache.unomi.api.query.Query; import org.apache.unomi.persistence.spi.PersistenceService; import org.junit.After; @@ -418,4 +419,46 @@ public class ProfileServiceIT extends BaseIT { keepTrying("Events number should be 150", () -> persistenceService.getAllItemsCount(Event.ITEM_TYPE), (count) -> count == (150 + originalEventsCount), 1000, 100); } + + @Test + public void testBatchProfileUpdate() throws Exception { + // Create 50 profiles + for (int i = 1; i <= 50; i++) { + Profile profile = new Profile(); + profile.setItemId("batchProfileUpdateTest" + i); + profile.setProperty("name", "Boby"); + profile.setProperty("test", "batchProfileUpdateTest"); + + profileService.save(profile); + } + + Condition batchUpdateCondition = new Condition(definitionsService.getConditionType("profilePropertyCondition")); + batchUpdateCondition.setParameter("propertyName","properties.test"); + batchUpdateCondition.setParameter("comparisonOperator","equals"); + batchUpdateCondition.setParameter("propertyValue", "batchProfileUpdateTest"); + keepTrying("We should wait for profiles to be saved", () -> persistenceService.queryCount(batchUpdateCondition, Profile.ITEM_TYPE), + (count) -> count == 50, 1000, 100); + + BatchUpdate batchUpdate = new BatchUpdate(); + batchUpdate.setCondition(batchUpdateCondition); + batchUpdate.setStrategy("alwaysSet"); + batchUpdate.setPropertyName("properties.name"); + batchUpdate.setPropertyValue("Billybob"); + batchUpdate.setScrollBatchSize(10); + profileService.batchProfilesUpdate(batchUpdate); + + Condition updatedProfilesCondition = new Condition(definitionsService.getConditionType("profilePropertyCondition")); + updatedProfilesCondition.setParameter("propertyName","properties.name"); + updatedProfilesCondition.setParameter("comparisonOperator","equals"); + updatedProfilesCondition.setParameter("propertyValue", "Billybob"); + keepTrying("We should still retrieve the 50 updated profiles", () -> persistenceService.queryCount(updatedProfilesCondition, Profile.ITEM_TYPE), + (count) -> count == 50, 1000, 100); + + Condition oldProfilesCondition = new Condition(definitionsService.getConditionType("profilePropertyCondition")); + oldProfilesCondition.setParameter("propertyName","properties.name"); + oldProfilesCondition.setParameter("comparisonOperator","equals"); + oldProfilesCondition.setParameter("propertyValue", "Boby"); + keepTrying("We should not be able to retrieve previous profile based on previous value", () -> persistenceService.queryCount(oldProfilesCondition, Profile.ITEM_TYPE), + (count) -> count == 0, 1000, 100); + } } diff --git a/services/src/main/java/org/apache/unomi/services/impl/profiles/ProfileServiceImpl.java b/services/src/main/java/org/apache/unomi/services/impl/profiles/ProfileServiceImpl.java index a595ba892..56bf686d8 100644 --- a/services/src/main/java/org/apache/unomi/services/impl/profiles/ProfileServiceImpl.java +++ b/services/src/main/java/org/apache/unomi/services/impl/profiles/ProfileServiceImpl.java @@ -942,14 +942,25 @@ public class ProfileServiceImpl implements ProfileService, SynchronousBundleList } public void batchProfilesUpdate(BatchUpdate update) { + logger.info("Starting batch profiles update"); + long startTime = System.currentTimeMillis(); + long updatedCount = 0; + ParserHelper.resolveConditionType(definitionsService, update.getCondition(), "batch update on property " + update.getPropertyName()); - List<Profile> profiles = persistenceService.query(update.getCondition(), null, Profile.class); + PartialList<Profile> profiles = persistenceService.query(update.getCondition(), null, Profile.class, 0,update.getScrollBatchSize(), update.getScrollTimeValidity()); - for (Profile profile : profiles) { - if (PropertyHelper.setProperty(profile, update.getPropertyName(), update.getPropertyValue(), update.getStrategy())) { - save(profile); + while (profiles != null && profiles.getList().size() > 0) { + for (Profile profile : profiles.getList()) { + if (PropertyHelper.setProperty(profile, update.getPropertyName(), update.getPropertyValue(), update.getStrategy())) { + save(profile); + updatedCount += 1; + } } + profiles = persistenceService.continueScrollQuery(Profile.class, profiles.getScrollIdentifier(), profiles.getScrollTimeValidity()); } + + long totalTime = System.currentTimeMillis() - startTime; + logger.info("Batch profiles updated: {} profiles in {}ms", updatedCount, totalTime); } public Persona loadPersona(String personaId) {