This is an automated email from the ASF dual-hosted git repository. jkevan pushed a commit to branch avoidScrollDuringMerge in repository https://gitbox.apache.org/repos/asf/unomi.git
commit 13afd61c372074140a02d962f2be71836e8e3755 Author: Kevan <ke...@jahia.com> AuthorDate: Thu May 4 23:34:03 2023 +0200 UNOMI-764: merge backport + no scroll queries anymore during merge --- .../org/apache/unomi/itests/ProfileMergeIT.java | 327 ++++++++++++++++++- .../main/resources/etc/custom.system.properties | 2 +- persistence-elasticsearch/core/pom.xml | 9 +- .../ElasticSearchPersistenceServiceImpl.java | 32 +- .../actions/MergeProfilesOnPropertyAction.java | 359 ++++++++++----------- .../META-INF/cxs/painless/updateProfileId.painless | 32 ++ .../resources/OSGI-INF/blueprint/blueprint.xml | 8 +- .../resources/org.apache.unomi.plugins.base.cfg | 2 +- .../services/impl/segments/SegmentServiceImpl.java | 21 -- 9 files changed, 560 insertions(+), 232 deletions(-) diff --git a/itests/src/test/java/org/apache/unomi/itests/ProfileMergeIT.java b/itests/src/test/java/org/apache/unomi/itests/ProfileMergeIT.java index fb1cc40de..afe92c142 100644 --- a/itests/src/test/java/org/apache/unomi/itests/ProfileMergeIT.java +++ b/itests/src/test/java/org/apache/unomi/itests/ProfileMergeIT.java @@ -19,13 +19,11 @@ package org.apache.unomi.itests; import org.apache.unomi.api.Event; import org.apache.unomi.api.Metadata; import org.apache.unomi.api.Profile; +import org.apache.unomi.api.Session; import org.apache.unomi.api.actions.Action; import org.apache.unomi.api.conditions.Condition; import org.apache.unomi.api.rules.Rule; -import org.apache.unomi.api.services.DefinitionsService; -import org.apache.unomi.api.services.EventService; -import org.apache.unomi.api.services.ProfileService; -import org.apache.unomi.api.services.RulesService; +import org.apache.unomi.api.services.*; import org.junit.After; import org.junit.Assert; import org.junit.Test; @@ -52,6 +50,8 @@ public class ProfileMergeIT extends BaseIT { @Inject @Filter(timeout = 600000) protected EventService eventService; @Inject @Filter(timeout = 600000) + protected PrivacyService privacyService; + @Inject @Filter(timeout = 600000) protected RulesService rulesService; @Inject @Filter(timeout = 600000) protected DefinitionsService definitionsService; @@ -63,14 +63,15 @@ public class ProfileMergeIT extends BaseIT { private final static String TEST_PROFILE_ID = "mergeOnPropertyTestProfileId"; @After - public void after() { + public void after() throws InterruptedException { // cleanup created data rulesService.removeRule(TEST_RULE_ID); + removeItems(Profile.class, Event.class, Session.class); } @Test public void testProfileMergeOnPropertyAction_dont_forceEventProfileAsMaster() throws InterruptedException { - createAndWaitForRule(createMergeOnPropertyRule(false)); + createAndWaitForRule(createMergeOnPropertyRule(false, "j:nodename")); // A new profile should be created. Assert.assertNotEquals(sendEvent().getProfile().getItemId(), TEST_PROFILE_ID); @@ -78,12 +79,314 @@ public class ProfileMergeIT extends BaseIT { @Test public void testProfileMergeOnPropertyAction_forceEventProfileAsMaster() throws InterruptedException { - createAndWaitForRule(createMergeOnPropertyRule(true)); + createAndWaitForRule(createMergeOnPropertyRule(true, "j:nodename")); // No new profile should be created, instead the profile of the event should be used. Assert.assertEquals(sendEvent().getProfile().getItemId(), TEST_PROFILE_ID); } + @Test + public void testProfileMergeOnPropertyAction_simpleMerge() throws InterruptedException { + // create rule + createAndWaitForRule(createMergeOnPropertyRule(false, "email")); + + // create master profile + Profile masterProfile = new Profile(); + masterProfile.setItemId("masterProfileID"); + masterProfile.setProperty("email", "usern...@domain.com"); + masterProfile.setSystemProperty("mergeIdentifier", "usern...@domain.com"); + profileService.save(masterProfile); + + // create event profile + Profile eventProfile = new Profile(); + eventProfile.setItemId("eventProfileID"); + eventProfile.setProperty("email", "usern...@domain.com"); + profileService.save(eventProfile); + + keepTrying("Profile with id masterProfileID not found in the required time", () -> profileService.load("masterProfileID"), + Objects::nonNull, DEFAULT_TRYING_TIMEOUT, DEFAULT_TRYING_TRIES); + keepTrying("Profile with id eventProfileID not found in the required time", () -> profileService.load("eventProfileID"), + Objects::nonNull, DEFAULT_TRYING_TIMEOUT, DEFAULT_TRYING_TRIES); + Event event = new Event(TEST_EVENT_TYPE, null, eventProfile, null, null, eventProfile, new Date()); + + eventService.send(event); + + Assert.assertNotNull(event.getProfile()); + + keepTrying("Profile with id eventProfileID should still be accessible but marked as mergedWith", + () -> profileService.load("eventProfileID"), (profile -> profile != null && "masterProfileID".equals(profile.getMergedWith())), + DEFAULT_TRYING_TIMEOUT, DEFAULT_TRYING_TRIES); + } + + + /** + * User switch case, this case can happen when a person (user A) is using the same browser session of a previous logged user (user B). + * user A will be using user B profile, but when user A is going to login by send a merge event, then we will detect that the mergeIdentifier is not the same + * In this case we will just switch user A profile to: + * - a new one, if it's the first time we encounter his own mergeIdentifier (TESTED in this scenario) + * - a previous one, if we already have a profile in DB with the same mergeIdentifier. + */ + @Test + public void testProfileMergeOnPropertyAction_sessionReassigned_newProfile() throws InterruptedException { + // create rule + createAndWaitForRule(createMergeOnPropertyRule(false, "email")); + + // create master profile + Profile masterProfile = new Profile(); + masterProfile.setItemId("masterProfileID"); + masterProfile.setProperty("email", "mas...@domain.com"); + masterProfile.setSystemProperty("mergeIdentifier", "mas...@domain.com"); + profileService.save(masterProfile); + + keepTrying("Profile with id masterProfileID not found in the required time", () -> profileService.load("masterProfileID"), + Objects::nonNull, DEFAULT_TRYING_TIMEOUT, DEFAULT_TRYING_TRIES); + + // create event profile + Profile eventProfile = new Profile(); + eventProfile.setItemId("eventProfileID"); + eventProfile.setProperty("email", "ev...@domain.com"); + + Session simpleSession = new Session("simpleSession", eventProfile, new Date(), null); + Event event = new Event(TEST_EVENT_TYPE, simpleSession, masterProfile, null, null, eventProfile, new Date()); + eventService.send(event); + + // Session should have been reassign and a new profile should have been created ! (We call this user switch case) + Assert.assertNotNull(event.getProfile()); + Assert.assertNotEquals("eventProfileID", event.getProfile().getItemId()); + Assert.assertNotEquals("eventProfileID", event.getProfileId()); + Assert.assertNotEquals("eventProfileID", event.getSession().getProfile().getItemId()); + Assert.assertNotEquals("eventProfileID", event.getSession().getProfileId()); + + Assert.assertNotEquals("masterProfileID", event.getProfile().getItemId()); + Assert.assertNotEquals("masterProfileID", event.getProfileId()); + Assert.assertNotEquals("masterProfileID", event.getSession().getProfile().getItemId()); + Assert.assertNotEquals("masterProfileID", event.getSession().getProfileId()); + + Assert.assertEquals(event.getSession().getProfileId(), event.getProfileId()); + Assert.assertEquals("ev...@domain.com", event.getProfile().getSystemProperties().get("mergeIdentifier")); + } + + /** + * User switch case, this case can happen when a person (user A) is using the same browser session of a previous logged user (user B). + * user A will be using user B profile, but when user A is going to login by send a merge event, then we will detect that the mergeIdentifier is not the same + * In this case we will just switch user A profile to: + * - a new one, if it's the first time we encounter his own mergeIdentifier + * - a previous one, if we already have a profile in DB with the same mergeIdentifier. (TESTED in this scenario) + */ + @Test + public void testProfileMergeOnPropertyAction_sessionReassigned_existingProfile() throws InterruptedException { + // create rule + createAndWaitForRule(createMergeOnPropertyRule(false, "email")); + + // create master profile + Profile masterProfile = new Profile(); + masterProfile.setItemId("masterProfileID"); + masterProfile.setProperty("email", "mas...@domain.com"); + masterProfile.setSystemProperty("mergeIdentifier", "mas...@domain.com"); + profileService.save(masterProfile); + + // create a previous existing profile with same mergeIdentifier + Profile previousProfile = new Profile(); + previousProfile.setItemId("previousProfileID"); + previousProfile.setProperty("email", "ev...@domain.com"); + previousProfile.setSystemProperty("mergeIdentifier", "ev...@domain.com"); + profileService.save(previousProfile); + + keepTrying("Profile with id masterProfileID not found in the required time", () -> profileService.load("masterProfileID"), + Objects::nonNull, DEFAULT_TRYING_TIMEOUT, DEFAULT_TRYING_TRIES); + keepTrying("Profile with id previousProfileID not found in the required time", () -> profileService.load("previousProfileID"), + Objects::nonNull, DEFAULT_TRYING_TIMEOUT, DEFAULT_TRYING_TRIES); + + // create event profile + Profile eventProfile = new Profile(); + eventProfile.setItemId("eventProfileID"); + eventProfile.setProperty("email", "ev...@domain.com"); + + Session simpleSession = new Session("simpleSession", eventProfile, new Date(), null); + Event event = new Event(TEST_EVENT_TYPE, simpleSession, masterProfile, null, null, eventProfile, new Date()); + eventService.send(event); + + // Session should have been reassign and the previous existing profile for mergeIdentifier: ev...@domain.com should have been reuse + // Session should have been reassign and a new profile should have been created ! (We call this user switch case) + Assert.assertNotNull(event.getProfile()); + Assert.assertEquals("previousProfileID", event.getProfile().getItemId()); + Assert.assertEquals("previousProfileID", event.getProfileId()); + Assert.assertEquals("previousProfileID", event.getSession().getProfile().getItemId()); + Assert.assertEquals("previousProfileID", event.getSession().getProfileId()); + + Assert.assertEquals(event.getSession().getProfileId(), event.getProfileId()); + Assert.assertEquals("ev...@domain.com", event.getProfile().getSystemProperties().get("mergeIdentifier")); + } + + /** + * In case of merge, existing sessions/events from previous profileId should be rewritten to use the new master profileId + */ + @Test + public void testProfileMergeOnPropertyAction_rewriteExistingSessionsEvents() throws InterruptedException { + Condition matchAll = new Condition(definitionsService.getConditionType("matchAllCondition")); + // create rule + createAndWaitForRule(createMergeOnPropertyRule(false, "email")); + + // create master profile + Profile masterProfile = new Profile(); + masterProfile.setItemId("masterProfileID"); + masterProfile.setProperty("email", "usern...@domain.com"); + masterProfile.setSystemProperty("mergeIdentifier", "usern...@domain.com"); + profileService.save(masterProfile); + + Profile eventProfile = new Profile(); + eventProfile.setItemId("eventProfileID"); + eventProfile.setProperty("email", "usern...@domain.com"); + profileService.save(eventProfile); + + // create 5 past sessions and 5 past events. + List<Session> sessionsToBeRewritten = new ArrayList<>(); + List<Event> eventsToBeRewritten = new ArrayList<>(); + for (int i = 1; i <= 5; i++) { + Session sessionToBeRewritten = new Session("simpleSession_"+ i, eventProfile, new Date(), null); + sessionsToBeRewritten.add(sessionToBeRewritten); + Event eventToBeRewritten = new Event("view", sessionToBeRewritten, eventProfile, null, null, null, new Date()); + eventsToBeRewritten.add(eventToBeRewritten); + + + persistenceService.save(sessionToBeRewritten); + persistenceService.save(eventToBeRewritten); + } + for (Session session : sessionsToBeRewritten) { + keepTrying("Wait for session: " + session.getItemId() + " to be indexed", + () -> persistenceService.query("itemId", session.getItemId(), null, Session.class), + (list) -> list.size() == 1, DEFAULT_TRYING_TIMEOUT, DEFAULT_TRYING_TRIES); + } + for (Event event : eventsToBeRewritten) { + keepTrying("Wait for event: " + event.getItemId() + " to be indexed", + () -> persistenceService.query("itemId", event.getItemId(), null, Event.class), + (list) -> list.size() == 1, DEFAULT_TRYING_TIMEOUT, DEFAULT_TRYING_TRIES); + } + keepTrying("Profile with id masterProfileID not found in the required time", () -> profileService.load("masterProfileID"), + Objects::nonNull, DEFAULT_TRYING_TIMEOUT, DEFAULT_TRYING_TRIES); + keepTrying("Profile with id eventProfileID not found in the required time", () -> profileService.load("eventProfileID"), + Objects::nonNull, DEFAULT_TRYING_TIMEOUT, DEFAULT_TRYING_TRIES); + + // Trigger the merge + Session simpleSession = new Session("simpleSession", eventProfile, new Date(), null); + Event mergeEvent = new Event(TEST_EVENT_TYPE, simpleSession, eventProfile, null, null, eventProfile, new Date()); + eventService.send(mergeEvent); + + // Check that master profile is now used: + Assert.assertNotNull(mergeEvent.getProfile()); + Assert.assertEquals("masterProfileID", mergeEvent.getProfile().getItemId()); + Assert.assertEquals("masterProfileID", mergeEvent.getProfileId()); + Assert.assertEquals("masterProfileID", mergeEvent.getSession().getProfile().getItemId()); + Assert.assertEquals("masterProfileID", mergeEvent.getSession().getProfileId()); + Assert.assertEquals(mergeEvent.getSession().getProfileId(), mergeEvent.getProfileId()); + Assert.assertEquals("usern...@domain.com", mergeEvent.getProfile().getSystemProperties().get("mergeIdentifier")); + + // Check events are correctly rewritten + for (Event event : eventsToBeRewritten) { + keepTrying("Wait for event: " + event.getItemId() + " profileId to be rewritten for masterProfileID", + () -> persistenceService.load(event.getItemId(), Event.class), + (loadedEvent) -> loadedEvent.getProfileId().equals("masterProfileID"), DEFAULT_TRYING_TIMEOUT, DEFAULT_TRYING_TRIES); + } + + // Check sessions are correctly rewritten + Condition sessionProfileIDRewrittenCondition = new Condition(definitionsService.getConditionType("sessionPropertyCondition")); + sessionProfileIDRewrittenCondition.setParameter("propertyName","profileId"); + sessionProfileIDRewrittenCondition.setParameter("comparisonOperator","equals"); + sessionProfileIDRewrittenCondition.setParameter("propertyValue","masterProfileID"); + keepTrying("Wait for sessions profileId to be rewritten to masterProfileID", + () -> persistenceService.queryCount(sessionProfileIDRewrittenCondition, Session.ITEM_TYPE), + (count) -> count == 5, DEFAULT_TRYING_TIMEOUT, DEFAULT_TRYING_TRIES); + + for (Session session : sessionsToBeRewritten) { + keepTrying("Wait for session: " + session.getItemId() + " profileId to be rewritten for masterProfileID", + () -> persistenceService.load(session.getItemId(), Session.class), + (loadedSession) -> loadedSession.getProfileId().equals("masterProfileID"), DEFAULT_TRYING_TIMEOUT, DEFAULT_TRYING_TRIES); + } + } + + /** + * If master profile is flagged as anonymous profile, then after the merge all past sessions/events should be anonymized + */ + @Test + public void testProfileMergeOnPropertyAction_rewriteExistingSessionsEventsAnonymous() throws InterruptedException { + Condition matchAll = new Condition(definitionsService.getConditionType("matchAllCondition")); + // create rule + createAndWaitForRule(createMergeOnPropertyRule(false, "email")); + + // create master profile + Profile masterProfile = new Profile(); + masterProfile.setItemId("masterProfileID"); + masterProfile.setProperty("email", "usern...@domain.com"); + masterProfile.setSystemProperty("mergeIdentifier", "usern...@domain.com"); + profileService.save(masterProfile); + privacyService.setRequireAnonymousBrowsing(masterProfile.getItemId(), true, null); + + Profile eventProfile = new Profile(); + eventProfile.setItemId("eventProfileID"); + eventProfile.setProperty("email", "usern...@domain.com"); + profileService.save(eventProfile); + + // create 5 sessions and 5 events for master profile. + List<Session> sessionsToBeRewritten = new ArrayList<>(); + List<Event> eventsToBeRewritten = new ArrayList<>(); + for (int i = 1; i <= 5; i++) { + Session sessionToBeRewritten = new Session("simpleSession_"+ i, eventProfile, new Date(), null); + sessionsToBeRewritten.add(sessionToBeRewritten); + Event eventToBeRewritten = new Event("view", sessionToBeRewritten, eventProfile, null, null, null, new Date()); + eventsToBeRewritten.add(eventToBeRewritten); + + persistenceService.save(sessionToBeRewritten); + persistenceService.save(eventToBeRewritten); + } + for (Session session : sessionsToBeRewritten) { + keepTrying("Wait for session: " + session.getItemId() + " to be indexed", + () -> persistenceService.query("itemId", session.getItemId(), null, Session.class), + (list) -> list.size() == 1, DEFAULT_TRYING_TIMEOUT, DEFAULT_TRYING_TRIES); + } + for (Event event : eventsToBeRewritten) { + keepTrying("Wait for event: " + event.getItemId() + " to be indexed", + () -> persistenceService.query("itemId", event.getItemId(), null, Event.class), + (list) -> list.size() == 1, DEFAULT_TRYING_TIMEOUT, DEFAULT_TRYING_TRIES); + } + keepTrying("Profile with id masterProfileID (should required anonymous browsing) not found in the required time", + () -> profileService.load("masterProfileID"), + profile -> profile != null && privacyService.isRequireAnonymousBrowsing(profile), DEFAULT_TRYING_TIMEOUT, DEFAULT_TRYING_TRIES); + keepTrying("Profile with id eventProfileID not found in the required time", () -> profileService.load("eventProfileID"), + Objects::nonNull, DEFAULT_TRYING_TIMEOUT, DEFAULT_TRYING_TRIES); + + // Trigger the merge + Session simpleSession = new Session("simpleSession", eventProfile, new Date(), null); + Event mergeEvent = new Event(TEST_EVENT_TYPE, simpleSession, eventProfile, null, null, eventProfile, new Date()); + eventService.send(mergeEvent); + + // Check that master profile is now used, but anonymous browsing is respected: + Assert.assertNotNull(mergeEvent.getProfile()); + Assert.assertEquals("masterProfileID", mergeEvent.getProfile().getItemId()); // We still have profile in the event + Assert.assertNull(mergeEvent.getProfileId()); // But profileId prop is null due to anonymous browsing + Assert.assertNull(mergeEvent.getSession().getProfile().getItemId()); // Same for the event session + Assert.assertNull(mergeEvent.getSession().getProfileId()); + Assert.assertEquals(mergeEvent.getSession().getProfileId(), mergeEvent.getProfileId()); + Assert.assertEquals("usern...@domain.com", mergeEvent.getProfile().getSystemProperties().get("mergeIdentifier")); + + // Check events are correctly rewritten (Anonymous !) + for (Event event : eventsToBeRewritten) { + keepTrying("Wait for event: " + event.getItemId() + " profileId to be rewritten for NULL due to anonymous browsing", + () -> persistenceService.load(event.getItemId(), Event.class), + (loadedEvent) -> loadedEvent.getProfileId() == null, DEFAULT_TRYING_TIMEOUT, DEFAULT_TRYING_TRIES); + } + + // Check sessions are correctly rewritten (Anonymous !) + for (Session session : sessionsToBeRewritten) { + keepTrying("Wait for session: " + session.getItemId() + " profileId to be rewritten for NULL due to anonymous browsing", + () -> persistenceService.load(session.getItemId(), Session.class), + (loadedSession) -> loadedSession.getProfileId() == null, DEFAULT_TRYING_TIMEOUT, DEFAULT_TRYING_TRIES); + } + } + + /** + * Personalization strategy have a specific handling during the merge of two profiles + * This test is here to ensure this specific behavior is correctly working. + */ @Test public void testPersonalizationStrategyStatusMerge() { // create some statuses for the tests: @@ -157,6 +460,7 @@ public class ProfileMergeIT extends BaseIT { } } } + private Event sendEvent() { Profile profile = new Profile(); profile.setProperties(new HashMap<>()); @@ -168,16 +472,17 @@ public class ProfileMergeIT extends BaseIT { return testEvent; } - private Rule createMergeOnPropertyRule(boolean forceEventProfileAsMaster) throws InterruptedException { + private Rule createMergeOnPropertyRule(boolean forceEventProfileAsMaster, String eventProperty) throws InterruptedException { Rule mergeOnPropertyTestRule = new Rule(); - mergeOnPropertyTestRule.setMetadata(new Metadata(null, TEST_RULE_ID, TEST_RULE_ID, "Test rule for testing MergeProfilesOnPropertyAction")); + mergeOnPropertyTestRule + .setMetadata(new Metadata(null, TEST_RULE_ID, TEST_RULE_ID, "Test rule for testing MergeProfilesOnPropertyAction")); Condition condition = new Condition(definitionsService.getConditionType("eventTypeCondition")); condition.setParameter("eventTypeId", TEST_EVENT_TYPE); mergeOnPropertyTestRule.setCondition(condition); - final Action mergeProfilesOnPropertyAction = new Action( definitionsService.getActionType( "mergeProfilesOnPropertyAction")); - mergeProfilesOnPropertyAction.setParameter("mergeProfilePropertyValue", "eventProperty::target.properties(j:nodename)"); + final Action mergeProfilesOnPropertyAction = new Action(definitionsService.getActionType("mergeProfilesOnPropertyAction")); + mergeProfilesOnPropertyAction.setParameter("mergeProfilePropertyValue", "eventProperty::target.properties(" + eventProperty + ")"); mergeProfilesOnPropertyAction.setParameter("mergeProfilePropertyName", "mergeIdentifier"); mergeProfilesOnPropertyAction.setParameter("forceEventProfileAsMaster", forceEventProfileAsMaster); mergeOnPropertyTestRule.setActions(Collections.singletonList(mergeProfilesOnPropertyAction)); diff --git a/package/src/main/resources/etc/custom.system.properties b/package/src/main/resources/etc/custom.system.properties index aa9c94290..0628777f2 100644 --- a/package/src/main/resources/etc/custom.system.properties +++ b/package/src/main/resources/etc/custom.system.properties @@ -314,7 +314,7 @@ org.apache.unomi.mail.server.sslOnConnect=${env:UNOMI_MAIL_SSLONCONNECT:-true} ####################################################################################################################### ## baseplugin settings ## ####################################################################################################################### -org.apache.unomi.plugins.base.maxProfilesInOneMerge=${env:UNOMI_MAX_PROFILES_IN_ONE_MERGE:--1} +org.apache.unomi.plugins.base.maxProfilesInOneMerge=${env:UNOMI_MAX_PROFILES_IN_ONE_MERGE:-50} ####################################################################################################################### ## Security settings ## diff --git a/persistence-elasticsearch/core/pom.xml b/persistence-elasticsearch/core/pom.xml index 29a619cec..76759d181 100644 --- a/persistence-elasticsearch/core/pom.xml +++ b/persistence-elasticsearch/core/pom.xml @@ -189,6 +189,10 @@ <groupId>commons-collections</groupId> <artifactId>commons-collections</artifactId> </dependency> + <dependency> + <groupId>commons-io</groupId> + <artifactId>commons-io</artifactId> + </dependency> <dependency> <groupId>org.apache.unomi</groupId> @@ -202,11 +206,6 @@ <artifactId>junit</artifactId> <scope>test</scope> </dependency> - <dependency> - <groupId>commons-io</groupId> - <artifactId>commons-io</artifactId> - <scope>test</scope> - </dependency> <dependency> <groupId>com.hazelcast</groupId> <artifactId>hazelcast-all</artifactId> diff --git a/persistence-elasticsearch/core/src/main/java/org/apache/unomi/persistence/elasticsearch/ElasticSearchPersistenceServiceImpl.java b/persistence-elasticsearch/core/src/main/java/org/apache/unomi/persistence/elasticsearch/ElasticSearchPersistenceServiceImpl.java index 80f333cdf..6c0a0fa63 100644 --- a/persistence-elasticsearch/core/src/main/java/org/apache/unomi/persistence/elasticsearch/ElasticSearchPersistenceServiceImpl.java +++ b/persistence-elasticsearch/core/src/main/java/org/apache/unomi/persistence/elasticsearch/ElasticSearchPersistenceServiceImpl.java @@ -19,6 +19,8 @@ package org.apache.unomi.persistence.elasticsearch; import com.fasterxml.jackson.core.type.TypeReference; import com.fasterxml.jackson.databind.ObjectMapper; +import org.apache.commons.io.FilenameUtils; +import org.apache.commons.io.IOUtils; import org.apache.commons.lang3.StringUtils; import org.apache.http.HttpHost; import org.apache.http.auth.AuthScope; @@ -118,8 +120,10 @@ import javax.net.ssl.TrustManager; import javax.net.ssl.X509TrustManager; import java.io.BufferedReader; import java.io.IOException; +import java.io.InputStream; import java.io.InputStreamReader; import java.net.URL; +import java.nio.charset.StandardCharsets; import java.security.KeyManagementException; import java.security.NoSuchAlgorithmException; import java.security.SecureRandom; @@ -436,11 +440,13 @@ public class ElasticSearchPersistenceServiceImpl implements PersistenceService, } loadPredefinedMappings(bundleContext, false); + loadPainlessScripts(bundleContext); // load predefined mappings and condition dispatchers of any bundles that were started before this one. for (Bundle existingBundle : bundleContext.getBundles()) { if (existingBundle.getBundleContext() != null) { loadPredefinedMappings(existingBundle.getBundleContext(), false); + loadPainlessScripts(existingBundle.getBundleContext()); } } @@ -656,6 +662,7 @@ public class ElasticSearchPersistenceServiceImpl implements PersistenceService, switch (event.getType()) { case BundleEvent.STARTING: loadPredefinedMappings(event.getBundle().getBundleContext(), true); + loadPainlessScripts(event.getBundle().getBundleContext()); break; } } @@ -692,6 +699,29 @@ public class ElasticSearchPersistenceServiceImpl implements PersistenceService, } } + private void loadPainlessScripts(BundleContext bundleContext) { + Enumeration<URL> scriptsURL = bundleContext.getBundle().findEntries("META-INF/cxs/painless", "*.painless", true); + if (scriptsURL == null) { + return; + } + + Map<String, String> scriptsById = new HashMap<>(); + while (scriptsURL.hasMoreElements()) { + URL scriptURL = scriptsURL.nextElement(); + logger.info("Found painless script at " + scriptURL + ", loading... "); + try (InputStream in = scriptURL.openStream()) { + String script = IOUtils.toString(in, StandardCharsets.UTF_8); + String scriptId = FilenameUtils.getBaseName(scriptURL.getPath()); + scriptsById.put(scriptId, script); + } catch (Exception e) { + logger.error("Error while loading painless script " + scriptURL, e); + } + + } + + storeScripts(scriptsById); + } + private String loadMappingFile(URL predefinedMappingURL) throws IOException { BufferedReader reader = new BufferedReader(new InputStreamReader(predefinedMappingURL.openStream())); @@ -977,7 +1007,7 @@ public class ElasticSearchPersistenceServiceImpl implements PersistenceService, try { String itemType = Item.getItemType(clazz); - String index = getIndex(itemType, dateHint); + String index = getIndexNameForQuery(itemType); for (int i = 0; i < scripts.length; i++) { RefreshRequest refreshRequest = new RefreshRequest(index); diff --git a/plugins/baseplugin/src/main/java/org/apache/unomi/plugins/baseplugin/actions/MergeProfilesOnPropertyAction.java b/plugins/baseplugin/src/main/java/org/apache/unomi/plugins/baseplugin/actions/MergeProfilesOnPropertyAction.java index 7a3782079..3d90f6317 100644 --- a/plugins/baseplugin/src/main/java/org/apache/unomi/plugins/baseplugin/actions/MergeProfilesOnPropertyAction.java +++ b/plugins/baseplugin/src/main/java/org/apache/unomi/plugins/baseplugin/actions/MergeProfilesOnPropertyAction.java @@ -18,13 +18,9 @@ package org.apache.unomi.plugins.baseplugin.actions; import org.apache.commons.lang3.StringUtils; -import org.apache.unomi.api.Event; -import org.apache.unomi.api.Persona; -import org.apache.unomi.api.Profile; -import org.apache.unomi.api.Session; +import org.apache.unomi.api.*; import org.apache.unomi.api.actions.Action; import org.apache.unomi.api.actions.ActionExecutor; -import org.apache.unomi.api.actions.ActionPostExecutor; import org.apache.unomi.api.conditions.Condition; import org.apache.unomi.api.services.*; import org.apache.unomi.persistence.spi.PersistenceService; @@ -35,6 +31,8 @@ import javax.servlet.ServletResponse; import javax.servlet.http.HttpServletRequest; import javax.servlet.http.HttpServletResponse; import java.util.*; +import java.util.concurrent.TimeUnit; +import java.util.stream.Collectors; public class MergeProfilesOnPropertyAction implements ActionExecutor { private static final Logger logger = LoggerFactory.getLogger(MergeProfilesOnPropertyAction.class.getName()); @@ -45,223 +43,203 @@ public class MergeProfilesOnPropertyAction implements ActionExecutor { private DefinitionsService definitionsService; private PrivacyService privacyService; private ConfigSharingService configSharingService; - private int maxProfilesInOneMerge = -1; + private SchedulerService schedulerService; + // TODO we can remove this limit after dealing with: UNOMI-776 (50 is completely arbitrary and it's used to bypass the auto-scroll done by the persistence Service) + private int maxProfilesInOneMerge = 50; public int execute(Action action, Event event) { - String profileIdCookieName = (String) configSharingService.getProperty("profileIdCookieName"); - String profileIdCookieDomain = (String) configSharingService.getProperty("profileIdCookieDomain"); - Integer profileIdCookieMaxAgeInSeconds = (Integer) configSharingService.getProperty("profileIdCookieMaxAgeInSeconds"); - Boolean profileIdCookieHttpOnly = (Boolean) configSharingService.getProperty("profileIdCookieHttpOnly"); + HttpServletResponse httpServletResponse = (HttpServletResponse) event.getAttributes().get(Event.HTTP_RESPONSE_ATTRIBUTE); + HttpServletRequest httpServletRequest = (HttpServletRequest) event.getAttributes().get(Event.HTTP_REQUEST_ATTRIBUTE); - Profile profile = event.getProfile(); - if (profile instanceof Persona || profile.isAnonymousProfile()) { - return EventService.NO_CHANGE; - } + Profile eventProfile = event.getProfile(); + final String mergePropName = (String) action.getParameterValues().get("mergeProfilePropertyName"); + final String mergePropValue = (String) action.getParameterValues().get("mergeProfilePropertyValue"); + boolean forceEventProfileAsMaster = action.getParameterValues().containsKey("forceEventProfileAsMaster") ? (boolean) action.getParameterValues().get("forceEventProfileAsMaster") : false; + final String currentProfileMergeValue = (String) eventProfile.getSystemProperties().get(mergePropName); - final String mergeProfilePropertyName = (String) action.getParameterValues().get("mergeProfilePropertyName"); - if (StringUtils.isEmpty(mergeProfilePropertyName)) { + if (eventProfile instanceof Persona || eventProfile.isAnonymousProfile() || StringUtils.isEmpty(mergePropName) || + StringUtils.isEmpty(mergePropValue)) { return EventService.NO_CHANGE; } - final String mergeProfilePropertyValue = (String) action.getParameterValues().get("mergeProfilePropertyValue"); - if (StringUtils.isEmpty(mergeProfilePropertyValue)) { - return EventService.NO_CHANGE; - } + final List<Profile> profilesToBeMerge = getProfilesToBeMerge(mergePropName, mergePropValue); - final String mergeProfilePreviousPropertyValue = profile.getSystemProperties().get(mergeProfilePropertyName) != null ? profile.getSystemProperties().get(mergeProfilePropertyName).toString() : ""; - - final Session currentSession = event.getSession(); + // Check if the user switched to another profile + if (StringUtils.isNotEmpty(currentProfileMergeValue) && !currentProfileMergeValue.equals(mergePropValue)) { + String reassignProfileId = reassignCurrentBrowsingData(event, profilesToBeMerge, forceEventProfileAsMaster, mergePropName, mergePropValue); + sendProfileCookie(reassignProfileId, httpServletResponse, httpServletRequest); - boolean forceEventProfileAsMaster = action.getParameterValues().containsKey("forceEventProfileAsMaster") ? - (boolean) action.getParameterValues().get("forceEventProfileAsMaster") : false; + return EventService.PROFILE_UPDATED + EventService.SESSION_UPDATED; + } - // store the profile id in case the merge change it to a previous one - String profileId = profile.getItemId(); + // Store merge prop on current profile + boolean profileUpdated = false; + if (StringUtils.isEmpty(currentProfileMergeValue)) { + profileUpdated = true; + eventProfile.getSystemProperties().put(mergePropName, mergePropValue); + } - Condition propertyCondition = new Condition(definitionsService.getConditionType("profilePropertyCondition")); - propertyCondition.setParameter("comparisonOperator", "equals"); - propertyCondition.setParameter("propertyName", "systemProperties." + mergeProfilePropertyName); - propertyCondition.setParameter("propertyValue", mergeProfilePropertyValue); + // If not profiles to merge we are done here. + if (profilesToBeMerge.isEmpty()) { + return profileUpdated ? EventService.PROFILE_UPDATED : EventService.NO_CHANGE; + } - Condition excludeMergedProfilesCondition = new Condition(definitionsService.getConditionType("profilePropertyCondition")); - excludeMergedProfilesCondition.setParameter("comparisonOperator", "missing"); - excludeMergedProfilesCondition.setParameter("propertyName", "mergedWith"); + // add current Profile to profiles to be merged + if (profilesToBeMerge.stream().noneMatch(p -> StringUtils.equals(p.getItemId(), eventProfile.getItemId()))) { + profilesToBeMerge.add(eventProfile); + } - Condition c = new Condition(definitionsService.getConditionType("booleanCondition")); - c.setParameter("operator", "and"); - c.setParameter("subConditions", Arrays.asList(propertyCondition, excludeMergedProfilesCondition)); + final String eventProfileId = eventProfile.getItemId(); + final Profile masterProfile = profileService.mergeProfiles(forceEventProfileAsMaster ? eventProfile : profilesToBeMerge.get(0), profilesToBeMerge); + final String masterProfileId = masterProfile.getItemId(); - final List<Profile> profiles = persistenceService.query(c, "properties.firstVisit", Profile.class, 0, maxProfilesInOneMerge).getList(); + // Profile is still using the same profileId after being merged, no need to rewrite exists data, merge is done + if (!forceEventProfileAsMaster && masterProfileId.equals(eventProfileId)) { + return profileUpdated ? EventService.PROFILE_UPDATED : EventService.NO_CHANGE; + } - // Check if the user switched to another profile - if (StringUtils.isNotEmpty(mergeProfilePreviousPropertyValue) && !mergeProfilePreviousPropertyValue.equals(mergeProfilePropertyValue)) { - if (profiles.size() > 0) { - // Take existing profile - profile = profiles.get(0); - } else { - // Create a new profile - if (forceEventProfileAsMaster) - profile = event.getProfile(); - else { - profile = new Profile(UUID.randomUUID().toString()); - profile.setProperty("firstVisit", event.getTimeStamp()); - } - profile.getSystemProperties().put(mergeProfilePropertyName, mergeProfilePropertyValue); - } + // ProfileID changed we have a lot to do + // First check for privacy stuff (inherit from previous profile if necessary) + if (privacyService.isRequireAnonymousBrowsing(eventProfile)) { + privacyService.setRequireAnonymousBrowsing(masterProfileId, true, event.getScope()); + } + final boolean anonymousBrowsing = privacyService.isRequireAnonymousBrowsing(masterProfileId); - logger.info("Different users, switch to " + profile.getItemId()); + // we still send back the current profile cookie. It will be changed on the next request to the ContextServlet. + // The current profile will be deleted only then because we cannot delete it right now (too soon) + sendProfileCookie(eventProfileId, httpServletResponse, httpServletRequest); - HttpServletResponse httpServletResponse = (HttpServletResponse) event.getAttributes().get(Event.HTTP_RESPONSE_ATTRIBUTE); - HttpServletRequest httpServletRequest = (HttpServletRequest) event.getAttributes().get(Event.HTTP_REQUEST_ATTRIBUTE); - if (httpServletRequest != null) { - sendProfileCookie(profile, httpServletResponse, profileIdCookieName, profileIdCookieDomain, - profileIdCookieMaxAgeInSeconds, profileIdCookieHttpOnly, httpServletRequest.isSecure()); - } + // Modify current session: + if (event.getSession() != null) { + event.getSession().setProfile(anonymousBrowsing ? privacyService.getAnonymousProfile(masterProfile) : masterProfile); + } - // At the end of the merge, we must set the merged profile as profile event to process other Actions - event.setProfileId(profile.getItemId()); - event.setProfile(profile); + // Modify current event: + event.setProfileId(anonymousBrowsing ? null : masterProfileId); + event.setProfile(masterProfile); - if (currentSession != null) { - currentSession.setProfile(profile); - eventService.send(new Event("sessionReassigned", currentSession, profile, event.getScope(), event, currentSession, - null, event.getTimeStamp(), false)); - } + event.getActionPostExecutors().add(() -> { + try { + // This is the list of profile Ids to be updated in browsing data (events/sessions) + List<Profile> mergedProfiles = profilesToBeMerge.stream() + .filter(mergedProfile -> !StringUtils.equals(mergedProfile.getItemId(), masterProfileId)) + .collect(Collectors.toList()); - return EventService.PROFILE_UPDATED + EventService.SESSION_UPDATED; - } else { - // Store the merge property identifier in the profile - profile.getSystemProperties().put(mergeProfilePropertyName, mergeProfilePropertyValue); + // ASYNC: Update browsing data (events/sessions) for merged profiles + reassignPersistedBrowsingDatasAsync(anonymousBrowsing, mergedProfiles.stream().map(Item::getItemId).collect(Collectors.toList()), masterProfileId); - // add current Profile to profiles to be merged - boolean add = true; - for (Profile p : profiles) { - add = add && !StringUtils.equals(p.getItemId(), profile.getItemId()); - } - if (add) { - profiles.add(profile); - } + // Save event, as we dynamically changed the profileId of the current event + if (event.isPersistent()) { + persistenceService.save(event); + } - if (profiles.size() == 1) { - return StringUtils.isEmpty(mergeProfilePreviousPropertyValue) ? EventService.PROFILE_UPDATED : EventService.NO_CHANGE; + // we must mark all the profiles that we merged into the master as merged with the master, and they will + // be deleted upon next load + for (Profile mergedProfile : mergedProfiles) { + mergedProfile.setMergedWith(masterProfileId); + mergedProfile.setSystemProperty("lastUpdated", new Date()); + + boolean isExist = persistenceService.load(mergedProfile.getItemId(), Profile.class) != null; + if (!isExist) { + //save the original event profile is it has been changed + persistenceService.save(mergedProfile); + } else { + Map<String,Object> sourceMap = new HashMap<>(); + sourceMap.put("mergedWith", masterProfileId); + sourceMap.put("systemProperties", mergedProfile.getSystemProperties()); + persistenceService.update(mergedProfile, null, Profile.class, sourceMap,true); + } + } + } catch (Exception e) { + logger.error("unable to execute callback action, profile and session will not be saved", e); + return false; } + return true; + }); - Profile markedMasterProfile; - if (forceEventProfileAsMaster) - markedMasterProfile = event.getProfile(); - else - markedMasterProfile = profiles.get(0);// Use oldest profile for master profile - - final Profile masterProfile = profileService.mergeProfiles(markedMasterProfile, profiles); - - // Profile has changed - if (forceEventProfileAsMaster || !masterProfile.getItemId().equals(profileId)) { - HttpServletResponse httpServletResponse = (HttpServletResponse) event.getAttributes().get(Event.HTTP_RESPONSE_ATTRIBUTE); - HttpServletRequest httpServletRequest = (HttpServletRequest) event.getAttributes().get(Event.HTTP_REQUEST_ATTRIBUTE); - // we still send back the current profile cookie. It will be changed on the next request to the ContextServlet. - // The current profile will be deleted only then because we cannot delete it right now (too soon) - if (httpServletRequest != null) { - sendProfileCookie(profile, httpServletResponse, profileIdCookieName, profileIdCookieDomain, - profileIdCookieMaxAgeInSeconds, profileIdCookieHttpOnly, httpServletRequest.isSecure()); - } + return EventService.PROFILE_UPDATED + EventService.SESSION_UPDATED; + } - final String masterProfileId = masterProfile.getItemId(); - // At the end of the merge, we must set the merged profile as profile event to process other Actions - event.setProfileId(masterProfileId); - event.setProfile(masterProfile); + private void sendProfileCookie(String profileId, HttpServletResponse response, HttpServletRequest request) { + if (response != null && request != null) { + String profileIdCookieName = (String) configSharingService.getProperty("profileIdCookieName"); + String profileIdCookieDomain = (String) configSharingService.getProperty("profileIdCookieDomain"); + Integer profileIdCookieMaxAgeInSeconds = (Integer) configSharingService.getProperty("profileIdCookieMaxAgeInSeconds"); + Boolean profileIdCookieHttpOnly = (Boolean) configSharingService.getProperty("profileIdCookieHttpOnly"); + + response.addHeader("Set-Cookie", + profileIdCookieName + "=" + profileId + + "; Path=/" + + "; Max-Age=" + profileIdCookieMaxAgeInSeconds + + (StringUtils.isNotBlank(profileIdCookieDomain) ? ("; Domain=" + profileIdCookieDomain) : "") + + "; SameSite=Lax" + + (request.isSecure() ? "; Secure" : "") + + (profileIdCookieHttpOnly ? "; HttpOnly" : "")); + } + } - final Boolean anonymousBrowsing = privacyService.isRequireAnonymousBrowsing(masterProfileId); + private List<Profile> getProfilesToBeMerge(String mergeProfilePropertyName, String mergeProfilePropertyValue) { + Condition propertyCondition = new Condition(definitionsService.getConditionType("profilePropertyCondition")); + propertyCondition.setParameter("comparisonOperator", "equals"); + propertyCondition.setParameter("propertyName", "systemProperties." + mergeProfilePropertyName); + propertyCondition.setParameter("propertyValue", mergeProfilePropertyValue); - if (currentSession != null){ - currentSession.setProfile(masterProfile); - if (privacyService.isRequireAnonymousBrowsing(profile)) { - privacyService.setRequireAnonymousBrowsing(masterProfileId, true, event.getScope()); - } + return persistenceService.query(propertyCondition, "properties.firstVisit", Profile.class, 0, maxProfilesInOneMerge).getList(); + } - if (anonymousBrowsing) { - currentSession.setProfile(privacyService.getAnonymousProfile(masterProfile)); - event.setProfileId(null); - persistenceService.save(event); + private void reassignPersistedBrowsingDatasAsync(boolean anonymousBrowsing, List<String> mergedProfileIds, String masterProfileId) { + schedulerService.getSharedScheduleExecutorService().schedule(new TimerTask() { + @Override + public void run() { + if (!anonymousBrowsing) { + Condition profileIdsCondition = new Condition(definitionsService.getConditionType("eventPropertyCondition")); + profileIdsCondition.setParameter("propertyName","profileId"); + profileIdsCondition.setParameter("comparisonOperator","in"); + profileIdsCondition.setParameter("propertyValues", mergedProfileIds); + + String[] scripts = new String[]{"updateProfileId"}; + Map<String, Object>[] scriptParams = new Map[]{Collections.singletonMap("profileId", masterProfileId)}; + Condition[] conditions = new Condition[]{profileIdsCondition}; + + persistenceService.updateWithQueryAndStoredScript(null, Session.class, scripts, scriptParams, conditions); + persistenceService.updateWithQueryAndStoredScript(null, Event.class, scripts, scriptParams, conditions); + } else { + for (String mergedProfileId : mergedProfileIds) { + privacyService.anonymizeBrowsingData(mergedProfileId); } } - - event.getActionPostExecutors().add(new ActionPostExecutor() { - @Override - public boolean execute() { - try { - Event currentEvent = event; - // Update current event explicitly, as it might not return from search query if there wasn't a refresh in ES - if (!StringUtils.equals(profileId, masterProfileId)) { - if (currentEvent.isPersistent()) { - persistenceService.update(currentEvent, currentEvent.getTimeStamp(), Event.class, "profileId", anonymousBrowsing ? null : masterProfileId); - } } - - for (Profile profile : profiles) { - String profileId = profile.getItemId(); - if (!StringUtils.equals(profileId, masterProfileId)) { - List<Session> sessions = persistenceService.query("profileId", profileId, null, Session.class); - if (currentSession != null){ - if (masterProfileId.equals(profileId) && !sessions.contains(currentSession)) { - sessions.add(currentSession); - } - } - - for (Session session : sessions) { - persistenceService.update(session, session.getTimeStamp(), Session.class, "profileId", anonymousBrowsing ? null : masterProfileId); - } - - List<Event> events = persistenceService.query("profileId", profileId, null, Event.class); - for (Event event : events) { - if (!event.getItemId().equals(currentEvent.getItemId())) { - persistenceService.update(event, event.getTimeStamp(), Event.class, "profileId", anonymousBrowsing ? null : masterProfileId); - } - } - // we must mark all the profiles that we merged into the master as merged with the master, and they will - // be deleted upon next load - profile.setMergedWith(masterProfileId); - Map<String,Object> sourceMap = new HashMap<>(); - sourceMap.put("mergedWith", masterProfileId); - profile.setSystemProperty("lastUpdated", new Date()); - sourceMap.put("systemProperties", profile.getSystemProperties()); - - boolean isExist = persistenceService.load(profile.getItemId(), Profile.class) != null; - - if (isExist == false) //save the original event profile is it has been changed - persistenceService.save(profile); - else - persistenceService.update(profile, null, Profile.class, sourceMap,true); - - } - } - } catch (Exception e) { - logger.error("unable to execute callback action, profile and session will not be saved", e); - return false; - } - return true; - } - }); - return EventService.PROFILE_UPDATED + EventService.SESSION_UPDATED; - } else { - return StringUtils.isEmpty(mergeProfilePreviousPropertyValue) ? EventService.PROFILE_UPDATED : EventService.NO_CHANGE; } - } + }, 1000, TimeUnit.MILLISECONDS); } - private static void sendProfileCookie(Profile profile, ServletResponse response, String profileIdCookieName, String profileIdCookieDomain, - int cookieAgeInSeconds, boolean httpOnly, boolean secure) { - if (response != null && response instanceof HttpServletResponse) { - HttpServletResponse httpServletResponse = (HttpServletResponse) response; - if (!(profile instanceof Persona)) { - httpServletResponse.addHeader("Set-Cookie", - profileIdCookieName + "=" + profile.getItemId() + - "; Path=/" + - "; Max-Age=" + cookieAgeInSeconds + - (StringUtils.isNotBlank(profileIdCookieDomain) ? ("; Domain=" + profileIdCookieDomain) : "") + - "; SameSite=Lax" + - (secure ? "; Secure" : "") + - (httpOnly ? "; HttpOnly" : "")); + private String reassignCurrentBrowsingData(Event event, List<Profile> existingMergedProfiles, boolean forceEventProfileAsMaster, String mergePropName, String mergePropValue) { + Profile eventProfile = event.getProfile(); + + if (existingMergedProfiles.size() > 0) { + // Take existing profile + eventProfile = existingMergedProfiles.get(0); + } else { + if (!forceEventProfileAsMaster) { + // Create a new profile + eventProfile = new Profile(UUID.randomUUID().toString()); + eventProfile.setProperty("firstVisit", event.getTimeStamp()); } + eventProfile.getSystemProperties().put(mergePropName, mergePropValue); } + + logger.info("Different users, switch to {}", eventProfile.getItemId()); + // At the end of the merge, we must set the merged profile as profile event to process other Actions + event.setProfileId(eventProfile.getItemId()); + event.setProfile(eventProfile); + + if (event.getSession() != null) { + Session eventSession = event.getSession(); + eventSession.setProfile(eventProfile); + eventService.send(new Event("sessionReassigned", eventSession, eventProfile, event.getScope(), event, eventSession, + null, event.getTimeStamp(), false)); + } + + return eventProfile.getItemId(); } public void setProfileService(ProfileService profileService) { @@ -288,8 +266,11 @@ public class MergeProfilesOnPropertyAction implements ActionExecutor { this.configSharingService = configSharingService; } + public void setSchedulerService(SchedulerService schedulerService) { + this.schedulerService = schedulerService; + } + public void setMaxProfilesInOneMerge(String maxProfilesInOneMerge) { this.maxProfilesInOneMerge = Integer.parseInt(maxProfilesInOneMerge); } - -} +} \ No newline at end of file diff --git a/plugins/baseplugin/src/main/resources/META-INF/cxs/painless/updateProfileId.painless b/plugins/baseplugin/src/main/resources/META-INF/cxs/painless/updateProfileId.painless new file mode 100644 index 000000000..31f900c62 --- /dev/null +++ b/plugins/baseplugin/src/main/resources/META-INF/cxs/painless/updateProfileId.painless @@ -0,0 +1,32 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +/* + This script is used to update the profileId and profile.profileId in sessions and/or events after a merge situation + required params: + - params.profileId: the ID of the new profileId +*/ + +// update profileId +if (ctx._source.containsKey("profileId") && ctx._source.profileId != params.profileId) { + ctx._source.put("profileId", params.profileId) +} + +// update inner profile.profileId if the inner profile exists (in sessions for example) +if (ctx._source.containsKey("profile") && ctx._source.profile.containsKey("itemId") && ctx._source.profile.itemId != params.profileId) { + ctx._source.profile.put("itemId", params.profileId) +} \ No newline at end of file diff --git a/plugins/baseplugin/src/main/resources/OSGI-INF/blueprint/blueprint.xml b/plugins/baseplugin/src/main/resources/OSGI-INF/blueprint/blueprint.xml index 1a067ba0e..ff7bcb6c5 100644 --- a/plugins/baseplugin/src/main/resources/OSGI-INF/blueprint/blueprint.xml +++ b/plugins/baseplugin/src/main/resources/OSGI-INF/blueprint/blueprint.xml @@ -26,7 +26,7 @@ <cm:default-properties> <cm:property name="useEventToUpdateProfile" value="false" /> <cm:property name="usePropertyConditionOptimizations" value="true" /> - <cm:property name="maxProfilesInOneMerge" value="-1"/> + <cm:property name="maxProfilesInOneMerge" value="50"/> </cm:default-properties> </cm:property-placeholder> @@ -43,9 +43,10 @@ <reference id="persistenceService" interface="org.apache.unomi.persistence.spi.PersistenceService"/> <reference id="profileService" interface="org.apache.unomi.api.services.ProfileService"/> <reference id="privacyService" interface="org.apache.unomi.api.services.PrivacyService"/> + <reference id="schedulerService" interface="org.apache.unomi.api.services.SchedulerService"/> <reference id="segmentService" interface="org.apache.unomi.api.services.SegmentService"/> <reference id="eventService" interface="org.apache.unomi.api.services.EventService"/> - <reference id="configSharingService" interface="org.apache.unomi.api.services.ConfigSharingService" /> + <reference id="configSharingService" interface="org.apache.unomi.api.services.ConfigSharingService"/> <reference id="scriptExecutor" interface="org.apache.unomi.scripting.ScriptExecutor"/> <service @@ -115,7 +116,7 @@ <property name="definitionsService" ref="definitionsService"/> <property name="persistenceService" ref="persistenceService"/> <property name="segmentService" ref="segmentService"/> - <property name="scriptExecutor" ref="scriptExecutor" /> + <property name="scriptExecutor" ref="scriptExecutor"/> <property name="maximumIdsQueryCount" value="${es.maximumIdsQueryCount}"/> <property name="pastEventsDisablePartitions" value="${es.pastEventsDisablePartitions}"/> <property name="aggregateQueryBucketSize" value="${es.aggregateQueryBucketSize}"/> @@ -298,6 +299,7 @@ <property name="definitionsService" ref="definitionsService"/> <property name="privacyService" ref="privacyService"/> <property name="configSharingService" ref="configSharingService" /> + <property name="schedulerService" ref="schedulerService"/> <property name="maxProfilesInOneMerge" value="${base.maxProfilesInOneMerge}"/> </bean> </service> diff --git a/plugins/baseplugin/src/main/resources/org.apache.unomi.plugins.base.cfg b/plugins/baseplugin/src/main/resources/org.apache.unomi.plugins.base.cfg index 41f80a950..b0bad3b5c 100644 --- a/plugins/baseplugin/src/main/resources/org.apache.unomi.plugins.base.cfg +++ b/plugins/baseplugin/src/main/resources/org.apache.unomi.plugins.base.cfg @@ -15,4 +15,4 @@ # limitations under the License. # -maxProfilesInOneMerge=${org.apache.unomi.plugins.base.maxProfilesInOneMerge:--1} +maxProfilesInOneMerge=${org.apache.unomi.plugins.base.maxProfilesInOneMerge:-50} diff --git a/services/src/main/java/org/apache/unomi/services/impl/segments/SegmentServiceImpl.java b/services/src/main/java/org/apache/unomi/services/impl/segments/SegmentServiceImpl.java index 377631c1f..e4794bceb 100644 --- a/services/src/main/java/org/apache/unomi/services/impl/segments/SegmentServiceImpl.java +++ b/services/src/main/java/org/apache/unomi/services/impl/segments/SegmentServiceImpl.java @@ -22,7 +22,6 @@ import net.jodah.failsafe.Failsafe; import net.jodah.failsafe.RetryPolicy; import org.apache.commons.io.FilenameUtils; import org.apache.commons.io.IOUtils; -import org.apache.commons.lang3.CharEncoding; import org.apache.unomi.api.Event; import org.apache.unomi.api.Item; import org.apache.unomi.api.Metadata; @@ -163,7 +162,6 @@ public class SegmentServiceImpl extends AbstractServiceImpl implements SegmentSe } bundleContext.addBundleListener(this); initializeTimer(); - loadScripts(); logger.info("Segment service initialized."); } @@ -1251,25 +1249,6 @@ public class SegmentServiceImpl extends AbstractServiceImpl implements SegmentSe schedulerService.getScheduleExecutorService().scheduleAtFixedRate(task, 0, segmentRefreshInterval, TimeUnit.MILLISECONDS); } - private void loadScripts() throws IOException { - Enumeration<URL> scriptsURL = bundleContext.getBundle().findEntries("META-INF/cxs/painless", "*.painless", true); - if (scriptsURL == null) { - return; - } - - Map<String, String> scriptsById = new HashMap<>(); - while (scriptsURL.hasMoreElements()) { - URL scriptURL = scriptsURL.nextElement(); - logger.debug("Found painless script at " + scriptURL + ", loading... "); - try (InputStream in = scriptURL.openStream()) { - String script = IOUtils.toString(in, StandardCharsets.UTF_8); - String scriptId = FilenameUtils.getBaseName(scriptURL.getPath()); - scriptsById.put(scriptId, script); - } - } - persistenceService.storeScripts(scriptsById); - } - public void setTaskExecutionPeriod(long taskExecutionPeriod) { this.taskExecutionPeriod = taskExecutionPeriod; }