This is an automated email from the ASF dual-hosted git repository.
jkevan pushed a commit to branch unomi-1.x
in repository https://gitbox.apache.org/repos/asf/unomi.git
The following commit(s) were added to refs/heads/unomi-1.x by this push:
new 1c2b78c0e UNOMI-784: task backport (#638)
1c2b78c0e is described below
commit 1c2b78c0e7682e55ba11d37ac82e4e429f52b17b
Author: kevan Jahanshahi <[email protected]>
AuthorDate: Thu Aug 3 18:05:59 2023 +0200
UNOMI-784: task backport (#638)
---
.../test/java/org/apache/unomi/itests/BaseIT.java | 1 +
.../org/apache/unomi/itests/ProfileServiceIT.java | 13 +-
.../main/resources/etc/custom.system.properties | 15 +-
.../ElasticSearchPersistenceServiceImpl.java | 190 +++++++++++----------
.../client/CustomRestHighLevelClient.java | 76 +++++++++
.../resources/OSGI-INF/blueprint/blueprint.xml | 6 +-
.../org.apache.unomi.persistence.elasticsearch.cfg | 18 +-
.../unomi/persistence/spi/PersistenceService.java | 15 ++
.../actions/MergeProfilesOnPropertyAction.java | 3 +-
9 files changed, 239 insertions(+), 98 deletions(-)
diff --git a/itests/src/test/java/org/apache/unomi/itests/BaseIT.java
b/itests/src/test/java/org/apache/unomi/itests/BaseIT.java
index c8420a324..0ba3ac46f 100644
--- a/itests/src/test/java/org/apache/unomi/itests/BaseIT.java
+++ b/itests/src/test/java/org/apache/unomi/itests/BaseIT.java
@@ -175,6 +175,7 @@ public abstract class BaseIT {
editConfigurationFilePut("etc/org.apache.karaf.features.cfg",
"serviceRequirements", "disable"),
// editConfigurationFilePut("etc/org.ops4j.pax.web.cfg",
"org.osgi.service.http.port", HTTP_PORT),
//
systemProperty("org.osgi.service.http.port").value(HTTP_PORT),
+ editConfigurationFilePut("etc/custom.system.properties",
"org.apache.unomi.elasticsearch.taskWaitingPollingInterval", "50"),
systemProperty("org.ops4j.pax.exam.rbc.rmi.port").value("1199"),
systemProperty("org.apache.unomi.itests.elasticsearch.transport.port").value("9500"),
systemProperty("org.apache.unomi.itests.elasticsearch.cluster.name").value("contextElasticSearchITests"),
diff --git a/itests/src/test/java/org/apache/unomi/itests/ProfileServiceIT.java
b/itests/src/test/java/org/apache/unomi/itests/ProfileServiceIT.java
index fe82fa749..8264f5883 100644
--- a/itests/src/test/java/org/apache/unomi/itests/ProfileServiceIT.java
+++ b/itests/src/test/java/org/apache/unomi/itests/ProfileServiceIT.java
@@ -134,8 +134,17 @@ public class ProfileServiceIT extends BaseIT {
public void testGetProfileWithWrongScrollerIdThrowException() throws
InterruptedException, NoSuchFieldException, IllegalAccessException, IOException
{
boolean throwExceptionCurrent = false;
Configuration elasticSearchConfiguration =
configurationAdmin.getConfiguration("org.apache.unomi.persistence.elasticsearch");
- if (elasticSearchConfiguration != null) {
- throwExceptionCurrent = Boolean.getBoolean((String)
elasticSearchConfiguration.getProperties().get("throwExceptions"));
+ if (elasticSearchConfiguration != null &&
elasticSearchConfiguration.getProperties().get("throwExceptions") != null) {
+ try {
+ if
(elasticSearchConfiguration.getProperties().get("throwExceptions") instanceof
String) {
+ throwExceptionCurrent = Boolean.parseBoolean((String)
elasticSearchConfiguration.getProperties().get("throwExceptions"));
+ } else {
+ // already a boolean
+ throwExceptionCurrent = (Boolean)
elasticSearchConfiguration.getProperties().get("throwExceptions");
+ }
+ } catch (Throwable e) {
+ // Not able to cast the property
+ }
}
updateConfiguration(PersistenceService.class.getName(),
"org.apache.unomi.persistence.elasticsearch", "throwExceptions", true);
diff --git a/package/src/main/resources/etc/custom.system.properties
b/package/src/main/resources/etc/custom.system.properties
index 0628777f2..5c45700e1 100644
--- a/package/src/main/resources/etc/custom.system.properties
+++ b/package/src/main/resources/etc/custom.system.properties
@@ -118,7 +118,20 @@
org.apache.unomi.elasticsearch.defaultIndex.indexMaxDocValueFieldsSearch=${env:U
org.apache.unomi.elasticsearch.defaultQueryLimit=${env:UNOMI_ELASTICSEARCH_DEFAULTQUERYLIMIT:-10}
org.apache.unomi.elasticsearch.aggregateQueryBucketSize=${env:UNOMI_ELASTICSEARCH_AGGREGATEBUCKETSIZE:-5000}
org.apache.unomi.elasticsearch.maximumIdsQueryCount=${env:UNOMI_ELASTICSEARCH_MAXIMUMIDSQUERYCOUNT:-5000}
-org.apache.unomi.elasticsearch.clientSocketTimeout=${env:UNOMI_ELASTICSEARCH_CLIENT_SOCKET_TIMEOUT:-}
+# Defines the socket timeout (SO_TIMEOUT) in milliseconds, which is the
timeout for waiting for data or, put differently, a maximum period inactivity
between two consecutive data packets).
+# A timeout value of zero is interpreted as an infinite timeout. A negative
value is interpreted as undefined (system default).
+# Default: -1 (System default)
+org.apache.unomi.elasticsearch.clientSocketTimeout=${env:UNOMI_ELASTICSEARCH_CLIENT_SOCKET_TIMEOUT:--1}
+# Defines the waiting for task completion timeout in milliseconds.
+# Some operations like update_by_query and delete_by_query are delegated to
ElasticSearch using tasks
+# For consistency the thread that trigger one of those operations will wait
for the task to be completed on ElasticSearch side.
+# This timeout configuration is here to ensure not blocking the thread
infinitely, in case of very long running tasks.
+# A timeout value of zero or negative is interpreted as an infinite timeout.
+# Default: 3600000 (one hour)
+org.apache.unomi.elasticsearch.taskWaitingTimeout=${env:UNOMI_ELASTICSEARCH_TASK_WAITING_TIMEOUT:-3600000}
+# Defines the polling interval in milliseconds, which is used to check if task
is completed on ElasticSearch side
+# Default: 1000 (1 second)
+org.apache.unomi.elasticsearch.taskWaitingPollingInterval=${env:UNOMI_ELASTICSEARCH_TASK_WAITING_POLLING_INTERVAL:-1000}
org.apache.unomi.elasticsearch.pastEventsDisablePartitions=${env:UNOMI_ELASTICSEARCH_PAST_EVENTS_DISABLE_PARTITIONS:-false}
org.apache.unomi.elasticsearch.aggQueryThrowOnMissingDocs=${env:UNOMI_ELASTICSEARCH_AGG_QUERY_THROW_ON_MISSING_DOCS:-false}
org.apache.unomi.elasticsearch.aggQueryMaxResponseSizeHttp=${env:UNOMI_ELASTICSEARCH_AGG_QUERY_MAX_RESPONSE_SIZE_HTTP:-}
diff --git
a/persistence-elasticsearch/core/src/main/java/org/apache/unomi/persistence/elasticsearch/ElasticSearchPersistenceServiceImpl.java
b/persistence-elasticsearch/core/src/main/java/org/apache/unomi/persistence/elasticsearch/ElasticSearchPersistenceServiceImpl.java
index 6c0a0fa63..589bca1b1 100644
---
a/persistence-elasticsearch/core/src/main/java/org/apache/unomi/persistence/elasticsearch/ElasticSearchPersistenceServiceImpl.java
+++
b/persistence-elasticsearch/core/src/main/java/org/apache/unomi/persistence/elasticsearch/ElasticSearchPersistenceServiceImpl.java
@@ -62,27 +62,22 @@ import org.elasticsearch.action.search.SearchScrollRequest;
import org.elasticsearch.action.support.WriteRequest;
import org.elasticsearch.action.support.master.AcknowledgedResponse;
import org.elasticsearch.action.update.UpdateRequest;
-import org.elasticsearch.client.Node;
-import org.elasticsearch.client.RequestOptions;
-import org.elasticsearch.client.Requests;
-import org.elasticsearch.client.HttpAsyncResponseConsumerFactory;
-import org.elasticsearch.client.RestClient;
-import org.elasticsearch.client.RestClientBuilder;
-import org.elasticsearch.client.RestHighLevelClient;
+import org.elasticsearch.client.*;
import org.elasticsearch.action.update.UpdateResponse;
import org.elasticsearch.client.core.CountRequest;
import org.elasticsearch.client.core.CountResponse;
import org.elasticsearch.client.core.MainResponse;
import org.elasticsearch.client.indices.*;
+import org.elasticsearch.client.tasks.GetTaskRequest;
+import org.elasticsearch.client.tasks.GetTaskResponse;
+import org.elasticsearch.client.tasks.TaskSubmissionResponse;
import org.elasticsearch.cluster.metadata.MappingMetaData;
import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.common.unit.ByteSizeUnit;
import org.elasticsearch.common.unit.ByteSizeValue;
import org.elasticsearch.common.unit.DistanceUnit;
import org.elasticsearch.common.unit.TimeValue;
-import org.elasticsearch.common.xcontent.XContentBuilder;
-import org.elasticsearch.common.xcontent.XContentFactory;
-import org.elasticsearch.common.xcontent.XContentType;
+import org.elasticsearch.common.xcontent.*;
import org.elasticsearch.index.IndexNotFoundException;
import org.elasticsearch.index.query.*;
import org.elasticsearch.index.reindex.*;
@@ -111,6 +106,7 @@ import org.elasticsearch.search.builder.SearchSourceBuilder;
import org.elasticsearch.search.sort.GeoDistanceSortBuilder;
import org.elasticsearch.search.sort.SortBuilders;
import org.elasticsearch.search.sort.SortOrder;
+import org.elasticsearch.tasks.TaskId;
import org.osgi.framework.*;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -130,18 +126,7 @@ import java.security.SecureRandom;
import java.security.cert.X509Certificate;
import java.text.ParseException;
import java.text.SimpleDateFormat;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Collections;
-import java.util.Date;
-import java.util.Enumeration;
-import java.util.HashMap;
-import java.util.Iterator;
-import java.util.LinkedHashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-import java.util.TreeSet;
+import java.util.*;
import java.util.concurrent.TimeUnit;
import static org.elasticsearch.index.query.QueryBuilders.termQuery;
@@ -165,7 +150,7 @@ public class ElasticSearchPersistenceServiceImpl implements
PersistenceService,
private static final Logger logger =
LoggerFactory.getLogger(ElasticSearchPersistenceServiceImpl.class.getName());
private boolean throwExceptions = false;
- private RestHighLevelClient client;
+ private CustomRestHighLevelClient client;
private BulkProcessor bulkProcessor;
private String elasticSearchAddresses;
private List<String> elasticSearchAddressList = new ArrayList<>();
@@ -190,6 +175,8 @@ public class ElasticSearchPersistenceServiceImpl implements
PersistenceService,
private Integer defaultQueryLimit = 10;
private Integer removeByQueryTimeoutInMinutes = 10;
+ private Integer taskWaitingTimeout = 3600000;
+ private Integer taskWaitingPollingInterval = 1000;
private String itemsMonthlyIndexedOverride = "event,session";
private String bulkProcessorConcurrentRequests = "1";
@@ -394,6 +381,18 @@ public class ElasticSearchPersistenceServiceImpl
implements PersistenceService,
this.logLevelRestClient = logLevelRestClient;
}
+ public void setTaskWaitingTimeout(String taskWaitingTimeout) {
+ if (StringUtils.isNumeric(taskWaitingTimeout)) {
+ this.taskWaitingTimeout = Integer.parseInt(taskWaitingTimeout);
+ }
+ }
+
+ public void setTaskWaitingPollingInterval(String
taskWaitingPollingInterval) {
+ if (StringUtils.isNumeric(taskWaitingPollingInterval)) {
+ this.taskWaitingPollingInterval =
Integer.parseInt(taskWaitingPollingInterval);
+ }
+ }
+
public void start() throws Exception {
// Work around to avoid ES Logs regarding the deprecated
[ignore_throttled] parameter
@@ -471,7 +470,7 @@ public class ElasticSearchPersistenceServiceImpl implements
PersistenceService,
logger.info(this.getClass().getName() + " service started
successfully.");
}
- private void buildClient() {
+ private void buildClient() throws NoSuchFieldException,
IllegalAccessException {
List<Node> nodeList = new ArrayList<>();
for (String elasticSearchAddress : elasticSearchAddressList) {
String[] elasticSearchAddressParts =
elasticSearchAddress.split(":");
@@ -526,7 +525,7 @@ public class ElasticSearchPersistenceServiceImpl implements
PersistenceService,
});
logger.info("Connecting to ElasticSearch persistence backend using
cluster name " + clusterName + " and index prefix " + indexPrefix + "...");
- client = new RestHighLevelClient(clientBuilder);
+ client = new CustomRestHighLevelClient(clientBuilder);
}
public BulkProcessor getBulkProcessor() {
@@ -989,59 +988,55 @@ public class ElasticSearchPersistenceServiceImpl
implements PersistenceService,
for (int i = 0; i < scripts.length; i++) {
builtScripts[i] = new Script(ScriptType.INLINE, "painless",
scripts[i], scriptParams[i]);
}
- return updateWithQueryAndScript(dateHint, clazz, builtScripts,
conditions);
+
+ return updateWithQueryAndScript(dateHint, new Class<?>[]{clazz},
builtScripts, conditions, true);
}
@Override
public boolean updateWithQueryAndStoredScript(Date dateHint, Class<?>
clazz, String[] scripts, Map<String, Object>[] scriptParams, Condition[]
conditions) {
+ return updateWithQueryAndStoredScript(dateHint, new Class<?>[]{clazz},
scripts, scriptParams, conditions, true);
+ }
+
+ @Override
+ public boolean updateWithQueryAndStoredScript(Date dateHint, Class<?>[]
classes, String[] scripts, Map<String, Object>[] scriptParams, Condition[]
conditions, boolean waitForComplete) {
Script[] builtScripts = new Script[scripts.length];
for (int i = 0; i < scripts.length; i++) {
builtScripts[i] = new Script(ScriptType.STORED, null, scripts[i],
scriptParams[i]);
}
- return updateWithQueryAndScript(dateHint, clazz, builtScripts,
conditions);
+ return updateWithQueryAndScript(dateHint, classes, builtScripts,
conditions, waitForComplete);
}
- private boolean updateWithQueryAndScript(final Date dateHint, final
Class<?> clazz, final Script[] scripts, final Condition[] conditions) {
+ private boolean updateWithQueryAndScript(final Date dateHint, final
Class<?>[] classes, final Script[] scripts, final Condition[] conditions,
boolean waitForComplete) {
Boolean result = new InClassLoaderExecute<Boolean>(metricsService,
this.getClass().getName() + ".updateWithQueryAndScript", this.bundleContext,
this.fatalIllegalStateErrors, throwExceptions) {
protected Boolean execute(Object... args) throws Exception {
- try {
- String itemType = Item.getItemType(clazz);
-
- String index = getIndexNameForQuery(itemType);
+ String[] itemTypes =
Arrays.stream(classes).map(Item::getItemType).toArray(String[]::new);
+ String[] indices = Arrays.stream(itemTypes).map(itemType ->
getIndexNameForQuery(itemType)).toArray(String[]::new);
+ try {
for (int i = 0; i < scripts.length; i++) {
- RefreshRequest refreshRequest = new
RefreshRequest(index);
+ RefreshRequest refreshRequest = new
RefreshRequest(indices);
client.indices().refresh(refreshRequest,
RequestOptions.DEFAULT);
- UpdateByQueryRequest updateByQueryRequest = new
UpdateByQueryRequest(index);
+ QueryBuilder queryBuilder =
conditionESQueryBuilderDispatcher.buildFilter(conditions[i]);
+ UpdateByQueryRequest updateByQueryRequest = new
UpdateByQueryRequest(indices);
updateByQueryRequest.setConflicts("proceed");
updateByQueryRequest.setMaxRetries(1000);
updateByQueryRequest.setSlices(2);
updateByQueryRequest.setScript(scripts[i]);
-
updateByQueryRequest.setQuery(conditionESQueryBuilderDispatcher.buildFilter(conditions[i]));
-
- BulkByScrollResponse response =
client.updateByQuery(updateByQueryRequest, RequestOptions.DEFAULT);
+ updateByQueryRequest.setQuery(queryBuilder);
- if (response.getBulkFailures().size() > 0) {
- for (BulkItemResponse.Failure failure :
response.getBulkFailures()) {
- logger.error("Failure : cause={} ,
message={}", failure.getCause(), failure.getMessage());
- }
+ TaskSubmissionResponse taskResponse =
client.submitUpdateByQuery(updateByQueryRequest, RequestOptions.DEFAULT);
+ if (taskResponse == null) {
+ logger.error("update with query and script: no
response returned for query: {}", queryBuilder);
+ } else if (waitForComplete) {
+ waitForTaskComplete(updateByQueryRequest,
taskResponse);
} else {
- logger.info("Update with query and script
processed {} entries in {}.", response.getUpdated(),
response.getTook().toString());
- }
- if (response.isTimedOut()) {
- logger.error("Update with query and script ended
with timeout!");
- }
- if (response.getVersionConflicts() > 0) {
- logger.warn("Update with query and script ended
with {} version conflicts!", response.getVersionConflicts());
- }
- if (response.getNoops() > 0) {
- logger.warn("Update Bwith query and script ended
with {} noops!", response.getNoops());
+ logger.debug("ES task started {}",
taskResponse.getTask());
}
}
return true;
} catch (IndexNotFoundException e) {
- throw new Exception("No index found for itemType=" +
clazz.getName(), e);
+ throw new Exception("No index found for itemTypes=" +
String.join(",", itemTypes), e);
} catch (ScriptException e) {
logger.error("Error in the update script : {}\n{}\n{}",
e.getScript(), e.getDetailedMessage(), e.getScriptStack());
throw new Exception("Error in the update script");
@@ -1055,6 +1050,53 @@ public class ElasticSearchPersistenceServiceImpl
implements PersistenceService,
}
}
+ private void waitForTaskComplete(AbstractBulkByScrollRequest request,
TaskSubmissionResponse response) {
+ if (logger.isDebugEnabled()) {
+ logger.debug("Waiting task [{}]: [{}] using query: [{}], polling
every {}ms with a timeout configured to {}ms",
+ response.getTask(), request.toString(),
request.getSearchRequest().source().query(), taskWaitingPollingInterval,
taskWaitingTimeout);
+ }
+ long start = System.currentTimeMillis();
+ new InClassLoaderExecute<Void>(metricsService,
this.getClass().getName() + ".waitForTask", this.bundleContext,
this.fatalIllegalStateErrors, throwExceptions) {
+ protected Void execute(Object... args) throws Exception {
+
+ TaskId taskId = new TaskId(response.getTask());
+ while (true){
+ Optional<GetTaskResponse> getTaskResponseOptional =
client.tasks().get(new GetTaskRequest(taskId.getNodeId(), taskId.getId()),
RequestOptions.DEFAULT);
+ if (getTaskResponseOptional.isPresent()) {
+ GetTaskResponse getTaskResponse =
getTaskResponseOptional.get();
+ if (getTaskResponse.isCompleted()) {
+ if (logger.isDebugEnabled()) {
+ long millis =
getTaskResponse.getTaskInfo().getRunningTimeNanos() / 1_000_000;
+ long seconds = millis / 1000;
+
+ logger.debug("Waiting task [{}]: Finished in
{} {}", taskId,
+ seconds >= 1 ? seconds : millis,
+ seconds >= 1 ? "seconds" :
"milliseconds");
+ }
+ break;
+ } else {
+ if ((start + taskWaitingTimeout) <
System.currentTimeMillis()) {
+ logger.error("Waiting task [{}]: Exceeded
configured timeout ({}ms), aborting wait process", taskId, taskWaitingTimeout);
+ break;
+ }
+
+ try {
+ Thread.sleep(taskWaitingPollingInterval);
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ throw new IllegalStateException("Waiting task
[{}]: interrupted");
+ }
+ }
+ } else {
+ logger.error("Waiting task [{}]: No task found",
taskId);
+ break;
+ }
+ }
+ return null;
+ }
+ }.catchingExecuteInClassLoader(true);
+ }
+
@Override
public boolean storeScripts(Map<String, String> scripts) {
Boolean result = new InClassLoaderExecute<Boolean>(metricsService,
this.getClass().getName() + ".storeScripts", this.bundleContext,
this.fatalIllegalStateErrors, throwExceptions) {
@@ -1161,8 +1203,9 @@ public class ElasticSearchPersistenceServiceImpl
implements PersistenceService,
protected Boolean execute(Object... args) throws Exception {
try {
String itemType = Item.getItemType(clazz);
+ QueryBuilder queryBuilder =
conditionESQueryBuilderDispatcher.getQueryBuilder(query);
final DeleteByQueryRequest deleteByQueryRequest = new
DeleteByQueryRequest(getIndexNameForQuery(itemType))
-
.setQuery(conditionESQueryBuilderDispatcher.getQueryBuilder(query))
+ .setQuery(queryBuilder)
// Setting slices to auto will let Elasticsearch
choose the number of slices to use.
// This setting will use one slice per shard, up
to a certain limit.
// The delete request will be more efficient and
faster than no slicing.
@@ -1176,46 +1219,14 @@ public class ElasticSearchPersistenceServiceImpl
implements PersistenceService,
// So we increase default timeout of 1min to 10min
.setTimeout(TimeValue.timeValueMinutes(removeByQueryTimeoutInMinutes));
- BulkByScrollResponse bulkByScrollResponse =
client.deleteByQuery(deleteByQueryRequest, RequestOptions.DEFAULT);
+ TaskSubmissionResponse taskResponse =
client.submitDeleteByQuery(deleteByQueryRequest, RequestOptions.DEFAULT);
- if (bulkByScrollResponse == null) {
- logger.error("Remove by query: no response returned
for query: {}", query);
+ if (taskResponse == null) {
+ logger.error("Remove by query: no response returned
for query: {}", queryBuilder);
return false;
}
- if (bulkByScrollResponse.isTimedOut()) {
- logger.warn("Remove by query: timed out because took
more than {} minutes for query: {}", removeByQueryTimeoutInMinutes, query);
- }
-
- if ((bulkByScrollResponse.getSearchFailures() != null &&
bulkByScrollResponse.getSearchFailures().size() > 0) ||
- bulkByScrollResponse.getBulkFailures() != null &&
bulkByScrollResponse.getBulkFailures().size() > 0) {
- logger.warn("Remove by query: we found some failure
during the process of query: {}", query);
-
-
- if (bulkByScrollResponse.getSearchFailures() != null
&& bulkByScrollResponse.getSearchFailures().size() > 0) {
- for (ScrollableHitSource.SearchFailure
searchFailure : bulkByScrollResponse.getSearchFailures()) {
- logger.warn("Remove by query, search failure:
{}", searchFailure.toString());
- }
- }
-
- if (bulkByScrollResponse.getBulkFailures() != null &&
bulkByScrollResponse.getBulkFailures().size() > 0) {
- for (BulkItemResponse.Failure bulkFailure :
bulkByScrollResponse.getBulkFailures()) {
- logger.warn("Remove by query, bulk failure:
{}", bulkFailure.toString());
- }
- }
- }
-
- if (logger.isDebugEnabled()) {
- logger.debug("Remove by query: took {}, deleted docs:
{}, batches executed: {}, skipped docs: {}, version conflicts: {}, search
retries: {}, bulk retries: {}, for query: {}",
-
bulkByScrollResponse.getTook().toHumanReadableString(1),
- bulkByScrollResponse.getDeleted(),
- bulkByScrollResponse.getBatches(),
- bulkByScrollResponse.getNoops(),
- bulkByScrollResponse.getVersionConflicts(),
- bulkByScrollResponse.getSearchRetries(),
- bulkByScrollResponse.getBulkRetries(),
- query);
- }
+ waitForTaskComplete(deleteByQueryRequest, taskResponse);
return true;
} catch (Exception e) {
@@ -1230,7 +1241,6 @@ public class ElasticSearchPersistenceServiceImpl
implements PersistenceService,
}
}
-
public boolean indexTemplateExists(final String templateName) {
Boolean result = new InClassLoaderExecute<Boolean>(metricsService,
this.getClass().getName() + ".indexTemplateExists", this.bundleContext,
this.fatalIllegalStateErrors, throwExceptions) {
protected Boolean execute(Object... args) throws IOException {
diff --git
a/persistence-elasticsearch/core/src/main/java/org/elasticsearch/client/CustomRestHighLevelClient.java
b/persistence-elasticsearch/core/src/main/java/org/elasticsearch/client/CustomRestHighLevelClient.java
new file mode 100644
index 000000000..8fff8dea6
--- /dev/null
+++
b/persistence-elasticsearch/core/src/main/java/org/elasticsearch/client/CustomRestHighLevelClient.java
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.elasticsearch.client;
+
+import org.elasticsearch.client.tasks.TaskSubmissionResponse;
+import org.elasticsearch.index.reindex.DeleteByQueryRequest;
+import org.elasticsearch.index.reindex.UpdateByQueryRequest;
+
+import java.io.IOException;
+
+import static java.util.Collections.emptySet;
+
+/**
+ * A custom Rest high level client that provide a way of using Task system on
updateByQuery and deleteByQuery,
+ * by returning the response immediately (wait_for_completion set to false)
+ * see org.elasticsearch.client.RestHighLevelClient for original code.
+ */
+public class CustomRestHighLevelClient extends RestHighLevelClient {
+
+ public CustomRestHighLevelClient(RestClientBuilder restClientBuilder) {
+ super(restClientBuilder);
+ }
+
+ /**
+ * Executes a delete by query request.
+ * See <a
href="https://www.elastic.co/guide/en/elasticsearch/reference/current/docs-delete-by-query.html">
+ * Delete By Query API on elastic.co</a>
+ *
+ * @param deleteByQueryRequest the request
+ * @param options the request options (e.g. headers), use
{@link RequestOptions#DEFAULT} if nothing needs to be customized
+ * @return the response
+ */
+ public final TaskSubmissionResponse
submitDeleteByQuery(DeleteByQueryRequest deleteByQueryRequest, RequestOptions
options) throws IOException {
+ return performRequestAndParseEntity(
+ deleteByQueryRequest, innerDeleteByQueryRequest -> {
+ Request request =
RequestConverters.deleteByQuery(innerDeleteByQueryRequest);
+ request.addParameter("wait_for_completion", "false");
+ return request;
+ }, options, TaskSubmissionResponse::fromXContent, emptySet()
+ );
+ }
+
+ /**
+ * Executes a update by query request.
+ * See <a
href="https://www.elastic.co/guide/en/elasticsearch/reference/current/docs-update-by-query.html">
+ * Update By Query API on elastic.co</a>
+ *
+ * @param updateByQueryRequest the request
+ * @param options the request options (e.g. headers), use
{@link RequestOptions#DEFAULT} if nothing needs to be customized
+ * @return the response
+ */
+ public final TaskSubmissionResponse
submitUpdateByQuery(UpdateByQueryRequest updateByQueryRequest, RequestOptions
options) throws IOException {
+ return performRequestAndParseEntity(
+ updateByQueryRequest, innerUpdateByQueryRequest -> {
+ Request request =
RequestConverters.updateByQuery(updateByQueryRequest);
+ request.addParameter("wait_for_completion", "false");
+ return request;
+ }, options, TaskSubmissionResponse::fromXContent, emptySet()
+ );
+ }
+}
diff --git
a/persistence-elasticsearch/core/src/main/resources/OSGI-INF/blueprint/blueprint.xml
b/persistence-elasticsearch/core/src/main/resources/OSGI-INF/blueprint/blueprint.xml
index 9c3800f76..bdf2dcc12 100644
---
a/persistence-elasticsearch/core/src/main/resources/OSGI-INF/blueprint/blueprint.xml
+++
b/persistence-elasticsearch/core/src/main/resources/OSGI-INF/blueprint/blueprint.xml
@@ -54,7 +54,9 @@
<cm:property name="maximalElasticSearchVersion" value="8.0.0" />
<cm:property name="aggregateQueryBucketSize" value="5000" />
- <cm:property name="clientSocketTimeout" value="" />
+ <cm:property name="clientSocketTimeout" value="-1" />
+ <cm:property name="taskWaitingTimeout" value="3600000" />
+ <cm:property name="taskWaitingPollingInterval" value="1000" />
<cm:property name="aggQueryMaxResponseSizeHttp" value="" />
<cm:property name="aggQueryThrowOnMissingDocs" value="false" />
<cm:property name="itemTypeToRefreshPolicy" value="" />
@@ -131,6 +133,8 @@
<property name="itemTypeToRefreshPolicy"
value="${es.itemTypeToRefreshPolicy}" />
<property name="clientSocketTimeout" value="${es.clientSocketTimeout}"
/>
+ <property name="taskWaitingTimeout" value="${es.taskWaitingTimeout}" />
+ <property name="taskWaitingPollingInterval"
value="${es.taskWaitingPollingInterval}" />
<property name="metricsService" ref="metricsService" />
<property name="useBatchingForSave" value="${es.useBatchingForSave}" />
diff --git
a/persistence-elasticsearch/core/src/main/resources/org.apache.unomi.persistence.elasticsearch.cfg
b/persistence-elasticsearch/core/src/main/resources/org.apache.unomi.persistence.elasticsearch.cfg
index b996c2a1c..5af2232e4 100644
---
a/persistence-elasticsearch/core/src/main/resources/org.apache.unomi.persistence.elasticsearch.cfg
+++
b/persistence-elasticsearch/core/src/main/resources/org.apache.unomi.persistence.elasticsearch.cfg
@@ -57,8 +57,22 @@
maximumIdsQueryCount=${org.apache.unomi.elasticsearch.maximumIdsQueryCount:-5000
# Disable partitions on aggregation queries for past events.
pastEventsDisablePartitions=${org.apache.unomi.elasticsearch.pastEventsDisablePartitions:-false}
-# max socket timeout in millis
-clientSocketTimeout=${org.apache.unomi.elasticsearch.clientSocketTimeout:-}
+# Defines the socket timeout (SO_TIMEOUT) in milliseconds, which is the
timeout for waiting for data or, put differently, a maximum period inactivity
between two consecutive data packets).
+# A timeout value of zero is interpreted as an infinite timeout. A negative
value is interpreted as undefined (system default).
+# Default: -1
+clientSocketTimeout=${org.apache.unomi.elasticsearch.clientSocketTimeout:--1}
+
+# Defines the waiting for task completion timeout in milliseconds.
+# Some operations like update_by_query and delete_by_query are delegated to
ElasticSearch using tasks
+# For consistency the thread that trigger one of those operations will wait
for the task to be completed on ElasticSearch side.
+# This timeout configuration is here to ensure not blocking the thread
infinitely, in case of very long running tasks.
+# A timeout value of zero or negative is interpreted as an infinite timeout.
+# Default: 3600000 (1 hour)
+taskWaitingTimeout=${org.apache.unomi.elasticsearch.taskWaitingTimeout:-3600000}
+
+# Defines the polling interval in milliseconds, which is used to check if task
is completed on ElasticSearch side
+# Default: 1000 (1 second)
+taskWaitingPollingInterval=${org.apache.unomi.elasticsearch.taskWaitingPollingInterval:-1000}
# refresh policy per item type in Json.
# Valid values are WAIT_UNTIL/IMMEDIATE/NONE. The default refresh policy is
NONE.
diff --git
a/persistence-spi/src/main/java/org/apache/unomi/persistence/spi/PersistenceService.java
b/persistence-spi/src/main/java/org/apache/unomi/persistence/spi/PersistenceService.java
index c4e62f66f..c4cea9943 100644
---
a/persistence-spi/src/main/java/org/apache/unomi/persistence/spi/PersistenceService.java
+++
b/persistence-spi/src/main/java/org/apache/unomi/persistence/spi/PersistenceService.java
@@ -205,6 +205,21 @@ public interface PersistenceService {
*/
boolean updateWithQueryAndStoredScript(Date dateHint, Class<?> clazz,
String[] scripts, Map<String, Object>[] scriptParams, Condition[] conditions);
+ /**
+ * Updates the items of the specified class by a query with a new property
value for the specified property name
+ * based on provided stored scripts and script parameters,
+ * This one is able to perform an update on multiple types in a single
run, be careful with your query as it will be performed on all of them.
+ *
+ * @param dateHint a Date helping in identifying where the item is
located
+ * @param classes classes of items to update, be careful all of them
will be submitted to update for all scripts/conditions
+ * @param scripts Stored scripts name
+ * @param scriptParams script params array
+ * @param conditions conditions array
+ * @param waitForComplete if true, wait for the ES execution to be complete
+ * @return {@code true} if the update was successful, {@code false}
otherwise
+ */
+ boolean updateWithQueryAndStoredScript(Date dateHint, Class<?>[] classes,
String[] scripts, Map<String, Object>[] scriptParams, Condition[] conditions,
boolean waitForComplete);
+
/**
* Store script in the Database for later usage with
updateWithQueryAndStoredScript function for example.
*
diff --git
a/plugins/baseplugin/src/main/java/org/apache/unomi/plugins/baseplugin/actions/MergeProfilesOnPropertyAction.java
b/plugins/baseplugin/src/main/java/org/apache/unomi/plugins/baseplugin/actions/MergeProfilesOnPropertyAction.java
index 3d90f6317..bef940e0d 100644
---
a/plugins/baseplugin/src/main/java/org/apache/unomi/plugins/baseplugin/actions/MergeProfilesOnPropertyAction.java
+++
b/plugins/baseplugin/src/main/java/org/apache/unomi/plugins/baseplugin/actions/MergeProfilesOnPropertyAction.java
@@ -201,8 +201,7 @@ public class MergeProfilesOnPropertyAction implements
ActionExecutor {
Map<String, Object>[] scriptParams = new
Map[]{Collections.singletonMap("profileId", masterProfileId)};
Condition[] conditions = new
Condition[]{profileIdsCondition};
- persistenceService.updateWithQueryAndStoredScript(null,
Session.class, scripts, scriptParams, conditions);
- persistenceService.updateWithQueryAndStoredScript(null,
Event.class, scripts, scriptParams, conditions);
+ persistenceService.updateWithQueryAndStoredScript(null,
new Class[]{Session.class, Event.class}, scripts, scriptParams, conditions,
false);
} else {
for (String mergedProfileId : mergedProfileIds) {
privacyService.anonymizeBrowsingData(mergedProfileId);