JAMES-2592 Add cluster.name in ElasticSearch client settings
Project: http://git-wip-us.apache.org/repos/asf/james-project/repo Commit: http://git-wip-us.apache.org/repos/asf/james-project/commit/987be509 Tree: http://git-wip-us.apache.org/repos/asf/james-project/tree/987be509 Diff: http://git-wip-us.apache.org/repos/asf/james-project/diff/987be509 Branch: refs/heads/master Commit: 987be509263c6f32188860f3fdef077afcfa0b1d Parents: 644266c Author: Antoine Duprat <adup...@linagora.com> Authored: Tue Nov 13 16:24:53 2018 +0100 Committer: Benoit Tellier <btell...@linagora.com> Committed: Thu Nov 15 09:05:42 2018 +0700 ---------------------------------------------------------------------- .../james/backends/es/ClientProviderImpl.java | 34 ++++++++++--- .../es/ClientProviderImplConnectionTest.java | 13 +++-- .../backends/es/ClientProviderImplTest.java | 52 ++++++++++++++------ .../mailbox/ElasticSearchMailboxModule.java | 2 +- .../apache/james/metric/es/ESReporterTest.java | 3 +- 5 files changed, 73 insertions(+), 31 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/james-project/blob/987be509/backends-common/elasticsearch/src/main/java/org/apache/james/backends/es/ClientProviderImpl.java ---------------------------------------------------------------------- diff --git a/backends-common/elasticsearch/src/main/java/org/apache/james/backends/es/ClientProviderImpl.java b/backends-common/elasticsearch/src/main/java/org/apache/james/backends/es/ClientProviderImpl.java index 317c0b7..8d92ae9 100644 --- a/backends-common/elasticsearch/src/main/java/org/apache/james/backends/es/ClientProviderImpl.java +++ b/backends-common/elasticsearch/src/main/java/org/apache/james/backends/es/ClientProviderImpl.java @@ -19,44 +19,53 @@ package org.apache.james.backends.es; import java.net.InetAddress; +import java.util.Optional; import org.apache.james.util.Host; import org.elasticsearch.client.Client; import org.elasticsearch.client.transport.TransportClient; +import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.transport.InetSocketTransportAddress; import com.github.fge.lambdas.Throwing; import com.github.fge.lambdas.consumers.ConsumerChainer; +import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Preconditions; import com.google.common.collect.ImmutableList; public class ClientProviderImpl implements ClientProvider { - public static ClientProviderImpl forHost(String address, Integer port) { - return new ClientProviderImpl(ImmutableList.of(Host.from(address, port))); + public static ClientProviderImpl forHost(String address, Integer port, Optional<String> clusterName) { + return new ClientProviderImpl(ImmutableList.of(Host.from(address, port)), clusterName); } - public static ClientProviderImpl fromHostsString(String hostsString) { + public static ClientProviderImpl fromHostsString(String hostsString, Optional<String> clusterName) { Preconditions.checkNotNull(hostsString, "HostString should not be null"); - return new ClientProviderImpl(Host.parseHosts(hostsString)); + return new ClientProviderImpl(Host.parseHosts(hostsString), clusterName); } - public static ClientProviderImpl fromHosts(ImmutableList<Host> hosts) { + public static ClientProviderImpl fromHosts(ImmutableList<Host> hosts, Optional<String> clusterName) { Preconditions.checkNotNull(hosts, "Hosts should not be null"); - return new ClientProviderImpl(hosts); + return new ClientProviderImpl(hosts, clusterName); } + private static final String CLUSTER_NAME_SETTING = "cluster.name"; + private final ImmutableList<Host> hosts; + private final Optional<String> clusterName; - private ClientProviderImpl(ImmutableList<Host> hosts) { + private ClientProviderImpl(ImmutableList<Host> hosts, Optional<String> clusterName) { Preconditions.checkArgument(!hosts.isEmpty(), "You should provide at least one host"); this.hosts = hosts; + this.clusterName = clusterName; } @Override public Client get() { - TransportClient transportClient = TransportClient.builder().build(); + TransportClient transportClient = TransportClient.builder() + .settings(settings()) + .build(); ConsumerChainer<Host> consumer = Throwing.consumer(host -> transportClient .addTransportAddress( new InetSocketTransportAddress( @@ -65,4 +74,13 @@ public class ClientProviderImpl implements ClientProvider { hosts.forEach(consumer.sneakyThrow()); return transportClient; } + + @VisibleForTesting Settings settings() { + if (clusterName.isPresent()) { + return Settings.builder() + .put(CLUSTER_NAME_SETTING, clusterName.get()) + .build(); + } + return Settings.EMPTY; + } } http://git-wip-us.apache.org/repos/asf/james-project/blob/987be509/backends-common/elasticsearch/src/test/java/org/apache/james/backends/es/ClientProviderImplConnectionTest.java ---------------------------------------------------------------------- diff --git a/backends-common/elasticsearch/src/test/java/org/apache/james/backends/es/ClientProviderImplConnectionTest.java b/backends-common/elasticsearch/src/test/java/org/apache/james/backends/es/ClientProviderImplConnectionTest.java index aa0a8e9..56f950d 100644 --- a/backends-common/elasticsearch/src/test/java/org/apache/james/backends/es/ClientProviderImplConnectionTest.java +++ b/backends-common/elasticsearch/src/test/java/org/apache/james/backends/es/ClientProviderImplConnectionTest.java @@ -19,6 +19,7 @@ package org.apache.james.backends.es; +import java.util.Optional; import java.util.concurrent.TimeUnit; import org.apache.james.util.docker.SwarmGenericContainer; @@ -52,7 +53,7 @@ public class ClientProviderImplConnectionTest { Awaitility.await() .atMost(1, TimeUnit.MINUTES) .pollInterval(5, TimeUnit.SECONDS) - .until(() -> isConnected(ClientProviderImpl.forHost(es1.getContainerIp(), 9300))); + .until(() -> isConnected(ClientProviderImpl.forHost(es1.getContainerIp(), 9300, Optional.empty()))); } @Test @@ -62,8 +63,9 @@ public class ClientProviderImplConnectionTest { .pollInterval(5, TimeUnit.SECONDS) .until(() -> isConnected( ClientProviderImpl.fromHostsString( - es1.getContainerIp() + ":" + ES_APPLICATIVE_PORT + "," - + es2.getContainerIp() + ":" + ES_APPLICATIVE_PORT))); + es1.getContainerIp() + ":" + ES_APPLICATIVE_PORT + "," + + es2.getContainerIp() + ":" + ES_APPLICATIVE_PORT + , Optional.empty()))); } @Test @@ -75,8 +77,9 @@ public class ClientProviderImplConnectionTest { .pollInterval(5, TimeUnit.SECONDS) .until(() -> isConnected( ClientProviderImpl.fromHostsString( - es1.getContainerIp() + ":" + ES_APPLICATIVE_PORT + "," - + es2.getContainerIp() + ":" + ES_APPLICATIVE_PORT))); + es1.getContainerIp() + ":" + ES_APPLICATIVE_PORT + "," + + es2.getContainerIp() + ":" + ES_APPLICATIVE_PORT + , Optional.empty()))); } private boolean isConnected(ClientProvider clientProvider) { http://git-wip-us.apache.org/repos/asf/james-project/blob/987be509/backends-common/elasticsearch/src/test/java/org/apache/james/backends/es/ClientProviderImplTest.java ---------------------------------------------------------------------- diff --git a/backends-common/elasticsearch/src/test/java/org/apache/james/backends/es/ClientProviderImplTest.java b/backends-common/elasticsearch/src/test/java/org/apache/james/backends/es/ClientProviderImplTest.java index a97eb64..95f1014 100644 --- a/backends-common/elasticsearch/src/test/java/org/apache/james/backends/es/ClientProviderImplTest.java +++ b/backends-common/elasticsearch/src/test/java/org/apache/james/backends/es/ClientProviderImplTest.java @@ -19,10 +19,15 @@ package org.apache.james.backends.es; +import static org.assertj.core.api.Assertions.assertThat; + +import org.elasticsearch.common.settings.Settings; import org.junit.Rule; import org.junit.Test; import org.junit.rules.ExpectedException; +import java.util.Optional; + public class ClientProviderImplTest { @Rule @@ -32,111 +37,126 @@ public class ClientProviderImplTest { public void fromHostsStringShouldThrowOnNullString() { expectedException.expect(NullPointerException.class); - ClientProviderImpl.fromHostsString(null); + ClientProviderImpl.fromHostsString(null, Optional.empty()); } @Test public void fromHostsStringShouldThrowOnEmptyString() { expectedException.expect(IllegalArgumentException.class); - ClientProviderImpl.fromHostsString(""); + ClientProviderImpl.fromHostsString("", Optional.empty()); } @Test public void forHostShouldThrowOnNullHost() { expectedException.expect(NullPointerException.class); - ClientProviderImpl.forHost(null, 9200); + ClientProviderImpl.forHost(null, 9200, Optional.empty()); } @Test public void forHostShouldThrowOnEmptyHost() { expectedException.expect(IllegalArgumentException.class); - ClientProviderImpl.forHost("", 9200); + ClientProviderImpl.forHost("", 9200, Optional.empty()); } @Test public void forHostShouldThrowOnNegativePort() { expectedException.expect(IllegalArgumentException.class); - ClientProviderImpl.forHost("localhost", -1); + ClientProviderImpl.forHost("localhost", -1, Optional.empty()); } @Test public void forHostShouldThrowOnZeroPort() { expectedException.expect(IllegalArgumentException.class); - ClientProviderImpl.forHost("localhost", 0); + ClientProviderImpl.forHost("localhost", 0, Optional.empty()); } @Test public void forHostShouldThrowOnTooBigPort() { expectedException.expect(IllegalArgumentException.class); - ClientProviderImpl.forHost("localhost", 65536); + ClientProviderImpl.forHost("localhost", 65536, Optional.empty()); } @Test public void fromHostsStringShouldEmptyAddress() { expectedException.expect(IllegalArgumentException.class); - ClientProviderImpl.fromHostsString(":9200"); + ClientProviderImpl.fromHostsString(":9200", Optional.empty()); } @Test public void fromHostsStringShouldThrowOnAbsentPort() { expectedException.expect(IllegalArgumentException.class); - ClientProviderImpl.fromHostsString("localhost"); + ClientProviderImpl.fromHostsString("localhost", Optional.empty()); } @Test public void fromHostsStringShouldThrowWhenTooMuchParts() { expectedException.expect(IllegalArgumentException.class); - ClientProviderImpl.fromHostsString("localhost:9200:9200"); + ClientProviderImpl.fromHostsString("localhost:9200:9200", Optional.empty()); } @Test public void fromHostsStringShouldThrowOnEmptyPort() { expectedException.expect(NumberFormatException.class); - ClientProviderImpl.fromHostsString("localhost:"); + ClientProviderImpl.fromHostsString("localhost:", Optional.empty()); } @Test public void fromHostsStringShouldThrowOnInvalidPort() { expectedException.expect(NumberFormatException.class); - ClientProviderImpl.fromHostsString("localhost:invalid"); + ClientProviderImpl.fromHostsString("localhost:invalid", Optional.empty()); } @Test public void fromHostsStringShouldThrowOnNegativePort() { expectedException.expect(IllegalArgumentException.class); - ClientProviderImpl.fromHostsString("localhost:-1"); + ClientProviderImpl.fromHostsString("localhost:-1", Optional.empty()); } @Test public void fromHostsStringShouldThrowOnZeroPort() { expectedException.expect(IllegalArgumentException.class); - ClientProviderImpl.fromHostsString("localhost:0"); + ClientProviderImpl.fromHostsString("localhost:0", Optional.empty()); } @Test public void fromHostsStringShouldThrowOnTooBigPort() { expectedException.expect(IllegalArgumentException.class); - ClientProviderImpl.fromHostsString("localhost:65536"); + ClientProviderImpl.fromHostsString("localhost:65536", Optional.empty()); } @Test public void fromHostsStringShouldThrowIfOneHostIsInvalid() { expectedException.expect(IllegalArgumentException.class); - ClientProviderImpl.fromHostsString("localhost:9200,localhost"); + ClientProviderImpl.fromHostsString("localhost:9200,localhost", Optional.empty()); + } + + @Test + public void settingsShouldBeEmptyWhenClusterNameIsEmpty() { + ClientProviderImpl clientProvider = ClientProviderImpl.fromHostsString("localhost:9200", Optional.empty()); + + assertThat(clientProvider.settings()).isEqualTo(Settings.EMPTY); + } + + @Test + public void settingsShouldContainClusterNameSettingWhenClusterNameIsGiven() { + String clusterName = "myClusterName"; + ClientProviderImpl clientProvider = ClientProviderImpl.fromHostsString("localhost:9200", Optional.of(clusterName)); + + assertThat(clientProvider.settings().get("cluster.name")).isEqualTo(clusterName); } } http://git-wip-us.apache.org/repos/asf/james-project/blob/987be509/server/container/guice/cassandra-guice/src/main/java/org/apache/james/modules/mailbox/ElasticSearchMailboxModule.java ---------------------------------------------------------------------- diff --git a/server/container/guice/cassandra-guice/src/main/java/org/apache/james/modules/mailbox/ElasticSearchMailboxModule.java b/server/container/guice/cassandra-guice/src/main/java/org/apache/james/modules/mailbox/ElasticSearchMailboxModule.java index ad05503..56b7fbc 100644 --- a/server/container/guice/cassandra-guice/src/main/java/org/apache/james/modules/mailbox/ElasticSearchMailboxModule.java +++ b/server/container/guice/cassandra-guice/src/main/java/org/apache/james/modules/mailbox/ElasticSearchMailboxModule.java @@ -144,7 +144,7 @@ public class ElasticSearchMailboxModule extends AbstractModule { ElasticSearchQuotaConfiguration quotaConfiguration) { LOGGER.info("Trying to connect to ElasticSearch service at {}", LocalDateTime.now()); - Client client = ClientProviderImpl.fromHosts(configuration.getHosts()).get(); + Client client = ClientProviderImpl.fromHosts(configuration.getHosts(), configuration.getClusterName()).get(); MailboxIndexCreationUtil.prepareClient(client, mailboxConfiguration.getReadAliasMailboxName(), http://git-wip-us.apache.org/repos/asf/james-project/blob/987be509/server/container/metrics/metrics-es-reporter/src/test/java/org/apache/james/metric/es/ESReporterTest.java ---------------------------------------------------------------------- diff --git a/server/container/metrics/metrics-es-reporter/src/test/java/org/apache/james/metric/es/ESReporterTest.java b/server/container/metrics/metrics-es-reporter/src/test/java/org/apache/james/metric/es/ESReporterTest.java index d602f8b..30820f9 100644 --- a/server/container/metrics/metrics-es-reporter/src/test/java/org/apache/james/metric/es/ESReporterTest.java +++ b/server/container/metrics/metrics-es-reporter/src/test/java/org/apache/james/metric/es/ESReporterTest.java @@ -21,6 +21,7 @@ package org.apache.james.metric.es; import static org.awaitility.Awaitility.await; +import java.util.Optional; import java.util.Timer; import java.util.TimerTask; @@ -67,7 +68,7 @@ public class ESReporterTest { @Before public void setUp() { - clientProvider = ClientProviderImpl.forHost(esContainer.getHostIp(), esContainer.getMappedPort(ES_APPLICATIVE_PORT)); + clientProvider = ClientProviderImpl.forHost(esContainer.getHostIp(), esContainer.getMappedPort(ES_APPLICATIVE_PORT), Optional.empty()); await().atMost(Duration.ONE_MINUTE) .until(() -> elasticSearchStarted(clientProvider)); --------------------------------------------------------------------- To unsubscribe, e-mail: server-dev-unsubscr...@james.apache.org For additional commands, e-mail: server-dev-h...@james.apache.org