This is an automated email from the ASF dual-hosted git repository.
jkevan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/unomi.git
The following commit(s) were added to refs/heads/master by this push:
new ba58fafde UNOMI-748, UNOMI-749: more improvements for merge system:
updateWithS… (#593)
ba58fafde is described below
commit ba58fafde122ac82d9764329edc620fda2b936bc
Author: kevan Jahanshahi <[email protected]>
AuthorDate: Thu Mar 23 09:33:11 2023 +0100
UNOMI-748, UNOMI-749: more improvements for merge system: updateWithS…
(#593)
* UNOMI-748, UNOMI-749: more improvements for merge system:
updateWithScript to reassign past sessions/events, fix inconstency in sessions
profileId, use schedulerService to support asynchronous update of past browsing
data.
* UNOMI-748, UNOMI-749: add more tests
---
.../unomi/privacy/internal/PrivacyServiceImpl.java | 5 -
.../test/java/org/apache/unomi/itests/BaseIT.java | 2 +
.../org/apache/unomi/itests/ProfileMergeIT.java | 100 ++++++++++++++++++--
persistence-elasticsearch/core/pom.xml | 9 +-
.../ElasticSearchPersistenceServiceImpl.java | 30 ++++++
.../actions/MergeProfilesOnPropertyAction.java | 105 ++++++++++++---------
.../META-INF/cxs/painless/updateProfileId.painless | 32 +++++++
.../resources/OSGI-INF/blueprint/blueprint.xml | 2 +
services/pom.xml | 4 -
.../services/impl/segments/SegmentServiceImpl.java | 25 -----
10 files changed, 224 insertions(+), 90 deletions(-)
diff --git
a/extensions/privacy-extension/services/src/main/java/org/apache/unomi/privacy/internal/PrivacyServiceImpl.java
b/extensions/privacy-extension/services/src/main/java/org/apache/unomi/privacy/internal/PrivacyServiceImpl.java
index fb357d559..5d6abb828 100644
---
a/extensions/privacy-extension/services/src/main/java/org/apache/unomi/privacy/internal/PrivacyServiceImpl.java
+++
b/extensions/privacy-extension/services/src/main/java/org/apache/unomi/privacy/internal/PrivacyServiceImpl.java
@@ -127,11 +127,6 @@ public class PrivacyServiceImpl implements PrivacyService {
@Override
public Boolean anonymizeBrowsingData(String profileId) {
- Profile profile = profileService.load(profileId);
- if (profile == null) {
- return false;
- }
-
List<Session> sessions = profileService.getProfileSessions(profileId,
null, 0, -1, null).getList();
if (sessions.isEmpty()) {
return false;
diff --git a/itests/src/test/java/org/apache/unomi/itests/BaseIT.java
b/itests/src/test/java/org/apache/unomi/itests/BaseIT.java
index 8c166b33c..f62d32d8a 100644
--- a/itests/src/test/java/org/apache/unomi/itests/BaseIT.java
+++ b/itests/src/test/java/org/apache/unomi/itests/BaseIT.java
@@ -140,6 +140,7 @@ public abstract class BaseIT extends KarafTestSupport {
protected RulesService rulesService;
protected DefinitionsService definitionsService;
protected ProfileService profileService;
+ protected PrivacyService privacyService;
protected EventService eventService;
protected BundleWatcher bundleWatcher;
protected GroovyActionsService groovyActionsService;
@@ -184,6 +185,7 @@ public abstract class BaseIT extends KarafTestSupport {
rulesService = getOsgiService(RulesService.class, 600000);
definitionsService = getOsgiService(DefinitionsService.class, 600000);
profileService = getOsgiService(ProfileService.class, 600000);
+ privacyService = getOsgiService(PrivacyService.class, 600000);
eventService = getOsgiService(EventService.class, 600000);
groovyActionsService = getOsgiService(GroovyActionsService.class,
600000);
segmentService = getOsgiService(SegmentService.class, 600000);
diff --git a/itests/src/test/java/org/apache/unomi/itests/ProfileMergeIT.java
b/itests/src/test/java/org/apache/unomi/itests/ProfileMergeIT.java
index 7815baccb..43c0b0429 100644
--- a/itests/src/test/java/org/apache/unomi/itests/ProfileMergeIT.java
+++ b/itests/src/test/java/org/apache/unomi/itests/ProfileMergeIT.java
@@ -220,7 +220,7 @@ public class ProfileMergeIT extends BaseIT {
* In case of merge, existing sessions/events from previous profileId
should be rewritten to use the new master profileId
*/
@Test
- public void
testProfileMergeOnPropertyAction_simpleMergeRewriteExistingSessionsEvents()
throws InterruptedException {
+ public void
testProfileMergeOnPropertyAction_rewriteExistingSessionsEvents() throws
InterruptedException {
Condition matchAll = new
Condition(definitionsService.getConditionType("matchAllCondition"));
// create rule
createAndWaitForRule(createMergeOnPropertyRule(false, "email"));
@@ -237,7 +237,7 @@ public class ProfileMergeIT extends BaseIT {
eventProfile.setProperty("email", "[email protected]");
profileService.save(eventProfile);
- // create 5 sessions and 5 events for master profile.
+ // create 5 past sessions and 5 past events.
List<Session> sessionsToBeRewritten = new ArrayList<>();
List<Event> eventsToBeRewritten = new ArrayList<>();
for (int i = 1; i <= 5; i++) {
@@ -250,8 +250,16 @@ public class ProfileMergeIT extends BaseIT {
persistenceService.save(sessionToBeRewritten);
persistenceService.save(eventToBeRewritten);
}
- keepTrying("Wait for sessions and events to be persisted", () ->
persistenceService.queryCount(matchAll, Session.ITEM_TYPE) +
persistenceService.queryCount(matchAll, Event.ITEM_TYPE),
- (count) -> count == 10, DEFAULT_TRYING_TIMEOUT,
DEFAULT_TRYING_TRIES);
+ for (Session session : sessionsToBeRewritten) {
+ keepTrying("Wait for session: " + session.getItemId() + " to be
indexed",
+ () -> persistenceService.query("itemId",
session.getItemId(), null, Session.class),
+ (list) -> list.size() == 1, DEFAULT_TRYING_TIMEOUT,
DEFAULT_TRYING_TRIES);
+ }
+ for (Event event : eventsToBeRewritten) {
+ keepTrying("Wait for event: " + event.getItemId() + " to be
indexed",
+ () -> persistenceService.query("itemId",
event.getItemId(), null, Event.class),
+ (list) -> list.size() == 1, DEFAULT_TRYING_TIMEOUT,
DEFAULT_TRYING_TRIES);
+ }
keepTrying("Profile with id masterProfileID not found in the required
time", () -> profileService.load("masterProfileID"),
Objects::nonNull, DEFAULT_TRYING_TIMEOUT,
DEFAULT_TRYING_TRIES);
keepTrying("Profile with id eventProfileID not found in the required
time", () -> profileService.load("eventProfileID"),
@@ -287,12 +295,90 @@ public class ProfileMergeIT extends BaseIT {
() ->
persistenceService.queryCount(sessionProfileIDRewrittenCondition,
Session.ITEM_TYPE),
(count) -> count == 5, DEFAULT_TRYING_TIMEOUT,
DEFAULT_TRYING_TRIES);
- // TODO uncomment this when UNOMI-749 is fixed, currently session
loaded are inconsistent
- /* for (Session session : sessionsToBeRewritten) {
+ for (Session session : sessionsToBeRewritten) {
keepTrying("Wait for session: " + session.getItemId() + "
profileId to be rewritten for masterProfileID",
() -> persistenceService.load(session.getItemId(),
Session.class),
(loadedSession) ->
loadedSession.getProfileId().equals("masterProfileID"), DEFAULT_TRYING_TIMEOUT,
DEFAULT_TRYING_TRIES);
- } */
+ }
+ }
+
+ /**
+ * If master profile is flagged as anonymous profile, then after the merge
all past sessions/events should be anonymized
+ */
+ @Test
+ public void
testProfileMergeOnPropertyAction_rewriteExistingSessionsEventsAnonymous()
throws InterruptedException {
+ Condition matchAll = new
Condition(definitionsService.getConditionType("matchAllCondition"));
+ // create rule
+ createAndWaitForRule(createMergeOnPropertyRule(false, "email"));
+
+ // create master profile
+ Profile masterProfile = new Profile();
+ masterProfile.setItemId("masterProfileID");
+ masterProfile.setProperty("email", "[email protected]");
+ masterProfile.setSystemProperty("mergeIdentifier",
"[email protected]");
+ profileService.save(masterProfile);
+ privacyService.setRequireAnonymousBrowsing(masterProfile.getItemId(),
true, null);
+
+ Profile eventProfile = new Profile();
+ eventProfile.setItemId("eventProfileID");
+ eventProfile.setProperty("email", "[email protected]");
+ profileService.save(eventProfile);
+
+ // create 5 sessions and 5 events for master profile.
+ List<Session> sessionsToBeRewritten = new ArrayList<>();
+ List<Event> eventsToBeRewritten = new ArrayList<>();
+ for (int i = 1; i <= 5; i++) {
+ Session sessionToBeRewritten = new Session("simpleSession_"+ i,
eventProfile, new Date(), null);
+ sessionsToBeRewritten.add(sessionToBeRewritten);
+ Event eventToBeRewritten = new Event("view", sessionToBeRewritten,
eventProfile, null, null, null, new Date());
+ eventsToBeRewritten.add(eventToBeRewritten);
+
+ persistenceService.save(sessionToBeRewritten);
+ persistenceService.save(eventToBeRewritten);
+ }
+ for (Session session : sessionsToBeRewritten) {
+ keepTrying("Wait for session: " + session.getItemId() + " to be
indexed",
+ () -> persistenceService.query("itemId",
session.getItemId(), null, Session.class),
+ (list) -> list.size() == 1, DEFAULT_TRYING_TIMEOUT,
DEFAULT_TRYING_TRIES);
+ }
+ for (Event event : eventsToBeRewritten) {
+ keepTrying("Wait for event: " + event.getItemId() + " to be
indexed",
+ () -> persistenceService.query("itemId",
event.getItemId(), null, Event.class),
+ (list) -> list.size() == 1, DEFAULT_TRYING_TIMEOUT,
DEFAULT_TRYING_TRIES);
+ }
+ keepTrying("Profile with id masterProfileID (should required anonymous
browsing) not found in the required time",
+ () -> profileService.load("masterProfileID"),
+ profile -> profile != null &&
privacyService.isRequireAnonymousBrowsing(profile), DEFAULT_TRYING_TIMEOUT,
DEFAULT_TRYING_TRIES);
+ keepTrying("Profile with id eventProfileID not found in the required
time", () -> profileService.load("eventProfileID"),
+ Objects::nonNull, DEFAULT_TRYING_TIMEOUT,
DEFAULT_TRYING_TRIES);
+
+ // Trigger the merge
+ Session simpleSession = new Session("simpleSession", eventProfile, new
Date(), null);
+ Event mergeEvent = new Event(TEST_EVENT_TYPE, simpleSession,
eventProfile, null, null, eventProfile, new Date());
+ eventService.send(mergeEvent);
+
+ // Check that master profile is now used, but anonymous browsing is
respected:
+ Assert.assertNotNull(mergeEvent.getProfile());
+ Assert.assertEquals("masterProfileID",
mergeEvent.getProfile().getItemId()); // We still have profile in the event
+ Assert.assertNull(mergeEvent.getProfileId()); // But profileId prop is
null due to anonymous browsing
+ Assert.assertNull(mergeEvent.getSession().getProfile().getItemId());
// Same for the event session
+ Assert.assertNull(mergeEvent.getSession().getProfileId());
+ Assert.assertEquals(mergeEvent.getSession().getProfileId(),
mergeEvent.getProfileId());
+ Assert.assertEquals("[email protected]",
mergeEvent.getProfile().getSystemProperties().get("mergeIdentifier"));
+
+ // Check events are correctly rewritten (Anonymous !)
+ for (Event event : eventsToBeRewritten) {
+ keepTrying("Wait for event: " + event.getItemId() + " profileId to
be rewritten for NULL due to anonymous browsing",
+ () -> persistenceService.load(event.getItemId(),
Event.class),
+ (loadedEvent) -> loadedEvent.getProfileId() == null,
DEFAULT_TRYING_TIMEOUT, DEFAULT_TRYING_TRIES);
+ }
+
+ // Check sessions are correctly rewritten (Anonymous !)
+ for (Session session : sessionsToBeRewritten) {
+ keepTrying("Wait for session: " + session.getItemId() + "
profileId to be rewritten for NULL due to anonymous browsing",
+ () -> persistenceService.load(session.getItemId(),
Session.class),
+ (loadedSession) -> loadedSession.getProfileId() == null,
DEFAULT_TRYING_TIMEOUT, DEFAULT_TRYING_TRIES);
+ }
}
/**
diff --git a/persistence-elasticsearch/core/pom.xml
b/persistence-elasticsearch/core/pom.xml
index 8aa4fd7ba..770c6cea8 100644
--- a/persistence-elasticsearch/core/pom.xml
+++ b/persistence-elasticsearch/core/pom.xml
@@ -196,6 +196,10 @@
<groupId>commons-collections</groupId>
<artifactId>commons-collections</artifactId>
</dependency>
+ <dependency>
+ <groupId>commons-io</groupId>
+ <artifactId>commons-io</artifactId>
+ </dependency>
<dependency>
<groupId>org.apache.unomi</groupId>
@@ -209,11 +213,6 @@
<artifactId>junit</artifactId>
<scope>test</scope>
</dependency>
- <dependency>
- <groupId>commons-io</groupId>
- <artifactId>commons-io</artifactId>
- <scope>test</scope>
- </dependency>
<dependency>
<groupId>com.hazelcast</groupId>
<artifactId>hazelcast-all</artifactId>
diff --git
a/persistence-elasticsearch/core/src/main/java/org/apache/unomi/persistence/elasticsearch/ElasticSearchPersistenceServiceImpl.java
b/persistence-elasticsearch/core/src/main/java/org/apache/unomi/persistence/elasticsearch/ElasticSearchPersistenceServiceImpl.java
index 9de538795..4562dd5cc 100644
---
a/persistence-elasticsearch/core/src/main/java/org/apache/unomi/persistence/elasticsearch/ElasticSearchPersistenceServiceImpl.java
+++
b/persistence-elasticsearch/core/src/main/java/org/apache/unomi/persistence/elasticsearch/ElasticSearchPersistenceServiceImpl.java
@@ -19,6 +19,8 @@ package org.apache.unomi.persistence.elasticsearch;
import com.fasterxml.jackson.core.type.TypeReference;
import com.fasterxml.jackson.databind.ObjectMapper;
+import org.apache.commons.io.FilenameUtils;
+import org.apache.commons.io.IOUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.http.HttpHost;
import org.apache.http.auth.AuthScope;
@@ -116,8 +118,10 @@ import javax.net.ssl.TrustManager;
import javax.net.ssl.X509TrustManager;
import java.io.BufferedReader;
import java.io.IOException;
+import java.io.InputStream;
import java.io.InputStreamReader;
import java.net.URL;
+import java.nio.charset.StandardCharsets;
import java.security.KeyManagementException;
import java.security.NoSuchAlgorithmException;
import java.security.SecureRandom;
@@ -460,11 +464,13 @@ public class ElasticSearchPersistenceServiceImpl
implements PersistenceService,
registerRolloverLifecyclePolicy();
loadPredefinedMappings(bundleContext, false);
+ loadPainlessScripts(bundleContext);
// load predefined mappings and condition dispatchers of any
bundles that were started before this one.
for (Bundle existingBundle : bundleContext.getBundles()) {
if (existingBundle.getBundleContext() != null) {
loadPredefinedMappings(existingBundle.getBundleContext(), false);
+ loadPainlessScripts(existingBundle.getBundleContext());
}
}
@@ -690,6 +696,7 @@ public class ElasticSearchPersistenceServiceImpl implements
PersistenceService,
switch (event.getType()) {
case BundleEvent.STARTING:
loadPredefinedMappings(event.getBundle().getBundleContext(),
true);
+ loadPainlessScripts(event.getBundle().getBundleContext());
break;
}
}
@@ -722,6 +729,29 @@ public class ElasticSearchPersistenceServiceImpl
implements PersistenceService,
}
}
+ private void loadPainlessScripts(BundleContext bundleContext) {
+ Enumeration<URL> scriptsURL =
bundleContext.getBundle().findEntries("META-INF/cxs/painless", "*.painless",
true);
+ if (scriptsURL == null) {
+ return;
+ }
+
+ Map<String, String> scriptsById = new HashMap<>();
+ while (scriptsURL.hasMoreElements()) {
+ URL scriptURL = scriptsURL.nextElement();
+ logger.info("Found painless script at " + scriptURL + ",
loading... ");
+ try (InputStream in = scriptURL.openStream()) {
+ String script = IOUtils.toString(in, StandardCharsets.UTF_8);
+ String scriptId =
FilenameUtils.getBaseName(scriptURL.getPath());
+ scriptsById.put(scriptId, script);
+ } catch (Exception e) {
+ logger.error("Error while loading painless script " +
scriptURL, e);
+ }
+
+ }
+
+ storeScripts(scriptsById);
+ }
+
private String loadMappingFile(URL predefinedMappingURL) throws
IOException {
BufferedReader reader = new BufferedReader(new
InputStreamReader(predefinedMappingURL.openStream()));
diff --git
a/plugins/baseplugin/src/main/java/org/apache/unomi/plugins/baseplugin/actions/MergeProfilesOnPropertyAction.java
b/plugins/baseplugin/src/main/java/org/apache/unomi/plugins/baseplugin/actions/MergeProfilesOnPropertyAction.java
index 6ecd0e797..e52c8fd84 100644
---
a/plugins/baseplugin/src/main/java/org/apache/unomi/plugins/baseplugin/actions/MergeProfilesOnPropertyAction.java
+++
b/plugins/baseplugin/src/main/java/org/apache/unomi/plugins/baseplugin/actions/MergeProfilesOnPropertyAction.java
@@ -30,8 +30,9 @@ import org.apache.unomi.persistence.spi.PersistenceService;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import java.util.List;
-import java.util.UUID;
+import java.util.*;
+import java.util.concurrent.TimeUnit;
+import java.util.stream.Collectors;
public class MergeProfilesOnPropertyAction implements ActionExecutor {
private static final Logger logger =
LoggerFactory.getLogger(MergeProfilesOnPropertyAction.class.getName());
@@ -41,6 +42,7 @@ public class MergeProfilesOnPropertyAction implements
ActionExecutor {
private EventService eventService;
private DefinitionsService definitionsService;
private PrivacyService privacyService;
+ private SchedulerService schedulerService;
private int maxProfilesInOneMerge = -1;
public int execute(Action action, Event event) {
@@ -48,6 +50,8 @@ public class MergeProfilesOnPropertyAction implements
ActionExecutor {
Profile eventProfile = event.getProfile();
final String mergePropName = (String)
action.getParameterValues().get("mergeProfilePropertyName");
final String mergePropValue = (String)
action.getParameterValues().get("mergeProfilePropertyValue");
+ final String clientIdFromEvent = (String)
event.getAttributes().get(Event.CLIENT_ID_ATTRIBUTE);
+ final String clientId = clientIdFromEvent != null ? clientIdFromEvent
: "defaultClientId";
boolean forceEventProfileAsMaster =
action.getParameterValues().containsKey("forceEventProfileAsMaster") ?
(boolean) action.getParameterValues().get("forceEventProfileAsMaster") : false;
final String currentProfileMergeValue = (String)
eventProfile.getSystemProperties().get(mergePropName);
@@ -60,7 +64,7 @@ public class MergeProfilesOnPropertyAction implements
ActionExecutor {
// Check if the user switched to another profile
if (StringUtils.isNotEmpty(currentProfileMergeValue) &&
!currentProfileMergeValue.equals(mergePropValue)) {
- reassignSession(event, profilesToBeMerge,
forceEventProfileAsMaster, mergePropName, mergePropValue);
+ reassignCurrentBrowsingData(event, profilesToBeMerge,
forceEventProfileAsMaster, mergePropName, mergePropValue);
return EventService.PROFILE_UPDATED + EventService.SESSION_UPDATED;
}
@@ -82,67 +86,51 @@ public class MergeProfilesOnPropertyAction implements
ActionExecutor {
}
final String eventProfileId = eventProfile.getItemId();
- final Profile mergedProfile =
profileService.mergeProfiles(forceEventProfileAsMaster ? eventProfile :
profilesToBeMerge.get(0), profilesToBeMerge);
- final String mergedProfileId = mergedProfile.getItemId();
+ final Profile masterProfile =
profileService.mergeProfiles(forceEventProfileAsMaster ? eventProfile :
profilesToBeMerge.get(0), profilesToBeMerge);
+ final String masterProfileId = masterProfile.getItemId();
// Profile is still using the same profileId after being merged, no
need to rewrite exists data, merge is done
- if (!forceEventProfileAsMaster &&
mergedProfileId.equals(eventProfileId)) {
+ if (!forceEventProfileAsMaster &&
masterProfileId.equals(eventProfileId)) {
return profileUpdated ? EventService.PROFILE_UPDATED :
EventService.NO_CHANGE;
}
// ProfileID changed we have a lot to do
// First check for privacy stuff (inherit from previous profile if
necessary)
if (privacyService.isRequireAnonymousBrowsing(eventProfile)) {
- privacyService.setRequireAnonymousBrowsing(mergedProfileId, true,
event.getScope());
+ privacyService.setRequireAnonymousBrowsing(masterProfileId, true,
event.getScope());
}
- final boolean anonymousBrowsing =
privacyService.isRequireAnonymousBrowsing(mergedProfileId);
+ final boolean anonymousBrowsing =
privacyService.isRequireAnonymousBrowsing(masterProfileId);
// Modify current session:
if (event.getSession() != null) {
- event.getSession().setProfile(anonymousBrowsing ?
privacyService.getAnonymousProfile(mergedProfile) : mergedProfile);
+ event.getSession().setProfile(anonymousBrowsing ?
privacyService.getAnonymousProfile(masterProfile) : masterProfile);
}
// Modify current event:
- event.setProfileId(anonymousBrowsing ? null : mergedProfileId);
- event.setProfile(mergedProfile);
+ event.setProfileId(anonymousBrowsing ? null : masterProfileId);
+ event.setProfile(masterProfile);
event.getActionPostExecutors().add(() -> {
try {
- // Save event, as we changed the profileId of the current event
+ // This is the list of profile Ids to be updated in browsing
data (events/sessions)
+ List<String> mergedProfileIds = profilesToBeMerge.stream()
+ .map(Profile::getItemId)
+ .filter(mergedProfileId ->
!StringUtils.equals(mergedProfileId, masterProfileId))
+ .collect(Collectors.toList());
+
+ // ASYNC: Update browsing data (events/sessions) for merged
profiles
+ reassignPersistedBrowsingDatasAsync(anonymousBrowsing,
mergedProfileIds, masterProfileId);
+
+ // Save event, as we dynamically changed the profileId of the
current event
if (event.isPersistent()) {
persistenceService.save(event);
}
- for (Profile profileToBeMerge : profilesToBeMerge) {
- String profileToBeMergeId = profileToBeMerge.getItemId();
- if (!StringUtils.equals(profileToBeMergeId,
mergedProfileId)) {
-
- // TODO (UNOMI-748): the following updates are
asynchron due to usage of bulk processor in ElasticSearch persistence service
update function.
- // We could consider replacing those updates(one item
at a time) by updateByQueryAndScript to avoid loading all the sessions/events
in memory,
- // but we would loose the asynchronous nature (By
doing that request may take longer than before,
- // and could potentially break client side timeouts
on requests)
- List<Event> oldEvents =
persistenceService.query("profileId", profileToBeMergeId, null, Event.class);
- for (Event oldEvent : oldEvents) {
- if
(!oldEvent.getItemId().equals(event.getItemId())) {
- persistenceService.update(oldEvent,
Event.class, "profileId", anonymousBrowsing ? null : mergedProfileId);
- }
- }
-
- // TODO (UNOMI-749): this is creating inconsistent
sessions, they still contains old profile.
- // And due to deserialization of sessions the
profileId property will always be the one from profile stored in the session
- List<Session> oldSessions =
persistenceService.query("profileId", profileToBeMergeId, null, Session.class);
- for (Session oldSession : oldSessions) {
- if
(!oldSession.getItemId().equals(event.getSession().getItemId())) {
- persistenceService.update(oldSession,
Session.class, "profileId", anonymousBrowsing ? null : mergedProfileId);
- }
- }
-
- final String clientIdFromEvent = (String)
event.getAttributes().get(Event.CLIENT_ID_ATTRIBUTE);
- String clientId = clientIdFromEvent != null ?
clientIdFromEvent : "defaultClientId";
- profileService.addAliasToProfile(mergedProfileId,
profileToBeMergeId, clientId);
- if (profileService.load(profileToBeMergeId) != null) {
- profileService.delete(profileToBeMergeId, false);
- }
+ // Handle aliases
+ for (String mergedProfileId : mergedProfileIds) {
+ profileService.addAliasToProfile(masterProfileId,
mergedProfileId, clientId);
+ if (persistenceService.load(mergedProfileId,
Profile.class) != null) {
+ profileService.delete(mergedProfileId, false);
}
}
} catch (Exception e) {
@@ -164,7 +152,32 @@ public class MergeProfilesOnPropertyAction implements
ActionExecutor {
return persistenceService.query(propertyCondition,
"properties.firstVisit", Profile.class, 0, maxProfilesInOneMerge).getList();
}
- private void reassignSession(Event event, List<Profile>
existingMergedProfiles, boolean forceEventProfileAsMaster, String
mergePropName, String mergePropValue) {
+ private void reassignPersistedBrowsingDatasAsync(boolean
anonymousBrowsing, List<String> mergedProfileIds, String masterProfileId) {
+ schedulerService.getSharedScheduleExecutorService().schedule(new
TimerTask() {
+ @Override
+ public void run() {
+ if (!anonymousBrowsing) {
+ Condition profileIdsCondition = new
Condition(definitionsService.getConditionType("eventPropertyCondition"));
+
profileIdsCondition.setParameter("propertyName","profileId");
+
profileIdsCondition.setParameter("comparisonOperator","in");
+ profileIdsCondition.setParameter("propertyValues",
mergedProfileIds);
+
+ String[] scripts = new String[]{"updateProfileId"};
+ Map<String, Object>[] scriptParams = new
Map[]{Collections.singletonMap("profileId", masterProfileId)};
+ Condition[] conditions = new
Condition[]{profileIdsCondition};
+
+
persistenceService.updateWithQueryAndStoredScript(Session.class, scripts,
scriptParams, conditions);
+
persistenceService.updateWithQueryAndStoredScript(Event.class, scripts,
scriptParams, conditions);
+ } else {
+ for (String mergedProfileId : mergedProfileIds) {
+ privacyService.anonymizeBrowsingData(mergedProfileId);
+ }
+ }
+ }
+ }, 1000, TimeUnit.MILLISECONDS);
+ }
+
+ private void reassignCurrentBrowsingData(Event event, List<Profile>
existingMergedProfiles, boolean forceEventProfileAsMaster, String
mergePropName, String mergePropValue) {
Profile eventProfile = event.getProfile();
if (existingMergedProfiles.size() > 0) {
@@ -179,7 +192,7 @@ public class MergeProfilesOnPropertyAction implements
ActionExecutor {
eventProfile.getSystemProperties().put(mergePropName,
mergePropValue);
}
- logger.info("Different users, switch to " + eventProfile.getItemId());
+ logger.info("Different users, switch to {}", eventProfile.getItemId());
// At the end of the merge, we must set the merged profile as profile
event to process other Actions
event.setProfileId(eventProfile.getItemId());
event.setProfile(eventProfile);
@@ -212,6 +225,10 @@ public class MergeProfilesOnPropertyAction implements
ActionExecutor {
this.definitionsService = definitionsService;
}
+ public void setSchedulerService(SchedulerService schedulerService) {
+ this.schedulerService = schedulerService;
+ }
+
public void setMaxProfilesInOneMerge(String maxProfilesInOneMerge) {
this.maxProfilesInOneMerge = Integer.parseInt(maxProfilesInOneMerge);
}
diff --git
a/plugins/baseplugin/src/main/resources/META-INF/cxs/painless/updateProfileId.painless
b/plugins/baseplugin/src/main/resources/META-INF/cxs/painless/updateProfileId.painless
new file mode 100644
index 000000000..31f900c62
--- /dev/null
+++
b/plugins/baseplugin/src/main/resources/META-INF/cxs/painless/updateProfileId.painless
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/*
+ This script is used to update the profileId and profile.profileId in
sessions and/or events after a merge situation
+ required params:
+ - params.profileId: the ID of the new profileId
+*/
+
+// update profileId
+if (ctx._source.containsKey("profileId") && ctx._source.profileId !=
params.profileId) {
+ ctx._source.put("profileId", params.profileId)
+}
+
+// update inner profile.profileId if the inner profile exists (in sessions for
example)
+if (ctx._source.containsKey("profile") &&
ctx._source.profile.containsKey("itemId") && ctx._source.profile.itemId !=
params.profileId) {
+ ctx._source.profile.put("itemId", params.profileId)
+}
\ No newline at end of file
diff --git
a/plugins/baseplugin/src/main/resources/OSGI-INF/blueprint/blueprint.xml
b/plugins/baseplugin/src/main/resources/OSGI-INF/blueprint/blueprint.xml
index 3222eb680..15b24ec39 100644
--- a/plugins/baseplugin/src/main/resources/OSGI-INF/blueprint/blueprint.xml
+++ b/plugins/baseplugin/src/main/resources/OSGI-INF/blueprint/blueprint.xml
@@ -43,6 +43,7 @@
<reference id="persistenceService"
interface="org.apache.unomi.persistence.spi.PersistenceService"/>
<reference id="profileService"
interface="org.apache.unomi.api.services.ProfileService"/>
<reference id="privacyService"
interface="org.apache.unomi.api.services.PrivacyService"/>
+ <reference id="schedulerService"
interface="org.apache.unomi.api.services.SchedulerService"/>
<reference id="segmentService"
interface="org.apache.unomi.api.services.SegmentService"/>
<reference id="eventService"
interface="org.apache.unomi.api.services.EventService"/>
<reference id="configSharingService"
interface="org.apache.unomi.api.services.ConfigSharingService"/>
@@ -315,6 +316,7 @@
<property name="persistenceService" ref="persistenceService"/>
<property name="definitionsService" ref="definitionsService"/>
<property name="privacyService" ref="privacyService"/>
+ <property name="schedulerService" ref="schedulerService"/>
<property name="maxProfilesInOneMerge"
value="${base.maxProfilesInOneMerge}"/>
</bean>
</service>
diff --git a/services/pom.xml b/services/pom.xml
index a1b909362..a6e739831 100644
--- a/services/pom.xml
+++ b/services/pom.xml
@@ -116,10 +116,6 @@
<groupId>commons-collections</groupId>
<artifactId>commons-collections</artifactId>
</dependency>
- <dependency>
- <groupId>commons-io</groupId>
- <artifactId>commons-io</artifactId>
- </dependency>
<dependency>
<groupId>org.apache.karaf.cellar</groupId>
<artifactId>org.apache.karaf.cellar.core</artifactId>
diff --git
a/services/src/main/java/org/apache/unomi/services/impl/segments/SegmentServiceImpl.java
b/services/src/main/java/org/apache/unomi/services/impl/segments/SegmentServiceImpl.java
index 216261c5a..1cc852f24 100644
---
a/services/src/main/java/org/apache/unomi/services/impl/segments/SegmentServiceImpl.java
+++
b/services/src/main/java/org/apache/unomi/services/impl/segments/SegmentServiceImpl.java
@@ -20,9 +20,6 @@ package org.apache.unomi.services.impl.segments;
import com.fasterxml.jackson.core.JsonProcessingException;
import net.jodah.failsafe.Failsafe;
import net.jodah.failsafe.RetryPolicy;
-import org.apache.commons.io.FilenameUtils;
-import org.apache.commons.io.IOUtils;
-import org.apache.commons.lang3.CharEncoding;
import org.apache.unomi.api.Event;
import org.apache.unomi.api.Item;
import org.apache.unomi.api.Metadata;
@@ -52,9 +49,7 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
-import java.io.InputStream;
import java.net.URL;
-import java.nio.charset.StandardCharsets;
import java.security.MessageDigest;
import java.time.Duration;
import java.time.ZoneOffset;
@@ -163,7 +158,6 @@ public class SegmentServiceImpl extends AbstractServiceImpl
implements SegmentSe
}
bundleContext.addBundleListener(this);
initializeTimer();
- loadScripts();
logger.info("Segment service initialized.");
}
@@ -1239,25 +1233,6 @@ public class SegmentServiceImpl extends
AbstractServiceImpl implements SegmentSe
schedulerService.getScheduleExecutorService().scheduleAtFixedRate(task, 0,
segmentRefreshInterval, TimeUnit.MILLISECONDS);
}
- private void loadScripts() throws IOException {
- Enumeration<URL> scriptsURL =
bundleContext.getBundle().findEntries("META-INF/cxs/painless", "*.painless",
true);
- if (scriptsURL == null) {
- return;
- }
-
- Map<String, String> scriptsById = new HashMap<>();
- while (scriptsURL.hasMoreElements()) {
- URL scriptURL = scriptsURL.nextElement();
- logger.debug("Found painless script at " + scriptURL + ",
loading... ");
- try (InputStream in = scriptURL.openStream()) {
- String script = IOUtils.toString(in, StandardCharsets.UTF_8);
- String scriptId =
FilenameUtils.getBaseName(scriptURL.getPath());
- scriptsById.put(scriptId, script);
- }
- }
- persistenceService.storeScripts(scriptsById);
- }
-
public void setTaskExecutionPeriod(long taskExecutionPeriod) {
this.taskExecutionPeriod = taskExecutionPeriod;
}