This is an automated email from the ASF dual-hosted git repository. taybou pushed a commit to branch unomi-1.5.x in repository https://gitbox.apache.org/repos/asf/unomi.git
commit 8149e9c1fa73ba62eabd55e810028af293d0c8cc Author: giladw <[email protected]> AuthorDate: Wed Feb 17 16:15:51 2021 +0100 update segments with its own bulk request, and use retry for failures (#237) (cherry picked from commit 73403962bcd46b42fe6172f4bded454ce553ce42) --- .../main/resources/etc/custom.system.properties | 8 + .../ElasticSearchPersistenceServiceImpl.java | 75 ++++++-- .../resources/OSGI-INF/blueprint/blueprint.xml | 2 + .../org.apache.unomi.persistence.elasticsearch.cfg | 3 +- .../unomi/persistence/spi/PersistenceService.java | 12 ++ services/pom.xml | 6 + .../services/impl/segments/SegmentServiceImpl.java | 195 +++++++++++++-------- .../resources/OSGI-INF/blueprint/blueprint.xml | 9 + .../main/resources/org.apache.unomi.services.cfg | 12 ++ 9 files changed, 229 insertions(+), 93 deletions(-) diff --git a/package/src/main/resources/etc/custom.system.properties b/package/src/main/resources/etc/custom.system.properties index a7b90f3..924cc07 100644 --- a/package/src/main/resources/etc/custom.system.properties +++ b/package/src/main/resources/etc/custom.system.properties @@ -151,6 +151,14 @@ org.apache.unomi.profile.forceRefreshOnSave=${env:UNOMI_PROFILE_REFRESH_ON_SAVE: # When performing segment updates, this controls the size of the scrolling query size used to iterate over all the # profiles that need updating org.apache.unomi.segment.update.batchSize=${env:UNOMI_SEGMENT_UPDATE_BATCHSIZE:-1000} +# Run Batch request separately for updating segments in profiles +org.apache.unomi.segment.batch.update=${env:UNOMI_SEGMENT_BATCH_PROFILE_UPDATE:-false} +# Send Profile Updated Event for every profile segment update +org.apache.unomi.segment.send.profile.update.event=${env: UNOMI_SEGMENT_SEND_PROFILE_UPDATE_EVENT:-true} +# When performing segment updates, can retry an update in case of an error to a single profile +org.apache.unomi.services.segment.max.retries.update.profile.segment=${env:UNOMI_SEGMENT_UPDATE_MAX_RETRIES:-0} +# When performing retry of segment update after a request was failed, delay of requests +org.apache.unomi.services.segment.update.segment.retry.seconds.delay=${env:UNOMI_SEGMENT_UPDATE_RETRY_DELAY:-1} # The interval in milliseconds to use to reload the definitions (condition types and action types) org.apache.unomi.definitions.refresh.interval=${env:UNOMI_DEFINITIONS_REFRESH_INTERVAL:-10000} # The interval in milliseconds to use to reload the property types diff --git a/persistence-elasticsearch/core/src/main/java/org/apache/unomi/persistence/elasticsearch/ElasticSearchPersistenceServiceImpl.java b/persistence-elasticsearch/core/src/main/java/org/apache/unomi/persistence/elasticsearch/ElasticSearchPersistenceServiceImpl.java index 7362cf7..9b75bdf 100644 --- a/persistence-elasticsearch/core/src/main/java/org/apache/unomi/persistence/elasticsearch/ElasticSearchPersistenceServiceImpl.java +++ b/persistence-elasticsearch/core/src/main/java/org/apache/unomi/persistence/elasticsearch/ElasticSearchPersistenceServiceImpl.java @@ -194,12 +194,12 @@ public class ElasticSearchPersistenceServiceImpl implements PersistenceService, private Set<String> itemClassesToCacheSet = new HashSet<>(); private String itemClassesToCache; private boolean useBatchingForSave = false; + private boolean useBatchingForUpdate = true; + private boolean alwaysOverwrite = true; private boolean aggQueryThrowOnMissingDocs = false; private Integer aggQueryMaxResponseSizeHttp = null; private Integer clientSocketTimeout = null; - private boolean alwaysOverwrite = true; - private Map<String, Map<String, Map<String, Object>>> knownMappings = new HashMap<>(); public void setBundleContext(BundleContext bundleContext) { @@ -349,6 +349,10 @@ public class ElasticSearchPersistenceServiceImpl implements PersistenceService, this.useBatchingForSave = useBatchingForSave; } + public void setUseBatchingForUpdate(boolean useBatchingForUpdate) { + this.useBatchingForUpdate = useBatchingForUpdate; + } + public void setUsername(String username) { this.username = username; } @@ -878,21 +882,9 @@ public class ElasticSearchPersistenceServiceImpl implements PersistenceService, Boolean result = new InClassLoaderExecute<Boolean>(metricsService, this.getClass().getName() + ".updateItem", this.bundleContext, this.fatalIllegalStateErrors) { protected Boolean execute(Object... args) throws Exception { try { - String itemType = Item.getItemType(clazz); - UpdateRequest updateRequest = new UpdateRequest(getIndex(itemType, dateHint), item.getItemId()); - updateRequest.doc(source); - - if (!alwaysOverwrite) { - Long seqNo = (Long)item.getSystemMetadata(SEQ_NO); - Long primaryTerm = (Long)item.getSystemMetadata(PRIMARY_TERM); - - if (seqNo != null && primaryTerm != null) { - updateRequest.setIfSeqNo(seqNo); - updateRequest.setIfPrimaryTerm(primaryTerm); - } - } + UpdateRequest updateRequest = createUpdateRequest(clazz, dateHint, item, source, alwaysOverwrite); - if (bulkProcessor == null) { + if (bulkProcessor == null || !useBatchingForUpdate) { UpdateResponse response = client.update(updateRequest, RequestOptions.DEFAULT); setMetadata(item, response.getId(), response.getVersion(), response.getSeqNo(), response.getPrimaryTerm()); } else { @@ -911,6 +903,57 @@ public class ElasticSearchPersistenceServiceImpl implements PersistenceService, } } + private UpdateRequest createUpdateRequest(Class clazz, Date dateHint, Item item, Map source, boolean alwaysOverwrite) { + String itemType = Item.getItemType(clazz); + UpdateRequest updateRequest = new UpdateRequest(getIndex(itemType, dateHint), item.getItemId()); + updateRequest.doc(source); + + if (!alwaysOverwrite) { + Long seqNo = (Long) item.getSystemMetadata(SEQ_NO); + Long primaryTerm = (Long) item.getSystemMetadata(PRIMARY_TERM); + + if (seqNo != null && primaryTerm != null) { + updateRequest.setIfSeqNo(seqNo); + updateRequest.setIfPrimaryTerm(primaryTerm); + } + } + return updateRequest; + } + + @Override + public List<String> update(final Map<Item, Map> items, final Date dateHint, final Class clazz) { + if (items.size() == 0) + return new ArrayList<>(); + + List<String> result = new InClassLoaderExecute<List<String>>(metricsService, this.getClass().getName() + ".updateItems", this.bundleContext, this.fatalIllegalStateErrors) { + protected List<String> execute(Object... args) throws Exception { + long batchRequestStartTime = System.currentTimeMillis(); + + BulkRequest bulkRequest = new BulkRequest(); + items.forEach((item, source) -> { + UpdateRequest updateRequest = createUpdateRequest(clazz, dateHint, item, source, alwaysOverwrite); + bulkRequest.add(updateRequest); + }); + + BulkResponse bulkResponse = client.bulk(bulkRequest, RequestOptions.DEFAULT); + logger.debug("{} profiles updated with bulk segment in {}ms", bulkRequest.numberOfActions(), System.currentTimeMillis() - batchRequestStartTime); + + List<String> failedItemsIds = new ArrayList<>(); + + if (bulkResponse.hasFailures()){ + Iterator<BulkItemResponse> iterator = bulkResponse.iterator(); + iterator.forEachRemaining(bulkItemResponse -> { + failedItemsIds.add(bulkItemResponse.getId()); + }); + } + return failedItemsIds; + } + }.catchingExecuteInClassLoader(true); + + return result; + } + + @Override public boolean updateWithQueryAndScript(final Date dateHint, final Class<?> clazz, final String[] scripts, final Map<String, Object>[] scriptParams, final Condition[] conditions) { Boolean result = new InClassLoaderExecute<Boolean>(metricsService, this.getClass().getName() + ".updateWithQueryAndScript", this.bundleContext, this.fatalIllegalStateErrors) { diff --git a/persistence-elasticsearch/core/src/main/resources/OSGI-INF/blueprint/blueprint.xml b/persistence-elasticsearch/core/src/main/resources/OSGI-INF/blueprint/blueprint.xml index 6f40799..8a52c5f 100644 --- a/persistence-elasticsearch/core/src/main/resources/OSGI-INF/blueprint/blueprint.xml +++ b/persistence-elasticsearch/core/src/main/resources/OSGI-INF/blueprint/blueprint.xml @@ -58,6 +58,7 @@ <cm:property name="aggQueryThrowOnMissingDocs" value="false" /> <cm:property name="itemClassesToCache" value="" /> <cm:property name="useBatchingForSave" value="false" /> + <cm:property name="useBatchingForUpdate" value="true" /> <cm:property name="username" value="" /> <cm:property name="password" value="" /> @@ -133,6 +134,7 @@ <property name="hazelcastInstance" ref="hazelcastInstance" /> <property name="itemClassesToCache" value="${es.itemClassesToCache}" /> <property name="useBatchingForSave" value="${es.useBatchingForSave}" /> + <property name="useBatchingForUpdate" value="${es.useBatchingForUpdate}" /> <property name="username" value="${es.username}" /> <property name="password" value="${es.password}" /> diff --git a/persistence-elasticsearch/core/src/main/resources/org.apache.unomi.persistence.elasticsearch.cfg b/persistence-elasticsearch/core/src/main/resources/org.apache.unomi.persistence.elasticsearch.cfg index ac30c91..ce2fb67 100644 --- a/persistence-elasticsearch/core/src/main/resources/org.apache.unomi.persistence.elasticsearch.cfg +++ b/persistence-elasticsearch/core/src/main/resources/org.apache.unomi.persistence.elasticsearch.cfg @@ -74,4 +74,5 @@ sslTrustAllCertificates=${org.apache.unomi.elasticsearch.sslTrustAllCertificates # Errors throwExceptions=${org.apache.unomi.elasticsearch.throwExceptions:-false} -alwaysOverwrite=${org.apache.unomi.elasticsearch.alwaysOverwrite:-true} \ No newline at end of file +alwaysOverwrite=${org.apache.unomi.elasticsearch.alwaysOverwrite:-true} +useBatchingForUpdate=${org.apache.unomi.elasticsearch.useBatchingForUpdate:-true} diff --git a/persistence-spi/src/main/java/org/apache/unomi/persistence/spi/PersistenceService.java b/persistence-spi/src/main/java/org/apache/unomi/persistence/spi/PersistenceService.java index c82731a..18d53c3 100644 --- a/persistence-spi/src/main/java/org/apache/unomi/persistence/spi/PersistenceService.java +++ b/persistence-spi/src/main/java/org/apache/unomi/persistence/spi/PersistenceService.java @@ -25,6 +25,7 @@ import org.apache.unomi.persistence.spi.aggregate.BaseAggregate; import java.util.Date; import java.util.List; import java.util.Map; +import java.util.function.Consumer; /** * A service to provide persistence and retrieval of context server entities. @@ -175,6 +176,17 @@ public interface PersistenceService { boolean update(Item item, Date dateHint, Class<?> clazz, Map<?, ?> source, final boolean alwaysOverwrite); /** + * Updates Map of items of the specified class and identified by the specified identifier with a new property value for the specified property name. Same as + * {@code update(itemId, dateHint, clazz, Collections.singletonMap(propertyName, propertyValue))} + * + * @param items A map the consist of item (key) and properties to update (value) + * @param dateHint a Date helping in identifying where the item is located + * @param clazz the Item subclass of the item to update + * @return List of failed Items Ids, if all succesful then returns an empty list. if the whole operation failed then will return null + */ + List<String> update(Map<Item, Map> items, Date dateHint, Class clazz); + + /** * Updates the item of the specified class and identified by the specified identifier with a new property value for the specified property name. Same as * {@code update(itemId, dateHint, clazz, Collections.singletonMap(propertyName, propertyValue))} * diff --git a/services/pom.xml b/services/pom.xml index 988a30b..4ec7d4a 100644 --- a/services/pom.xml +++ b/services/pom.xml @@ -75,6 +75,12 @@ </dependency> <dependency> + <groupId>net.jodah</groupId> + <artifactId>failsafe</artifactId> + <version>2.4.0</version> + </dependency> + + <dependency> <groupId>com.github.fge</groupId> <artifactId>json-patch</artifactId> <version>1.9</version> diff --git a/services/src/main/java/org/apache/unomi/services/impl/segments/SegmentServiceImpl.java b/services/src/main/java/org/apache/unomi/services/impl/segments/SegmentServiceImpl.java index a0a1a02..363395d 100644 --- a/services/src/main/java/org/apache/unomi/services/impl/segments/SegmentServiceImpl.java +++ b/services/src/main/java/org/apache/unomi/services/impl/segments/SegmentServiceImpl.java @@ -18,7 +18,10 @@ package org.apache.unomi.services.impl.segments; import com.fasterxml.jackson.core.JsonProcessingException; +import net.jodah.failsafe.Failsafe; +import net.jodah.failsafe.RetryPolicy; import org.apache.unomi.api.Event; +import org.apache.unomi.api.Item; import org.apache.unomi.api.Metadata; import org.apache.unomi.api.PartialList; import org.apache.unomi.api.Profile; @@ -46,6 +49,7 @@ import org.slf4j.LoggerFactory; import java.io.IOException; import java.net.URL; import java.security.MessageDigest; +import java.time.Duration; import java.util.*; import java.util.concurrent.TimeUnit; @@ -65,7 +69,10 @@ public class SegmentServiceImpl extends AbstractServiceImpl implements SegmentSe private int segmentUpdateBatchSize = 1000; private long segmentRefreshInterval = 1000; private int aggregateQueryBucketSize = 5000; - + private int maxRetriesForUpdateProfileSegment = 0; + private long secondsDelayForRetryUpdateProfileSegment = 1; + private boolean batchSegmentProfileUpdate = false; + private boolean sendProfileUpdateEventForSegmentUpdate = true; private int maximumIdsQueryCount = 5000; private boolean pastEventsDisablePartitions = false; @@ -109,6 +116,22 @@ public class SegmentServiceImpl extends AbstractServiceImpl implements SegmentSe this.segmentRefreshInterval = segmentRefreshInterval; } + public void setMaxRetriesForUpdateProfileSegment(int maxRetriesForUpdateProfileSegment) { + this.maxRetriesForUpdateProfileSegment = maxRetriesForUpdateProfileSegment; + } + + public void setSecondsDelayForRetryUpdateProfileSegment(long secondsDelayForRetryUpdateProfileSegment) { + this.secondsDelayForRetryUpdateProfileSegment = secondsDelayForRetryUpdateProfileSegment; + } + + public void setBatchSegmentProfileUpdate(boolean batchSegmentProfileUpdate) { + this.batchSegmentProfileUpdate = batchSegmentProfileUpdate; + } + + public void setSendProfileUpdateEventForSegmentUpdate(boolean sendProfileUpdateEventForSegmentUpdate){ + this.sendProfileUpdateEventForSegmentUpdate = sendProfileUpdateEventForSegmentUpdate; + } + public void postConstruct() { logger.debug("postConstruct {" + bundleContext.getBundle() + "}"); loadPredefinedSegments(bundleContext); @@ -361,15 +384,17 @@ public class SegmentServiceImpl extends AbstractServiceImpl implements SegmentSe List<Profile> previousProfiles = persistenceService.query(segmentCondition, null, Profile.class); long updatedProfileCount = 0; long profileRemovalStartTime = System.currentTimeMillis(); - for (Profile profileToRemove : previousProfiles) { - profileToRemove.getSegments().remove(segmentId); - Map<String,Object> sourceMap = new HashMap<>(); - sourceMap.put("segments", profileToRemove.getSegments()); - profileToRemove.setSystemProperty("lastUpdated", new Date()); - sourceMap.put("systemProperties", profileToRemove.getSystemProperties()); - persistenceService.update(profileToRemove, null, Profile.class, sourceMap); - updatedProfileCount++; + if (batchSegmentProfileUpdate && previousProfiles.size() > 0) { + batchUpdateProfilesSegment(segmentId, previousProfiles, false); + } + else { + for (Profile profileToRemove : previousProfiles) { + Map<String, Object> sourceMap = buildPropertiesMapForUpdateSegment(profileToRemove, segmentId, false); + persistenceService.update(profileToRemove, null, Profile.class, sourceMap); + } } + + updatedProfileCount += previousProfiles.size(); logger.info("Removed segment from {} profiles in {} ms", updatedProfileCount, System.currentTimeMillis() - profileRemovalStartTime); // update impacted segments @@ -417,7 +442,6 @@ public class SegmentServiceImpl extends AbstractServiceImpl implements SegmentSe return new DependentMetadata(segments, scorings); } - public PartialList<Profile> getMatchingIndividuals(String segmentID, int offset, int size, String sortBy) { Segment segment = getSegmentDefinition(segmentID); if (segment == null) { @@ -889,14 +913,14 @@ public class SegmentServiceImpl extends AbstractServiceImpl implements SegmentSe private void updateExistingProfilesForSegment(Segment segment) { long updateProfilesForSegmentStartTime = System.currentTimeMillis(); - Condition segmentCondition = new Condition(); - long updatedProfileCount = 0; + final String segmentId = segment.getItemId(); + Condition segmentCondition = new Condition(); segmentCondition.setConditionType(definitionsService.getConditionType("profilePropertyCondition")); segmentCondition.setParameter("propertyName", "segments"); segmentCondition.setParameter("comparisonOperator", "equals"); - segmentCondition.setParameter("propertyValue", segment.getItemId()); + segmentCondition.setParameter("propertyValue", segmentId); if (segment.getMetadata().isEnabled()) { @@ -921,74 +945,93 @@ public class SegmentServiceImpl extends AbstractServiceImpl implements SegmentSe profilesToRemoveSubConditions.add(notNewSegmentCondition); profilesToRemoveCondition.setParameter("subConditions", profilesToRemoveSubConditions); - PartialList<Profile> profilesToRemove = persistenceService.query(profilesToRemoveCondition, null, Profile.class, 0, segmentUpdateBatchSize, "10m"); - PartialList<Profile> profilesToAdd = persistenceService.query(profilesToAddCondition, null, Profile.class, 0, segmentUpdateBatchSize, "10m"); - - while (profilesToAdd.getList().size() > 0) { - long profilesToAddStartTime = System.currentTimeMillis(); - for (Profile profileToAdd : profilesToAdd.getList()) { - profileToAdd.getSegments().add(segment.getItemId()); - Map<String,Object> sourceMap = new HashMap<>(); - sourceMap.put("segments", profileToAdd.getSegments()); - profileToAdd.setSystemProperty("lastUpdated", new Date()); - sourceMap.put("systemProperties", profileToAdd.getSystemProperties()); - persistenceService.update(profileToAdd, null, Profile.class, sourceMap); - Event profileUpdated = new Event("profileUpdated", null, profileToAdd, null, null, profileToAdd, new Date()); - profileUpdated.setPersistent(false); - eventService.send(profileUpdated); - updatedProfileCount++; - } - logger.info("{} profiles added to segment in {}ms", profilesToAdd.size(), System.currentTimeMillis() - profilesToAddStartTime); - profilesToAdd = persistenceService.continueScrollQuery(Profile.class, profilesToAdd.getScrollIdentifier(), profilesToAdd.getScrollTimeValidity()); - if (profilesToAdd == null || profilesToAdd.getList().size() == 0) { - break; - } + updatedProfileCount += updateProfilesSegment(profilesToAddCondition, segmentId, true); + updatedProfileCount += updateProfilesSegment(profilesToRemoveCondition, segmentId, false); + } else { + updatedProfileCount += updateProfilesSegment(segmentCondition, segmentId, false); + } + logger.info("{} profiles updated in {}ms", updatedProfileCount, System.currentTimeMillis() - updateProfilesForSegmentStartTime); + } + + private long updateProfilesSegment(Condition profilesToUpdateCondition, String segmentId, boolean isAdd){ + long updatedProfileCount= 0; + PartialList<Profile> profiles = persistenceService.query(profilesToUpdateCondition, null, Profile.class, 0, segmentUpdateBatchSize, "10m"); + + while (profiles != null && profiles.getList().size() > 0) { + long startTime = System.currentTimeMillis(); + if (batchSegmentProfileUpdate) { + batchUpdateProfilesSegment(segmentId, profiles.getList(), isAdd); } - while (profilesToRemove.getList().size() > 0) { - long profilesToRemoveStartTime = System.currentTimeMillis(); - for (Profile profileToRemove : profilesToRemove.getList()) { - profileToRemove.getSegments().remove(segment.getItemId()); - Map<String,Object> sourceMap = new HashMap<>(); - sourceMap.put("segments", profileToRemove.getSegments()); - profileToRemove.setSystemProperty("lastUpdated", new Date()); - sourceMap.put("systemProperties", profileToRemove.getSystemProperties()); - persistenceService.update(profileToRemove, null, Profile.class, sourceMap); - Event profileUpdated = new Event("profileUpdated", null, profileToRemove, null, null, profileToRemove, new Date()); - profileUpdated.setPersistent(false); - eventService.send(profileUpdated); - updatedProfileCount++; - } - logger.info("{} profiles removed from segment in {}ms", profilesToRemove.size(), System.currentTimeMillis() - profilesToRemoveStartTime ); - profilesToRemove = persistenceService.continueScrollQuery(Profile.class, profilesToRemove.getScrollIdentifier(), profilesToRemove.getScrollTimeValidity()); - if (profilesToRemove == null || profilesToRemove.getList().size() == 0) { - break; + else { //send update profile one by one + for (Profile profileToUpdate : profiles.getList()) { + Map<String, Object> sourceMap = buildPropertiesMapForUpdateSegment(profileToUpdate, segmentId, isAdd); + persistenceService.update(profileToUpdate, null, Profile.class, sourceMap); } } + if (sendProfileUpdateEventForSegmentUpdate) + sendProfileUpdatedEvent(profiles.getList()); - } else { - PartialList<Profile> profilesToRemove = persistenceService.query(segmentCondition, null, Profile.class, 0, 200, "10m"); - while (profilesToRemove.getList().size() > 0) { - long profilesToRemoveStartTime = System.currentTimeMillis(); - for (Profile profileToRemove : profilesToRemove.getList()) { - profileToRemove.getSegments().remove(segment.getItemId()); - Map<String,Object> sourceMap = new HashMap<>(); - sourceMap.put("segments", profileToRemove.getSegments()); - profileToRemove.setSystemProperty("lastUpdated", new Date()); - sourceMap.put("systemProperties", profileToRemove.getSystemProperties()); - persistenceService.update(profileToRemove, null, Profile.class, sourceMap); - Event profileUpdated = new Event("profileUpdated", null, profileToRemove, null, null, profileToRemove, new Date()); - profileUpdated.setPersistent(false); - eventService.send(profileUpdated); - updatedProfileCount++; - } - logger.info("{} profiles removed from segment in {}ms", profilesToRemove.size(), System.currentTimeMillis() - profilesToRemoveStartTime); - profilesToRemove = persistenceService.continueScrollQuery(Profile.class, profilesToRemove.getScrollIdentifier(), profilesToRemove.getScrollTimeValidity()); - if (profilesToRemove == null || profilesToRemove.getList().size() == 0) { - break; - } - } + updatedProfileCount += profiles.size(); + logger.info("{} profiles {} to segment {} in {}ms", profiles.size(), isAdd ? "added" : "removed", segmentId, System.currentTimeMillis() - startTime); + + profiles = persistenceService.continueScrollQuery(Profile.class, profiles.getScrollIdentifier(), profiles.getScrollTimeValidity()); } - logger.info("{} profiles updated in {}ms", updatedProfileCount, System.currentTimeMillis() - updateProfilesForSegmentStartTime); + + return updatedProfileCount; + } + + private void batchUpdateProfilesSegment(String segmentId, List<Profile> profiles, boolean isAdd) { + Map<Item, Map> profileToPropertiesMap = new HashMap<>(); + for (Profile profileToUpdate : profiles) { + Map<String,Object> propertiesToUpdate = buildPropertiesMapForUpdateSegment(profileToUpdate, segmentId, isAdd); + profileToPropertiesMap.put(profileToUpdate, propertiesToUpdate); + } + List<String> failedItemsIds = persistenceService.update(profileToPropertiesMap, null, Profile.class); + if (failedItemsIds != null) + failedItemsIds.forEach(s -> retryFailedSegmentUpdate(s, segmentId, isAdd)); + } + + private void retryFailedSegmentUpdate(String profileId, String segmentId, boolean isAdd){ + if (maxRetriesForUpdateProfileSegment > 0){ + RetryPolicy retryPolicy = new RetryPolicy() + .withDelay(Duration.ofSeconds(secondsDelayForRetryUpdateProfileSegment)) + .withMaxRetries(maxRetriesForUpdateProfileSegment); + + Failsafe.with(retryPolicy). + run(executionContext -> { + logger.warn("retry updating profile segment {}, profile {}, time {}", segmentId, profileId, new Date()); + Profile profileToAddUpdated = persistenceService.load(profileId, Profile.class); + Map<String, Object> sourceMapToUpdate = buildPropertiesMapForUpdateSegment(profileToAddUpdated, segmentId, isAdd); + boolean isUpdated = persistenceService.update(profileToAddUpdated, null, Profile.class, sourceMapToUpdate); + if (isUpdated == false) + throw new Exception(String.format("failed retry update profile segment {}, profile {}, time {}", segmentId, profileId, new Date())); + }); + } + } + + private void sendProfileUpdatedEvent(List<Profile> profiles) { + for (Profile profileToAdd : profiles) { + sendProfileUpdatedEvent(profileToAdd); + } + } + + private void sendProfileUpdatedEvent(Profile profile) { + Event profileUpdated = new Event("profileUpdated", null, profile, null, null, profile, new Date()); + profileUpdated.setPersistent(false); + eventService.send(profileUpdated); + } + + private Map<String, Object> buildPropertiesMapForUpdateSegment(Profile profile, String segmentId, boolean isAdd) { + if (isAdd) + profile.getSegments().add(segmentId); + else + profile.getSegments().remove(segmentId); + + Map<String, Object> sourceMap = new HashMap<>(); + sourceMap.put("segments", profile.getSegments()); + profile.setSystemProperty("lastUpdated", new Date()); + sourceMap.put("systemProperties", profile.getSystemProperties()); + return sourceMap; } private void updateExistingProfilesForScoring(Scoring scoring) { diff --git a/services/src/main/resources/OSGI-INF/blueprint/blueprint.xml b/services/src/main/resources/OSGI-INF/blueprint/blueprint.xml index 3e7b0eb..3e7f10b 100644 --- a/services/src/main/resources/OSGI-INF/blueprint/blueprint.xml +++ b/services/src/main/resources/OSGI-INF/blueprint/blueprint.xml @@ -34,7 +34,11 @@ <cm:property name="definitions.refresh.interval" value="10000"/> <cm:property name="properties.refresh.interval" value="10000"/> <cm:property name="segment.refresh.interval" value="1000"/> + <cm:property name="segment.max.retries.update.profile.segment" value="5"/> + <cm:property name="segment.retry.update.segment.seconds.delay" value="1"/> <cm:property name="segment.recalculate.period" value="1"/> + <cm:property name="segment.batch.update" value="false"/> + <cm:property name="segment.send.profile.update.event" value="true"/> <cm:property name="rules.refresh.interval" value="1000"/> <cm:property name="rules.statistics.refresh.interval" value="10000"/> </cm:default-properties> @@ -174,6 +178,11 @@ <property name="aggregateQueryBucketSize" value="${es.aggregateQueryBucketSize}" /> <property name="pastEventsDisablePartitions" value="${es.pastEventsDisablePartitions}"/> <property name="maximumIdsQueryCount" value="${es.maximumIdsQueryCount}"/> + <property name="maxRetriesForUpdateProfileSegment" value="${services.segment.max.retries.update.profile.segment}" /> + <property name="secondsDelayForRetryUpdateProfileSegment" value="${services.segment.retry.update.segment.seconds.delay}" /> + <property name="batchSegmentProfileUpdate" value="${services.segment.batch.update}" /> + <property name="sendProfileUpdateEventForSegmentUpdate" value="${services.segment.send.profile.update.event}" /> + </bean> <service id="segmentService" ref="segmentServiceImpl"> <interfaces> diff --git a/services/src/main/resources/org.apache.unomi.services.cfg b/services/src/main/resources/org.apache.unomi.services.cfg index 08295cb..37c9881 100644 --- a/services/src/main/resources/org.apache.unomi.services.cfg +++ b/services/src/main/resources/org.apache.unomi.services.cfg @@ -34,6 +34,18 @@ event.purge.existTime=${org.apache.unomi.event.purge.existTime:-12} # profiles that need updating segment.update.batchSize=${org.apache.unomi.segment.update.batchSize:-1000} +# When performing segment updates, can retry an update in case of an error to a single profile +segment.max.retries.update.profile.segment=${org.apache.unomi.services.segment.max.retries.update.profile.segment:-0} + +# When performing retry of segment update after a request was failed, delay of requests +segment.retry.update.segment.seconds.delay=${org.apache.unomi.services.segment.update.segment.retry.seconds.delay:-1} + +# Run Batch request separately for updating segments in profiles +segment.batch.update=${org.apache.unomi.segment.batch.update:-false} + +# Send Profile Updated Event for every profile segment update +segment.send.profile.update.event=${org.apache.unomi.segment.send.profile.update.event:-true} + # The interval in milliseconds to use to reload the definitions (condition types and action types) definitions.refresh.interval=${org.apache.unomi.definitions.refresh.interval:-10000}
