This is an automated email from the ASF dual-hosted git repository. btellier pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/james-project.git
commit be0233b511a8b7ac02265979c0dbba328775f615 Author: Matthieu Baechler <matth...@apache.org> AuthorDate: Wed Jun 19 16:25:14 2019 +0200 JAMES-2803 Always close ES connections --- .../apache/james/backends/es/ClientProvider.java | 68 +++++++- .../james/backends/es/ClientProviderImpl.java | 54 ------ .../es/ClientProviderImplConnectionTest.java | 6 +- ...oviderImplTest.java => ClientProviderTest.java} | 10 +- .../james/backends/es/DockerElasticSearch.java | 7 +- .../james/backends/es/DockerElasticSearchRule.java | 2 +- .../backends/es/ElasticSearchIndexerTest.java | 90 +++++----- .../backends/es/IndexCreationFactoryTest.java | 17 +- .../james/backends/es/NodeMappingFactoryTest.java | 23 ++- .../backends/es/search/ScrolledSearchTest.java | 181 ++++++++++----------- .../ElasticSearchIntegrationTest.java | 13 +- ...lasticSearchQuotaSearchTestSystemExtension.java | 13 +- .../ElasticSearchQuotaMailboxListenerTest.java | 6 + .../host/ElasticSearchHostSystem.java | 7 +- .../modules/mailbox/ElasticSearchClientModule.java | 40 +---- .../modules/TestDockerElasticSearchModule.java | 16 +- .../routes/ElasticSearchQuotaSearchExtension.java | 9 +- 17 files changed, 276 insertions(+), 286 deletions(-) diff --git a/backends-common/elasticsearch/src/main/java/org/apache/james/backends/es/ClientProvider.java b/backends-common/elasticsearch/src/main/java/org/apache/james/backends/es/ClientProvider.java index 2a3aba1..9fb7f1a 100644 --- a/backends-common/elasticsearch/src/main/java/org/apache/james/backends/es/ClientProvider.java +++ b/backends-common/elasticsearch/src/main/java/org/apache/james/backends/es/ClientProvider.java @@ -18,9 +18,73 @@ ****************************************************************/ package org.apache.james.backends.es; +import java.io.IOException; +import java.time.Duration; +import java.time.LocalDateTime; + +import javax.annotation.PreDestroy; +import javax.inject.Inject; +import javax.inject.Provider; + +import org.apache.commons.lang3.time.DurationFormatUtils; +import org.apache.http.HttpHost; +import org.elasticsearch.client.RestClient; import org.elasticsearch.client.RestHighLevelClient; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.google.common.annotations.VisibleForTesting; +import reactor.core.publisher.Mono; +import reactor.core.scheduler.Schedulers; + +public class ClientProvider implements Provider<RestHighLevelClient> { + + private static final Logger LOGGER = LoggerFactory.getLogger(ClientProvider.class); + + private static final String HTTP_HOST_SCHEME = "http"; + private final ElasticSearchConfiguration configuration; + private final RestHighLevelClient client; + + @Inject + @VisibleForTesting + ClientProvider(ElasticSearchConfiguration configuration) { + this.configuration = configuration; + this.client = connect(configuration); + } + + private RestHighLevelClient connect(ElasticSearchConfiguration configuration) { + Duration waitDelay = Duration.ofMillis(configuration.getMinDelay()); + Duration forever = Duration.ofMillis(Long.MAX_VALUE); + boolean suppressLeadingZeroElements = true; + boolean suppressTrailingZeroElements = true; + return Mono.fromCallable(() -> connectToCluster(configuration)) + .doOnError(e -> LOGGER.warn("Error establishing ElasticSearch connection. Next retry scheduled in {}", + DurationFormatUtils.formatDurationWords(waitDelay.toMillis(), suppressLeadingZeroElements, suppressTrailingZeroElements), e)) + .retryBackoff(configuration.getMaxRetries(), waitDelay, forever, Schedulers.elastic()) + .publishOn(Schedulers.elastic()) + .block(); + } + + private RestHighLevelClient connectToCluster(ElasticSearchConfiguration configuration) throws IOException { + LOGGER.info("Trying to connect to ElasticSearch service at {}", LocalDateTime.now()); + return new RestHighLevelClient( + RestClient.builder(hostsToHttpHosts()) + .setMaxRetryTimeoutMillis(Math.toIntExact(configuration.getRequestTimeout().toMillis()))); + } + + private HttpHost[] hostsToHttpHosts() { + return configuration.getHosts().stream() + .map(host -> new HttpHost(host.getHostName(), host.getPort(), HTTP_HOST_SCHEME)) + .toArray(HttpHost[]::new); + } -public interface ClientProvider { + @Override + public RestHighLevelClient get() { + return client; + } - RestHighLevelClient get(); + @PreDestroy + public void close() throws IOException { + client.close(); + } } diff --git a/backends-common/elasticsearch/src/main/java/org/apache/james/backends/es/ClientProviderImpl.java b/backends-common/elasticsearch/src/main/java/org/apache/james/backends/es/ClientProviderImpl.java deleted file mode 100644 index 98611df..0000000 --- a/backends-common/elasticsearch/src/main/java/org/apache/james/backends/es/ClientProviderImpl.java +++ /dev/null @@ -1,54 +0,0 @@ -/**************************************************************** - * Licensed to the Apache Software Foundation (ASF) under one * - * or more contributor license agreements. See the NOTICE file * - * distributed with this work for additional information * - * regarding copyright ownership. The ASF licenses this file * - * to you under the Apache License, Version 2.0 (the * - * "License"); you may not use this file except in compliance * - * with the License. You may obtain a copy of the License at * - * * - * http://www.apache.org/licenses/LICENSE-2.0 * - * * - * Unless required by applicable law or agreed to in writing, * - * software distributed under the License is distributed on an * - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY * - * KIND, either express or implied. See the License for the * - * specific language governing permissions and limitations * - * under the License. * - ****************************************************************/ -package org.apache.james.backends.es; - -import org.apache.http.HttpHost; -import org.elasticsearch.client.RestClient; -import org.elasticsearch.client.RestClientBuilder; -import org.elasticsearch.client.RestHighLevelClient; - -import com.google.common.base.Preconditions; - -public class ClientProviderImpl implements ClientProvider { - - public static ClientProviderImpl fromConfiguration(ElasticSearchConfiguration configuration) { - Preconditions.checkNotNull(configuration); - return new ClientProviderImpl(configuration); - } - - private static final String HTTP_HOST_SCHEME = "http"; - private final ElasticSearchConfiguration configuration; - - private ClientProviderImpl(ElasticSearchConfiguration configuration) { - this.configuration = configuration; - } - - private HttpHost[] hostsToHttpHosts() { - return configuration.getHosts().stream() - .map(host -> new HttpHost(host.getHostName(), host.getPort(), HTTP_HOST_SCHEME)) - .toArray(HttpHost[]::new); - } - - @Override - public RestHighLevelClient get() { - RestClientBuilder restClient = RestClient.builder(hostsToHttpHosts()) - .setMaxRetryTimeoutMillis(Math.toIntExact(configuration.getRequestTimeout().toMillis())); - return new RestHighLevelClient(restClient); - } -} diff --git a/backends-common/elasticsearch/src/test/java/org/apache/james/backends/es/ClientProviderImplConnectionTest.java b/backends-common/elasticsearch/src/test/java/org/apache/james/backends/es/ClientProviderImplConnectionTest.java index 72ef84a..516221a 100644 --- a/backends-common/elasticsearch/src/test/java/org/apache/james/backends/es/ClientProviderImplConnectionTest.java +++ b/backends-common/elasticsearch/src/test/java/org/apache/james/backends/es/ClientProviderImplConnectionTest.java @@ -60,7 +60,7 @@ public class ClientProviderImplConnectionTest { Awaitility.await() .atMost(1, TimeUnit.MINUTES) .pollInterval(5, TimeUnit.SECONDS) - .until(() -> isConnected(ClientProviderImpl.fromConfiguration(configuration))); + .until(() -> isConnected(new ClientProvider(configuration))); } @Test @@ -73,7 +73,7 @@ public class ClientProviderImplConnectionTest { Awaitility.await() .atMost(1, TimeUnit.MINUTES) .pollInterval(5, TimeUnit.SECONDS) - .until(() -> isConnected(ClientProviderImpl.fromConfiguration(configuration))); + .until(() -> isConnected(new ClientProvider(configuration))); } @Test @@ -90,7 +90,7 @@ public class ClientProviderImplConnectionTest { Awaitility.await() .atMost(1, TimeUnit.MINUTES) .pollInterval(5, TimeUnit.SECONDS) - .until(() -> isConnected(ClientProviderImpl.fromConfiguration(configuration))); + .until(() -> isConnected(new ClientProvider(configuration))); } private boolean isConnected(ClientProvider clientProvider) { diff --git a/backends-common/elasticsearch/src/test/java/org/apache/james/backends/es/ClientProviderImplTest.java b/backends-common/elasticsearch/src/test/java/org/apache/james/backends/es/ClientProviderTest.java similarity index 81% rename from backends-common/elasticsearch/src/test/java/org/apache/james/backends/es/ClientProviderImplTest.java rename to backends-common/elasticsearch/src/test/java/org/apache/james/backends/es/ClientProviderTest.java index 949ad74..aef7c35 100644 --- a/backends-common/elasticsearch/src/test/java/org/apache/james/backends/es/ClientProviderImplTest.java +++ b/backends-common/elasticsearch/src/test/java/org/apache/james/backends/es/ClientProviderTest.java @@ -19,19 +19,15 @@ package org.apache.james.backends.es; -import static org.assertj.core.api.Assertions.assertThat; import static org.assertj.core.api.Assertions.assertThatThrownBy; -import java.util.Optional; - -import org.elasticsearch.common.settings.Settings; import org.junit.Test; -public class ClientProviderImplTest { +public class ClientProviderTest { @Test - public void fromConfigurationShouldThrowOnNull() { - assertThatThrownBy(() -> ClientProviderImpl.fromConfiguration(null)) + public void constructorShouldThrowOnNull() { + assertThatThrownBy(() -> new ClientProvider(null)) .isInstanceOf(NullPointerException.class); } } diff --git a/backends-common/elasticsearch/src/test/java/org/apache/james/backends/es/DockerElasticSearch.java b/backends-common/elasticsearch/src/test/java/org/apache/james/backends/es/DockerElasticSearch.java index 82eb83b..8441a99 100644 --- a/backends-common/elasticsearch/src/test/java/org/apache/james/backends/es/DockerElasticSearch.java +++ b/backends-common/elasticsearch/src/test/java/org/apache/james/backends/es/DockerElasticSearch.java @@ -112,9 +112,12 @@ public class DockerElasticSearch { } } + public ElasticSearchConfiguration configuration() { + return ElasticSearchConfiguration.builder().addHost(getHttpHost()).build(); + } + public ClientProvider clientProvider() { - ElasticSearchConfiguration configuration = ElasticSearchConfiguration.builder().addHost(getHttpHost()).build(); - return ClientProviderImpl.fromConfiguration(configuration); + return new ClientProvider(configuration()); } private ElasticSearchAPI esAPI() { diff --git a/backends-common/elasticsearch/src/test/java/org/apache/james/backends/es/DockerElasticSearchRule.java b/backends-common/elasticsearch/src/test/java/org/apache/james/backends/es/DockerElasticSearchRule.java index 89233a1..a7de917 100644 --- a/backends-common/elasticsearch/src/test/java/org/apache/james/backends/es/DockerElasticSearchRule.java +++ b/backends-common/elasticsearch/src/test/java/org/apache/james/backends/es/DockerElasticSearchRule.java @@ -38,7 +38,7 @@ public class DockerElasticSearchRule extends ExternalResource { public ClientProvider clientProvider() { return dockerElasticSearch.clientProvider(); } - + public void awaitForElasticSearch() { dockerElasticSearch.flushIndices(); } diff --git a/backends-common/elasticsearch/src/test/java/org/apache/james/backends/es/ElasticSearchIndexerTest.java b/backends-common/elasticsearch/src/test/java/org/apache/james/backends/es/ElasticSearchIndexerTest.java index 34abc24..08e4e4f 100644 --- a/backends-common/elasticsearch/src/test/java/org/apache/james/backends/es/ElasticSearchIndexerTest.java +++ b/backends-common/elasticsearch/src/test/java/org/apache/james/backends/es/ElasticSearchIndexerTest.java @@ -24,6 +24,8 @@ import static org.assertj.core.api.Assertions.assertThatThrownBy; import static org.awaitility.Duration.ONE_HUNDRED_MILLISECONDS; import static org.elasticsearch.index.query.QueryBuilders.termQuery; +import java.io.IOException; + import org.awaitility.Awaitility; import org.awaitility.Duration; import org.awaitility.core.ConditionFactory; @@ -32,6 +34,7 @@ import org.elasticsearch.action.search.SearchResponse; import org.elasticsearch.client.RestHighLevelClient; import org.elasticsearch.index.query.QueryBuilders; import org.elasticsearch.search.builder.SearchSourceBuilder; +import org.junit.After; import org.junit.Before; import org.junit.Rule; import org.junit.Test; @@ -52,19 +55,21 @@ public class ElasticSearchIndexerTest { @Rule public DockerElasticSearchRule elasticSearch = new DockerElasticSearchRule(); private ElasticSearchIndexer testee; + private RestHighLevelClient client; @Before public void setup() { + client = elasticSearch.clientProvider().get(); new IndexCreationFactory(ElasticSearchConfiguration.DEFAULT_CONFIGURATION) .useIndex(INDEX_NAME) .addAlias(ALIAS_NAME) - .createIndexAndAliases(getESClient()); - testee = new ElasticSearchIndexer(getESClient(), - ALIAS_NAME, MINIMUM_BATCH_SIZE); + .createIndexAndAliases(client); + testee = new ElasticSearchIndexer(client, ALIAS_NAME, MINIMUM_BATCH_SIZE); } - private RestHighLevelClient getESClient() { - return elasticSearch.clientProvider().get(); + @After + public void tearDown() throws IOException { + client.close(); } @Test @@ -75,12 +80,10 @@ public class ElasticSearchIndexerTest { testee.index(messageId, content); elasticSearch.awaitForElasticSearch(); - try (RestHighLevelClient client = getESClient()) { - SearchResponse searchResponse = client.search( - new SearchRequest(INDEX_NAME.getValue()) - .source(new SearchSourceBuilder().query(QueryBuilders.matchQuery("message", "trying")))); - assertThat(searchResponse.getHits().getTotalHits()).isEqualTo(1); - } + SearchResponse searchResponse = client.search( + new SearchRequest(INDEX_NAME.getValue()) + .source(new SearchSourceBuilder().query(QueryBuilders.matchQuery("message", "trying")))); + assertThat(searchResponse.getHits().getTotalHits()).isEqualTo(1); } @Test @@ -100,19 +103,16 @@ public class ElasticSearchIndexerTest { testee.update(ImmutableList.of(new UpdatedRepresentation(messageId, "{\"message\": \"mastering out Elasticsearch\"}"))); elasticSearch.awaitForElasticSearch(); - try (RestHighLevelClient client = getESClient()) { - SearchResponse searchResponse = client.search( - new SearchRequest(INDEX_NAME.getValue()) - .source(new SearchSourceBuilder().query(QueryBuilders.matchQuery("message", "mastering")))); - assertThat(searchResponse.getHits().getTotalHits()).isEqualTo(1); - } - try (RestHighLevelClient client = getESClient()) { - SearchResponse searchResponse = client.search( - new SearchRequest(INDEX_NAME.getValue()) - .source(new SearchSourceBuilder().query(QueryBuilders.matchQuery("field", "unchanged")))); - assertThat(searchResponse.getHits().getTotalHits()).isEqualTo(1); - } + SearchResponse searchResponse = client.search( + new SearchRequest(INDEX_NAME.getValue()) + .source(new SearchSourceBuilder().query(QueryBuilders.matchQuery("message", "mastering")))); + assertThat(searchResponse.getHits().getTotalHits()).isEqualTo(1); + + SearchResponse searchResponse2 = client.search( + new SearchRequest(INDEX_NAME.getValue()) + .source(new SearchSourceBuilder().query(QueryBuilders.matchQuery("field", "unchanged")))); + assertThat(searchResponse2.getHits().getTotalHits()).isEqualTo(1); } @Test @@ -150,13 +150,11 @@ public class ElasticSearchIndexerTest { testee.deleteAllMatchingQuery(termQuery("property", "1")); elasticSearch.awaitForElasticSearch(); - try (RestHighLevelClient client = getESClient()) { - CALMLY_AWAIT.atMost(Duration.TEN_SECONDS) - .until(() -> client.search( - new SearchRequest(INDEX_NAME.getValue()) - .source(new SearchSourceBuilder().query(QueryBuilders.matchAllQuery()))) - .getHits().getTotalHits() == 0); - } + CALMLY_AWAIT.atMost(Duration.TEN_SECONDS) + .until(() -> client.search( + new SearchRequest(INDEX_NAME.getValue()) + .source(new SearchSourceBuilder().query(QueryBuilders.matchAllQuery()))) + .getHits().getTotalHits() == 0); } @Test @@ -180,13 +178,11 @@ public class ElasticSearchIndexerTest { testee.deleteAllMatchingQuery(termQuery("property", "1")); elasticSearch.awaitForElasticSearch(); - try (RestHighLevelClient client = getESClient()) { - CALMLY_AWAIT.atMost(Duration.TEN_SECONDS) - .until(() -> client.search( - new SearchRequest(INDEX_NAME.getValue()) - .source(new SearchSourceBuilder().query(QueryBuilders.matchAllQuery()))) - .getHits().getTotalHits() == 1); - } + CALMLY_AWAIT.atMost(Duration.TEN_SECONDS) + .until(() -> client.search( + new SearchRequest(INDEX_NAME.getValue()) + .source(new SearchSourceBuilder().query(QueryBuilders.matchAllQuery()))) + .getHits().getTotalHits() == 1); } @Test @@ -200,12 +196,10 @@ public class ElasticSearchIndexerTest { testee.delete(ImmutableList.of(messageId)); elasticSearch.awaitForElasticSearch(); - try (RestHighLevelClient client = getESClient()) { - SearchResponse searchResponse = client.search( - new SearchRequest(INDEX_NAME.getValue()) - .source(new SearchSourceBuilder().query(QueryBuilders.matchAllQuery()))); - assertThat(searchResponse.getHits().getTotalHits()).isEqualTo(0); - } + SearchResponse searchResponse = client.search( + new SearchRequest(INDEX_NAME.getValue()) + .source(new SearchSourceBuilder().query(QueryBuilders.matchAllQuery()))); + assertThat(searchResponse.getHits().getTotalHits()).isEqualTo(0); } @Test @@ -229,12 +223,10 @@ public class ElasticSearchIndexerTest { testee.delete(ImmutableList.of(messageId, messageId3)); elasticSearch.awaitForElasticSearch(); - try (RestHighLevelClient client = getESClient()) { - SearchResponse searchResponse = client.search( - new SearchRequest(INDEX_NAME.getValue()) - .source(new SearchSourceBuilder().query(QueryBuilders.matchAllQuery()))); - assertThat(searchResponse.getHits().getTotalHits()).isEqualTo(1); - } + SearchResponse searchResponse = client.search( + new SearchRequest(INDEX_NAME.getValue()) + .source(new SearchSourceBuilder().query(QueryBuilders.matchAllQuery()))); + assertThat(searchResponse.getHits().getTotalHits()).isEqualTo(1); } @Test diff --git a/backends-common/elasticsearch/src/test/java/org/apache/james/backends/es/IndexCreationFactoryTest.java b/backends-common/elasticsearch/src/test/java/org/apache/james/backends/es/IndexCreationFactoryTest.java index 50babf5..f204f2c 100644 --- a/backends-common/elasticsearch/src/test/java/org/apache/james/backends/es/IndexCreationFactoryTest.java +++ b/backends-common/elasticsearch/src/test/java/org/apache/james/backends/es/IndexCreationFactoryTest.java @@ -21,6 +21,10 @@ package org.apache.james.backends.es; import static org.assertj.core.api.Assertions.assertThatThrownBy; +import java.io.IOException; + +import org.elasticsearch.client.RestHighLevelClient; +import org.junit.After; import org.junit.Before; import org.junit.Rule; import org.junit.Test; @@ -31,15 +35,20 @@ public class IndexCreationFactoryTest { @Rule public DockerElasticSearchRule elasticSearch = new DockerElasticSearchRule(); - private ClientProvider clientProvider; + private RestHighLevelClient client; @Before public void setUp() { - clientProvider = elasticSearch.clientProvider(); + client = elasticSearch.clientProvider().get(); new IndexCreationFactory(ElasticSearchConfiguration.DEFAULT_CONFIGURATION) .useIndex(INDEX_NAME) .addAlias(ALIAS_NAME) - .createIndexAndAliases(clientProvider.get()); + .createIndexAndAliases(client); + } + + @After + public void tearDown() throws IOException { + client.close(); } @Test @@ -47,7 +56,7 @@ public class IndexCreationFactoryTest { new IndexCreationFactory(ElasticSearchConfiguration.DEFAULT_CONFIGURATION) .useIndex(INDEX_NAME) .addAlias(ALIAS_NAME) - .createIndexAndAliases(clientProvider.get()); + .createIndexAndAliases(client); } @Test diff --git a/backends-common/elasticsearch/src/test/java/org/apache/james/backends/es/NodeMappingFactoryTest.java b/backends-common/elasticsearch/src/test/java/org/apache/james/backends/es/NodeMappingFactoryTest.java index dd24ddd..e78c970 100644 --- a/backends-common/elasticsearch/src/test/java/org/apache/james/backends/es/NodeMappingFactoryTest.java +++ b/backends-common/elasticsearch/src/test/java/org/apache/james/backends/es/NodeMappingFactoryTest.java @@ -22,7 +22,11 @@ package org.apache.james.backends.es; import static org.assertj.core.api.Assertions.assertThatCode; import static org.elasticsearch.common.xcontent.XContentFactory.jsonBuilder; +import java.io.IOException; + +import org.elasticsearch.client.RestHighLevelClient; import org.elasticsearch.common.xcontent.XContentBuilder; +import org.junit.After; import org.junit.Before; import org.junit.Rule; import org.junit.Test; @@ -34,36 +38,41 @@ public class NodeMappingFactoryTest { @Rule public DockerElasticSearchRule elasticSearch = new DockerElasticSearchRule(); - private ClientProvider clientProvider; + private RestHighLevelClient client; @Before public void setUp() throws Exception { - clientProvider = elasticSearch.clientProvider(); + client = elasticSearch.getDockerElasticSearch().clientProvider().get(); new IndexCreationFactory(ElasticSearchConfiguration.DEFAULT_CONFIGURATION) .useIndex(INDEX_NAME) .addAlias(ALIAS_NAME) - .createIndexAndAliases(clientProvider.get()); - NodeMappingFactory.applyMapping(clientProvider.get(), + .createIndexAndAliases(client); + NodeMappingFactory.applyMapping(client, INDEX_NAME, getMappingsSources()); } + @After + public void tearDown() throws IOException { + client.close(); + } + @Test public void applyMappingShouldNotThrowWhenCalledSeveralTime() throws Exception { - NodeMappingFactory.applyMapping(clientProvider.get(), + NodeMappingFactory.applyMapping(client, INDEX_NAME, getMappingsSources()); } @Test public void applyMappingShouldNotThrowWhenIncrementalChanges() throws Exception { - NodeMappingFactory.applyMapping(clientProvider.get(), + NodeMappingFactory.applyMapping(client, INDEX_NAME, getMappingsSources()); elasticSearch.awaitForElasticSearch(); - assertThatCode(() -> NodeMappingFactory.applyMapping(clientProvider.get(), + assertThatCode(() -> NodeMappingFactory.applyMapping(client, INDEX_NAME, getOtherMappingsSources())) .doesNotThrowAnyException(); diff --git a/backends-common/elasticsearch/src/test/java/org/apache/james/backends/es/search/ScrolledSearchTest.java b/backends-common/elasticsearch/src/test/java/org/apache/james/backends/es/search/ScrolledSearchTest.java index f622a63..49549af 100644 --- a/backends-common/elasticsearch/src/test/java/org/apache/james/backends/es/search/ScrolledSearchTest.java +++ b/backends-common/elasticsearch/src/test/java/org/apache/james/backends/es/search/ScrolledSearchTest.java @@ -24,7 +24,6 @@ import static org.awaitility.Awaitility.await; import java.io.IOException; -import org.apache.james.backends.es.ClientProvider; import org.apache.james.backends.es.DockerElasticSearchRule; import org.apache.james.backends.es.ElasticSearchConfiguration; import org.apache.james.backends.es.IndexCreationFactory; @@ -40,6 +39,7 @@ import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.index.query.QueryBuilders; import org.elasticsearch.search.SearchHit; import org.elasticsearch.search.builder.SearchSourceBuilder; +import org.junit.After; import org.junit.Before; import org.junit.Rule; import org.junit.Test; @@ -55,120 +55,117 @@ public class ScrolledSearchTest { @Rule public DockerElasticSearchRule elasticSearch = new DockerElasticSearchRule(); - private ClientProvider clientProvider; + private RestHighLevelClient client; @Before public void setUp() { - clientProvider = elasticSearch.clientProvider(); + client = elasticSearch.clientProvider().get(); new IndexCreationFactory(ElasticSearchConfiguration.DEFAULT_CONFIGURATION) .useIndex(INDEX_NAME) .addAlias(ALIAS_NAME) - .createIndexAndAliases(clientProvider.get()); + .createIndexAndAliases(client); elasticSearch.awaitForElasticSearch(); } + @After + public void tearDown() throws IOException { + client.close(); + } + @Test public void scrollIterableShouldWorkWhenEmpty() throws Exception { - try (RestHighLevelClient client = clientProvider.get()) { - SearchRequest searchRequest = new SearchRequest(INDEX_NAME.getValue()) - .scroll(TIMEOUT) - .source(new SearchSourceBuilder() - .query(QueryBuilders.matchAllQuery()) - .size(SIZE)); - - assertThat(new ScrolledSearch(client, searchRequest).searchHits()) - .isEmpty(); - } + SearchRequest searchRequest = new SearchRequest(INDEX_NAME.getValue()) + .scroll(TIMEOUT) + .source(new SearchSourceBuilder() + .query(QueryBuilders.matchAllQuery()) + .size(SIZE)); + + assertThat(new ScrolledSearch(client, searchRequest).searchHits()) + .isEmpty(); } @Test public void scrollIterableShouldWorkWhenOneElement() throws Exception { - try (RestHighLevelClient client = clientProvider.get()) { - String id = "1"; - client.index(new IndexRequest(INDEX_NAME.getValue()) - .type(NodeMappingFactory.DEFAULT_MAPPING_NAME) - .id(id) - .source(MESSAGE, "Sample message")); - - elasticSearch.awaitForElasticSearch(); - WAIT_CONDITION.untilAsserted(() -> hasIdsInIndex(client, id)); - - SearchRequest searchRequest = new SearchRequest(INDEX_NAME.getValue()) - .scroll(TIMEOUT) - .source(new SearchSourceBuilder() - .query(QueryBuilders.matchAllQuery()) - .size(SIZE)); - - assertThat(new ScrolledSearch(client, searchRequest).searchHits()) - .extracting(SearchHit::getId) - .containsOnly(id); - } + String id = "1"; + client.index(new IndexRequest(INDEX_NAME.getValue()) + .type(NodeMappingFactory.DEFAULT_MAPPING_NAME) + .id(id) + .source(MESSAGE, "Sample message")); + + elasticSearch.awaitForElasticSearch(); + WAIT_CONDITION.untilAsserted(() -> hasIdsInIndex(client, id)); + + SearchRequest searchRequest = new SearchRequest(INDEX_NAME.getValue()) + .scroll(TIMEOUT) + .source(new SearchSourceBuilder() + .query(QueryBuilders.matchAllQuery()) + .size(SIZE)); + + assertThat(new ScrolledSearch(client, searchRequest).searchHits()) + .extracting(SearchHit::getId) + .containsOnly(id); } @Test public void scrollIterableShouldWorkWhenSizeElement() throws Exception { - try (RestHighLevelClient client = clientProvider.get()) { - String id1 = "1"; - client.index(new IndexRequest(INDEX_NAME.getValue()) - .type(NodeMappingFactory.DEFAULT_MAPPING_NAME) - .id(id1) - .source(MESSAGE, "Sample message")); - - String id2 = "2"; - client.index(new IndexRequest(INDEX_NAME.getValue()) - .type(NodeMappingFactory.DEFAULT_MAPPING_NAME) - .id(id2) - .source(MESSAGE, "Sample message")); - - elasticSearch.awaitForElasticSearch(); - WAIT_CONDITION.untilAsserted(() -> hasIdsInIndex(client, id1, id2)); - - SearchRequest searchRequest = new SearchRequest(INDEX_NAME.getValue()) - .scroll(TIMEOUT) - .source(new SearchSourceBuilder() - .query(QueryBuilders.matchAllQuery()) - .size(SIZE)); - - assertThat(new ScrolledSearch(client, searchRequest).searchHits()) - .extracting(SearchHit::getId) - .containsOnly(id1, id2); - } + String id1 = "1"; + client.index(new IndexRequest(INDEX_NAME.getValue()) + .type(NodeMappingFactory.DEFAULT_MAPPING_NAME) + .id(id1) + .source(MESSAGE, "Sample message")); + + String id2 = "2"; + client.index(new IndexRequest(INDEX_NAME.getValue()) + .type(NodeMappingFactory.DEFAULT_MAPPING_NAME) + .id(id2) + .source(MESSAGE, "Sample message")); + + elasticSearch.awaitForElasticSearch(); + WAIT_CONDITION.untilAsserted(() -> hasIdsInIndex(client, id1, id2)); + + SearchRequest searchRequest = new SearchRequest(INDEX_NAME.getValue()) + .scroll(TIMEOUT) + .source(new SearchSourceBuilder() + .query(QueryBuilders.matchAllQuery()) + .size(SIZE)); + + assertThat(new ScrolledSearch(client, searchRequest).searchHits()) + .extracting(SearchHit::getId) + .containsOnly(id1, id2); } @Test public void scrollIterableShouldWorkWhenMoreThanSizeElement() throws Exception { - try (RestHighLevelClient client = clientProvider.get()) { - String id1 = "1"; - client.index(new IndexRequest(INDEX_NAME.getValue()) - .type(NodeMappingFactory.DEFAULT_MAPPING_NAME) - .id(id1) - .source(MESSAGE, "Sample message")); - - String id2 = "2"; - client.index(new IndexRequest(INDEX_NAME.getValue()) - .type(NodeMappingFactory.DEFAULT_MAPPING_NAME) - .id(id2) - .source(MESSAGE, "Sample message")); - - String id3 = "3"; - client.index(new IndexRequest(INDEX_NAME.getValue()) - .type(NodeMappingFactory.DEFAULT_MAPPING_NAME) - .id(id3) - .source(MESSAGE, "Sample message")); - - elasticSearch.awaitForElasticSearch(); - WAIT_CONDITION.untilAsserted(() -> hasIdsInIndex(client, id1, id2, id3)); - - SearchRequest searchRequest = new SearchRequest(INDEX_NAME.getValue()) - .scroll(TIMEOUT) - .source(new SearchSourceBuilder() - .query(QueryBuilders.matchAllQuery()) - .size(SIZE)); - - assertThat(new ScrolledSearch(client, searchRequest).searchHits()) - .extracting(SearchHit::getId) - .containsOnly(id1, id2, id3); - } + String id1 = "1"; + client.index(new IndexRequest(INDEX_NAME.getValue()) + .type(NodeMappingFactory.DEFAULT_MAPPING_NAME) + .id(id1) + .source(MESSAGE, "Sample message")); + + String id2 = "2"; + client.index(new IndexRequest(INDEX_NAME.getValue()) + .type(NodeMappingFactory.DEFAULT_MAPPING_NAME) + .id(id2) + .source(MESSAGE, "Sample message")); + + String id3 = "3"; + client.index(new IndexRequest(INDEX_NAME.getValue()) + .type(NodeMappingFactory.DEFAULT_MAPPING_NAME) + .id(id3) + .source(MESSAGE, "Sample message")); + + elasticSearch.awaitForElasticSearch(); + WAIT_CONDITION.untilAsserted(() -> hasIdsInIndex(client, id1, id2, id3)); + + SearchRequest searchRequest = new SearchRequest(INDEX_NAME.getValue()) + .scroll(TIMEOUT) + .source(new SearchSourceBuilder() + .query(QueryBuilders.matchAllQuery()) + .size(SIZE)); + + assertThat(new ScrolledSearch(client, searchRequest).searchHits()) + .extracting(SearchHit::getId) + .containsOnly(id1, id2, id3); } private void hasIdsInIndex(RestHighLevelClient client, String... ids) throws IOException { diff --git a/mailbox/elasticsearch/src/test/java/org/apache/james/mailbox/elasticsearch/ElasticSearchIntegrationTest.java b/mailbox/elasticsearch/src/test/java/org/apache/james/mailbox/elasticsearch/ElasticSearchIntegrationTest.java index d32d51b..0e02175 100644 --- a/mailbox/elasticsearch/src/test/java/org/apache/james/mailbox/elasticsearch/ElasticSearchIntegrationTest.java +++ b/mailbox/elasticsearch/src/test/java/org/apache/james/mailbox/elasticsearch/ElasticSearchIntegrationTest.java @@ -50,6 +50,7 @@ import org.apache.james.mailbox.tika.TikaTextExtractor; import org.apache.james.metrics.api.NoopMetricFactory; import org.apache.james.mime4j.dom.Message; import org.elasticsearch.client.RestHighLevelClient; +import org.junit.After; import org.junit.ClassRule; import org.junit.Rule; import org.junit.Test; @@ -67,6 +68,7 @@ public class ElasticSearchIntegrationTest extends AbstractMessageSearchIndexTest @Rule public DockerElasticSearchRule elasticSearch = new DockerElasticSearchRule(); private TikaTextExtractor textExtractor; + private RestHighLevelClient client; @Override public void setUp() throws Exception { @@ -79,6 +81,11 @@ public class ElasticSearchIntegrationTest extends AbstractMessageSearchIndexTest super.setUp(); } + @After + public void tearDown() throws IOException { + client.close(); + } + @Override protected void await() { elasticSearch.awaitForElasticSearch(); @@ -86,11 +93,9 @@ public class ElasticSearchIntegrationTest extends AbstractMessageSearchIndexTest @Override protected void initializeMailboxManager() throws IOException { - RestHighLevelClient client = MailboxIndexCreationUtil.prepareDefaultClient( + client = MailboxIndexCreationUtil.prepareDefaultClient( elasticSearch.clientProvider().get(), - ElasticSearchConfiguration.builder() - .addHost(elasticSearch.getDockerElasticSearch().getHttpHost()) - .build()); + elasticSearch.getDockerElasticSearch().configuration()); InMemoryMessageId.Factory messageIdFactory = new InMemoryMessageId.Factory(); diff --git a/mailbox/plugin/quota-search-elasticsearch/src/test/java/org/apache/james/quota/search/elasticsearch/ElasticSearchQuotaSearchTestSystemExtension.java b/mailbox/plugin/quota-search-elasticsearch/src/test/java/org/apache/james/quota/search/elasticsearch/ElasticSearchQuotaSearchTestSystemExtension.java index 55dbf0b..9bfe5ac 100644 --- a/mailbox/plugin/quota-search-elasticsearch/src/test/java/org/apache/james/quota/search/elasticsearch/ElasticSearchQuotaSearchTestSystemExtension.java +++ b/mailbox/plugin/quota-search-elasticsearch/src/test/java/org/apache/james/quota/search/elasticsearch/ElasticSearchQuotaSearchTestSystemExtension.java @@ -21,6 +21,9 @@ package org.apache.james.quota.search.elasticsearch; import static org.mockito.Mockito.mock; +import java.io.IOException; + +import org.apache.james.backends.es.ClientProvider; import org.apache.james.backends.es.DockerElasticSearch; import org.apache.james.backends.es.DockerElasticSearchSingleton; import org.apache.james.backends.es.ElasticSearchConfiguration; @@ -44,6 +47,7 @@ import org.junit.jupiter.api.extension.ParameterResolver; public class ElasticSearchQuotaSearchTestSystemExtension implements ParameterResolver, BeforeEachCallback, AfterEachCallback { private final DockerElasticSearch elasticSearch = DockerElasticSearchSingleton.INSTANCE; + private RestHighLevelClient client; @Override public boolean supportsParameter(ParameterContext parameterContext, ExtensionContext extensionContext) throws ParameterResolutionException { @@ -53,11 +57,9 @@ public class ElasticSearchQuotaSearchTestSystemExtension implements ParameterRes @Override public Object resolveParameter(ParameterContext parameterContext, ExtensionContext extensionContext) throws ParameterResolutionException { try { - RestHighLevelClient client = QuotaSearchIndexCreationUtil.prepareDefaultClient( + client = QuotaSearchIndexCreationUtil.prepareDefaultClient( elasticSearch.clientProvider().get(), - ElasticSearchConfiguration.builder() - .addHost(elasticSearch.getHttpHost()) - .build()); + elasticSearch.configuration()); InMemoryIntegrationResources resources = InMemoryIntegrationResources.defaultResources(); @@ -98,7 +100,8 @@ public class ElasticSearchQuotaSearchTestSystemExtension implements ParameterRes } @Override - public void afterEach(ExtensionContext context) { + public void afterEach(ExtensionContext context) throws IOException { + client.close(); elasticSearch.cleanUpData(); } } diff --git a/mailbox/plugin/quota-search-elasticsearch/src/test/java/org/apache/james/quota/search/elasticsearch/events/ElasticSearchQuotaMailboxListenerTest.java b/mailbox/plugin/quota-search-elasticsearch/src/test/java/org/apache/james/quota/search/elasticsearch/events/ElasticSearchQuotaMailboxListenerTest.java index ef5ef59..69e55e8 100644 --- a/mailbox/plugin/quota-search-elasticsearch/src/test/java/org/apache/james/quota/search/elasticsearch/events/ElasticSearchQuotaMailboxListenerTest.java +++ b/mailbox/plugin/quota-search-elasticsearch/src/test/java/org/apache/james/quota/search/elasticsearch/events/ElasticSearchQuotaMailboxListenerTest.java @@ -43,6 +43,7 @@ import org.elasticsearch.action.search.SearchResponse; import org.elasticsearch.client.RestHighLevelClient; import org.elasticsearch.index.query.QueryBuilders; import org.elasticsearch.search.builder.SearchSourceBuilder; +import org.junit.After; import org.junit.Before; import org.junit.Rule; import org.junit.Test; @@ -72,6 +73,11 @@ public class ElasticSearchQuotaMailboxListenerTest { new QuotaRatioToElasticSearchJson()); } + @After + public void tearDown() throws IOException { + client.close(); + } + @Test public void deserializeElasticSearchQuotaMailboxListenerGroup() throws Exception { assertThat(Group.deserialize("org.apache.james.quota.search.elasticsearch.events.ElasticSearchQuotaMailboxListener$ElasticSearchQuotaMailboxListenerGroup")) diff --git a/mpt/impl/imap-mailbox/elasticsearch/src/test/java/org/apache/james/mpt/imapmailbox/elasticsearch/host/ElasticSearchHostSystem.java b/mpt/impl/imap-mailbox/elasticsearch/src/test/java/org/apache/james/mpt/imapmailbox/elasticsearch/host/ElasticSearchHostSystem.java index e0f3b92..f191c30 100644 --- a/mpt/impl/imap-mailbox/elasticsearch/src/test/java/org/apache/james/mpt/imapmailbox/elasticsearch/host/ElasticSearchHostSystem.java +++ b/mpt/impl/imap-mailbox/elasticsearch/src/test/java/org/apache/james/mpt/imapmailbox/elasticsearch/host/ElasticSearchHostSystem.java @@ -19,6 +19,7 @@ package org.apache.james.mpt.imapmailbox.elasticsearch.host; +import java.io.IOException; import java.time.ZoneId; import org.apache.commons.lang.NotImplementedException; @@ -61,6 +62,7 @@ public class ElasticSearchHostSystem extends JamesImapHostSystem { private DockerElasticSearch dockerElasticSearch; private StoreMailboxManager mailboxManager; + private RestHighLevelClient client; @Override public void beforeTest() throws Exception { @@ -71,12 +73,13 @@ public class ElasticSearchHostSystem extends JamesImapHostSystem { } @Override - public void afterTest() { + public void afterTest() throws IOException { + client.close(); dockerElasticSearch.cleanUpData(); } private void initFields() throws Exception { - RestHighLevelClient client = MailboxIndexCreationUtil.prepareDefaultClient( + client = MailboxIndexCreationUtil.prepareDefaultClient( dockerElasticSearch.clientProvider().get(), ElasticSearchConfiguration.builder() .addHost(dockerElasticSearch.getHttpHost()) diff --git a/server/container/guice/cassandra-guice/src/main/java/org/apache/james/modules/mailbox/ElasticSearchClientModule.java b/server/container/guice/cassandra-guice/src/main/java/org/apache/james/modules/mailbox/ElasticSearchClientModule.java index 020a365..d6a6c11 100644 --- a/server/container/guice/cassandra-guice/src/main/java/org/apache/james/modules/mailbox/ElasticSearchClientModule.java +++ b/server/container/guice/cassandra-guice/src/main/java/org/apache/james/modules/mailbox/ElasticSearchClientModule.java @@ -19,50 +19,18 @@ package org.apache.james.modules.mailbox; -import java.io.IOException; -import java.time.Duration; -import java.time.LocalDateTime; - -import javax.inject.Singleton; - -import org.apache.commons.lang3.time.DurationFormatUtils; -import org.apache.james.backends.es.ClientProviderImpl; -import org.apache.james.backends.es.ElasticSearchConfiguration; +import org.apache.james.backends.es.ClientProvider; import org.elasticsearch.client.RestHighLevelClient; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; import com.google.inject.AbstractModule; -import com.google.inject.Provides; - -import reactor.core.publisher.Mono; -import reactor.core.scheduler.Schedulers; +import com.google.inject.Scopes; public class ElasticSearchClientModule extends AbstractModule { - private static final Logger LOGGER = LoggerFactory.getLogger(ElasticSearchClientModule.class); - @Override protected void configure() { + bind(ClientProvider.class).in(Scopes.SINGLETON); + bind(RestHighLevelClient.class).toProvider(ClientProvider.class); } - @Provides - @Singleton - protected RestHighLevelClient provideClient(ElasticSearchConfiguration configuration) { - Duration waitDelay = Duration.ofMillis(configuration.getMinDelay()); - Duration forever = Duration.ofMillis(Long.MAX_VALUE); - return Mono.fromCallable(() -> connectToCluster(configuration)) - .doOnError(e -> LOGGER.warn("Error establishing ElasticSearch connection. Next retry scheduled in {}", - DurationFormatUtils.formatDurationWords(waitDelay.toMillis(), true, true), e)) - .retryBackoff(configuration.getMaxRetries(), waitDelay, forever, Schedulers.elastic()) - .publishOn(Schedulers.elastic()) - .block(); - } - - private RestHighLevelClient connectToCluster(ElasticSearchConfiguration configuration) throws IOException { - LOGGER.info("Trying to connect to ElasticSearch service at {}", LocalDateTime.now()); - - return ClientProviderImpl.fromConfiguration(configuration) - .get(); - } } diff --git a/server/container/guice/cassandra-guice/src/test/java/org/apache/james/modules/TestDockerElasticSearchModule.java b/server/container/guice/cassandra-guice/src/test/java/org/apache/james/modules/TestDockerElasticSearchModule.java index 70e9362..fe87e53 100644 --- a/server/container/guice/cassandra-guice/src/test/java/org/apache/james/modules/TestDockerElasticSearchModule.java +++ b/server/container/guice/cassandra-guice/src/test/java/org/apache/james/modules/TestDockerElasticSearchModule.java @@ -19,18 +19,11 @@ package org.apache.james.modules; -import java.io.IOException; - -import javax.inject.Singleton; - import org.apache.james.CleanupTasksPerformer; import org.apache.james.backends.es.DockerElasticSearch; import org.apache.james.backends.es.ElasticSearchConfiguration; -import org.apache.james.mailbox.elasticsearch.MailboxIndexCreationUtil; -import org.elasticsearch.client.RestHighLevelClient; import com.google.inject.AbstractModule; -import com.google.inject.Provides; import com.google.inject.multibindings.Multibinder; public class TestDockerElasticSearchModule extends AbstractModule { @@ -59,17 +52,10 @@ public class TestDockerElasticSearchModule extends AbstractModule { @Override protected void configure() { + bind(ElasticSearchConfiguration.class).toInstance(elasticSearch.configuration()); Multibinder.newSetBinder(binder(), CleanupTasksPerformer.CleanupTask.class) .addBinding() .toInstance(new ESContainerCleanUp(elasticSearch)); } - @Provides - @Singleton - protected RestHighLevelClient provideClientProvider() throws IOException { - RestHighLevelClient client = elasticSearch.clientProvider().get(); - return MailboxIndexCreationUtil.prepareDefaultClient(client, ElasticSearchConfiguration.builder() - .addHost(elasticSearch.getHttpHost()) - .build()); - } } diff --git a/server/protocols/webadmin/webadmin-mailbox/src/test/java/org/apache/james/webadmin/routes/ElasticSearchQuotaSearchExtension.java b/server/protocols/webadmin/webadmin-mailbox/src/test/java/org/apache/james/webadmin/routes/ElasticSearchQuotaSearchExtension.java index f062b60..a469985 100644 --- a/server/protocols/webadmin/webadmin-mailbox/src/test/java/org/apache/james/webadmin/routes/ElasticSearchQuotaSearchExtension.java +++ b/server/protocols/webadmin/webadmin-mailbox/src/test/java/org/apache/james/webadmin/routes/ElasticSearchQuotaSearchExtension.java @@ -21,6 +21,8 @@ package org.apache.james.webadmin.routes; import static org.mockito.Mockito.mock; +import java.io.IOException; + import org.apache.james.backends.es.DockerElasticSearch; import org.apache.james.backends.es.DockerElasticSearchSingleton; import org.apache.james.backends.es.ElasticSearchConfiguration; @@ -50,6 +52,7 @@ public class ElasticSearchQuotaSearchExtension implements ParameterResolver, Bef private final DockerElasticSearch elasticSearch = DockerElasticSearchSingleton.INSTANCE; private WebAdminQuotaSearchTestSystem restQuotaSearchTestSystem; private TemporaryFolder temporaryFolder = new TemporaryFolder(); + private RestHighLevelClient client; @Override public void beforeEach(ExtensionContext context) { @@ -57,7 +60,7 @@ public class ElasticSearchQuotaSearchExtension implements ParameterResolver, Bef temporaryFolder.create(); elasticSearch.start(); - RestHighLevelClient client = QuotaSearchIndexCreationUtil.prepareDefaultClient( + client = QuotaSearchIndexCreationUtil.prepareDefaultClient( elasticSearch.clientProvider().get(), ElasticSearchConfiguration.builder() .addHost(elasticSearch.getHttpHost()) @@ -98,9 +101,9 @@ public class ElasticSearchQuotaSearchExtension implements ParameterResolver, Bef } @Override - public void afterEach(ExtensionContext context) { + public void afterEach(ExtensionContext context) throws IOException { restQuotaSearchTestSystem.getWebAdminServer().destroy(); - + client.close(); elasticSearch.cleanUpData(); temporaryFolder.delete(); } --------------------------------------------------------------------- To unsubscribe, e-mail: server-dev-unsubscr...@james.apache.org For additional commands, e-mail: server-dev-h...@james.apache.org