This is an automated email from the ASF dual-hosted git repository. jkevan pushed a commit to branch upgradeESSocketTimeout in repository https://gitbox.apache.org/repos/asf/unomi.git
commit 5de02d8995fe7b67748c51128c4f28af4ca09897 Author: Kevan <ke...@jahia.com> AuthorDate: Thu Jul 13 20:48:56 2023 +0200 UNOMI-784: set ES client socket timeout to 80sec instead of 30sec by default. --- itests/pom.xml | 1 + .../test/java/org/apache/unomi/itests/BaseIT.java | 19 ++++-- .../java/org/apache/unomi/itests/SegmentIT.java | 79 ++++++++++++++++++++-- .../main/resources/etc/custom.system.properties | 2 +- .../resources/OSGI-INF/blueprint/blueprint.xml | 2 +- .../org.apache.unomi.persistence.elasticsearch.cfg | 2 +- 6 files changed, 90 insertions(+), 15 deletions(-) diff --git a/itests/pom.xml b/itests/pom.xml index 24e8b65ce..8bd5fd908 100644 --- a/itests/pom.xml +++ b/itests/pom.xml @@ -226,6 +226,7 @@ </environmentVariables> <instanceSettings> <properties> + <xpack.ml.enabled>false</xpack.ml.enabled> <path.repo>${project.build.directory}/snapshots_repository</path.repo> <cluster.routing.allocation.disk.threshold_enabled>false</cluster.routing.allocation.disk.threshold_enabled> <http.cors.allow-origin>*</http.cors.allow-origin> diff --git a/itests/src/test/java/org/apache/unomi/itests/BaseIT.java b/itests/src/test/java/org/apache/unomi/itests/BaseIT.java index 7e70e40ab..20aa733a5 100644 --- a/itests/src/test/java/org/apache/unomi/itests/BaseIT.java +++ b/itests/src/test/java/org/apache/unomi/itests/BaseIT.java @@ -93,12 +93,7 @@ import java.security.KeyManagementException; import java.security.NoSuchAlgorithmException; import java.security.SecureRandom; import java.security.cert.X509Certificate; -import java.util.ArrayList; -import java.util.Arrays; -import java.util.Dictionary; -import java.util.List; -import java.util.Map; -import java.util.Objects; +import java.util.*; import java.util.concurrent.CountDownLatch; import java.util.function.Predicate; import java.util.function.Supplier; @@ -381,13 +376,23 @@ public abstract class BaseIT extends KarafTestSupport { persistenceService = getService(PersistenceService.class); definitionsService = getService(DefinitionsService.class); rulesService = getService(RulesService.class); + segmentService = getService(SegmentService.class); } public void updateConfiguration(String serviceName, String configPid, String propName, Object propValue) throws InterruptedException, IOException { + Map<String, Object> props = new HashMap<>(); + props.put(propName, propValue); + updateConfiguration(serviceName, configPid, props); + } + + public void updateConfiguration(String serviceName, String configPid, Map<String, Object> propsToSet) + throws InterruptedException, IOException { org.osgi.service.cm.Configuration cfg = configurationAdmin.getConfiguration(configPid); Dictionary<String, Object> props = cfg.getProperties(); - props.put(propName, propValue); + for (Map.Entry<String, Object> propToSet : propsToSet.entrySet()) { + props.put(propToSet.getKey(), propToSet.getValue()); + } waitForReRegistration(serviceName, () -> { try { diff --git a/itests/src/test/java/org/apache/unomi/itests/SegmentIT.java b/itests/src/test/java/org/apache/unomi/itests/SegmentIT.java index 12fb99da8..c4c3d1a9c 100644 --- a/itests/src/test/java/org/apache/unomi/itests/SegmentIT.java +++ b/itests/src/test/java/org/apache/unomi/itests/SegmentIT.java @@ -39,6 +39,7 @@ import org.ops4j.pax.exam.junit.PaxExam; import org.ops4j.pax.exam.spi.reactors.ExamReactorStrategy; import org.ops4j.pax.exam.spi.reactors.PerSuite; import org.ops4j.pax.exam.util.Filter; +import org.osgi.service.cm.Configuration; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -46,11 +47,7 @@ import javax.inject.Inject; import java.text.SimpleDateFormat; import java.time.LocalDate; import java.time.ZoneId; -import java.util.ArrayList; -import java.util.Collections; -import java.util.Date; -import java.util.List; -import java.util.Map; +import java.util.*; @RunWith(PaxExam.class) @ExamReactorStrategy(PerSuite.class) @@ -730,4 +727,76 @@ public class SegmentIT extends BaseIT { updatedProfile.getScores() == null || !updatedProfile.getScores().containsKey("relative-date-scoring-test")), 1000, 20); } + + + @Test + public void testScoringRecalculationTimeout() throws Exception { + Configuration elasticSearchConfiguration = configurationAdmin.getConfiguration("org.apache.unomi.persistence.elasticsearch"); + String currentClientSocketTimeout = (String) elasticSearchConfiguration.getProperties().get("clientSocketTimeout"); + + // create a lot of profiles + for (int i = 0; i <= 100000; i++) { + String profileId = "test_profile_id_" + i; + + LOGGER.info("Creating profile: " + profileId); + Profile profile = new Profile(); + profile.setSystemProperty("lastUpdated", new Date()); + profile.setItemId(profileId); + + persistenceService.save(profile, true); + + // save events for the profiles + Date timestampEventInRange = new SimpleDateFormat("yyyy-MM-dd").parse("2000-10-30"); + persistenceService.save(new Event("test-event-type", null, profile, null, null, profile, timestampEventInRange), true); + } + refreshPersistence(Event.class, Profile.class); + + // create the past event condition + Condition pastEventCondition = new Condition(definitionsService.getConditionType("pastEventCondition")); + pastEventCondition.setParameter("minimumEventCount", 1); + pastEventCondition.setParameter("maximumEventCount", 2); + + pastEventCondition.setParameter("fromDate", "2000-07-15T07:00:00Z"); + pastEventCondition.setParameter("toDate", "2001-01-15T07:00:00Z"); + ; + Condition pastEventEventCondition = new Condition(definitionsService.getConditionType("eventTypeCondition")); + pastEventEventCondition.setParameter("eventTypeId", "test-event-type"); + pastEventCondition.setParameter("eventCondition", pastEventEventCondition); + + // create the scoring + Metadata scoringMetadata = new Metadata("scoring-recalculation-socket-timeout"); + Scoring scoring = new Scoring(scoringMetadata); + List<ScoringElement> scoringElements = new ArrayList<>(); + ScoringElement scoringElement = new ScoringElement(); + scoringElement.setCondition(pastEventCondition); + scoringElement.setValue(50); + scoringElements.add(scoringElement); + scoring.setElements(scoringElements); + + Map<String, Object> configUpdate = new HashMap<>(); + configUpdate.put("clientSocketTimeout", "50"); + configUpdate.put("throwExceptions", true); + updateConfiguration(PersistenceService.class.getName(), "org.apache.unomi.persistence.elasticsearch", configUpdate); + + boolean exceptionTriggered = false; + try { + segmentService.setScoringDefinition(scoring); + } catch (Exception e) { + // This is expected since we reduce the SocketTimeout to be really short. + exceptionTriggered = true; + } finally { + configUpdate.put("clientSocketTimeout", currentClientSocketTimeout); + configUpdate.put("throwExceptions", false); + updateConfiguration(PersistenceService.class.getName(), "org.apache.unomi.persistence.elasticsearch", configUpdate); + } + + Assert.assertTrue("We should have a SocketTimeoutException due to reduce timeout", exceptionTriggered); + keepTrying("Check that profiles are correctly updated even in case of SocketTimeoutException", () -> { + Condition condition = new Condition(definitionsService.getConditionType("scoringCondition")); + condition.setParameter("scoringPlanId", "scoring-recalculation-socket-timeout"); + condition.setParameter("scoreValue", 50); + return persistenceService.queryCount(condition, Profile.ITEM_TYPE); + }, + count -> count == 100001, DEFAULT_TRYING_TIMEOUT, DEFAULT_TRYING_TRIES); + } } diff --git a/package/src/main/resources/etc/custom.system.properties b/package/src/main/resources/etc/custom.system.properties index eb65ab026..9f5bf5bd8 100644 --- a/package/src/main/resources/etc/custom.system.properties +++ b/package/src/main/resources/etc/custom.system.properties @@ -132,7 +132,7 @@ org.apache.unomi.elasticsearch.defaultIndex.indexMaxDocValueFieldsSearch=${env:U org.apache.unomi.elasticsearch.defaultQueryLimit=${env:UNOMI_ELASTICSEARCH_DEFAULTQUERYLIMIT:-10} org.apache.unomi.elasticsearch.aggregateQueryBucketSize=${env:UNOMI_ELASTICSEARCH_AGGREGATEBUCKETSIZE:-5000} org.apache.unomi.elasticsearch.maximumIdsQueryCount=${env:UNOMI_ELASTICSEARCH_MAXIMUMIDSQUERYCOUNT:-5000} -org.apache.unomi.elasticsearch.clientSocketTimeout=${env:UNOMI_ELASTICSEARCH_CLIENT_SOCKET_TIMEOUT:-} +org.apache.unomi.elasticsearch.clientSocketTimeout=${env:UNOMI_ELASTICSEARCH_CLIENT_SOCKET_TIMEOUT:-180000} org.apache.unomi.elasticsearch.pastEventsDisablePartitions=${env:UNOMI_ELASTICSEARCH_PAST_EVENTS_DISABLE_PARTITIONS:-false} org.apache.unomi.elasticsearch.aggQueryThrowOnMissingDocs=${env:UNOMI_ELASTICSEARCH_AGG_QUERY_THROW_ON_MISSING_DOCS:-false} org.apache.unomi.elasticsearch.aggQueryMaxResponseSizeHttp=${env:UNOMI_ELASTICSEARCH_AGG_QUERY_MAX_RESPONSE_SIZE_HTTP:-} diff --git a/persistence-elasticsearch/core/src/main/resources/OSGI-INF/blueprint/blueprint.xml b/persistence-elasticsearch/core/src/main/resources/OSGI-INF/blueprint/blueprint.xml index 507d8789f..3c522707d 100644 --- a/persistence-elasticsearch/core/src/main/resources/OSGI-INF/blueprint/blueprint.xml +++ b/persistence-elasticsearch/core/src/main/resources/OSGI-INF/blueprint/blueprint.xml @@ -63,7 +63,7 @@ <cm:property name="maximalElasticSearchVersion" value="8.0.0" /> <cm:property name="aggregateQueryBucketSize" value="5000" /> - <cm:property name="clientSocketTimeout" value="" /> + <cm:property name="clientSocketTimeout" value="180000" /> <cm:property name="aggQueryMaxResponseSizeHttp" value="" /> <cm:property name="aggQueryThrowOnMissingDocs" value="false" /> <cm:property name="itemTypeToRefreshPolicy" value="" /> diff --git a/persistence-elasticsearch/core/src/main/resources/org.apache.unomi.persistence.elasticsearch.cfg b/persistence-elasticsearch/core/src/main/resources/org.apache.unomi.persistence.elasticsearch.cfg index 086941e80..ad4144558 100644 --- a/persistence-elasticsearch/core/src/main/resources/org.apache.unomi.persistence.elasticsearch.cfg +++ b/persistence-elasticsearch/core/src/main/resources/org.apache.unomi.persistence.elasticsearch.cfg @@ -74,7 +74,7 @@ maximumIdsQueryCount=${org.apache.unomi.elasticsearch.maximumIdsQueryCount:-5000 pastEventsDisablePartitions=${org.apache.unomi.elasticsearch.pastEventsDisablePartitions:-false} # max socket timeout in millis -clientSocketTimeout=${org.apache.unomi.elasticsearch.clientSocketTimeout:-} +clientSocketTimeout=${org.apache.unomi.elasticsearch.clientSocketTimeout:-180000} # refresh policy per item type in Json. # Valid values are WAIT_UNTIL/IMMEDIATE/NONE. The default refresh policy is NONE.