This is an automated email from the ASF dual-hosted git repository.

btellier pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/james-project.git

commit e3298980db6c4fa3eadc64cdce958c325e4a71ae
Author: Matthieu Baechler <[email protected]>
AuthorDate: Wed Jun 19 15:17:25 2019 +0200

    JAMES-2803 allow configuration of ES request timeout
---
 .../james/backends/es/ClientProviderImpl.java      |  47 +++------
 .../backends/es/ElasticSearchConfiguration.java    |  25 ++++-
 .../es/ClientProviderImplConnectionTest.java       |  30 +++---
 .../james/backends/es/ClientProviderImplTest.java  | 109 +--------------------
 .../james/backends/es/DockerElasticSearch.java     |   8 +-
 .../modules/mailbox/ElasticSearchClientModule.java |   2 +-
 .../apache/james/DockerElasticSearchExtension.java |  11 +++
 7 files changed, 71 insertions(+), 161 deletions(-)

diff --git 
a/backends-common/elasticsearch/src/main/java/org/apache/james/backends/es/ClientProviderImpl.java
 
b/backends-common/elasticsearch/src/main/java/org/apache/james/backends/es/ClientProviderImpl.java
index 3ba32f0..7816e8d 100644
--- 
a/backends-common/elasticsearch/src/main/java/org/apache/james/backends/es/ClientProviderImpl.java
+++ 
b/backends-common/elasticsearch/src/main/java/org/apache/james/backends/es/ClientProviderImpl.java
@@ -18,63 +18,48 @@
  ****************************************************************/
 package org.apache.james.backends.es;
 
-import java.util.Optional;
-
 import org.apache.http.HttpHost;
-import org.apache.james.util.Host;
 import org.elasticsearch.client.RestClient;
