This is an automated email from the ASF dual-hosted git repository. btellier pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/james-project.git
commit c7cb996ee941b649f4d270ce4365aa7eaf910ed6 Author: Rene Cordier <rcord...@linagora.com> AuthorDate: Tue Jun 21 15:03:11 2022 +0700 JAMES-3771 Migrate backend opensearch high level to new java client --- backends-common/opensearch/pom.xml | 15 +- .../james/backends/opensearch/ClientProvider.java | 34 ++- .../opensearch/DeleteByQueryPerformer.java | 41 ++-- .../james/backends/opensearch/DocumentId.java | 3 +- .../backends/opensearch/IndexCreationFactory.java | 254 ++++++++++++--------- .../backends/opensearch/OpenSearchHealthCheck.java | 38 +-- .../backends/opensearch/OpenSearchIndexer.java | 121 ++++++---- .../opensearch/ReactorOpenSearchClient.java | 181 ++++++--------- .../james/backends/opensearch/RoutingKey.java | 3 +- .../backends/opensearch/UpdatedRepresentation.java | 3 +- .../backends/opensearch/search/ScrolledSearch.java | 74 +++--- .../ClientProviderImplConnectionContract.java | 12 +- .../opensearch/DockerOpenSearchExtension.java | 42 ++-- .../opensearch/IndexCreationFactoryTest.java | 213 ++++++++--------- .../opensearch/OpenSearchHealthCheckTest.java | 52 ++--- .../backends/opensearch/OpenSearchIndexerTest.java | 76 +++--- .../opensearch/search/ScrolledSearchTest.java | 135 ++++++----- 17 files changed, 690 insertions(+), 607 deletions(-) diff --git a/backends-common/opensearch/pom.xml b/backends-common/opensearch/pom.xml index ce40d7dca4..a53014a685 100644 --- a/backends-common/opensearch/pom.xml +++ b/backends-common/opensearch/pom.xml @@ -47,14 +47,10 @@ <artifactId>testing-base</artifactId> <scope>test</scope> </dependency> - - <!-- Prevents https://cve.mitre.org/cgi-bin/cvename.cgi?name=CVE-2020-28491 --> <dependency> - <groupId>com.fasterxml.jackson.dataformat</groupId> - <artifactId>jackson-dataformat-cbor</artifactId> - <version>2.13.4</version> + <groupId>com.fasterxml.jackson.core</groupId> + <artifactId>jackson-databind</artifactId> </dependency> - <dependency> <groupId>com.github.fge</groupId> <artifactId>throwing-lambdas</artifactId> @@ -86,7 +82,12 @@ </dependency> <dependency> <groupId>org.opensearch.client</groupId> - <artifactId>opensearch-rest-high-level-client</artifactId> + <artifactId>opensearch-java</artifactId> + <version>2.0.0</version> + </dependency> + <dependency> + <groupId>org.opensearch.client</groupId> + <artifactId>opensearch-rest-client</artifactId> <version>2.3.0</version> </dependency> <dependency> diff --git a/backends-common/opensearch/src/main/java/org/apache/james/backends/opensearch/ClientProvider.java b/backends-common/opensearch/src/main/java/org/apache/james/backends/opensearch/ClientProvider.java index 52bf765741..89a104bb8b 100644 --- a/backends-common/opensearch/src/main/java/org/apache/james/backends/opensearch/ClientProvider.java +++ b/backends-common/opensearch/src/main/java/org/apache/james/backends/opensearch/ClientProvider.java @@ -46,7 +46,9 @@ import org.apache.http.ssl.SSLContextBuilder; import org.apache.http.ssl.TrustStrategy; import org.apache.james.util.concurrent.NamedThreadFactory; import org.opensearch.client.RestClient; -import org.opensearch.client.RestHighLevelClient; +import org.opensearch.client.json.jackson.JacksonJsonpMapper; +import org.opensearch.client.opensearch.OpenSearchAsyncClient; +import org.opensearch.client.transport.rest_client.RestClientTransport; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -181,7 +183,8 @@ public class ClientProvider implements Provider<ReactorOpenSearchClient> { private static final Logger LOGGER = LoggerFactory.getLogger(ClientProvider.class); private final OpenSearchConfiguration configuration; - private final RestHighLevelClient openSearchRestHighLevelClient; + private final RestClient lowLevelRestClient; + private final OpenSearchAsyncClient openSearchClient; private final HttpAsyncClientConfigurer httpAsyncClientConfigurer; private final ReactorOpenSearchClient client; @@ -189,28 +192,35 @@ public class ClientProvider implements Provider<ReactorOpenSearchClient> { public ClientProvider(OpenSearchConfiguration configuration) { this.httpAsyncClientConfigurer = new HttpAsyncClientConfigurer(configuration); this.configuration = configuration; - this.openSearchRestHighLevelClient = connect(configuration); - this.client = new ReactorOpenSearchClient(this.openSearchRestHighLevelClient); + this.lowLevelRestClient = buildRestClient(); + this.openSearchClient = connect(); + this.client = new ReactorOpenSearchClient(this.openSearchClient, lowLevelRestClient); } - private RestHighLevelClient connect(OpenSearchConfiguration configuration) { + private RestClient buildRestClient() { + return RestClient.builder(hostsToHttpHosts()) + .setHttpClientConfigCallback(httpAsyncClientConfigurer::configure) + .build(); + } + + private OpenSearchAsyncClient connect() { Duration waitDelay = Duration.ofMillis(configuration.getMinDelay()); boolean suppressLeadingZeroElements = true; boolean suppressTrailingZeroElements = true; return Mono.fromCallable(this::connectToCluster) .doOnError(e -> LOGGER.warn("Error establishing OpenSearch connection. Next retry scheduled in {}", DurationFormatUtils.formatDurationWords(waitDelay.toMillis(), suppressLeadingZeroElements, suppressTrailingZeroElements), e)) - .retryWhen(Retry.backoff(configuration.getMaxRetries(), waitDelay).scheduler(Schedulers.boundedElastic())) + .retryWhen(Retry.backoff(configuration.getMaxRetries(), waitDelay).scheduler(Schedulers.elastic())) + .publishOn(Schedulers.elastic()) .block(); } - private RestHighLevelClient connectToCluster() { + private OpenSearchAsyncClient connectToCluster() { LOGGER.info("Trying to connect to OpenSearch service at {}", LocalDateTime.now()); - return new RestHighLevelClient( - RestClient - .builder(hostsToHttpHosts()) - .setHttpClientConfigCallback(httpAsyncClientConfigurer::configure)); + RestClientTransport transport = new RestClientTransport(lowLevelRestClient, new JacksonJsonpMapper()); + + return new OpenSearchAsyncClient(transport); } private HttpHost[] hostsToHttpHosts() { @@ -226,6 +236,6 @@ public class ClientProvider implements Provider<ReactorOpenSearchClient> { @PreDestroy public void close() throws IOException { - openSearchRestHighLevelClient.close(); + lowLevelRestClient.close(); } } diff --git a/backends-common/opensearch/src/main/java/org/apache/james/backends/opensearch/DeleteByQueryPerformer.java b/backends-common/opensearch/src/main/java/org/apache/james/backends/opensearch/DeleteByQueryPerformer.java index a0261badd8..7ea40f627e 100644 --- a/backends-common/opensearch/src/main/java/org/apache/james/backends/opensearch/DeleteByQueryPerformer.java +++ b/backends-common/opensearch/src/main/java/org/apache/james/backends/opensearch/DeleteByQueryPerformer.java @@ -19,9 +19,10 @@ package org.apache.james.backends.opensearch; -import org.opensearch.client.RequestOptions; -import org.opensearch.index.query.QueryBuilder; -import org.opensearch.index.reindex.DeleteByQueryRequest; +import java.io.IOException; + +import org.opensearch.client.opensearch._types.query_dsl.Query; +import org.opensearch.client.opensearch.core.DeleteByQueryRequest; import reactor.core.publisher.Mono; @@ -35,20 +36,32 @@ public class DeleteByQueryPerformer { this.aliasName = aliasName; } - public Mono<Void> perform(QueryBuilder queryBuilder, RoutingKey routingKey) { - DeleteByQueryRequest deleteRequest = new DeleteByQueryRequest(aliasName.getValue()); - deleteRequest.setQuery(queryBuilder); - deleteRequest.setRouting(routingKey.asString()); + public Mono<Void> perform(Query query, RoutingKey routingKey) { + DeleteByQueryRequest deleteRequest = new DeleteByQueryRequest.Builder() + .index(aliasName.getValue()) + .query(query) + .routing(routingKey.asString()) + .build(); - return client.deleteByQuery(deleteRequest, RequestOptions.DEFAULT) - .then(); + try { + return client.deleteByQuery(deleteRequest) + .then(); + } catch (IOException e) { + return Mono.error(e); + } } - public Mono<Void> perform(QueryBuilder queryBuilder) { - DeleteByQueryRequest deleteRequest = new DeleteByQueryRequest(aliasName.getValue()); - deleteRequest.setQuery(queryBuilder); + public Mono<Void> perform(Query query) { + DeleteByQueryRequest deleteRequest = new DeleteByQueryRequest.Builder() + .index(aliasName.getValue()) + .query(query) + .build(); - return client.deleteByQuery(deleteRequest, RequestOptions.DEFAULT) - .then(); + try { + return client.deleteByQuery(deleteRequest) + .then(); + } catch (IOException e) { + return Mono.error(e); + } } } diff --git a/backends-common/opensearch/src/main/java/org/apache/james/backends/opensearch/DocumentId.java b/backends-common/opensearch/src/main/java/org/apache/james/backends/opensearch/DocumentId.java index 42f650b03b..be9a994539 100644 --- a/backends-common/opensearch/src/main/java/org/apache/james/backends/opensearch/DocumentId.java +++ b/backends-common/opensearch/src/main/java/org/apache/james/backends/opensearch/DocumentId.java @@ -21,9 +21,8 @@ package org.apache.james.backends.opensearch; import java.util.Objects; -import org.opensearch.common.Strings; - import com.google.common.base.Preconditions; +import com.google.common.base.Strings; public class DocumentId { diff --git a/backends-common/opensearch/src/main/java/org/apache/james/backends/opensearch/IndexCreationFactory.java b/backends-common/opensearch/src/main/java/org/apache/james/backends/opensearch/IndexCreationFactory.java index 48756d104c..f314b373cc 100644 --- a/backends-common/opensearch/src/main/java/org/apache/james/backends/opensearch/IndexCreationFactory.java +++ b/backends-common/opensearch/src/main/java/org/apache/james/backends/opensearch/IndexCreationFactory.java @@ -19,58 +19,76 @@ package org.apache.james.backends.opensearch; -import static org.opensearch.action.admin.indices.alias.IndicesAliasesRequest.AliasActions; -import static org.opensearch.common.xcontent.XContentFactory.jsonBuilder; - -import java.io.ByteArrayInputStream; import java.io.IOException; -import java.io.InputStream; -import java.nio.charset.StandardCharsets; import java.util.Collection; +import java.util.Map; import java.util.Optional; +import java.util.stream.Collectors; import javax.inject.Inject; -import org.opensearch.OpenSearchStatusException; -import org.opensearch.action.admin.indices.alias.IndicesAliasesRequest; -import org.opensearch.action.admin.indices.alias.get.GetAliasesRequest; -import org.opensearch.client.RequestOptions; -import org.opensearch.client.indices.CreateIndexRequest; -import org.opensearch.client.indices.GetIndexRequest; -import org.opensearch.common.Strings; -import org.opensearch.common.xcontent.XContentBuilder; -import org.opensearch.common.xcontent.XContentType; +import org.opensearch.client.opensearch._types.OpenSearchException; +import org.opensearch.client.opensearch._types.WaitForActiveShards; +import org.opensearch.client.opensearch._types.analysis.Analyzer; +import org.opensearch.client.opensearch._types.analysis.CustomAnalyzer; +import org.opensearch.client.opensearch._types.analysis.CustomNormalizer; +import org.opensearch.client.opensearch._types.analysis.Normalizer; +import org.opensearch.client.opensearch._types.analysis.SnowballLanguage; +import org.opensearch.client.opensearch._types.analysis.SnowballTokenFilter; +import org.opensearch.client.opensearch._types.analysis.Tokenizer; +import org.opensearch.client.opensearch._types.mapping.TypeMapping; +import org.opensearch.client.opensearch.indices.CreateIndexRequest; +import org.opensearch.client.opensearch.indices.ExistsAliasRequest; +import org.opensearch.client.opensearch.indices.ExistsRequest; +import org.opensearch.client.opensearch.indices.IndexSettings; +import org.opensearch.client.opensearch.indices.IndexSettingsAnalysis; +import org.opensearch.client.opensearch.indices.UpdateAliasesRequest; +import org.opensearch.client.opensearch.indices.update_aliases.Action; +import org.opensearch.client.opensearch.indices.update_aliases.AddAction; +import org.opensearch.client.transport.endpoints.BooleanResponse; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import com.fasterxml.jackson.core.JsonProcessingException; -import com.fasterxml.jackson.databind.ObjectMapper; import com.github.fge.lambdas.Throwing; import com.google.common.base.Preconditions; import com.google.common.collect.ImmutableList; +import com.google.common.collect.ImmutableMap; public class IndexCreationFactory { - public static class IndexCreationCustomElement { - public static IndexCreationCustomElement EMPTY = from("{}"); + public static class IndexCreationCustomAnalyzer { + private final String key; + private final Analyzer analyzer; - public static IndexCreationCustomElement from(String value) { - try { - new ObjectMapper().readTree(value); - } catch (JsonProcessingException e) { - throw new IllegalArgumentException("value must be a valid json"); - } - return new IndexCreationCustomElement(value); + public IndexCreationCustomAnalyzer(String key, Analyzer analyzer) { + this.key = key; + this.analyzer = analyzer; + } + + public String getKey() { + return key; + } + + public Analyzer getAnalyzer() { + return analyzer; } + } - private final String payload; + public static class IndexCreationCustomTokenizer { + private final String key; + private final Tokenizer tokenizer; - IndexCreationCustomElement(String payload) { - this.payload = payload; + public IndexCreationCustomTokenizer(String key, Tokenizer tokenizer) { + this.key = key; + this.tokenizer = tokenizer; } - public String getPayload() { - return payload; + public String getKey() { + return key; + } + + public Tokenizer getTokenizer() { + return tokenizer; } } @@ -103,8 +121,8 @@ public class IndexCreationFactory { private final int waitForActiveShards; private final IndexName indexName; private final ImmutableList.Builder<AliasName> aliases; - private Optional<IndexCreationCustomElement> customAnalyzers; - private Optional<IndexCreationCustomElement> customTokenizers; + private final ImmutableList.Builder<IndexCreationCustomAnalyzer> customAnalyzers; + private final ImmutableList.Builder<IndexCreationCustomTokenizer> customTokenizers; FinalStage(int nbShards, int nbReplica, int waitForActiveShards, IndexName indexName) { this.nbShards = nbShards; @@ -112,8 +130,8 @@ public class IndexCreationFactory { this.waitForActiveShards = waitForActiveShards; this.indexName = indexName; this.aliases = ImmutableList.builder(); - this.customAnalyzers = Optional.empty(); - this.customTokenizers = Optional.empty(); + this.customAnalyzers = ImmutableList.builder(); + this.customTokenizers = ImmutableList.builder(); } public FinalStage addAlias(AliasName... aliases) { @@ -126,29 +144,29 @@ public class IndexCreationFactory { return this; } - public FinalStage customAnalyzers(IndexCreationCustomElement customAnalyzers) { - this.customAnalyzers = Optional.of(customAnalyzers); + public FinalStage customAnalyzers(IndexCreationCustomAnalyzer... customAnalyzers) { + this.customAnalyzers.add(customAnalyzers); return this; } - public FinalStage customTokenizers(IndexCreationCustomElement customTokenizers) { - this.customTokenizers = Optional.of(customTokenizers); + public FinalStage customTokenizers(IndexCreationCustomTokenizer... customTokenizers) { + this.customTokenizers.add(customTokenizers); return this; } public IndexCreationPerformer build() { - return new IndexCreationPerformer(nbShards, nbReplica, waitForActiveShards, indexName, aliases.build(), customAnalyzers, customTokenizers); + return new IndexCreationPerformer(nbShards, nbReplica, waitForActiveShards, indexName, aliases.build(), customAnalyzers.build(), customTokenizers.build()); } public ReactorOpenSearchClient createIndexAndAliases(ReactorOpenSearchClient client) { return build().createIndexAndAliases(client, Optional.empty(), Optional.empty()); } - public ReactorOpenSearchClient createIndexAndAliases(ReactorOpenSearchClient client, XContentBuilder mappingContent) { + public ReactorOpenSearchClient createIndexAndAliases(ReactorOpenSearchClient client, TypeMapping mappingContent) { return build().createIndexAndAliases(client, Optional.empty(), Optional.of(mappingContent)); } - public ReactorOpenSearchClient createIndexAndAliases(ReactorOpenSearchClient client, Optional<XContentBuilder> indexSettings, Optional<XContentBuilder> mappingContent) { + public ReactorOpenSearchClient createIndexAndAliases(ReactorOpenSearchClient client, Optional<IndexSettings> indexSettings, Optional<TypeMapping> mappingContent) { return build().createIndexAndAliases(client, indexSettings, mappingContent); } } @@ -164,11 +182,12 @@ public class IndexCreationFactory { private final int waitForActiveShards; private final IndexName indexName; private final ImmutableList<AliasName> aliases; - private final Optional<IndexCreationCustomElement> customAnalyzers; - private final Optional<IndexCreationCustomElement> customTokenizers; + private final ImmutableList<IndexCreationCustomAnalyzer> customAnalyzers; + private final ImmutableList<IndexCreationCustomTokenizer> customTokenizers; - private IndexCreationPerformer(int nbShards, int nbReplica, int waitForActiveShards, IndexName indexName, ImmutableList<AliasName> aliases, - Optional<IndexCreationCustomElement> customAnalyzers, Optional<IndexCreationCustomElement> customTokenizers) { + private IndexCreationPerformer(int nbShards, int nbReplica, int waitForActiveShards, IndexName indexName, + ImmutableList<AliasName> aliases, ImmutableList<IndexCreationCustomAnalyzer> customAnalyzers, + ImmutableList<IndexCreationCustomTokenizer> customTokenizers) { this.nbShards = nbShards; this.nbReplica = nbReplica; this.waitForActiveShards = waitForActiveShards; @@ -178,8 +197,9 @@ public class IndexCreationFactory { this.customTokenizers = customTokenizers; } - public ReactorOpenSearchClient createIndexAndAliases(ReactorOpenSearchClient client, Optional<XContentBuilder> indexSettings, - Optional<XContentBuilder> mappingContent) { + public ReactorOpenSearchClient createIndexAndAliases(ReactorOpenSearchClient client, + Optional<IndexSettings> indexSettings, + Optional<TypeMapping> mappingContent) { Preconditions.checkNotNull(indexName); try { createIndexIfNeeded(client, indexName, indexSettings.orElse(generateSetting()), mappingContent); @@ -193,31 +213,41 @@ public class IndexCreationFactory { private void createAliasIfNeeded(ReactorOpenSearchClient client, IndexName indexName, AliasName aliasName) throws IOException { if (!aliasExist(client, aliasName)) { - client.indices() - .updateAliases( - new IndicesAliasesRequest().addAliasAction( - new AliasActions(AliasActions.Type.ADD) + client.updateAliases( + new UpdateAliasesRequest.Builder() + .actions(new Action.Builder() + .add(new AddAction.Builder() .index(indexName.getValue()) - .alias(aliasName.getValue())), - RequestOptions.DEFAULT); + .alias(aliasName.getValue()) + .build()) + .build()) + .build()) + .block(); } } private boolean aliasExist(ReactorOpenSearchClient client, AliasName aliasName) throws IOException { - return client.indices() - .existsAlias(new GetAliasesRequest().aliases(aliasName.getValue()), RequestOptions.DEFAULT); + return client.aliasExists(new ExistsAliasRequest.Builder() + .name(aliasName.getValue()) + .build()) + .map(BooleanResponse::value) + .block(); } - private void createIndexIfNeeded(ReactorOpenSearchClient client, IndexName indexName, XContentBuilder settings, Optional<XContentBuilder> mappingContent) throws IOException { + private void createIndexIfNeeded(ReactorOpenSearchClient client, IndexName indexName, IndexSettings settings, Optional<TypeMapping> mappingContent) throws IOException { try { if (!indexExists(client, indexName)) { - CreateIndexRequest request = new CreateIndexRequest(indexName.getValue()).source(settings); - mappingContent.ifPresent(request::mapping); - client.indices().create( - request, - RequestOptions.DEFAULT); + CreateIndexRequest.Builder request = new CreateIndexRequest.Builder() + .index(indexName.getValue()) + .waitForActiveShards(new WaitForActiveShards.Builder() + .count(waitForActiveShards) + .build()) + .settings(settings); + mappingContent.ifPresent(request::mappings); + client.createIndex(request.build()) + .block(); } - } catch (OpenSearchStatusException exception) { + } catch (OpenSearchException exception) { if (exception.getMessage().contains(INDEX_ALREADY_EXISTS_EXCEPTION_MESSAGE)) { LOGGER.info("Index [{}] already exists", indexName.getValue()); } else { @@ -227,60 +257,66 @@ public class IndexCreationFactory { } private boolean indexExists(ReactorOpenSearchClient client, IndexName indexName) throws IOException { - return client.indices().exists(new GetIndexRequest(indexName.getValue()), RequestOptions.DEFAULT); + return client.indexExists(new ExistsRequest.Builder() + .index(indexName.getValue()) + .build()) + .map(BooleanResponse::value) + .block(); + } + + private IndexSettings generateSetting() { + return new IndexSettings.Builder() + .numberOfShards(Integer.toString(nbShards)) + .numberOfReplicas(Integer.toString(nbReplica)) + .analysis(new IndexSettingsAnalysis.Builder() + .normalizer(CASE_INSENSITIVE, new Normalizer.Builder() + .custom(generateNormalizer()) + .build()) + .analyzer(generateAnalyzers()) + .tokenizer(generateTokenizers()) + .build()) + .build(); } - private XContentBuilder generateSetting() throws IOException { - return jsonBuilder() - .startObject() - .startObject("settings") - .field("number_of_shards", nbShards) - .field("number_of_replicas", nbReplica) - .field("index.write.wait_for_active_shards", waitForActiveShards) - .startObject("analysis") - .startObject("normalizer") - .startObject(CASE_INSENSITIVE) - .field("type", "custom") - .startArray("char_filter") - .endArray() - .startArray("filter") - .value("lowercase") - .value("asciifolding") - .endArray() - .endObject() - .endObject() - .rawField(ANALYZER, generateAnalyzers(), XContentType.JSON) - .rawField(TOKENIZER, generateTokenizer(), XContentType.JSON) - .endObject() - .endObject() - .endObject(); + private CustomNormalizer generateNormalizer() { + return new CustomNormalizer.Builder() + .filter("lowercase", "asciifolding") + .build(); } - private String analyzerDefault() throws IOException { - XContentBuilder analyzerBuilder = jsonBuilder() - .startObject() - .startObject(KEEP_MAIL_AND_URL) - .field("tokenizer", "uax_url_email") - .startArray("filter") - .value("lowercase") - .value("stop") - .endArray() - .endObject() - .endObject(); - - return Strings.toString(analyzerBuilder); + private SnowballTokenFilter generateFilter() { + return new SnowballTokenFilter.Builder() + .language(SnowballLanguage.English) + .build(); } - private InputStream generateAnalyzers() { - return new ByteArrayInputStream(customAnalyzers.orElseGet(Throwing.supplier(() -> IndexCreationCustomElement.from(analyzerDefault())).sneakyThrow()) - .getPayload() - .getBytes(StandardCharsets.UTF_8)); + private Map<String, Analyzer> defaultAnalyzers() { + return ImmutableMap.of( + KEEP_MAIL_AND_URL, new Analyzer.Builder().custom( + new CustomAnalyzer.Builder() + .tokenizer("uax_url_email") + .filter("lowercase", "stop") + .build()) + .build() + ); + } + + private Map<String, Analyzer> generateAnalyzers() { + if (customAnalyzers.isEmpty()) { + return defaultAnalyzers(); + } + return customAnalyzers.stream() + .collect(Collectors.toMap( + IndexCreationCustomAnalyzer::getKey, + IndexCreationCustomAnalyzer::getAnalyzer)); } - private InputStream generateTokenizer() { - return new ByteArrayInputStream(customTokenizers.orElse(IndexCreationCustomElement.EMPTY) - .getPayload() - .getBytes(StandardCharsets.UTF_8)); + private Map<String, Tokenizer> generateTokenizers() { + return customTokenizers.stream() + .collect(Collectors.toMap( + IndexCreationCustomTokenizer::getKey, + IndexCreationCustomTokenizer::getTokenizer + )); } } diff --git a/backends-common/opensearch/src/main/java/org/apache/james/backends/opensearch/OpenSearchHealthCheck.java b/backends-common/opensearch/src/main/java/org/apache/james/backends/opensearch/OpenSearchHealthCheck.java index de26140daf..49874fc8f9 100644 --- a/backends-common/opensearch/src/main/java/org/apache/james/backends/opensearch/OpenSearchHealthCheck.java +++ b/backends-common/opensearch/src/main/java/org/apache/james/backends/opensearch/OpenSearchHealthCheck.java @@ -19,7 +19,10 @@ package org.apache.james.backends.opensearch; +import java.io.IOException; +import java.util.List; import java.util.Set; +import java.util.stream.Collectors; import javax.inject.Inject; @@ -27,9 +30,8 @@ import org.apache.commons.lang3.NotImplementedException; import org.apache.james.core.healthcheck.ComponentName; import org.apache.james.core.healthcheck.HealthCheck; import org.apache.james.core.healthcheck.Result; -import org.opensearch.action.admin.cluster.health.ClusterHealthRequest; -import org.opensearch.action.admin.cluster.health.ClusterHealthResponse; -import org.opensearch.client.Requests; +import org.opensearch.client.opensearch.cluster.HealthRequest; +import org.opensearch.client.opensearch.cluster.HealthResponse; import com.google.common.annotations.VisibleForTesting; @@ -54,24 +56,30 @@ public class OpenSearchHealthCheck implements HealthCheck { @Override public Mono<Result> check() { - String[] indices = indexNames.stream() + List<String> indices = indexNames.stream() .map(IndexName::getValue) - .toArray(String[]::new); - ClusterHealthRequest request = Requests.clusterHealthRequest(indices); + .collect(Collectors.toList()); + HealthRequest request = new HealthRequest.Builder() + .index(indices) + .build(); - return client.health(request) - .map(this::toHealthCheckResult) - .onErrorResume(e -> Mono.just(Result.unhealthy(COMPONENT_NAME, "Error while contacting cluster", e))); + try { + return client.health(request) + .map(this::toHealthCheckResult) + .onErrorResume(e -> Mono.just(Result.unhealthy(COMPONENT_NAME, "Error while contacting cluster", e))); + } catch (IOException e) { + return Mono.error(e); + } } @VisibleForTesting - Result toHealthCheckResult(ClusterHealthResponse response) { - switch (response.getStatus()) { - case GREEN: - case YELLOW: + Result toHealthCheckResult(HealthResponse response) { + switch (response.status()) { + case Green: + case Yellow: return Result.healthy(COMPONENT_NAME); - case RED: - return Result.unhealthy(COMPONENT_NAME, response.getClusterName() + " status is RED"); + case Red: + return Result.unhealthy(COMPONENT_NAME, response.clusterName() + " status is RED"); default: throw new NotImplementedException("Un-handled OpenSearch cluster status"); } diff --git a/backends-common/opensearch/src/main/java/org/apache/james/backends/opensearch/OpenSearchIndexer.java b/backends-common/opensearch/src/main/java/org/apache/james/backends/opensearch/OpenSearchIndexer.java index 9a0002cf91..19c46e0dac 100644 --- a/backends-common/opensearch/src/main/java/org/apache/james/backends/opensearch/OpenSearchIndexer.java +++ b/backends-common/opensearch/src/main/java/org/apache/james/backends/opensearch/OpenSearchIndexer.java @@ -18,24 +18,23 @@ ****************************************************************/ package org.apache.james.backends.opensearch; +import java.io.IOException; +import java.util.Collections; import java.util.List; import org.apache.commons.lang3.StringUtils; -import org.opensearch.action.bulk.BulkRequest; -import org.opensearch.action.bulk.BulkResponse; -import org.opensearch.action.delete.DeleteRequest; -import org.opensearch.action.get.GetRequest; -import org.opensearch.action.get.GetResponse; -import org.opensearch.action.index.IndexRequest; -import org.opensearch.action.index.IndexResponse; -import org.opensearch.action.update.UpdateRequest; -import org.opensearch.client.RequestOptions; -import org.opensearch.common.ValidationException; -import org.opensearch.common.xcontent.XContentType; -import org.opensearch.index.query.QueryBuilder; +import org.opensearch.client.opensearch._types.query_dsl.Query; +import org.opensearch.client.opensearch.core.BulkRequest; +import org.opensearch.client.opensearch.core.BulkResponse; +import org.opensearch.client.opensearch.core.GetRequest; +import org.opensearch.client.opensearch.core.GetResponse; +import org.opensearch.client.opensearch.core.IndexRequest; +import org.opensearch.client.opensearch.core.IndexResponse; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import com.fasterxml.jackson.databind.node.ObjectNode; +import com.fasterxml.jackson.databind.util.RawValue; import com.google.common.base.Preconditions; import reactor.core.publisher.Mono; @@ -59,11 +58,17 @@ public class OpenSearchIndexer { public Mono<IndexResponse> index(DocumentId id, String content, RoutingKey routingKey) { checkArgument(content); logContent(id, content); - return client.index(new IndexRequest(aliasName.getValue()) + + try { + return client.index(new IndexRequest.Builder<>() + .index(aliasName.getValue()) .id(id.asString()) - .source(content, XContentType.JSON) - .routing(routingKey.asString()), - RequestOptions.DEFAULT); + .document(new RawValue(content)) + .routing(routingKey.asString()) + .build()); + } catch (IOException e) { + return Mono.error(e); + } } private void logContent(DocumentId id, String content) { @@ -75,50 +80,70 @@ public class OpenSearchIndexer { public Mono<BulkResponse> update(List<UpdatedRepresentation> updatedDocumentParts, RoutingKey routingKey) { Preconditions.checkNotNull(updatedDocumentParts); Preconditions.checkNotNull(routingKey); - BulkRequest request = new BulkRequest(); - updatedDocumentParts.forEach(updatedDocumentPart -> request.add( - new UpdateRequest(aliasName.getValue(), - updatedDocumentPart.getId().asString()) - .doc(updatedDocumentPart.getUpdatedDocumentPart(), XContentType.JSON) - .routing(routingKey.asString()))); - - return client.bulk(request, RequestOptions.DEFAULT) - .onErrorResume(ValidationException.class, exception -> { - LOGGER.warn("Error while updating index", exception); - return Mono.empty(); - }); + + if (updatedDocumentParts.isEmpty()) { + return Mono.empty(); + } + + BulkRequest.Builder bulkBuilder = new BulkRequest.Builder(); + updatedDocumentParts.forEach(updatedDocumentPart -> bulkBuilder.operations( + op -> op.update(idx -> idx + .index(aliasName.getValue()) + .id(updatedDocumentPart.getId().asString()) + .document(Collections.singletonMap("doc", new RawValue(updatedDocumentPart.getUpdatedDocumentPart()))) + .routing(routingKey.asString()) + ))); + + try { + return client.bulk(bulkBuilder.build()); + } catch (IOException e) { + return Mono.error(e); + } } public Mono<BulkResponse> delete(List<DocumentId> ids, RoutingKey routingKey) { - BulkRequest request = new BulkRequest(); - ids.forEach(id -> request.add( - new DeleteRequest(aliasName.getValue()) + if (ids.isEmpty()) { + return Mono.empty(); + } + + BulkRequest.Builder bulkBuilder = new BulkRequest.Builder(); + + ids.forEach(id -> bulkBuilder.operations( + op -> op.delete(idx -> idx + .index(aliasName.getValue()) .id(id.asString()) - .routing(routingKey.asString()))); + .routing(routingKey.asString()) + ))); - return client.bulk(request, RequestOptions.DEFAULT) - .onErrorResume(ValidationException.class, exception -> { - LOGGER.warn("Error while deleting index", exception); - return Mono.empty(); - }); + try { + return client.bulk(bulkBuilder.build()); + } catch (IOException e) { + return Mono.error(e); + } } - public Mono<Void> deleteAllMatchingQuery(QueryBuilder queryBuilder, RoutingKey routingKey) { - return deleteByQueryPerformer.perform(queryBuilder, routingKey); + public Mono<Void> deleteAllMatchingQuery(Query query, RoutingKey routingKey) { + return deleteByQueryPerformer.perform(query, routingKey); } private void checkArgument(String content) { Preconditions.checkArgument(content != null, "content should be provided"); } - public Mono<GetResponse> get(DocumentId id, RoutingKey routingKey) { - return Mono.fromRunnable(() -> { - Preconditions.checkNotNull(id); - Preconditions.checkNotNull(routingKey); - }) - .then(client.get(new GetRequest(aliasName.getValue()) - .id(id.asString()) - .routing(routingKey.asString()), - RequestOptions.DEFAULT)); + public Mono<GetResponse<ObjectNode>> get(DocumentId id, RoutingKey routingKey) { + try { + return Mono.fromRunnable(() -> { + Preconditions.checkNotNull(id); + Preconditions.checkNotNull(routingKey); + }) + .then(client.get( + new GetRequest.Builder() + .index(aliasName.getValue()) + .id(id.asString()) + .routing(routingKey.asString()) + .build())); + } catch (IOException e) { + return Mono.error(e); + } } } diff --git a/backends-common/opensearch/src/main/java/org/apache/james/backends/opensearch/ReactorOpenSearchClient.java b/backends-common/opensearch/src/main/java/org/apache/james/backends/opensearch/ReactorOpenSearchClient.java index 1c739fcf37..d2818e2ad3 100644 --- a/backends-common/opensearch/src/main/java/org/apache/james/backends/opensearch/ReactorOpenSearchClient.java +++ b/backends-common/opensearch/src/main/java/org/apache/james/backends/opensearch/ReactorOpenSearchClient.java @@ -20,160 +20,129 @@ package org.apache.james.backends.opensearch; import java.io.IOException; -import java.util.function.Consumer; - -import org.opensearch.action.ActionListener; -import org.opensearch.action.admin.cluster.health.ClusterHealthRequest; -import org.opensearch.action.admin.cluster.health.ClusterHealthResponse; -import org.opensearch.action.admin.cluster.storedscripts.DeleteStoredScriptRequest; -import org.opensearch.action.admin.cluster.storedscripts.GetStoredScriptRequest; -import org.opensearch.action.admin.cluster.storedscripts.GetStoredScriptResponse; -import org.opensearch.action.bulk.BulkRequest; -import org.opensearch.action.bulk.BulkResponse; -import org.opensearch.action.delete.DeleteRequest; -import org.opensearch.action.delete.DeleteResponse; -import org.opensearch.action.explain.ExplainRequest; -import org.opensearch.action.explain.ExplainResponse; -import org.opensearch.action.fieldcaps.FieldCapabilitiesRequest; -import org.opensearch.action.fieldcaps.FieldCapabilitiesResponse; -import org.opensearch.action.get.GetRequest; -import org.opensearch.action.get.GetResponse; -import org.opensearch.action.index.IndexRequest; -import org.opensearch.action.index.IndexResponse; -import org.opensearch.action.search.ClearScrollRequest; -import org.opensearch.action.search.ClearScrollResponse; -import org.opensearch.action.search.MultiSearchRequest; -import org.opensearch.action.search.MultiSearchResponse; -import org.opensearch.action.search.SearchRequest; -import org.opensearch.action.search.SearchResponse; -import org.opensearch.action.search.SearchScrollRequest; -import org.opensearch.action.support.master.AcknowledgedResponse; -import org.opensearch.client.IndicesClient; -import org.opensearch.client.RequestOptions; +import java.util.concurrent.CompletableFuture; +import java.util.function.BiConsumer; + import org.opensearch.client.RestClient; -import org.opensearch.client.RestHighLevelClient; -import org.opensearch.client.core.MainResponse; -import org.opensearch.index.rankeval.RankEvalRequest; -import org.opensearch.index.rankeval.RankEvalResponse; -import org.opensearch.index.reindex.BulkByScrollResponse; -import org.opensearch.index.reindex.DeleteByQueryRequest; -import org.opensearch.script.mustache.MultiSearchTemplateRequest; -import org.opensearch.script.mustache.MultiSearchTemplateResponse; -import org.opensearch.script.mustache.SearchTemplateRequest; -import org.opensearch.script.mustache.SearchTemplateResponse; +import org.opensearch.client.opensearch.OpenSearchAsyncClient; +import org.opensearch.client.opensearch.cluster.HealthRequest; +import org.opensearch.client.opensearch.cluster.HealthResponse; +import org.opensearch.client.opensearch.core.BulkRequest; +import org.opensearch.client.opensearch.core.BulkResponse; +import org.opensearch.client.opensearch.core.ClearScrollRequest; +import org.opensearch.client.opensearch.core.ClearScrollResponse; +import org.opensearch.client.opensearch.core.DeleteByQueryRequest; +import org.opensearch.client.opensearch.core.DeleteByQueryResponse; +import org.opensearch.client.opensearch.core.DeleteRequest; +import org.opensearch.client.opensearch.core.DeleteResponse; +import org.opensearch.client.opensearch.core.GetRequest; +import org.opensearch.client.opensearch.core.GetResponse; +import org.opensearch.client.opensearch.core.IndexRequest; +import org.opensearch.client.opensearch.core.IndexResponse; +import org.opensearch.client.opensearch.core.InfoResponse; +import org.opensearch.client.opensearch.core.ScrollRequest; +import org.opensearch.client.opensearch.core.ScrollResponse; +import org.opensearch.client.opensearch.core.SearchRequest; +import org.opensearch.client.opensearch.core.SearchResponse; +import org.opensearch.client.opensearch.indices.CreateIndexRequest; +import org.opensearch.client.opensearch.indices.CreateIndexResponse; +import org.opensearch.client.opensearch.indices.ExistsAliasRequest; +import org.opensearch.client.opensearch.indices.ExistsRequest; +import org.opensearch.client.opensearch.indices.UpdateAliasesRequest; +import org.opensearch.client.opensearch.indices.UpdateAliasesResponse; +import org.opensearch.client.transport.endpoints.BooleanResponse; + +import com.fasterxml.jackson.databind.node.ObjectNode; import reactor.core.publisher.Mono; import reactor.core.publisher.MonoSink; import reactor.core.scheduler.Schedulers; public class ReactorOpenSearchClient implements AutoCloseable { - private final RestHighLevelClient client; + private final OpenSearchAsyncClient client; + private final RestClient lowLevelRestClient; - public ReactorOpenSearchClient(RestHighLevelClient client) { + public ReactorOpenSearchClient(OpenSearchAsyncClient client, RestClient lowLevelRestClient) { this.client = client; + this.lowLevelRestClient = lowLevelRestClient; } - public Mono<BulkResponse> bulk(BulkRequest bulkRequest, RequestOptions options) { - return toReactor(listener -> client.bulkAsync(bulkRequest, options, listener)); - } - - public Mono<ClearScrollResponse> clearScroll(ClearScrollRequest clearScrollRequest, RequestOptions options) { - return toReactor(listener -> client.clearScrollAsync(clearScrollRequest, options, listener)); - } - - public DeleteResponse delete(DeleteRequest deleteRequest, RequestOptions options) throws IOException { - return client.delete(deleteRequest, options); - } - - public Mono<BulkByScrollResponse> deleteByQuery(DeleteByQueryRequest deleteRequest, RequestOptions options) { - return toReactor(listener -> client.deleteByQueryAsync(deleteRequest, options, listener)); + public Mono<BulkResponse> bulk(BulkRequest bulkRequest) throws IOException { + return toReactor(client.bulk(bulkRequest)); } - public Mono<AcknowledgedResponse> deleteScript(DeleteStoredScriptRequest request, RequestOptions options) { - return toReactor(listener -> client.deleteScriptAsync(request, options, listener)); + public Mono<ClearScrollResponse> clearScroll(ClearScrollRequest clearScrollRequest) throws IOException { + return toReactor(client.clearScroll(clearScrollRequest)); } - public Mono<ExplainResponse> explain(ExplainRequest explainRequest, RequestOptions options) { - return toReactor(listener -> client.explainAsync(explainRequest, options, listener)); + public Mono<DeleteResponse> delete(DeleteRequest deleteRequest) throws IOException { + return toReactor(client.delete(deleteRequest)); } - public Mono<FieldCapabilitiesResponse> fieldCaps(FieldCapabilitiesRequest fieldCapabilitiesRequest, RequestOptions options) { - return toReactor(listener -> client.fieldCapsAsync(fieldCapabilitiesRequest, options, listener)); + public Mono<DeleteByQueryResponse> deleteByQuery(DeleteByQueryRequest deleteRequest) throws IOException { + return toReactor(client.deleteByQuery(deleteRequest)); } public RestClient getLowLevelClient() { - return client.getLowLevelClient(); + return lowLevelRestClient; } - public Mono<GetStoredScriptResponse> getScript(GetStoredScriptRequest request, RequestOptions options) { - return toReactor(listener -> client.getScriptAsync(request, options, listener)); + public <T> Mono<IndexResponse> index(IndexRequest<T> indexRequest) throws IOException { + return toReactor(client.index(indexRequest)); } - public Mono<IndexResponse> index(IndexRequest indexRequest, RequestOptions options) { - return toReactor(listener -> client.indexAsync(indexRequest, options, listener)); + public Mono<BooleanResponse> indexExists(ExistsRequest existsRequest) throws IOException { + return toReactor(client.indices().exists(existsRequest)); } - public IndicesClient indices() { - return client.indices(); + public Mono<BooleanResponse> aliasExists(ExistsAliasRequest existsAliasRequest) throws IOException { + return toReactor(client.indices().existsAlias(existsAliasRequest)); } - public MainResponse info(RequestOptions options) throws IOException { - return client.info(options); + public Mono<CreateIndexResponse> createIndex(CreateIndexRequest indexRequest) throws IOException { + return toReactor(client.indices().create(indexRequest)); } - public Mono<MultiSearchResponse> msearch(MultiSearchRequest multiSearchRequest, RequestOptions options) { - return toReactor(listener -> client.msearchAsync(multiSearchRequest, options, listener)); + public Mono<UpdateAliasesResponse> updateAliases(UpdateAliasesRequest updateAliasesRequest) throws IOException { + return toReactor(client.indices().updateAliases(updateAliasesRequest)); } - public Mono<MultiSearchTemplateResponse> msearchTemplate(MultiSearchTemplateRequest multiSearchTemplateRequest, RequestOptions options) { - return toReactor(listener -> client.msearchTemplateAsync(multiSearchTemplateRequest, options, listener)); + public Mono<InfoResponse> info() throws IOException { + return toReactor(client.info()); } - public Mono<RankEvalResponse> rankEval(RankEvalRequest rankEvalRequest, RequestOptions options) { - return toReactor(listener -> client.rankEvalAsync(rankEvalRequest, options, listener)); + public Mono<ScrollResponse<ObjectNode>> scroll(ScrollRequest scrollRequest) throws IOException { + return toReactor(client.scroll(scrollRequest, ObjectNode.class)); } - public Mono<SearchResponse> scroll(SearchScrollRequest searchScrollRequest, RequestOptions options) { - return toReactor(listener -> client.scrollAsync(searchScrollRequest, options, listener)); + public Mono<SearchResponse<ObjectNode>> search(SearchRequest searchRequest) throws IOException { + return toReactor(client.search(searchRequest, ObjectNode.class)); } - public Mono<SearchResponse> search(SearchRequest searchRequest, RequestOptions options) { - return toReactor(listener -> client.searchAsync(searchRequest, options, listener)); + public Mono<HealthResponse> health(HealthRequest request) throws IOException { + return toReactor(client.cluster().health(request)); } - public Mono<ClusterHealthResponse> health(ClusterHealthRequest request) { - return toReactor(listener -> client.cluster() - .healthAsync(request, RequestOptions.DEFAULT, listener)); - } - - public Mono<SearchTemplateResponse> searchTemplate(SearchTemplateRequest searchTemplateRequest, RequestOptions options) { - return toReactor(listener -> client.searchTemplateAsync(searchTemplateRequest, options, listener)); - } - - public Mono<GetResponse> get(GetRequest getRequest, RequestOptions options) { - return toReactor(listener -> client.getAsync(getRequest, options, listener)); + public Mono<GetResponse<ObjectNode>> get(GetRequest getRequest) throws IOException { + return toReactor(client.get(getRequest, ObjectNode.class)); } @Override public void close() throws IOException { - client.close(); + lowLevelRestClient.close(); } - private static <T> Mono<T> toReactor(Consumer<ActionListener<T>> async) { - return Mono.<T>create(sink -> async.accept(getListener(sink))) - .publishOn(Schedulers.boundedElastic()); + private static <T> Mono<T> toReactor(CompletableFuture<T> async) { + return Mono.<T>create(sink -> async.whenComplete(getFuture(sink))) + .publishOn(Schedulers.elastic()); } - private static <T> ActionListener<T> getListener(MonoSink<T> sink) { - return new ActionListener<T>() { - @Override - public void onResponse(T t) { - sink.success(t); - } - - @Override - public void onFailure(Exception e) { - sink.error(e); + private static <T> BiConsumer<? super T, ? super Throwable> getFuture(MonoSink<T> sink) { + return (response, exception) -> { + if (exception != null) { + sink.error(exception); + } else { + sink.success(response); } }; } diff --git a/backends-common/opensearch/src/main/java/org/apache/james/backends/opensearch/RoutingKey.java b/backends-common/opensearch/src/main/java/org/apache/james/backends/opensearch/RoutingKey.java index bb823f8e9f..4dedbc67fe 100644 --- a/backends-common/opensearch/src/main/java/org/apache/james/backends/opensearch/RoutingKey.java +++ b/backends-common/opensearch/src/main/java/org/apache/james/backends/opensearch/RoutingKey.java @@ -21,9 +21,8 @@ package org.apache.james.backends.opensearch; import java.util.Objects; -import org.opensearch.common.Strings; - import com.google.common.base.Preconditions; +import com.google.common.base.Strings; public class RoutingKey { public interface Factory<T> { diff --git a/backends-common/opensearch/src/main/java/org/apache/james/backends/opensearch/UpdatedRepresentation.java b/backends-common/opensearch/src/main/java/org/apache/james/backends/opensearch/UpdatedRepresentation.java index f221ccd45c..76c415b9e1 100644 --- a/backends-common/opensearch/src/main/java/org/apache/james/backends/opensearch/UpdatedRepresentation.java +++ b/backends-common/opensearch/src/main/java/org/apache/james/backends/opensearch/UpdatedRepresentation.java @@ -20,10 +20,9 @@ package org.apache.james.backends.opensearch; import java.util.Objects; -import org.opensearch.common.Strings; - import com.google.common.base.MoreObjects; import com.google.common.base.Preconditions; +import com.google.common.base.Strings; public class UpdatedRepresentation { private final DocumentId id; diff --git a/backends-common/opensearch/src/main/java/org/apache/james/backends/opensearch/search/ScrolledSearch.java b/backends-common/opensearch/src/main/java/org/apache/james/backends/opensearch/search/ScrolledSearch.java index ecb9cf955a..3a5d22d1ce 100644 --- a/backends-common/opensearch/src/main/java/org/apache/james/backends/opensearch/search/ScrolledSearch.java +++ b/backends-common/opensearch/src/main/java/org/apache/james/backends/opensearch/search/ScrolledSearch.java @@ -19,19 +19,20 @@ package org.apache.james.backends.opensearch.search; +import java.io.IOException; import java.util.Optional; import java.util.concurrent.atomic.AtomicReference; import java.util.function.Consumer; import org.apache.james.backends.opensearch.ReactorOpenSearchClient; -import org.opensearch.action.search.ClearScrollRequest; -import org.opensearch.action.search.SearchRequest; -import org.opensearch.action.search.SearchResponse; -import org.opensearch.action.search.SearchScrollRequest; -import org.opensearch.client.RequestOptions; -import org.opensearch.common.unit.TimeValue; -import org.opensearch.search.SearchHit; - +import org.opensearch.client.opensearch._types.Time; +import org.opensearch.client.opensearch.core.ClearScrollRequest; +import org.opensearch.client.opensearch.core.ScrollRequest; +import org.opensearch.client.opensearch.core.ScrollResponse; +import org.opensearch.client.opensearch.core.SearchRequest; +import org.opensearch.client.opensearch.core.search.Hit; + +import com.fasterxml.jackson.databind.node.ObjectNode; import com.github.fge.lambdas.Throwing; import reactor.core.publisher.Flux; @@ -39,8 +40,9 @@ import reactor.core.publisher.FluxSink; import reactor.core.publisher.Mono; public class ScrolledSearch { - - private static final TimeValue TIMEOUT = TimeValue.timeValueMinutes(1); + private static final Time TIMEOUT = new Time.Builder() + .time("1m") + .build(); private final ReactorOpenSearchClient client; private final SearchRequest searchRequest; @@ -50,12 +52,12 @@ public class ScrolledSearch { this.searchRequest = searchRequest; } - public Flux<SearchHit> searchHits() { + public Flux<Hit<ObjectNode>> searchHits() { return searchResponses() - .concatMap(searchResponse -> Flux.just(searchResponse.getHits().getHits())); + .concatMap(searchResponse -> Flux.fromIterable(searchResponse.hits().hits())); } - public Flux<SearchResponse> searchResponses() { + private Flux<ScrollResponse<ObjectNode>> searchResponses() { return Flux.push(sink -> { AtomicReference<Optional<String>> scrollId = new AtomicReference<>(Optional.empty()); sink.onRequest(numberOfRequestedElements -> next(sink, scrollId, numberOfRequestedElements)); @@ -64,17 +66,16 @@ public class ScrolledSearch { }); } - private void next(FluxSink<SearchResponse> sink, AtomicReference<Optional<String>> scrollId, long numberOfRequestedElements) { + private void next(FluxSink<ScrollResponse<ObjectNode>> sink, AtomicReference<Optional<String>> scrollId, long numberOfRequestedElements) { if (numberOfRequestedElements <= 0) { return; } - Consumer<SearchResponse> onResponse = searchResponse -> { - scrollId.set(Optional.of(searchResponse.getScrollId())); + Consumer<ScrollResponse<ObjectNode>> onResponse = searchResponse -> { + scrollId.set(Optional.of(searchResponse.scrollId())); sink.next(searchResponse); - boolean noHitsLeft = searchResponse.getHits().getHits().length == 0; - if (noHitsLeft) { + if (searchResponse.hits().hits().isEmpty()) { sink.complete(); } else { next(sink, scrollId, numberOfRequestedElements - 1); @@ -87,22 +88,33 @@ public class ScrolledSearch { .subscribe(onResponse, onFailure); } - private Mono<SearchResponse> buildRequest(Optional<String> scrollId) { - return scrollId.map(id -> - client.scroll( - new SearchScrollRequest() - .scrollId(scrollId.get()) - .scroll(TIMEOUT), - RequestOptions.DEFAULT)) - .orElseGet(() -> client.search(searchRequest, RequestOptions.DEFAULT)); + private Mono<ScrollResponse<ObjectNode>> buildRequest(Optional<String> scrollId) { + return scrollId.map(Throwing.function(id -> client.scroll(new ScrollRequest.Builder() + .scrollId(scrollId.get()) + .scroll(TIMEOUT) + .build())).sneakyThrow()) + .orElseGet(() -> { + try { + return client.search(searchRequest) + .map(response -> new ScrollResponse.Builder<ObjectNode>() + .scrollId(response.scrollId()) + .hits(response.hits()) + .took(response.took()) + .timedOut(response.timedOut()) + .shards(response.shards()) + .build()); + } catch (IOException e) { + throw new RuntimeException(e); + } + }); } public void close(AtomicReference<Optional<String>> scrollId) { - scrollId.get().map(id -> { - ClearScrollRequest clearScrollRequest = new ClearScrollRequest(); - clearScrollRequest.addScrollId(id); - return clearScrollRequest; - }).ifPresent(Throwing.<ClearScrollRequest>consumer(clearScrollRequest -> client.clearScroll(clearScrollRequest, RequestOptions.DEFAULT).subscribe()).sneakyThrow()); + scrollId.get().map(id -> new ClearScrollRequest.Builder() + .scrollId(id) + .build()) + .ifPresent(Throwing.<ClearScrollRequest>consumer(clearScrollRequest -> + client.clearScroll(clearScrollRequest).subscribe()).sneakyThrow()); } } diff --git a/backends-common/opensearch/src/test/java/org/apache/james/backends/opensearch/ClientProviderImplConnectionContract.java b/backends-common/opensearch/src/test/java/org/apache/james/backends/opensearch/ClientProviderImplConnectionContract.java index 9434324172..9f89156f42 100644 --- a/backends-common/opensearch/src/test/java/org/apache/james/backends/opensearch/ClientProviderImplConnectionContract.java +++ b/backends-common/opensearch/src/test/java/org/apache/james/backends/opensearch/ClientProviderImplConnectionContract.java @@ -24,10 +24,8 @@ import java.util.concurrent.TimeUnit; import org.apache.james.backends.opensearch.OpenSearchClusterExtension.OpenSearchCluster; import org.awaitility.Awaitility; import org.junit.jupiter.api.Test; -import org.opensearch.action.search.SearchRequest; -import org.opensearch.client.RequestOptions; -import org.opensearch.index.query.QueryBuilders; -import org.opensearch.search.builder.SearchSourceBuilder; +import org.opensearch.client.opensearch._types.query_dsl.MatchAllQuery; +import org.opensearch.client.opensearch.core.SearchRequest; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -76,9 +74,9 @@ interface ClientProviderImplConnectionContract { default boolean isConnected(ClientProvider clientProvider) { try (ReactorOpenSearchClient client = clientProvider.get()) { client.search( - new SearchRequest() - .source(new SearchSourceBuilder().query(QueryBuilders.existsQuery("any"))), - RequestOptions.DEFAULT).block(); + new SearchRequest.Builder() + .query(new MatchAllQuery.Builder().build()._toQuery()) + .build()).block(); return true; } catch (Exception e) { LOGGER.info("Caught exception while trying to connect", e); diff --git a/backends-common/opensearch/src/test/java/org/apache/james/backends/opensearch/DockerOpenSearchExtension.java b/backends-common/opensearch/src/test/java/org/apache/james/backends/opensearch/DockerOpenSearchExtension.java index 34a105b5e6..021a60d790 100644 --- a/backends-common/opensearch/src/test/java/org/apache/james/backends/opensearch/DockerOpenSearchExtension.java +++ b/backends-common/opensearch/src/test/java/org/apache/james/backends/opensearch/DockerOpenSearchExtension.java @@ -19,18 +19,14 @@ package org.apache.james.backends.opensearch; -import java.time.Duration; - import org.junit.jupiter.api.extension.AfterEachCallback; import org.junit.jupiter.api.extension.BeforeEachCallback; import org.junit.jupiter.api.extension.ExtensionContext; import org.junit.jupiter.api.extension.ParameterContext; import org.junit.jupiter.api.extension.ParameterResolutionException; import org.junit.jupiter.api.extension.ParameterResolver; -import org.opensearch.action.search.SearchRequest; -import org.opensearch.client.RequestOptions; -import org.opensearch.index.query.QueryBuilders; -import org.opensearch.search.builder.SearchSourceBuilder; +import org.opensearch.client.opensearch._types.query_dsl.MatchAllQuery; +import org.opensearch.client.opensearch.core.SearchRequest; import org.testcontainers.shaded.org.awaitility.Awaitility; public class DockerOpenSearchExtension implements AfterEachCallback, BeforeEachCallback, ParameterResolver { @@ -57,20 +53,20 @@ public class DockerOpenSearchExtension implements AfterEachCallback, BeforeEachC } @Override - public void clean(DockerOpenSearch elasticSearch) { + public void clean(DockerOpenSearch openSearch) { Awaitility.await() .until(() -> { - elasticSearch.flushIndices(); - ReactorOpenSearchClient client = elasticSearch.clientProvider().get(); + openSearch.flushIndices(); + ReactorOpenSearchClient client = openSearch.clientProvider().get(); new DeleteByQueryPerformer(client, aliasName) - .perform(QueryBuilders.matchAllQuery()) + .perform(new MatchAllQuery.Builder().build()._toQuery()) .block(); - SearchRequest searchRequest = new SearchRequest(); - searchRequest.source(new SearchSourceBuilder() - .query(QueryBuilders.matchAllQuery())); - elasticSearch.flushIndices(); - boolean result = client.search(new SearchRequest(searchRequest), RequestOptions.DEFAULT) - .map(searchResponse -> searchResponse.getHits().getHits().length) + SearchRequest searchRequest = new SearchRequest.Builder() + .query(new MatchAllQuery.Builder().build()._toQuery()) + .build(); + openSearch.flushIndices(); + boolean result = client.search(searchRequest) + .map(searchResponse -> searchResponse.hits().hits().size()) .block() == 0; try { @@ -83,7 +79,7 @@ public class DockerOpenSearchExtension implements AfterEachCallback, BeforeEachC } } - private final DockerOpenSearch elasticSearch = DockerOpenSearchSingleton.INSTANCE; + private final DockerOpenSearch openSearch = DockerOpenSearchSingleton.INSTANCE; private final CleanupStrategy cleanupStrategy; public DockerOpenSearchExtension() { @@ -96,13 +92,13 @@ public class DockerOpenSearchExtension implements AfterEachCallback, BeforeEachC @Override public void afterEach(ExtensionContext context) { - cleanupStrategy.clean(elasticSearch); + cleanupStrategy.clean(openSearch); } @Override public void beforeEach(ExtensionContext extensionContext) { - if (!elasticSearch.isRunning()) { - elasticSearch.unpause(); + if (!openSearch.isRunning()) { + openSearch.unpause(); } awaitForOpenSearch(); } @@ -114,14 +110,14 @@ public class DockerOpenSearchExtension implements AfterEachCallback, BeforeEachC @Override public Object resolveParameter(ParameterContext parameterContext, ExtensionContext extensionContext) throws ParameterResolutionException { - return elasticSearch; + return openSearch; } public void awaitForOpenSearch() { - elasticSearch.flushIndices(); + openSearch.flushIndices(); } public DockerOpenSearch getDockerOpenSearch() { - return elasticSearch; + return openSearch; } } diff --git a/backends-common/opensearch/src/test/java/org/apache/james/backends/opensearch/IndexCreationFactoryTest.java b/backends-common/opensearch/src/test/java/org/apache/james/backends/opensearch/IndexCreationFactoryTest.java index 8cda09676f..2b169aaaa5 100644 --- a/backends-common/opensearch/src/test/java/org/apache/james/backends/opensearch/IndexCreationFactoryTest.java +++ b/backends-common/opensearch/src/test/java/org/apache/james/backends/opensearch/IndexCreationFactoryTest.java @@ -19,78 +19,77 @@ package org.apache.james.backends.opensearch; -import static org.apache.james.backends.opensearch.IndexCreationFactory.ANALYZER; -import static org.apache.james.backends.opensearch.IndexCreationFactory.TOKENIZER; -import static org.apache.james.backends.opensearch.IndexCreationFactory.TYPE; import static org.assertj.core.api.Assertions.assertThatThrownBy; -import static org.opensearch.common.xcontent.XContentFactory.jsonBuilder; import java.io.IOException; import java.util.Optional; -import org.apache.james.backends.opensearch.IndexCreationFactory.IndexCreationCustomElement; +import org.apache.james.backends.opensearch.IndexCreationFactory.IndexCreationCustomAnalyzer; +import org.apache.james.backends.opensearch.IndexCreationFactory.IndexCreationCustomTokenizer; import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.extension.RegisterExtension; -import org.opensearch.OpenSearchStatusException; -import org.opensearch.common.xcontent.XContentBuilder; +import org.opensearch.client.opensearch._types.analysis.Analyzer; +import org.opensearch.client.opensearch._types.analysis.CustomAnalyzer; +import org.opensearch.client.opensearch._types.analysis.NGramTokenFilter; +import org.opensearch.client.opensearch._types.analysis.PatternTokenizer; +import org.opensearch.client.opensearch._types.analysis.TokenFilter; +import org.opensearch.client.opensearch._types.analysis.TokenFilterDefinition; +import org.opensearch.client.opensearch._types.analysis.Tokenizer; +import org.opensearch.client.opensearch._types.analysis.TokenizerDefinition; +import org.opensearch.client.opensearch.indices.IndexSettings; +import org.opensearch.client.opensearch.indices.IndexSettingsAnalysis; class IndexCreationFactoryTest { private static final IndexName INDEX_NAME = new IndexName("index"); private static final ReadAliasName ALIAS_NAME = new ReadAliasName("alias"); - public static XContentBuilder getValidIndexSetting() throws IOException { - return jsonBuilder() - .startObject() - .startObject("settings") - .startObject("index") - .field("max_ngram_diff", 10) - .endObject() - .startObject("analysis") - .startObject(ANALYZER) - .startObject("email_ngram_filter_analyzer") - .field(TOKENIZER, "uax_url_email") - .startArray("filter") - .value("ngram_filter") - .endArray() - .endObject() - .endObject() - .startObject("filter") - .startObject("ngram_filter") - .field(TYPE, "ngram") - .field("min_gram", 3) - .field("max_gram", 13) - .endObject() - .endObject() - .endObject() - .endObject() - .endObject(); + public static IndexSettings getValidIndexSetting() { + return new IndexSettings.Builder() + .index(new IndexSettings.Builder() + .maxNgramDiff(10) + .build()) + .analysis(new IndexSettingsAnalysis.Builder() + .analyzer("email_ngram_filter_analyzer", new Analyzer.Builder() + .custom(generateAnalyzer()) + .build()) + .filter("ngram_filter", new TokenFilter.Builder() + .definition(new TokenFilterDefinition.Builder() + .ngram(generateFilter()) + .build()) + .build()) + .build()) + .build(); } - public static XContentBuilder getInvalidIndexSetting() throws IOException { - return jsonBuilder() - .startObject() - .startObject("settings") - .startObject("analysis") - .startObject(ANALYZER) - .startObject("email_ngram_filter_analyzer") - .field(TOKENIZER, "uax_url_email") - .startArray("filter") - .value("ngram_filter") - .endArray() - .endObject() - .endObject() - .startObject("filter") - .startObject("ngram_filter") - .field(TYPE, "ngram") - .field("min_gram", 3) - .field("max_gram", 13) - .endObject() - .endObject() - .endObject() - .endObject() - .endObject(); + public static IndexSettings getInvalidIndexSetting() { + return new IndexSettings.Builder() + .analysis(new IndexSettingsAnalysis.Builder() + .analyzer("email_ngram_filter_analyzer", new Analyzer.Builder() + .custom(generateAnalyzer()) + .build()) + .filter("ngram_filter", new TokenFilter.Builder() + .definition(new TokenFilterDefinition.Builder() + .ngram(generateFilter()) + .build()) + .build()) + .build()) + .build(); + } + + private static CustomAnalyzer generateAnalyzer() { + return new CustomAnalyzer.Builder() + .tokenizer("uax_url_email") + .filter("ngram_filter") + .build(); + } + + private static NGramTokenFilter generateFilter() { + return new NGramTokenFilter.Builder() + .minGram(3) + .maxGram(13) + .build(); } @RegisterExtension @@ -133,19 +132,14 @@ class IndexCreationFactoryTest { new IndexCreationFactory(OpenSearchConfiguration.DEFAULT_CONFIGURATION) .useIndex(INDEX_NAME) .addAlias(ALIAS_NAME) - .customAnalyzers(IndexCreationCustomElement.from("{" + - " \"my_custom_analyzer\": {" + - " \"type\": \"custom\"," + - " \"tokenizer\": \"standard\"," + - " \"char_filter\": [" + - " \"html_strip\"" + - " ]," + - " \"filter\": [" + - " \"lowercase\"," + - " \"asciifolding\"" + - " ]" + - " }" + - "}")) + .customAnalyzers(new IndexCreationCustomAnalyzer("my_custom_analyzer", + new Analyzer.Builder() + .custom(new CustomAnalyzer.Builder() + .tokenizer("standard") + .filter("lowercase", "asciifolding") + .charFilter("html_strip") + .build()) + .build())) .createIndexAndAliases(client); } @@ -155,14 +149,14 @@ class IndexCreationFactoryTest { new IndexCreationFactory(OpenSearchConfiguration.DEFAULT_CONFIGURATION) .useIndex(INDEX_NAME) .addAlias(ALIAS_NAME) - .customAnalyzers(IndexCreationCustomElement.from("{" + - " \"my_custom_analyzer\": {" + - " \"type\": \"invalid\"," + - " \"tokenizer\": \"not_Found_tokenizer\"" + - " }" + - "}")) + .customAnalyzers(new IndexCreationCustomAnalyzer("my_custom_analyzer", + new Analyzer.Builder() + .custom(new CustomAnalyzer.Builder() + .tokenizer("not_Found_tokenizer") + .build()) + .build())) .createIndexAndAliases(client)) - .isInstanceOf(OpenSearchStatusException.class); + .isInstanceOf(Exception.class); } @Test @@ -170,12 +164,16 @@ class IndexCreationFactoryTest { new IndexCreationFactory(OpenSearchConfiguration.DEFAULT_CONFIGURATION) .useIndex(INDEX_NAME) .addAlias(ALIAS_NAME) - .customTokenizers(IndexCreationCustomElement.from("{" + - " \"custom_tokenizer\": { " + - " \"type\": \"pattern\"," + - " \"pattern\": \"[ .,!?]\"" + - " }" + - " }")) + .customTokenizers(new IndexCreationCustomTokenizer("custom_tokenizer", + new Tokenizer.Builder() + .definition(new TokenizerDefinition.Builder() + .pattern(new PatternTokenizer.Builder() + .pattern("[ .,!?]") + .flags("CASE_INSENSITIVE|COMMENTS") + .group(0) + .build()) + .build()) + .build())) .createIndexAndAliases(client); } @@ -185,14 +183,16 @@ class IndexCreationFactoryTest { new IndexCreationFactory(OpenSearchConfiguration.DEFAULT_CONFIGURATION) .useIndex(INDEX_NAME) .addAlias(ALIAS_NAME) - .customTokenizers(IndexCreationCustomElement.from("{" + - " \"custom_tokenizer\": { " + - " \"type\": \"invalidType\"," + - " \"pattern\": \"[ .,!?]\"" + - " }" + - " }")) + .customTokenizers(new IndexCreationCustomTokenizer("custom_tokenizer", + new Tokenizer.Builder() + .definition(new TokenizerDefinition.Builder() + .pattern(new PatternTokenizer.Builder() + .pattern("[ .,!?]") + .build()) + .build()) + .build())) .createIndexAndAliases(client)) - .isInstanceOf(OpenSearchStatusException.class); + .isInstanceOf(Exception.class); } @Test @@ -200,25 +200,28 @@ class IndexCreationFactoryTest { new IndexCreationFactory(OpenSearchConfiguration.DEFAULT_CONFIGURATION) .useIndex(INDEX_NAME) .addAlias(ALIAS_NAME) - .customAnalyzers(IndexCreationCustomElement.from("{" + - " \"my_custom_analyzer\": { " + - " \"tokenizer\": \"custom_tokenizer\"," + - " \"filter\": [" + - " \"lowercase\"" + - " ]" + - " }" + - " }")) - .customTokenizers(IndexCreationCustomElement.from("{" + - " \"custom_tokenizer\": { " + - " \"type\": \"pattern\"," + - " \"pattern\": \"[ .,!?]\"" + - " }" + - " }")) + .customAnalyzers(new IndexCreationCustomAnalyzer("my_custom_analyzer", + new Analyzer.Builder() + .custom(new CustomAnalyzer.Builder() + .tokenizer("custom_tokenizer") + .filter("lowercase") + .build()) + .build())) + .customTokenizers(new IndexCreationCustomTokenizer("custom_tokenizer", + new Tokenizer.Builder() + .definition(new TokenizerDefinition.Builder() + .pattern(new PatternTokenizer.Builder() + .pattern("[ .,!?]") + .flags("CASE_INSENSITIVE|COMMENTS") + .group(0) + .build()) + .build()) + .build())) .createIndexAndAliases(client); } @Test - void customIndexSettingShouldNotThrowWhenValidSetting() throws IOException { + void customIndexSettingShouldNotThrowWhenValidSetting() { new IndexCreationFactory(OpenSearchConfiguration.DEFAULT_CONFIGURATION) .useIndex(INDEX_NAME) .addAlias(ALIAS_NAME) @@ -232,7 +235,7 @@ class IndexCreationFactoryTest { .useIndex(INDEX_NAME) .addAlias(ALIAS_NAME) .createIndexAndAliases(client, Optional.of(getInvalidIndexSetting()), Optional.empty())) - .isInstanceOf(OpenSearchStatusException.class); + .isInstanceOf(Exception.class); } } \ No newline at end of file diff --git a/backends-common/opensearch/src/test/java/org/apache/james/backends/opensearch/OpenSearchHealthCheckTest.java b/backends-common/opensearch/src/test/java/org/apache/james/backends/opensearch/OpenSearchHealthCheckTest.java index 5bacd05967..1e40074c7f 100644 --- a/backends-common/opensearch/src/test/java/org/apache/james/backends/opensearch/OpenSearchHealthCheckTest.java +++ b/backends-common/opensearch/src/test/java/org/apache/james/backends/opensearch/OpenSearchHealthCheckTest.java @@ -22,17 +22,32 @@ import static org.assertj.core.api.Assertions.assertThat; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; -import org.opensearch.action.admin.cluster.health.ClusterHealthResponse; -import org.opensearch.cluster.ClusterName; -import org.opensearch.cluster.ClusterState; -import org.opensearch.cluster.block.ClusterBlocks; -import org.opensearch.cluster.health.ClusterHealthStatus; -import org.opensearch.cluster.node.DiscoveryNodes; -import org.opensearch.cluster.routing.RoutingTable; +import org.opensearch.client.opensearch._types.HealthStatus; +import org.opensearch.client.opensearch.cluster.HealthResponse; import com.google.common.collect.ImmutableSet; class OpenSearchHealthCheckTest { + private static HealthResponse fakeHealthResponse(HealthStatus status) { + return new HealthResponse.Builder() + .clusterName("fake-cluster") + .activePrimaryShards(0) + .activeShards(0) + .activeShardsPercentAsNumber("0") + .delayedUnassignedShards(0) + .initializingShards(0) + .numberOfDataNodes(0) + .numberOfInFlightFetch(0) + .numberOfNodes(0) + .numberOfPendingTasks(0) + .relocatingShards(0) + .taskMaxWaitingInQueueMillis(String.valueOf(System.currentTimeMillis())) + .timedOut(false) + .unassignedShards(0) + .status(status) + .build(); + } + private OpenSearchHealthCheck healthCheck; @BeforeEach @@ -42,39 +57,22 @@ class OpenSearchHealthCheckTest { @Test void checkShouldReturnHealthyWhenOpenSearchClusterHealthStatusIsGreen() { - FakeClusterHealthResponse response = new FakeClusterHealthResponse(ClusterHealthStatus.GREEN); + HealthResponse response = fakeHealthResponse(HealthStatus.Green); assertThat(healthCheck.toHealthCheckResult(response).isHealthy()).isTrue(); } @Test void checkShouldReturnUnHealthyWhenOpenSearchClusterHealthStatusIsRed() { - FakeClusterHealthResponse response = new FakeClusterHealthResponse(ClusterHealthStatus.RED); + HealthResponse response = fakeHealthResponse(HealthStatus.Red); assertThat(healthCheck.toHealthCheckResult(response).isUnHealthy()).isTrue(); } @Test void checkShouldReturnHealthyWhenOpenSearchClusterHealthStatusIsYellow() { - FakeClusterHealthResponse response = new FakeClusterHealthResponse(ClusterHealthStatus.YELLOW); + HealthResponse response = fakeHealthResponse(HealthStatus.Yellow); assertThat(healthCheck.toHealthCheckResult(response).isHealthy()).isTrue(); } - - private static class FakeClusterHealthResponse extends ClusterHealthResponse { - private final ClusterHealthStatus status; - - private FakeClusterHealthResponse(ClusterHealthStatus clusterHealthStatus) { - super("fake-cluster", new String[0], - new ClusterState(new ClusterName("fake-cluster"), 0, null, null, RoutingTable.builder().build(), - DiscoveryNodes.builder().build(), - ClusterBlocks.builder().build(), null, 0, false)); - this.status = clusterHealthStatus; - } - - @Override - public ClusterHealthStatus getStatus() { - return this.status; - } - } } diff --git a/backends-common/opensearch/src/test/java/org/apache/james/backends/opensearch/OpenSearchIndexerTest.java b/backends-common/opensearch/src/test/java/org/apache/james/backends/opensearch/OpenSearchIndexerTest.java index 2c8ba75c6e..9af1bd3806 100644 --- a/backends-common/opensearch/src/test/java/org/apache/james/backends/opensearch/OpenSearchIndexerTest.java +++ b/backends-common/opensearch/src/test/java/org/apache/james/backends/opensearch/OpenSearchIndexerTest.java @@ -23,7 +23,6 @@ import static org.assertj.core.api.Assertions.assertThat; import static org.assertj.core.api.Assertions.assertThatCode; import static org.assertj.core.api.Assertions.assertThatThrownBy; import static org.awaitility.Durations.ONE_HUNDRED_MILLISECONDS; -import static org.opensearch.index.query.QueryBuilders.termQuery; import java.io.IOException; @@ -34,12 +33,13 @@ import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.extension.RegisterExtension; -import org.opensearch.action.get.GetResponse; -import org.opensearch.action.search.SearchRequest; -import org.opensearch.client.RequestOptions; -import org.opensearch.index.query.QueryBuilder; -import org.opensearch.index.query.QueryBuilders; -import org.opensearch.search.builder.SearchSourceBuilder; +import org.opensearch.client.opensearch._types.FieldValue; +import org.opensearch.client.opensearch._types.query_dsl.MatchAllQuery; +import org.opensearch.client.opensearch._types.query_dsl.MatchQuery; +import org.opensearch.client.opensearch._types.query_dsl.Query; +import org.opensearch.client.opensearch._types.query_dsl.TermQuery; +import org.opensearch.client.opensearch.core.GetResponse; +import org.opensearch.client.opensearch.core.SearchRequest; import com.google.common.collect.ImmutableList; @@ -79,32 +79,36 @@ class OpenSearchIndexerTest { } @Test - void indexMessageShouldWork() { + void indexMessageShouldWork() throws IOException { DocumentId documentId = DocumentId.fromString("1"); String content = "{\"message\": \"trying out Elasticsearch\"}"; testee.index(documentId, content, useDocumentId(documentId)).block(); - awaitForOpenSearch(QueryBuilders.matchQuery("message", "trying"), 1L); + awaitForOpenSearch(new MatchQuery.Builder() + .field("message") + .query(new FieldValue.Builder().stringValue("trying").build()) + .build() + ._toQuery(), 1L); } @Test void indexMessageShouldThrowWhenJsonIsNull() { - assertThatThrownBy(() -> testee.index(DOCUMENT_ID, null, ROUTING)) + assertThatThrownBy(() -> testee.index(DOCUMENT_ID, null, ROUTING).block()) .isInstanceOf(IllegalArgumentException.class); } @Test - void updateMessages() { + void updateMessages() throws IOException { String content = "{\"message\": \"trying out Elasticsearch\",\"field\":\"Should be unchanged\"}"; testee.index(DOCUMENT_ID, content, useDocumentId(DOCUMENT_ID)).block(); - awaitForOpenSearch(QueryBuilders.matchAllQuery(), 1L); + awaitForOpenSearch(new MatchAllQuery.Builder().build()._toQuery(), 1L); testee.update(ImmutableList.of(new UpdatedRepresentation(DOCUMENT_ID, "{\"message\": \"mastering out Elasticsearch\"}")), useDocumentId(DOCUMENT_ID)).block(); - awaitForOpenSearch(QueryBuilders.matchQuery("message", "mastering"), 1L); + awaitForOpenSearch(new MatchQuery.Builder().field("message").query(new FieldValue.Builder().stringValue("mastering").build()).build()._toQuery(), 1L); - awaitForOpenSearch(QueryBuilders.matchQuery("field", "unchanged"), 1L); + awaitForOpenSearch(new MatchQuery.Builder().field("field").query(new FieldValue.Builder().stringValue("unchanged").build()).build()._toQuery(), 1L); } @Test @@ -136,21 +140,21 @@ class OpenSearchIndexerTest { } @Test - void deleteByQueryShouldWorkOnSingleMessage() { + void deleteByQueryShouldWorkOnSingleMessage() throws IOException { DocumentId documentId = DocumentId.fromString("1:2"); String content = "{\"message\": \"trying out Elasticsearch\", \"property\":\"1\"}"; RoutingKey routingKey = useDocumentId(documentId); testee.index(documentId, content, routingKey).block(); - awaitForOpenSearch(QueryBuilders.matchAllQuery(), 1L); + awaitForOpenSearch(new MatchAllQuery.Builder().build()._toQuery(), 1L); - testee.deleteAllMatchingQuery(termQuery("property", "1"), routingKey).block(); + testee.deleteAllMatchingQuery(new TermQuery.Builder().field("property").value(new FieldValue.Builder().stringValue("1").build()).build()._toQuery(), routingKey).block(); - awaitForOpenSearch(QueryBuilders.matchAllQuery(), 0L); + awaitForOpenSearch(new MatchAllQuery.Builder().build()._toQuery(), 0L); } @Test - void deleteByQueryShouldWorkWhenMultipleMessages() { + void deleteByQueryShouldWorkWhenMultipleMessages() throws IOException { DocumentId documentId = DocumentId.fromString("1:1"); String content = "{\"message\": \"trying out Elasticsearch\", \"property\":\"1\"}"; @@ -165,28 +169,28 @@ class OpenSearchIndexerTest { String content3 = "{\"message\": \"trying out Elasticsearch 3\", \"property\":\"2\"}"; testee.index(documentId3, content3, ROUTING).block(); - awaitForOpenSearch(QueryBuilders.matchAllQuery(), 3L); + awaitForOpenSearch(new MatchAllQuery.Builder().build()._toQuery(), 3L); - testee.deleteAllMatchingQuery(termQuery("property", "1"), ROUTING).block(); + testee.deleteAllMatchingQuery(new TermQuery.Builder().field("property").value(new FieldValue.Builder().stringValue("1").build()).build()._toQuery(), ROUTING).block(); - awaitForOpenSearch(QueryBuilders.matchAllQuery(), 1L); + awaitForOpenSearch(new MatchAllQuery.Builder().build()._toQuery(), 1L); } @Test - void deleteMessage() { + void deleteMessage() throws IOException { DocumentId documentId = DocumentId.fromString("1:2"); String content = "{\"message\": \"trying out Elasticsearch\"}"; testee.index(documentId, content, useDocumentId(documentId)).block(); - awaitForOpenSearch(QueryBuilders.matchAllQuery(), 1L); + awaitForOpenSearch(new MatchAllQuery.Builder().build()._toQuery(), 1L); testee.delete(ImmutableList.of(documentId), useDocumentId(documentId)).block(); - awaitForOpenSearch(QueryBuilders.matchAllQuery(), 0L); + awaitForOpenSearch(new MatchAllQuery.Builder().build()._toQuery(), 0L); } @Test - void deleteShouldWorkWhenMultipleMessages() { + void deleteShouldWorkWhenMultipleMessages() throws IOException { DocumentId documentId = DocumentId.fromString("1:1"); String content = "{\"message\": \"trying out Elasticsearch\", \"mailboxId\":\"1\"}"; testee.index(documentId, content, ROUTING).block(); @@ -199,11 +203,11 @@ class OpenSearchIndexerTest { String content3 = "{\"message\": \"trying out Elasticsearch 3\", \"mailboxId\":\"2\"}"; testee.index(documentId3, content3, ROUTING).block(); - awaitForOpenSearch(QueryBuilders.matchAllQuery(), 3L); + awaitForOpenSearch(new MatchAllQuery.Builder().build()._toQuery(), 3L); testee.delete(ImmutableList.of(documentId, documentId3), ROUTING).block(); - awaitForOpenSearch(QueryBuilders.matchAllQuery(), 1L); + awaitForOpenSearch(new MatchAllQuery.Builder().build()._toQuery(), 1L); } @Test @@ -219,16 +223,16 @@ class OpenSearchIndexerTest { } @Test - void getShouldWork() { + void getShouldWork() throws IOException { DocumentId documentId = DocumentId.fromString("1"); String content = "{\"message\":\"trying out Elasticsearch\"}"; testee.index(documentId, content, useDocumentId(documentId)).block(); - awaitForOpenSearch(QueryBuilders.matchAllQuery(), 1L); + awaitForOpenSearch(new MatchAllQuery.Builder().build()._toQuery(), 1L); GetResponse getResponse = testee.get(documentId, useDocumentId(documentId)).block(); - assertThat(getResponse.getSourceAsString()).isEqualTo(content); + assertThat(getResponse.source().toString()).isEqualTo(content); } @Test @@ -243,13 +247,13 @@ class OpenSearchIndexerTest { .isInstanceOf(NullPointerException.class); } - private void awaitForOpenSearch(QueryBuilder query, long totalHits) { + private void awaitForOpenSearch(Query query, long totalHits) { CALMLY_AWAIT.atMost(Durations.TEN_SECONDS) .untilAsserted(() -> assertThat(client.search( - new SearchRequest(INDEX_NAME.getValue()) - .source(new SearchSourceBuilder().query(query)), - RequestOptions.DEFAULT) + new SearchRequest.Builder() + .query(query) + .build()) .block() - .getHits().getTotalHits().value).isEqualTo(totalHits)); + .hits().total().value()).isEqualTo(totalHits)); } } diff --git a/backends-common/opensearch/src/test/java/org/apache/james/backends/opensearch/search/ScrolledSearchTest.java b/backends-common/opensearch/src/test/java/org/apache/james/backends/opensearch/search/ScrolledSearchTest.java index dcd10da200..9e3e32455c 100644 --- a/backends-common/opensearch/src/test/java/org/apache/james/backends/opensearch/search/ScrolledSearchTest.java +++ b/backends-common/opensearch/src/test/java/org/apache/james/backends/opensearch/search/ScrolledSearchTest.java @@ -23,6 +23,7 @@ import static org.assertj.core.api.Assertions.assertThat; import static org.awaitility.Awaitility.await; import java.io.IOException; +import java.util.List; import org.apache.james.backends.opensearch.DockerOpenSearchExtension; import org.apache.james.backends.opensearch.IndexCreationFactory; @@ -36,18 +37,20 @@ import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.extension.RegisterExtension; -import org.opensearch.action.index.IndexRequest; -import org.opensearch.action.search.SearchRequest; -import org.opensearch.client.RequestOptions; -import org.opensearch.common.unit.TimeValue; -import org.opensearch.index.query.QueryBuilders; -import org.opensearch.search.SearchHit; -import org.opensearch.search.builder.SearchSourceBuilder; +import org.opensearch.client.opensearch._types.Time; +import org.opensearch.client.opensearch._types.query_dsl.MatchAllQuery; +import org.opensearch.client.opensearch.core.IndexRequest; +import org.opensearch.client.opensearch.core.SearchRequest; +import org.opensearch.client.opensearch.core.search.Hit; + +import com.fasterxml.jackson.databind.util.RawValue; class ScrolledSearchTest { - private static final TimeValue TIMEOUT = TimeValue.timeValueMinutes(1); + private static final Time TIMEOUT = new Time.Builder() + .time("1m") + .build(); private static final int SIZE = 2; - private static final String MESSAGE = "message"; + private static final String CONTENT = "{\"message\": \"Sample message\"}"; private static final IndexName INDEX_NAME = new IndexName("index"); private static final ReadAliasName ALIAS_NAME = new ReadAliasName("alias"); @@ -74,119 +77,129 @@ class ScrolledSearchTest { @Test void scrollIterableShouldWorkWhenEmpty() { - SearchRequest searchRequest = new SearchRequest(INDEX_NAME.getValue()) + SearchRequest searchRequest = new SearchRequest.Builder() + .index(INDEX_NAME.getValue()) .scroll(TIMEOUT) - .source(new SearchSourceBuilder() - .query(QueryBuilders.matchAllQuery()) - .size(SIZE)); + .query(new MatchAllQuery.Builder().build()._toQuery()) + .size(SIZE) + .build(); assertThat(new ScrolledSearch(client, searchRequest).searchHits().collectList().block()) .isEmpty(); } @Test - void scrollIterableShouldWorkWhenOneElement() throws Exception { + void scrollIterableShouldWorkWhenOneElement() throws IOException { String id = "1"; - client.index(new IndexRequest(INDEX_NAME.getValue()) + client.index(new IndexRequest.Builder<>() + .index(INDEX_NAME.getValue()) .id(id) - .source(MESSAGE, "Sample message"), - RequestOptions.DEFAULT) - .block(); + .document(new RawValue(CONTENT)) + .build()) + .block(); openSearch.awaitForOpenSearch(); WAIT_CONDITION.untilAsserted(() -> hasIdsInIndex(client, id)); - SearchRequest searchRequest = new SearchRequest(INDEX_NAME.getValue()) + SearchRequest searchRequest = new SearchRequest.Builder() + .index(INDEX_NAME.getValue()) .scroll(TIMEOUT) - .source(new SearchSourceBuilder() - .query(QueryBuilders.matchAllQuery()) - .size(SIZE)); + .query(new MatchAllQuery.Builder().build()._toQuery()) + .size(SIZE) + .build(); assertThat(new ScrolledSearch(client, searchRequest).searchHits().collectList().block()) - .extracting(SearchHit::getId) + .extracting(Hit::id) .containsOnly(id); } @Test - void scrollIterableShouldWorkWhenSizeElement() { + void scrollIterableShouldWorkWhenSizeElement() throws IOException { String id1 = "1"; - client.index(new IndexRequest(INDEX_NAME.getValue()) + client.index(new IndexRequest.Builder<>() + .index(INDEX_NAME.getValue()) .id(id1) - .source(MESSAGE, "Sample message"), - RequestOptions.DEFAULT) + .document(new RawValue(CONTENT)) + .build()) .block(); String id2 = "2"; - client.index(new IndexRequest(INDEX_NAME.getValue()) + client.index(new IndexRequest.Builder<>() + .index(INDEX_NAME.getValue()) .id(id2) - .source(MESSAGE, "Sample message"), - RequestOptions.DEFAULT) + .document(new RawValue(CONTENT)) + .build()) .block(); openSearch.awaitForOpenSearch(); WAIT_CONDITION.untilAsserted(() -> hasIdsInIndex(client, id1, id2)); - SearchRequest searchRequest = new SearchRequest(INDEX_NAME.getValue()) + SearchRequest searchRequest = new SearchRequest.Builder() + .index(INDEX_NAME.getValue()) .scroll(TIMEOUT) - .source(new SearchSourceBuilder() - .query(QueryBuilders.matchAllQuery()) - .size(SIZE)); + .query(new MatchAllQuery.Builder().build()._toQuery()) + .size(SIZE) + .build(); assertThat(new ScrolledSearch(client, searchRequest).searchHits().collectList().block()) - .extracting(SearchHit::getId) + .extracting(Hit::id) .containsOnly(id1, id2); } @Test - void scrollIterableShouldWorkWhenMoreThanSizeElement() { + void scrollIterableShouldWorkWhenMoreThanSizeElement() throws IOException { String id1 = "1"; - client.index(new IndexRequest(INDEX_NAME.getValue()) + client.index(new IndexRequest.Builder<>() + .index(INDEX_NAME.getValue()) .id(id1) - .source(MESSAGE, "Sample message"), - RequestOptions.DEFAULT) + .document(new RawValue(CONTENT)) + .build()) .block(); String id2 = "2"; - client.index(new IndexRequest(INDEX_NAME.getValue()) + client.index(new IndexRequest.Builder<>() + .index(INDEX_NAME.getValue()) .id(id2) - .source(MESSAGE, "Sample message"), - RequestOptions.DEFAULT) + .document(new RawValue(CONTENT)) + .build()) .block(); String id3 = "3"; - client.index(new IndexRequest(INDEX_NAME.getValue()) + client.index(new IndexRequest.Builder<>() + .index(INDEX_NAME.getValue()) .id(id3) - .source(MESSAGE, "Sample message"), - RequestOptions.DEFAULT) + .document(new RawValue(CONTENT)) + .build()) .block(); openSearch.awaitForOpenSearch(); WAIT_CONDITION.untilAsserted(() -> hasIdsInIndex(client, id1, id2, id3)); - SearchRequest searchRequest = new SearchRequest(INDEX_NAME.getValue()) + SearchRequest searchRequest = new SearchRequest.Builder() + .index(INDEX_NAME.getValue()) .scroll(TIMEOUT) - .source(new SearchSourceBuilder() - .query(QueryBuilders.matchAllQuery()) - .size(SIZE)); + .query(new MatchAllQuery.Builder().build()._toQuery()) + .size(SIZE) + .build(); assertThat(new ScrolledSearch(client, searchRequest).searchHits().collectList().block()) - .extracting(SearchHit::getId) + .extracting(Hit::id) .containsOnly(id1, id2, id3); } - private void hasIdsInIndex(ReactorOpenSearchClient client, String... ids) { - SearchRequest searchRequest = new SearchRequest(INDEX_NAME.getValue()) - .scroll(TIMEOUT) - .source(new SearchSourceBuilder() - .query(QueryBuilders.matchAllQuery())); + private void hasIdsInIndex(ReactorOpenSearchClient client, String... ids) throws IOException { + SearchRequest searchRequest = new SearchRequest.Builder() + .index(INDEX_NAME.getValue()) + .query(new MatchAllQuery.Builder().build()._toQuery()) + .build(); - SearchHit[] hits = client.search(searchRequest, RequestOptions.DEFAULT) - .block() - .getHits() - .getHits(); + List<String> hitIds = client.search(searchRequest) + .flatMapIterable(response -> response.hits().hits()) + .map(Hit::id) + .collectList() + .block(); - assertThat(hits) - .extracting(SearchHit::getId) + assertThat(hitIds) .contains(ids); } } --------------------------------------------------------------------- To unsubscribe, e-mail: notifications-unsubscr...@james.apache.org For additional commands, e-mail: notifications-h...@james.apache.org