This is an automated email from the ASF dual-hosted git repository.
jkevan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/unomi.git
The following commit(s) were added to refs/heads/master by this push:
new 64169942b UNOMI-784: use tasks to perform update_by_query and
delete_by_query (#636)
64169942b is described below
commit 64169942b2ca16ee4bd9d1a5cd2f047fd3c194d8
Author: kevan Jahanshahi <[email protected]>
AuthorDate: Thu Aug 3 12:28:34 2023 +0200
UNOMI-784: use tasks to perform update_by_query and delete_by_query (#636)
* UNOMI-784: set ES client socket timeout to 80sec instead of 30sec by
default.
* use wait_for_completion for update by script / delete queries
* Fix integration test
* UNOMI-784: simplify the client wrapper
* UNOMI-784: add some clarity to config related to socket timeout
* UNOMI-784: wait for task to complete
* UNOMI-784: improve merge updateByQuery to perform a single task instead
of two separates
* UNOMI-784: Typo
* UNOMI-784: add configurations regarding tasks waiting timeout and polling
interval
* UNOMI-784: set task completion log level to debug
* Don't wait for task to be complete for merge profile ES script
* remove non required test
* UNOMI-784: Try speed up the tests and set waitForCompletion to be true in
service
---------
Co-authored-by: David Griffon <[email protected]>
---
itests/pom.xml | 1 +
.../test/java/org/apache/unomi/itests/BaseIT.java | 20 +-
.../org/apache/unomi/itests/ProfileServiceIT.java | 13 +-
.../java/org/apache/unomi/itests/SegmentIT.java | 7 +-
.../main/resources/etc/custom.system.properties | 15 +-
.../ElasticSearchPersistenceServiceImpl.java | 211 +++++++++++++--------
.../client/CustomRestHighLevelClient.java | 76 ++++++++
.../resources/OSGI-INF/blueprint/blueprint.xml | 6 +-
.../org.apache.unomi.persistence.elasticsearch.cfg | 18 +-
.../unomi/persistence/spi/PersistenceService.java | 14 ++
.../actions/MergeProfilesOnPropertyAction.java | 3 +-
11 files changed, 282 insertions(+), 102 deletions(-)
diff --git a/itests/pom.xml b/itests/pom.xml
index 24e8b65ce..8bd5fd908 100644
--- a/itests/pom.xml
+++ b/itests/pom.xml
@@ -226,6 +226,7 @@
</environmentVariables>
<instanceSettings>
<properties>
+ <xpack.ml.enabled>false</xpack.ml.enabled>
<path.repo>${project.build.directory}/snapshots_repository</path.repo>
<cluster.routing.allocation.disk.threshold_enabled>false</cluster.routing.allocation.disk.threshold_enabled>
<http.cors.allow-origin>*</http.cors.allow-origin>
diff --git a/itests/src/test/java/org/apache/unomi/itests/BaseIT.java
b/itests/src/test/java/org/apache/unomi/itests/BaseIT.java
index 7e70e40ab..28cf43c7b 100644
--- a/itests/src/test/java/org/apache/unomi/itests/BaseIT.java
+++ b/itests/src/test/java/org/apache/unomi/itests/BaseIT.java
@@ -93,12 +93,7 @@ import java.security.KeyManagementException;
import java.security.NoSuchAlgorithmException;
import java.security.SecureRandom;
import java.security.cert.X509Certificate;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Dictionary;
-import java.util.List;
-import java.util.Map;
-import java.util.Objects;
+import java.util.*;
import java.util.concurrent.CountDownLatch;
import java.util.function.Predicate;
import java.util.function.Supplier;
@@ -256,6 +251,7 @@ public abstract class BaseIT extends KarafTestSupport {
editConfigurationFilePut("etc/custom.system.properties",
"org.apache.unomi.graphql.feature.activated", "true"),
editConfigurationFilePut("etc/custom.system.properties",
"org.apache.unomi.elasticsearch.cluster.name", "contextElasticSearchITests"),
editConfigurationFilePut("etc/custom.system.properties",
"org.apache.unomi.elasticsearch.addresses", "localhost:9400"),
+ editConfigurationFilePut("etc/custom.system.properties",
"org.apache.unomi.elasticsearch.taskWaitingPollingInterval", "50"),
systemProperty("org.ops4j.pax.exam.rbc.rmi.port").value("1199"),
systemProperty("org.apache.unomi.hazelcast.group.name").value("cellar"),
@@ -381,13 +377,23 @@ public abstract class BaseIT extends KarafTestSupport {
persistenceService = getService(PersistenceService.class);
definitionsService = getService(DefinitionsService.class);
rulesService = getService(RulesService.class);
+ segmentService = getService(SegmentService.class);
}
public void updateConfiguration(String serviceName, String configPid,
String propName, Object propValue)
throws InterruptedException, IOException {
+ Map<String, Object> props = new HashMap<>();
+ props.put(propName, propValue);
+ updateConfiguration(serviceName, configPid, props);
+ }
+
+ public void updateConfiguration(String serviceName, String configPid,
Map<String, Object> propsToSet)
+ throws InterruptedException, IOException {
org.osgi.service.cm.Configuration cfg =
configurationAdmin.getConfiguration(configPid);
Dictionary<String, Object> props = cfg.getProperties();
- props.put(propName, propValue);
+ for (Map.Entry<String, Object> propToSet : propsToSet.entrySet()) {
+ props.put(propToSet.getKey(), propToSet.getValue());
+ }
waitForReRegistration(serviceName, () -> {
try {
diff --git a/itests/src/test/java/org/apache/unomi/itests/ProfileServiceIT.java
b/itests/src/test/java/org/apache/unomi/itests/ProfileServiceIT.java
index 306f9d8c8..623904938 100644
--- a/itests/src/test/java/org/apache/unomi/itests/ProfileServiceIT.java
+++ b/itests/src/test/java/org/apache/unomi/itests/ProfileServiceIT.java
@@ -155,8 +155,17 @@ public class ProfileServiceIT extends BaseIT {
throws InterruptedException, NoSuchFieldException,
IllegalAccessException, IOException {
boolean throwExceptionCurrent = false;
Configuration elasticSearchConfiguration =
configurationAdmin.getConfiguration("org.apache.unomi.persistence.elasticsearch");
- if (elasticSearchConfiguration != null) {
- throwExceptionCurrent = Boolean.getBoolean((String)
elasticSearchConfiguration.getProperties().get("throwExceptions"));
+ if (elasticSearchConfiguration != null &&
elasticSearchConfiguration.getProperties().get("throwExceptions") != null) {
+ try {
+ if
(elasticSearchConfiguration.getProperties().get("throwExceptions") instanceof
String) {
+ throwExceptionCurrent = Boolean.parseBoolean((String)
elasticSearchConfiguration.getProperties().get("throwExceptions"));
+ } else {
+ // already a boolean
+ throwExceptionCurrent = (Boolean)
elasticSearchConfiguration.getProperties().get("throwExceptions");
+ }
+ } catch (Throwable e) {
+ // Not able to cast the property
+ }
}
updateConfiguration(PersistenceService.class.getName(),
"org.apache.unomi.persistence.elasticsearch", "throwExceptions", true);
diff --git a/itests/src/test/java/org/apache/unomi/itests/SegmentIT.java
b/itests/src/test/java/org/apache/unomi/itests/SegmentIT.java
index 12fb99da8..3b3aa9314 100644
--- a/itests/src/test/java/org/apache/unomi/itests/SegmentIT.java
+++ b/itests/src/test/java/org/apache/unomi/itests/SegmentIT.java
@@ -39,6 +39,7 @@ import org.ops4j.pax.exam.junit.PaxExam;
import org.ops4j.pax.exam.spi.reactors.ExamReactorStrategy;
import org.ops4j.pax.exam.spi.reactors.PerSuite;
import org.ops4j.pax.exam.util.Filter;
+import org.osgi.service.cm.Configuration;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -46,11 +47,7 @@ import javax.inject.Inject;
import java.text.SimpleDateFormat;
import java.time.LocalDate;
import java.time.ZoneId;
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.Date;
-import java.util.List;
-import java.util.Map;
+import java.util.*;
@RunWith(PaxExam.class)
@ExamReactorStrategy(PerSuite.class)
diff --git a/package/src/main/resources/etc/custom.system.properties
b/package/src/main/resources/etc/custom.system.properties
index eb65ab026..d421c7b43 100644
--- a/package/src/main/resources/etc/custom.system.properties
+++ b/package/src/main/resources/etc/custom.system.properties
@@ -132,7 +132,20 @@
org.apache.unomi.elasticsearch.defaultIndex.indexMaxDocValueFieldsSearch=${env:U
org.apache.unomi.elasticsearch.defaultQueryLimit=${env:UNOMI_ELASTICSEARCH_DEFAULTQUERYLIMIT:-10}
org.apache.unomi.elasticsearch.aggregateQueryBucketSize=${env:UNOMI_ELASTICSEARCH_AGGREGATEBUCKETSIZE:-5000}
org.apache.unomi.elasticsearch.maximumIdsQueryCount=${env:UNOMI_ELASTICSEARCH_MAXIMUMIDSQUERYCOUNT:-5000}
-org.apache.unomi.elasticsearch.clientSocketTimeout=${env:UNOMI_ELASTICSEARCH_CLIENT_SOCKET_TIMEOUT:-}
+# Defines the socket timeout (SO_TIMEOUT) in milliseconds, which is the
timeout for waiting for data or, put differently, a maximum period inactivity
between two consecutive data packets).
+# A timeout value of zero is interpreted as an infinite timeout. A negative
value is interpreted as undefined (system default).
+# Default: -1 (System default)
+org.apache.unomi.elasticsearch.clientSocketTimeout=${env:UNOMI_ELASTICSEARCH_CLIENT_SOCKET_TIMEOUT:--1}
+# Defines the waiting for task completion timeout in milliseconds.
+# Some operations like update_by_query and delete_by_query are delegated to
ElasticSearch using tasks
+# For consistency the thread that trigger one of those operations will wait
for the task to be completed on ElasticSearch side.
+# This timeout configuration is here to ensure not blocking the thread
infinitely, in case of very long running tasks.
+# A timeout value of zero or negative is interpreted as an infinite timeout.
+# Default: 3600000 (one hour)
+org.apache.unomi.elasticsearch.taskWaitingTimeout=${env:UNOMI_ELASTICSEARCH_TASK_WAITING_TIMEOUT:-3600000}
+# Defines the polling interval in milliseconds, which is used to check if task
is completed on ElasticSearch side
+# Default: 1000 (1 second)
+org.apache.unomi.elasticsearch.taskWaitingPollingInterval=${env:UNOMI_ELASTICSEARCH_TASK_WAITING_POLLING_INTERVAL:-1000}
org.apache.unomi.elasticsearch.pastEventsDisablePartitions=${env:UNOMI_ELASTICSEARCH_PAST_EVENTS_DISABLE_PARTITIONS:-false}
org.apache.unomi.elasticsearch.aggQueryThrowOnMissingDocs=${env:UNOMI_ELASTICSEARCH_AGG_QUERY_THROW_ON_MISSING_DOCS:-false}
org.apache.unomi.elasticsearch.aggQueryMaxResponseSizeHttp=${env:UNOMI_ELASTICSEARCH_AGG_QUERY_MAX_RESPONSE_SIZE_HTTP:-}
diff --git
a/persistence-elasticsearch/core/src/main/java/org/apache/unomi/persistence/elasticsearch/ElasticSearchPersistenceServiceImpl.java
b/persistence-elasticsearch/core/src/main/java/org/apache/unomi/persistence/elasticsearch/ElasticSearchPersistenceServiceImpl.java
index 2ad652387..8ca6526f1 100644
---
a/persistence-elasticsearch/core/src/main/java/org/apache/unomi/persistence/elasticsearch/ElasticSearchPersistenceServiceImpl.java
+++
b/persistence-elasticsearch/core/src/main/java/org/apache/unomi/persistence/elasticsearch/ElasticSearchPersistenceServiceImpl.java
@@ -64,13 +64,16 @@ import org.elasticsearch.action.search.SearchScrollRequest;
import org.elasticsearch.action.support.WriteRequest;
import org.elasticsearch.action.support.master.AcknowledgedResponse;
import org.elasticsearch.action.update.UpdateRequest;
-import org.elasticsearch.action.update.UpdateResponse;
import org.elasticsearch.client.*;
+import org.elasticsearch.action.update.UpdateResponse;
import org.elasticsearch.client.core.CountRequest;
import org.elasticsearch.client.core.CountResponse;
import org.elasticsearch.client.core.MainResponse;
import org.elasticsearch.client.indexlifecycle.*;
import org.elasticsearch.client.indices.*;
+import org.elasticsearch.client.tasks.GetTaskRequest;
+import org.elasticsearch.client.tasks.GetTaskResponse;
+import org.elasticsearch.client.tasks.TaskSubmissionResponse;
import org.elasticsearch.cluster.metadata.AliasMetaData;
import org.elasticsearch.cluster.metadata.MappingMetaData;
import org.elasticsearch.common.bytes.BytesReference;
@@ -78,9 +81,7 @@ import org.elasticsearch.common.unit.ByteSizeUnit;
import org.elasticsearch.common.unit.ByteSizeValue;
import org.elasticsearch.common.unit.DistanceUnit;
import org.elasticsearch.common.unit.TimeValue;
-import org.elasticsearch.common.xcontent.XContentBuilder;
-import org.elasticsearch.common.xcontent.XContentFactory;
-import org.elasticsearch.common.xcontent.XContentType;
+import org.elasticsearch.common.xcontent.*;
import org.elasticsearch.index.IndexNotFoundException;
import org.elasticsearch.index.query.*;
import org.elasticsearch.index.reindex.*;
@@ -109,6 +110,7 @@ import org.elasticsearch.search.builder.SearchSourceBuilder;
import org.elasticsearch.search.sort.GeoDistanceSortBuilder;
import org.elasticsearch.search.sort.SortBuilders;
import org.elasticsearch.search.sort.SortOrder;
+import org.elasticsearch.tasks.TaskId;
import org.osgi.framework.*;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -144,7 +146,7 @@ public class ElasticSearchPersistenceServiceImpl implements
PersistenceService,
private static final String ROLLOVER_LIFECYCLE_NAME =
"unomi-rollover-policy";
private boolean throwExceptions = false;
- private RestHighLevelClient client;
+ private CustomRestHighLevelClient client;
private BulkProcessor bulkProcessor;
private String elasticSearchAddresses;
private List<String> elasticSearchAddressList = new ArrayList<>();
@@ -168,6 +170,8 @@ public class ElasticSearchPersistenceServiceImpl implements
PersistenceService,
private Integer defaultQueryLimit = 10;
private Integer removeByQueryTimeoutInMinutes = 10;
+ private Integer taskWaitingTimeout = 3600000;
+ private Integer taskWaitingPollingInterval = 1000;
private String bulkProcessorConcurrentRequests = "1";
private String bulkProcessorBulkActions = "1000";
@@ -434,6 +438,18 @@ public class ElasticSearchPersistenceServiceImpl
implements PersistenceService,
this.logLevelRestClient = logLevelRestClient;
}
+ public void setTaskWaitingTimeout(String taskWaitingTimeout) {
+ if (StringUtils.isNumeric(taskWaitingTimeout)) {
+ this.taskWaitingTimeout = Integer.parseInt(taskWaitingTimeout);
+ }
+ }
+
+ public void setTaskWaitingPollingInterval(String
taskWaitingPollingInterval) {
+ if (StringUtils.isNumeric(taskWaitingPollingInterval)) {
+ this.taskWaitingPollingInterval =
Integer.parseInt(taskWaitingPollingInterval);
+ }
+ }
+
public void start() throws Exception {
// Work around to avoid ES Logs regarding the deprecated
[ignore_throttled] parameter
@@ -505,7 +521,7 @@ public class ElasticSearchPersistenceServiceImpl implements
PersistenceService,
logger.info(this.getClass().getName() + " service started
successfully.");
}
- private void buildClient() {
+ private void buildClient() throws NoSuchFieldException,
IllegalAccessException {
List<Node> nodeList = new ArrayList<>();
for (String elasticSearchAddress : elasticSearchAddressList) {
String[] elasticSearchAddressParts =
elasticSearchAddress.split(":");
@@ -560,7 +576,7 @@ public class ElasticSearchPersistenceServiceImpl implements
PersistenceService,
});
logger.info("Connecting to ElasticSearch persistence backend using
cluster name " + clusterName + " and index prefix " + indexPrefix + "...");
- client = new RestHighLevelClient(clientBuilder);
+ client = new CustomRestHighLevelClient(clientBuilder);
}
public BulkProcessor getBulkProcessor() {
@@ -1087,64 +1103,59 @@ public class ElasticSearchPersistenceServiceImpl
implements PersistenceService,
for (int i = 0; i < scripts.length; i++) {
builtScripts[i] = new Script(ScriptType.INLINE, "painless",
scripts[i], scriptParams[i]);
}
- return updateWithQueryAndScript(clazz, builtScripts, conditions);
+ return updateWithQueryAndScript(new Class<?>[]{clazz}, builtScripts,
conditions, true);
}
@Override
public boolean updateWithQueryAndStoredScript(Date dateHint, Class<?>
clazz, String[] scripts, Map<String, Object>[] scriptParams, Condition[]
conditions) {
- return updateWithQueryAndStoredScript(clazz, scripts, scriptParams,
conditions);
+ return updateWithQueryAndStoredScript(new Class<?>[]{clazz}, scripts,
scriptParams, conditions, true);
}
@Override
public boolean updateWithQueryAndStoredScript(Class<?> clazz, String[]
scripts, Map<String, Object>[] scriptParams, Condition[] conditions) {
+ return updateWithQueryAndStoredScript(new Class<?>[]{clazz}, scripts,
scriptParams, conditions, true);
+ }
+
+ @Override
+ public boolean updateWithQueryAndStoredScript(Class<?>[] classes, String[]
scripts, Map<String, Object>[] scriptParams, Condition[] conditions, boolean
waitForComplete) {
Script[] builtScripts = new Script[scripts.length];
for (int i = 0; i < scripts.length; i++) {
builtScripts[i] = new Script(ScriptType.STORED, null, scripts[i],
scriptParams[i]);
}
- return updateWithQueryAndScript(clazz, builtScripts, conditions);
+ return updateWithQueryAndScript(classes, builtScripts, conditions,
waitForComplete);
}
- private boolean updateWithQueryAndScript(final Class<?> clazz, final
Script[] scripts, final Condition[] conditions) {
+ private boolean updateWithQueryAndScript(final Class<?>[] classes, final
Script[] scripts, final Condition[] conditions, boolean waitForComplete) {
Boolean result = new InClassLoaderExecute<Boolean>(metricsService,
this.getClass().getName() + ".updateWithQueryAndScript", this.bundleContext,
this.fatalIllegalStateErrors, throwExceptions) {
protected Boolean execute(Object... args) throws Exception {
- try {
- String itemType = Item.getItemType(clazz);
- String index = getIndex(itemType);
+ String[] itemTypes =
Arrays.stream(classes).map(Item::getItemType).toArray(String[]::new);
+ String[] indices = Arrays.stream(itemTypes).map(itemType ->
getIndexNameForQuery(itemType)).toArray(String[]::new);
+ try {
for (int i = 0; i < scripts.length; i++) {
- RefreshRequest refreshRequest = new
RefreshRequest(index);
+ RefreshRequest refreshRequest = new
RefreshRequest(indices);
client.indices().refresh(refreshRequest,
RequestOptions.DEFAULT);
- QueryBuilder queryBuilder =
conditionESQueryBuilderDispatcher.buildFilter(conditions[i]);
- UpdateByQueryRequest updateByQueryRequest = new
UpdateByQueryRequest(index);
+ QueryBuilder queryBuilder =
conditionESQueryBuilderDispatcher.buildFilter(conditions[i]);
+ UpdateByQueryRequest updateByQueryRequest = new
UpdateByQueryRequest(indices);
updateByQueryRequest.setConflicts("proceed");
updateByQueryRequest.setMaxRetries(1000);
updateByQueryRequest.setSlices(2);
updateByQueryRequest.setScript(scripts[i]);
-
updateByQueryRequest.setQuery(isItemTypeSharingIndex(itemType) ?
wrapWithItemTypeQuery(itemType, queryBuilder) : queryBuilder);
-
- BulkByScrollResponse response =
client.updateByQuery(updateByQueryRequest, RequestOptions.DEFAULT);
+
updateByQueryRequest.setQuery(wrapWithItemsTypeQuery(itemTypes, queryBuilder));
- if (response.getBulkFailures().size() > 0) {
- for (BulkItemResponse.Failure failure :
response.getBulkFailures()) {
- logger.error("Failure : cause={} ,
message={}", failure.getCause(), failure.getMessage());
- }
+ TaskSubmissionResponse taskResponse =
client.submitUpdateByQuery(updateByQueryRequest, RequestOptions.DEFAULT);
+ if (taskResponse == null) {
+ logger.error("update with query and script: no
response returned for query: {}", queryBuilder);
+ } else if (waitForComplete) {
+ waitForTaskComplete(updateByQueryRequest,
taskResponse);
} else {
- logger.info("Update with query and script
processed {} entries in {}.", response.getUpdated(),
response.getTook().toString());
- }
- if (response.isTimedOut()) {
- logger.error("Update with query and script ended
with timeout!");
- }
- if (response.getVersionConflicts() > 0) {
- logger.warn("Update with query and script ended
with {} version conflicts!", response.getVersionConflicts());
- }
- if (response.getNoops() > 0) {
- logger.warn("Update Bwith query and script ended
with {} noops!", response.getNoops());
+ logger.debug("ES task started {}",
taskResponse.getTask());
}
}
return true;
} catch (IndexNotFoundException e) {
- throw new Exception("No index found for itemType=" +
clazz.getName(), e);
+ throw new Exception("No index found for itemTypes=" +
String.join(",", itemTypes), e);
} catch (ScriptException e) {
logger.error("Error in the update script : {}\n{}\n{}",
e.getScript(), e.getDetailedMessage(), e.getScriptStack());
throw new Exception("Error in the update script");
@@ -1158,6 +1169,53 @@ public class ElasticSearchPersistenceServiceImpl
implements PersistenceService,
}
}
+ private void waitForTaskComplete(AbstractBulkByScrollRequest request,
TaskSubmissionResponse response) {
+ if (logger.isDebugEnabled()) {
+ logger.debug("Waiting task [{}]: [{}] using query: [{}], polling
every {}ms with a timeout configured to {}ms",
+ response.getTask(), request.toString(),
request.getSearchRequest().source().query(), taskWaitingPollingInterval,
taskWaitingTimeout);
+ }
+ long start = System.currentTimeMillis();
+ new InClassLoaderExecute<Void>(metricsService,
this.getClass().getName() + ".waitForTask", this.bundleContext,
this.fatalIllegalStateErrors, throwExceptions) {
+ protected Void execute(Object... args) throws Exception {
+
+ TaskId taskId = new TaskId(response.getTask());
+ while (true){
+ Optional<GetTaskResponse> getTaskResponseOptional =
client.tasks().get(new GetTaskRequest(taskId.getNodeId(), taskId.getId()),
RequestOptions.DEFAULT);
+ if (getTaskResponseOptional.isPresent()) {
+ GetTaskResponse getTaskResponse =
getTaskResponseOptional.get();
+ if (getTaskResponse.isCompleted()) {
+ if (logger.isDebugEnabled()) {
+ long millis =
getTaskResponse.getTaskInfo().getRunningTimeNanos() / 1_000_000;
+ long seconds = millis / 1000;
+
+ logger.debug("Waiting task [{}]: Finished in
{} {}", taskId,
+ seconds >= 1 ? seconds : millis,
+ seconds >= 1 ? "seconds" :
"milliseconds");
+ }
+ break;
+ } else {
+ if ((start + taskWaitingTimeout) <
System.currentTimeMillis()) {
+ logger.error("Waiting task [{}]: Exceeded
configured timeout ({}ms), aborting wait process", taskId, taskWaitingTimeout);
+ break;
+ }
+
+ try {
+ Thread.sleep(taskWaitingPollingInterval);
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ throw new IllegalStateException("Waiting task
[{}]: interrupted");
+ }
+ }
+ } else {
+ logger.error("Waiting task [{}]: No task found",
taskId);
+ break;
+ }
+ }
+ return null;
+ }
+ }.catchingExecuteInClassLoader(true);
+ }
+
@Override
public boolean storeScripts(Map<String, String> scripts) {
Boolean result = new InClassLoaderExecute<Boolean>(metricsService,
this.getClass().getName() + ".storeScripts", this.bundleContext,
this.fatalIllegalStateErrors, throwExceptions) {
@@ -1295,7 +1353,7 @@ public class ElasticSearchPersistenceServiceImpl
implements PersistenceService,
try {
String itemType = Item.getItemType(clazz);
final DeleteByQueryRequest deleteByQueryRequest = new
DeleteByQueryRequest(getIndexNameForQuery(itemType))
- .setQuery(isItemTypeSharingIndex(itemType) ?
wrapWithItemTypeQuery(itemType, queryBuilder) : queryBuilder)
+ .setQuery(wrapWithItemTypeQuery(itemType, queryBuilder))
// Setting slices to auto will let Elasticsearch choose
the number of slices to use.
// This setting will use one slice per shard, up to a
certain limit.
// The delete request will be more efficient and faster
than no slicing.
@@ -1309,45 +1367,14 @@ public class ElasticSearchPersistenceServiceImpl
implements PersistenceService,
// So we increase default timeout of 1min to 10min
.setTimeout(TimeValue.timeValueMinutes(removeByQueryTimeoutInMinutes));
- BulkByScrollResponse bulkByScrollResponse =
client.deleteByQuery(deleteByQueryRequest, RequestOptions.DEFAULT);
+ TaskSubmissionResponse taskResponse =
client.submitDeleteByQuery(deleteByQueryRequest, RequestOptions.DEFAULT);
- if (bulkByScrollResponse == null) {
+ if (taskResponse == null) {
logger.error("Remove by query: no response returned for query:
{}", queryBuilder);
return false;
}
- if (bulkByScrollResponse.isTimedOut()) {
- logger.warn("Remove by query: timed out because took more than
{} minutes for query: {}", removeByQueryTimeoutInMinutes, queryBuilder);
- }
-
- if ((bulkByScrollResponse.getSearchFailures() != null &&
bulkByScrollResponse.getSearchFailures().size() > 0) ||
- bulkByScrollResponse.getBulkFailures() != null &&
bulkByScrollResponse.getBulkFailures().size() > 0) {
- logger.warn("Remove by query: we found some failure during the
process of query: {}", queryBuilder);
-
- if (bulkByScrollResponse.getSearchFailures() != null &&
bulkByScrollResponse.getSearchFailures().size() > 0) {
- for (ScrollableHitSource.SearchFailure searchFailure :
bulkByScrollResponse.getSearchFailures()) {
- logger.warn("Remove by query, search failure: {}",
searchFailure.toString());
- }
- }
-
- if (bulkByScrollResponse.getBulkFailures() != null &&
bulkByScrollResponse.getBulkFailures().size() > 0) {
- for (BulkItemResponse.Failure bulkFailure :
bulkByScrollResponse.getBulkFailures()) {
- logger.warn("Remove by query, bulk failure: {}",
bulkFailure.toString());
- }
- }
- }
-
- if (logger.isDebugEnabled()) {
- logger.debug("Remove by query: took {}, deleted docs: {},
batches executed: {}, skipped docs: {}, version conflicts: {}, search retries:
{}, bulk retries: {}, for query: {}",
-
bulkByScrollResponse.getTook().toHumanReadableString(1),
- bulkByScrollResponse.getDeleted(),
- bulkByScrollResponse.getBatches(),
- bulkByScrollResponse.getNoops(),
- bulkByScrollResponse.getVersionConflicts(),
- bulkByScrollResponse.getSearchRetries(),
- bulkByScrollResponse.getBulkRetries(),
- queryBuilder);
- }
+ waitForTaskComplete(deleteByQueryRequest, taskResponse);
return true;
} catch (Exception e) {
@@ -1942,7 +1969,7 @@ public class ElasticSearchPersistenceServiceImpl
implements PersistenceService,
CountRequest countRequest = new
CountRequest(getIndexNameForQuery(itemType));
SearchSourceBuilder searchSourceBuilder = new
SearchSourceBuilder();
- searchSourceBuilder.query(isItemTypeSharingIndex(itemType) ?
wrapWithItemTypeQuery(itemType, filter) : filter);
+ searchSourceBuilder.query(wrapWithItemTypeQuery(itemType,
filter));
countRequest.source(searchSourceBuilder);
CountResponse response = client.count(countRequest,
RequestOptions.DEFAULT);
return response.getCount();
@@ -1977,7 +2004,7 @@ public class ElasticSearchPersistenceServiceImpl
implements PersistenceService,
SearchSourceBuilder searchSourceBuilder = new
SearchSourceBuilder()
.fetchSource(true)
.seqNoAndPrimaryTerm(true)
- .query(isItemTypeSharingIndex(itemType) ?
wrapWithItemTypeQuery(itemType, query) : query)
+ .query(wrapWithItemTypeQuery(itemType, query))
.size(size < 0 ? defaultQueryLimit : size)
.from(offset);
if (scrollTimeValidity != null) {
@@ -2281,15 +2308,12 @@ public class ElasticSearchPersistenceServiceImpl
implements PersistenceService,
}
if (filter != null) {
- searchSourceBuilder.query(isItemTypeSharingIndex ?
- wrapWithItemTypeQuery(itemType,
conditionESQueryBuilderDispatcher.buildFilter(filter)) :
-
conditionESQueryBuilderDispatcher.buildFilter(filter));
+
searchSourceBuilder.query(wrapWithItemTypeQuery(itemType,
conditionESQueryBuilderDispatcher.buildFilter(filter)));
}
} else {
if (filter != null) {
- AggregationBuilder filterAggregation =
AggregationBuilders.filter("filter", isItemTypeSharingIndex ?
- wrapWithItemTypeQuery(itemType,
conditionESQueryBuilderDispatcher.buildFilter(filter)) :
-
conditionESQueryBuilderDispatcher.buildFilter(filter));
+ AggregationBuilder filterAggregation =
AggregationBuilders.filter("filter",
+ wrapWithItemTypeQuery(itemType,
conditionESQueryBuilderDispatcher.buildFilter(filter)));
for (AggregationBuilder aggregationBuilder :
lastAggregation) {
filterAggregation.subAggregation(aggregationBuilder);
}
@@ -2666,10 +2690,33 @@ public class ElasticSearchPersistenceServiceImpl
implements PersistenceService,
}
private QueryBuilder wrapWithItemTypeQuery(String itemType, QueryBuilder
originalQuery) {
- BoolQueryBuilder wrappedQuery = QueryBuilders.boolQuery();
- wrappedQuery.must(getItemTypeQueryBuilder(itemType));
- wrappedQuery.must(originalQuery);
- return wrappedQuery;
+ if (isItemTypeSharingIndex(itemType)) {
+ BoolQueryBuilder wrappedQuery = QueryBuilders.boolQuery();
+ wrappedQuery.must(getItemTypeQueryBuilder(itemType));
+ wrappedQuery.must(originalQuery);
+ return wrappedQuery;
+ }
+ return originalQuery;
+ }
+
+ private QueryBuilder wrapWithItemsTypeQuery(String[] itemTypes,
QueryBuilder originalQuery) {
+ if (itemTypes.length == 1) {
+ return wrapWithItemTypeQuery(itemTypes[0], originalQuery);
+ }
+
+ if (Arrays.stream(itemTypes).anyMatch(this::isItemTypeSharingIndex)) {
+ BoolQueryBuilder itemTypeQuery = QueryBuilders.boolQuery();
+ itemTypeQuery.minimumShouldMatch(1);
+ for (String itemType : itemTypes) {
+ itemTypeQuery.should(getItemTypeQueryBuilder(itemType));
+ }
+
+ BoolQueryBuilder wrappedQuery = QueryBuilders.boolQuery();
+ wrappedQuery.filter(itemTypeQuery);
+ wrappedQuery.must(originalQuery);
+ return wrappedQuery;
+ }
+ return originalQuery;
}
private QueryBuilder getItemTypeQueryBuilder(String itemType) {
diff --git
a/persistence-elasticsearch/core/src/main/java/org/elasticsearch/client/CustomRestHighLevelClient.java
b/persistence-elasticsearch/core/src/main/java/org/elasticsearch/client/CustomRestHighLevelClient.java
new file mode 100644
index 000000000..8fff8dea6
--- /dev/null
+++
b/persistence-elasticsearch/core/src/main/java/org/elasticsearch/client/CustomRestHighLevelClient.java
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.elasticsearch.client;
+
+import org.elasticsearch.client.tasks.TaskSubmissionResponse;
+import org.elasticsearch.index.reindex.DeleteByQueryRequest;
+import org.elasticsearch.index.reindex.UpdateByQueryRequest;
+
+import java.io.IOException;
+
+import static java.util.Collections.emptySet;
+
+/**
+ * A custom Rest high level client that provide a way of using Task system on
updateByQuery and deleteByQuery,
+ * by returning the response immediately (wait_for_completion set to false)
+ * see org.elasticsearch.client.RestHighLevelClient for original code.
+ */
+public class CustomRestHighLevelClient extends RestHighLevelClient {
+
+ public CustomRestHighLevelClient(RestClientBuilder restClientBuilder) {
+ super(restClientBuilder);
+ }
+
+ /**
+ * Executes a delete by query request.
+ * See <a
href="https://www.elastic.co/guide/en/elasticsearch/reference/current/docs-delete-by-query.html">
+ * Delete By Query API on elastic.co</a>
+ *
+ * @param deleteByQueryRequest the request
+ * @param options the request options (e.g. headers), use
{@link RequestOptions#DEFAULT} if nothing needs to be customized
+ * @return the response
+ */
+ public final TaskSubmissionResponse
submitDeleteByQuery(DeleteByQueryRequest deleteByQueryRequest, RequestOptions
options) throws IOException {
+ return performRequestAndParseEntity(
+ deleteByQueryRequest, innerDeleteByQueryRequest -> {
+ Request request =
RequestConverters.deleteByQuery(innerDeleteByQueryRequest);
+ request.addParameter("wait_for_completion", "false");
+ return request;
+ }, options, TaskSubmissionResponse::fromXContent, emptySet()
+ );
+ }
+
+ /**
+ * Executes a update by query request.
+ * See <a
href="https://www.elastic.co/guide/en/elasticsearch/reference/current/docs-update-by-query.html">
+ * Update By Query API on elastic.co</a>
+ *
+ * @param updateByQueryRequest the request
+ * @param options the request options (e.g. headers), use
{@link RequestOptions#DEFAULT} if nothing needs to be customized
+ * @return the response
+ */
+ public final TaskSubmissionResponse
submitUpdateByQuery(UpdateByQueryRequest updateByQueryRequest, RequestOptions
options) throws IOException {
+ return performRequestAndParseEntity(
+ updateByQueryRequest, innerUpdateByQueryRequest -> {
+ Request request =
RequestConverters.updateByQuery(updateByQueryRequest);
+ request.addParameter("wait_for_completion", "false");
+ return request;
+ }, options, TaskSubmissionResponse::fromXContent, emptySet()
+ );
+ }
+}
diff --git
a/persistence-elasticsearch/core/src/main/resources/OSGI-INF/blueprint/blueprint.xml
b/persistence-elasticsearch/core/src/main/resources/OSGI-INF/blueprint/blueprint.xml
index 507d8789f..32efdd022 100644
---
a/persistence-elasticsearch/core/src/main/resources/OSGI-INF/blueprint/blueprint.xml
+++
b/persistence-elasticsearch/core/src/main/resources/OSGI-INF/blueprint/blueprint.xml
@@ -63,7 +63,9 @@
<cm:property name="maximalElasticSearchVersion" value="8.0.0" />
<cm:property name="aggregateQueryBucketSize" value="5000" />
- <cm:property name="clientSocketTimeout" value="" />
+ <cm:property name="clientSocketTimeout" value="-1" />
+ <cm:property name="taskWaitingTimeout" value="3600000" />
+ <cm:property name="taskWaitingPollingInterval" value="1000" />
<cm:property name="aggQueryMaxResponseSizeHttp" value="" />
<cm:property name="aggQueryThrowOnMissingDocs" value="false" />
<cm:property name="itemTypeToRefreshPolicy" value="" />
@@ -151,6 +153,8 @@
<property name="itemTypeToRefreshPolicy"
value="${es.itemTypeToRefreshPolicy}" />
<property name="clientSocketTimeout" value="${es.clientSocketTimeout}"
/>
+ <property name="taskWaitingTimeout" value="${es.taskWaitingTimeout}" />
+ <property name="taskWaitingPollingInterval"
value="${es.taskWaitingPollingInterval}" />
<property name="metricsService" ref="metricsService" />
<property name="useBatchingForSave" value="${es.useBatchingForSave}" />
diff --git
a/persistence-elasticsearch/core/src/main/resources/org.apache.unomi.persistence.elasticsearch.cfg
b/persistence-elasticsearch/core/src/main/resources/org.apache.unomi.persistence.elasticsearch.cfg
index 086941e80..224d01110 100644
---
a/persistence-elasticsearch/core/src/main/resources/org.apache.unomi.persistence.elasticsearch.cfg
+++
b/persistence-elasticsearch/core/src/main/resources/org.apache.unomi.persistence.elasticsearch.cfg
@@ -73,8 +73,22 @@
maximumIdsQueryCount=${org.apache.unomi.elasticsearch.maximumIdsQueryCount:-5000
# Disable partitions on aggregation queries for past events.
pastEventsDisablePartitions=${org.apache.unomi.elasticsearch.pastEventsDisablePartitions:-false}
-# max socket timeout in millis
-clientSocketTimeout=${org.apache.unomi.elasticsearch.clientSocketTimeout:-}
+# Defines the socket timeout (SO_TIMEOUT) in milliseconds, which is the
timeout for waiting for data or, put differently, a maximum period inactivity
between two consecutive data packets).
+# A timeout value of zero is interpreted as an infinite timeout. A negative
value is interpreted as undefined (system default).
+# Default: -1
+clientSocketTimeout=${org.apache.unomi.elasticsearch.clientSocketTimeout:--1}
+
+# Defines the waiting for task completion timeout in milliseconds.
+# Some operations like update_by_query and delete_by_query are delegated to
ElasticSearch using tasks
+# For consistency the thread that trigger one of those operations will wait
for the task to be completed on ElasticSearch side.
+# This timeout configuration is here to ensure not blocking the thread
infinitely, in case of very long running tasks.
+# A timeout value of zero or negative is interpreted as an infinite timeout.
+# Default: 3600000 (1 hour)
+taskWaitingTimeout=${org.apache.unomi.elasticsearch.taskWaitingTimeout:-3600000}
+
+# Defines the polling interval in milliseconds, which is used to check if task
is completed on ElasticSearch side
+# Default: 1000 (1 second)
+taskWaitingPollingInterval=${org.apache.unomi.elasticsearch.taskWaitingPollingInterval:-1000}
# refresh policy per item type in Json.
# Valid values are WAIT_UNTIL/IMMEDIATE/NONE. The default refresh policy is
NONE.
diff --git
a/persistence-spi/src/main/java/org/apache/unomi/persistence/spi/PersistenceService.java
b/persistence-spi/src/main/java/org/apache/unomi/persistence/spi/PersistenceService.java
index 29c196a2b..0fe374616 100644
---
a/persistence-spi/src/main/java/org/apache/unomi/persistence/spi/PersistenceService.java
+++
b/persistence-spi/src/main/java/org/apache/unomi/persistence/spi/PersistenceService.java
@@ -248,6 +248,20 @@ public interface PersistenceService {
return updateWithQueryAndStoredScript(null, clazz, scripts,
scriptParams, conditions);
}
+ /**
+ * Updates the items of the specified class by a query with a new property
value for the specified property name
+ * based on provided stored scripts and script parameters,
+ * This one is able to perform an update on multiple types in a single
run, be careful with your query as it will be performed on all of them.
+ *
+ * @param classes classes of items to update, be careful all of them
will be submitted to update for all scripts/conditions
+ * @param scripts Stored scripts name
+ * @param scriptParams script params array
+ * @param conditions conditions array
+ * @param waitForComplete if true, wait for the ES execution to be complete
+ * @return {@code true} if the update was successful, {@code false}
otherwise
+ */
+ boolean updateWithQueryAndStoredScript(Class<?>[] classes, String[]
scripts, Map<String, Object>[] scriptParams, Condition[] conditions, boolean
waitForComplete);
+
/**
* @deprecated use {@link #updateWithQueryAndStoredScript(Class, String[],
Map[], Condition[])}
*/
diff --git
a/plugins/baseplugin/src/main/java/org/apache/unomi/plugins/baseplugin/actions/MergeProfilesOnPropertyAction.java
b/plugins/baseplugin/src/main/java/org/apache/unomi/plugins/baseplugin/actions/MergeProfilesOnPropertyAction.java
index 14b6f9f52..59ef60646 100644
---
a/plugins/baseplugin/src/main/java/org/apache/unomi/plugins/baseplugin/actions/MergeProfilesOnPropertyAction.java
+++
b/plugins/baseplugin/src/main/java/org/apache/unomi/plugins/baseplugin/actions/MergeProfilesOnPropertyAction.java
@@ -167,8 +167,7 @@ public class MergeProfilesOnPropertyAction implements
ActionExecutor {
Map<String, Object>[] scriptParams = new
Map[]{Collections.singletonMap("profileId", masterProfileId)};
Condition[] conditions = new
Condition[]{profileIdsCondition};
-
persistenceService.updateWithQueryAndStoredScript(Session.class, scripts,
scriptParams, conditions);
-
persistenceService.updateWithQueryAndStoredScript(Event.class, scripts,
scriptParams, conditions);
+ persistenceService.updateWithQueryAndStoredScript(new
Class[]{Session.class, Event.class}, scripts, scriptParams, conditions, false);
} else {
for (String mergedProfileId : mergedProfileIds) {
privacyService.anonymizeBrowsingData(mergedProfileId);