+import org.elasticsearch.client.RestClientBuilder;
 import org.elasticsearch.client.RestHighLevelClient;
 import org.elasticsearch.common.settings.Settings;
 
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Preconditions;
-import com.google.common.collect.ImmutableList;
 
 public class ClientProviderImpl implements ClientProvider {
 
-    public static ClientProviderImpl forHost(String address, Integer port, 
Optional<String> clusterName) {
-        return new ClientProviderImpl(ImmutableList.of(Host.from(address, 
port)), clusterName);
-    }
-
-    public static ClientProviderImpl fromHostsString(String hostsString, 
Optional<String> clusterName) {
-        Preconditions.checkNotNull(hostsString, "HostString should not be 
null");
-        return new ClientProviderImpl(Host.parseHosts(hostsString), 
clusterName);
-    }
-
-    public static ClientProviderImpl fromHosts(ImmutableList<Host> hosts, 
Optional<String> clusterName) {
-        Preconditions.checkNotNull(hosts, "Hosts should not be null");
-        return new ClientProviderImpl(hosts, clusterName);
+    public static ClientProviderImpl 
fromConfiguration(ElasticSearchConfiguration configuration) {
+        Preconditions.checkNotNull(configuration);
+        return new ClientProviderImpl(configuration);
     }
 
     private static final String CLUSTER_NAME_SETTING = "cluster.name";
     private static final String HTTP_HOST_SCHEME = "http";
+    private final ElasticSearchConfiguration configuration;
 
-    private final ImmutableList<Host> hosts;
-    private final Optional<String> clusterName;
-
-    private ClientProviderImpl(ImmutableList<Host> hosts, Optional<String> 
clusterName) {
-        Preconditions.checkArgument(!hosts.isEmpty(), "You should provide at 
least one host");
-        this.hosts = hosts;
-        this.clusterName = clusterName;
+    private ClientProviderImpl(ElasticSearchConfiguration configuration) {
+        this.configuration = configuration;
     }
 
     private HttpHost[] hostsToHttpHosts() {
-        return hosts.stream()
+        return configuration.getHosts().stream()
             .map(host -> new HttpHost(host.getHostName(), host.getPort(), 
HTTP_HOST_SCHEME))
             .toArray(HttpHost[]::new);
     }
 
     @Override
     public RestHighLevelClient get() {
-        return new RestHighLevelClient(RestClient.builder(hostsToHttpHosts()));
+        RestClientBuilder restClient = RestClient.builder(hostsToHttpHosts())
+            
.setMaxRetryTimeoutMillis(Math.toIntExact(configuration.getRequestTimeout().toMillis()));
+        return new RestHighLevelClient(restClient);
     }
 
     @VisibleForTesting Settings settings() {
-        if (clusterName.isPresent()) {
-            return Settings.builder()
-                    .put(CLUSTER_NAME_SETTING, clusterName.get())
-                    .build();
-        }
-        return Settings.EMPTY;
+        return configuration.getClusterName()
+            .map(clusterName -> Settings.builder()
+                .put(CLUSTER_NAME_SETTING, clusterName)
+                .build())
+            .orElse(Settings.EMPTY);
     }
 }
diff --git 
a/backends-common/elasticsearch/src/main/java/org/apache/james/backends/es/ElasticSearchConfiguration.java
 
b/backends-common/elasticsearch/src/main/java/org/apache/james/backends/es/ElasticSearchConfiguration.java
index a1dec4d..ed3199d 100644
--- 
a/backends-common/elasticsearch/src/main/java/org/apache/james/backends/es/ElasticSearchConfiguration.java
+++ 
b/backends-common/elasticsearch/src/main/java/org/apache/james/backends/es/ElasticSearchConfiguration.java
@@ -19,6 +19,7 @@
 
 package org.apache.james.backends.es;
 
+import java.time.Duration;
 import java.util.Arrays;
 import java.util.Collection;
 import java.util.List;
@@ -46,6 +47,7 @@ public class ElasticSearchConfiguration {
         private Optional<Integer> nbReplica;
         private Optional<Integer> minDelay;
         private Optional<Integer> maxRetries;
+        private Optional<Duration> requestTimeout;
 
         public Builder() {
             hosts = ImmutableList.builder();
@@ -54,6 +56,7 @@ public class ElasticSearchConfiguration {
             nbReplica = Optional.empty();
             minDelay = Optional.empty();
             maxRetries = Optional.empty();
+            requestTimeout = Optional.empty();
         }
 
         public Builder addHost(Host host) {
@@ -93,6 +96,11 @@ public class ElasticSearchConfiguration {
             return this;
         }
 
+        public Builder requestTimeout(Optional<Duration> requestTimeout) {
+            this.requestTimeout = requestTimeout;
+            return this;
+        }
+
         public ElasticSearchConfiguration build() {
             ImmutableList<Host> hosts = this.hosts.build();
             Preconditions.checkState(!hosts.isEmpty(), "You need to specify 
ElasticSearch host");
@@ -102,7 +110,8 @@ public class ElasticSearchConfiguration {
                 nbShards.orElse(DEFAULT_NB_SHARDS),
                 nbReplica.orElse(DEFAULT_NB_REPLICA),
                 minDelay.orElse(DEFAULT_CONNECTION_MIN_DELAY),
-                maxRetries.orElse(DEFAULT_CONNECTION_MAX_RETRIES));
+                maxRetries.orElse(DEFAULT_CONNECTION_MAX_RETRIES),
+                requestTimeout.orElse(DEFAULT_REQUEST_TIMEOUT));
         }
     }
 
@@ -121,6 +130,7 @@ public class ElasticSearchConfiguration {
 
     public static final int DEFAULT_CONNECTION_MAX_RETRIES = 7;
     public static final int DEFAULT_CONNECTION_MIN_DELAY = 3000;
+    public static final Duration DEFAULT_REQUEST_TIMEOUT = 
Duration.ofSeconds(30);
     public static final int DEFAULT_NB_SHARDS = 5;
     public static final int DEFAULT_NB_REPLICA = 1;
     public static final int DEFAULT_PORT = 9200;
@@ -184,14 +194,16 @@ public class ElasticSearchConfiguration {
     private final int nbReplica;
     private final int minDelay;
     private final int maxRetries;
+    private final Duration requestTimeout;
 
-    private ElasticSearchConfiguration(ImmutableList<Host> hosts, 
Optional<String> clusterName, int nbShards, int nbReplica, int minDelay, int 
maxRetries) {
+    private ElasticSearchConfiguration(ImmutableList<Host> hosts, 
Optional<String> clusterName, int nbShards, int nbReplica, int minDelay, int 
maxRetries, Duration requestTimeout) {
         this.hosts = hosts;
         this.clusterName = clusterName;
         this.nbShards = nbShards;
         this.nbReplica = nbReplica;
         this.minDelay = minDelay;
         this.maxRetries = maxRetries;
+        this.requestTimeout = requestTimeout;
     }
 
     public ImmutableList<Host> getHosts() {
@@ -218,6 +230,10 @@ public class ElasticSearchConfiguration {
         return maxRetries;
     }
 
+    public Duration getRequestTimeout() {
+        return requestTimeout;
+    }
+
     @Override
     public final boolean equals(Object o) {
         if (o instanceof ElasticSearchConfiguration) {
@@ -228,13 +244,14 @@ public class ElasticSearchConfiguration {
                 && Objects.equals(this.nbReplica, that.nbReplica)
                 && Objects.equals(this.minDelay, that.minDelay)
                 && Objects.equals(this.maxRetries, that.maxRetries)
-                && Objects.equals(this.hosts, that.hosts);
+                && Objects.equals(this.hosts, that.hosts)
+                && Objects.equals(this.requestTimeout, that.requestTimeout);
         }
         return false;
     }
 
     @Override
     public final int hashCode() {
-        return Objects.hash(hosts, clusterName, nbShards, nbReplica, minDelay, 
maxRetries);
+        return Objects.hash(hosts, clusterName, nbShards, nbReplica, minDelay, 
maxRetries, requestTimeout);
     }
 }
diff --git 
a/backends-common/elasticsearch/src/test/java/org/apache/james/backends/es/ClientProviderImplConnectionTest.java
 
b/backends-common/elasticsearch/src/test/java/org/apache/james/backends/es/ClientProviderImplConnectionTest.java
index 149aa07..72ef84a 100644
--- 
a/backends-common/elasticsearch/src/test/java/org/apache/james/backends/es/ClientProviderImplConnectionTest.java
+++ 
b/backends-common/elasticsearch/src/test/java/org/apache/james/backends/es/ClientProviderImplConnectionTest.java
@@ -19,9 +19,9 @@
 
 package org.apache.james.backends.es;
 
-import java.util.Optional;
 import java.util.concurrent.TimeUnit;
 
+import org.apache.james.util.Host;
 import org.apache.james.util.docker.DockerGenericContainer;
 import org.apache.james.util.docker.Images;
 import org.awaitility.Awaitility;
@@ -53,22 +53,27 @@ public class ClientProviderImplConnectionTest {
 
     @Test
     public void connectingASingleServerShouldWork() {
+        ElasticSearchConfiguration configuration = 
ElasticSearchConfiguration.builder()
+            .addHost(Host.from(es1.getContainerIp(), ES_APPLICATIVE_PORT))
+            .build();
+
         Awaitility.await()
             .atMost(1, TimeUnit.MINUTES)
             .pollInterval(5, TimeUnit.SECONDS)
-            .until(() -> 
isConnected(ClientProviderImpl.forHost(es1.getContainerIp(), 
ES_APPLICATIVE_PORT, Optional.empty())));
+            .until(() -> 
isConnected(ClientProviderImpl.fromConfiguration(configuration)));
     }
 
     @Test
     public void connectingAClusterShouldWork() {
+        ElasticSearchConfiguration configuration = 
ElasticSearchConfiguration.builder()
+            .addHost(Host.from(es1.getContainerIp(), ES_APPLICATIVE_PORT))
+            .addHost(Host.from(es2.getContainerIp(), ES_APPLICATIVE_PORT))
+            .build();
+
         Awaitility.await()
             .atMost(1, TimeUnit.MINUTES)
             .pollInterval(5, TimeUnit.SECONDS)
-            .until(() -> isConnected(
-                ClientProviderImpl.fromHostsString(
-                    es1.getContainerIp() + ":" + ES_APPLICATIVE_PORT + ","
-                        + es2.getContainerIp() + ":" + ES_APPLICATIVE_PORT,
-                    Optional.empty())));
+            .until(() -> 
isConnected(ClientProviderImpl.fromConfiguration(configuration)));
     }
 
     @Test
@@ -77,14 +82,15 @@ public class ClientProviderImplConnectionTest {
         String es2Ip = es2.getContainerIp();
         es2.stop();
 
+        ElasticSearchConfiguration configuration = 
ElasticSearchConfiguration.builder()
+            .addHost(Host.from(es1Ip, ES_APPLICATIVE_PORT))
+            .addHost(Host.from(es2Ip, ES_APPLICATIVE_PORT))
+            .build();
+
         Awaitility.await()
             .atMost(1, TimeUnit.MINUTES)
             .pollInterval(5, TimeUnit.SECONDS)
-            .until(() -> isConnected(
-                ClientProviderImpl.fromHostsString(
-                    es1Ip + ":" + ES_APPLICATIVE_PORT + ","
-                        + es2Ip + ":" + ES_APPLICATIVE_PORT,
-                    Optional.empty())));
+            .until(() -> 
isConnected(ClientProviderImpl.fromConfiguration(configuration)));
     }
 
     private boolean isConnected(ClientProvider clientProvider) {
diff --git 
a/backends-common/elasticsearch/src/test/java/org/apache/james/backends/es/ClientProviderImplTest.java
 
b/backends-common/elasticsearch/src/test/java/org/apache/james/backends/es/ClientProviderImplTest.java
index dac6ce3..949ad74 100644
--- 
a/backends-common/elasticsearch/src/test/java/org/apache/james/backends/es/ClientProviderImplTest.java
+++ 
b/backends-common/elasticsearch/src/test/java/org/apache/james/backends/es/ClientProviderImplTest.java
@@ -30,113 +30,8 @@ import org.junit.Test;
 public class ClientProviderImplTest {
 
     @Test
-    public void fromHostsStringShouldThrowOnNullString() {
-        assertThatThrownBy(() -> ClientProviderImpl.fromHostsString(null, 
Optional.empty()))
+    public void fromConfigurationShouldThrowOnNull() {
+        assertThatThrownBy(() -> ClientProviderImpl.fromConfiguration(null))
                 .isInstanceOf(NullPointerException.class);
     }
-
-    @Test
-    public void fromHostsStringShouldThrowOnEmptyString() {
-        assertThatThrownBy(() -> ClientProviderImpl.fromHostsString("", 
Optional.empty()))
-                .isInstanceOf(IllegalArgumentException.class);
-    }
-
-    @Test
-    public void forHostShouldThrowOnNullHost() {
-        assertThatThrownBy(() -> ClientProviderImpl.forHost(null, 9200, 
Optional.empty()))
-                .isInstanceOf(NullPointerException.class);
-    }
-
-    @Test
-    public void forHostShouldThrowOnEmptyHost() {
-        assertThatThrownBy(() -> ClientProviderImpl.forHost("", 9200, 
Optional.empty()))
-                .isInstanceOf(IllegalArgumentException.class);
-    }
-
-    @Test
-    public void forHostShouldThrowOnNegativePort() {
-        assertThatThrownBy(() -> ClientProviderImpl.forHost("localhost", -1, 
Optional.empty()))
-                .isInstanceOf(IllegalArgumentException.class);
-    }
-
-    @Test
-    public void forHostShouldThrowOnZeroPort() {
-        assertThatThrownBy(() -> ClientProviderImpl.forHost("localhost", 0, 
Optional.empty()))
-                .isInstanceOf(IllegalArgumentException.class);
-    }
-
-    @Test
-    public void forHostShouldThrowOnTooBigPort() {
-        assertThatThrownBy(() -> ClientProviderImpl.forHost("localhost", 
65536, Optional.empty()))
-                .isInstanceOf(IllegalArgumentException.class);
-    }
-
-    @Test
-    public void fromHostsStringShouldEmptyAddress() {
-        assertThatThrownBy(() -> ClientProviderImpl.fromHostsString(":9200", 
Optional.empty()))
-                .isInstanceOf(IllegalArgumentException.class);
-    }
-
-    @Test
-    public void fromHostsStringShouldThrowOnAbsentPort() {
-        assertThatThrownBy(() -> 
ClientProviderImpl.fromHostsString("localhost", Optional.empty()))
-                .isInstanceOf(IllegalArgumentException.class);
-    }
-
-    @Test
-    public void fromHostsStringShouldThrowWhenTooMuchParts() {
-        assertThatThrownBy(() -> 
ClientProviderImpl.fromHostsString("localhost:9200:9200", Optional.empty()))
-                .isInstanceOf(IllegalArgumentException.class);
-    }
-
-    @Test
-    public void fromHostsStringShouldThrowOnEmptyPort() {
-        assertThatThrownBy(() -> 
ClientProviderImpl.fromHostsString("localhost:", Optional.empty()))
-                .isInstanceOf(NumberFormatException.class);
-    }
-
-    @Test
-    public void fromHostsStringShouldThrowOnInvalidPort() {
-        assertThatThrownBy(() -> 
ClientProviderImpl.fromHostsString("localhost:invalid", Optional.empty()))
-                .isInstanceOf(NumberFormatException.class);
-    }
-
-    @Test
-    public void fromHostsStringShouldThrowOnNegativePort() {
-        assertThatThrownBy(() -> 
ClientProviderImpl.fromHostsString("localhost:-1", Optional.empty()))
-                .isInstanceOf(IllegalArgumentException.class);
-    }
-
-    @Test
-    public void fromHostsStringShouldThrowOnZeroPort() {
-        assertThatThrownBy(() -> 
ClientProviderImpl.fromHostsString("localhost:0", Optional.empty()))
-                .isInstanceOf(IllegalArgumentException.class);
-    }
-
-    @Test
-    public void fromHostsStringShouldThrowOnTooBigPort() {
-        assertThatThrownBy(() -> 
ClientProviderImpl.fromHostsString("localhost:65536", Optional.empty()))
-                .isInstanceOf(IllegalArgumentException.class);
-    }
-
-    @Test
-    public void fromHostsStringShouldThrowIfOneHostIsInvalid() {
-        assertThatThrownBy(() -> 
ClientProviderImpl.fromHostsString("localhost:9200,localhost", 
Optional.empty()))
-                .isInstanceOf(IllegalArgumentException.class);
-    }
-
-    @Test
-    public void settingsShouldBeEmptyWhenClusterNameIsEmpty() {
-        ClientProviderImpl clientProvider = 
ClientProviderImpl.fromHostsString("localhost:9200", Optional.empty());
-
-        assertThat(clientProvider.settings()).isEqualTo(Settings.EMPTY);
-    }
-
-    @Test
-    public void 
settingsShouldContainClusterNameSettingWhenClusterNameIsGiven() {
-        String clusterName = "myClusterName";
-        ClientProviderImpl clientProvider = 
ClientProviderImpl.fromHostsString("localhost:9200", Optional.of(clusterName));
-
-        
assertThat(clientProvider.settings().get("cluster.name")).isEqualTo(clusterName);
-    }
 }
diff --git 
a/backends-common/elasticsearch/src/test/java/org/apache/james/backends/es/DockerElasticSearch.java
 
b/backends-common/elasticsearch/src/test/java/org/apache/james/backends/es/DockerElasticSearch.java
index e2d5f78..82eb83b 100644
--- 
a/backends-common/elasticsearch/src/test/java/org/apache/james/backends/es/DockerElasticSearch.java
+++ 
b/backends-common/elasticsearch/src/test/java/org/apache/james/backends/es/DockerElasticSearch.java
@@ -21,8 +21,6 @@ package org.apache.james.backends.es;
 
 import static org.assertj.core.api.Assertions.assertThat;
 
-import java.util.Optional;
-
 import org.apache.http.HttpStatus;
 import org.apache.james.util.Host;
 import org.apache.james.util.docker.DockerGenericContainer;
@@ -30,9 +28,7 @@ import org.apache.james.util.docker.Images;
 import org.apache.james.util.docker.RateLimiters;
 import org.testcontainers.containers.wait.strategy.HostPortWaitStrategy;
 
-import com.google.common.collect.ImmutableList;
 import com.google.common.collect.ImmutableMap;
-
 import feign.Feign;
 import feign.Logger;
 import feign.RequestLine;
@@ -117,8 +113,8 @@ public class DockerElasticSearch {
     }
 
     public ClientProvider clientProvider() {
-        Optional<String> noClusterName = Optional.empty();
-        return ClientProviderImpl.fromHosts(ImmutableList.of(getHttpHost()), 
noClusterName);
+        ElasticSearchConfiguration configuration = 
ElasticSearchConfiguration.builder().addHost(getHttpHost()).build();
+        return ClientProviderImpl.fromConfiguration(configuration);
     }
 
     private ElasticSearchAPI esAPI() {
diff --git 
a/server/container/guice/cassandra-guice/src/main/java/org/apache/james/modules/mailbox/ElasticSearchClientModule.java
 
b/server/container/guice/cassandra-guice/src/main/java/org/apache/james/modules/mailbox/ElasticSearchClientModule.java
index e485fee..020a365 100644
--- 
a/server/container/guice/cassandra-guice/src/main/java/org/apache/james/modules/mailbox/ElasticSearchClientModule.java
+++ 
b/server/container/guice/cassandra-guice/src/main/java/org/apache/james/modules/mailbox/ElasticSearchClientModule.java
@@ -62,7 +62,7 @@ public class ElasticSearchClientModule extends AbstractModule 
{
     private RestHighLevelClient connectToCluster(ElasticSearchConfiguration 
configuration) throws IOException {
         LOGGER.info("Trying to connect to ElasticSearch service at {}", 
LocalDateTime.now());
 
-        return ClientProviderImpl.fromHosts(configuration.getHosts(), 
configuration.getClusterName())
+        return ClientProviderImpl.fromConfiguration(configuration)
             .get();
     }
 }
diff --git 
a/server/container/guice/cassandra-guice/src/test/java/org/apache/james/DockerElasticSearchExtension.java
 
b/server/container/guice/cassandra-guice/src/test/java/org/apache/james/DockerElasticSearchExtension.java
index 531bf13..7ff07f4 100644
--- 
a/server/container/guice/cassandra-guice/src/test/java/org/apache/james/DockerElasticSearchExtension.java
+++ 
b/server/container/guice/cassandra-guice/src/test/java/org/apache/james/DockerElasticSearchExtension.java
@@ -19,6 +19,9 @@
 
 package org.apache.james;
 
+import java.time.Duration;
+import java.util.Optional;
+
 import org.apache.james.backends.es.DockerElasticSearch;
 import org.apache.james.backends.es.DockerElasticSearchSingleton;
 import org.apache.james.backends.es.ElasticSearchConfiguration;
@@ -29,13 +32,20 @@ import com.google.inject.Module;
 public class DockerElasticSearchExtension implements GuiceModuleTestExtension {
 
     private final DockerElasticSearch dockerElasticSearch;
+    private Optional<Duration> requestTimeout;
 
     public DockerElasticSearchExtension() {
         this(DockerElasticSearchSingleton.INSTANCE);
     }
 
+    public DockerElasticSearchExtension withRequestTimeout(Duration 
requestTimeout) {
+        this.requestTimeout = Optional.of(requestTimeout);
+        return this;
+    }
+
     public DockerElasticSearchExtension(DockerElasticSearch 
dockerElasticSearch) {
         this.dockerElasticSearch = dockerElasticSearch;
+        requestTimeout = Optional.empty();
     }
 
     @Override
@@ -61,6 +71,7 @@ public class DockerElasticSearchExtension implements 
GuiceModuleTestExtension {
     private ElasticSearchConfiguration 
getElasticSearchConfigurationForDocker() {
         return ElasticSearchConfiguration.builder()
             .addHost(getDockerES().getHttpHost())
+            .requestTimeout(requestTimeout)
             .build();
     }
 


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to