JAMES-1999 Elasticsearch does not retry to connect when error
Project: http://git-wip-us.apache.org/repos/asf/james-project/repo Commit: http://git-wip-us.apache.org/repos/asf/james-project/commit/d258d900 Tree: http://git-wip-us.apache.org/repos/asf/james-project/tree/d258d900 Diff: http://git-wip-us.apache.org/repos/asf/james-project/diff/d258d900 Branch: refs/heads/master Commit: d258d900fea5c220480bcaad095942ecd59d2b12 Parents: 6c5912f Author: quynhn <qngu...@linagora.com> Authored: Fri Apr 14 09:48:31 2017 +0700 Committer: benwa <btell...@linagora.com> Committed: Fri Apr 21 07:52:19 2017 +0700 ---------------------------------------------------------------------- .../modules/mailbox/CassandraSessionModule.java | 43 ++++++++++++-------- .../mailbox/ElasticSearchMailboxModule.java | 26 ++++++------ 2 files changed, 40 insertions(+), 29 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/james-project/blob/d258d900/server/container/guice/cassandra-guice/src/main/java/org/apache/james/modules/mailbox/CassandraSessionModule.java ---------------------------------------------------------------------- diff --git a/server/container/guice/cassandra-guice/src/main/java/org/apache/james/modules/mailbox/CassandraSessionModule.java b/server/container/guice/cassandra-guice/src/main/java/org/apache/james/modules/mailbox/CassandraSessionModule.java index 8146c5a..8b92f99 100644 --- a/server/container/guice/cassandra-guice/src/main/java/org/apache/james/modules/mailbox/CassandraSessionModule.java +++ b/server/container/guice/cassandra-guice/src/main/java/org/apache/james/modules/mailbox/CassandraSessionModule.java @@ -29,6 +29,7 @@ import com.google.inject.Provides; import com.google.inject.Singleton; import com.google.inject.multibindings.Multibinder; import com.nurkiewicz.asyncretry.AsyncRetryExecutor; +import com.nurkiewicz.asyncretry.function.RetryCallable; import org.apache.commons.configuration.ConfigurationException; import org.apache.commons.configuration.PropertiesConfiguration; import org.apache.james.backends.cassandra.components.CassandraModule; @@ -40,6 +41,9 @@ import org.apache.james.backends.cassandra.init.QueryLoggerConfiguration; import org.apache.james.backends.cassandra.init.SessionWithInitializedTablesFactory; import org.apache.james.filesystem.api.FileSystem; import org.apache.james.util.Host; +import org.apache.james.utils.RetryExecutorUtil; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import java.io.FileNotFoundException; import java.util.Arrays; @@ -51,6 +55,7 @@ import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; public class CassandraSessionModule extends AbstractModule { + private static final Logger LOGGER = LoggerFactory.getLogger(CassandraSessionModule.class); private static final int DEFAULT_CONNECTION_MAX_RETRIES = 10; private static final int DEFAULT_CONNECTION_MIN_DELAY = 5000; @@ -91,19 +96,30 @@ public class CassandraSessionModule extends AbstractModule { List<Host> servers = listCassandraServers(configuration); QueryLoggerConfiguration queryLoggerConfiguration = getCassandraQueryLoggerConf(configuration); - return getRetryer(executor, configuration) - .getWithRetry(ctx -> ClusterWithKeyspaceCreatedFactory - .config( - ClusterBuilder.builder() - .servers(servers) - .queryLoggerConfiguration(queryLoggerConfiguration) - .build(), - configuration.getString("cassandra.keyspace")) - .replicationFactor(configuration.getInt("cassandra.replication.factor")) - .clusterWithInitializedKeyspace()) + int maxRetries = configuration.getInt("cassandra.retryConnection.maxRetries", DEFAULT_CONNECTION_MAX_RETRIES); + int minDelay = configuration.getInt("cassandra.retryConnection.minDelay", DEFAULT_CONNECTION_MIN_DELAY); + + return RetryExecutorUtil.retryOnExceptions(executor, maxRetries, minDelay, NoHostAvailableException.class) + .getWithRetry(getClusterRetryCallable(configuration, servers, queryLoggerConfiguration)) .get(); } + private RetryCallable<Cluster> getClusterRetryCallable(PropertiesConfiguration configuration, List<Host> servers, QueryLoggerConfiguration queryLoggerConfiguration) { + LOGGER.info("Trying to connect to Cassandra service"); + + return context -> ClusterWithKeyspaceCreatedFactory + .config(getCluster(servers, queryLoggerConfiguration), configuration.getString("cassandra.keyspace")) + .replicationFactor(configuration.getInt("cassandra.replication.factor")) + .clusterWithInitializedKeyspace(); + } + + private Cluster getCluster(List<Host> servers, QueryLoggerConfiguration queryLoggerConfiguration) { + return ClusterBuilder.builder() + .servers(servers) + .queryLoggerConfiguration(queryLoggerConfiguration) + .build(); + } + private List<Host> listCassandraServers(PropertiesConfiguration configuration) { String[] ipAndPorts = configuration.getStringArray("cassandra.nodes"); @@ -154,13 +170,6 @@ public class CassandraSessionModule extends AbstractModule { return builder.build(); } - private static AsyncRetryExecutor getRetryer(AsyncRetryExecutor executor, PropertiesConfiguration configuration) { - return executor.retryOn(NoHostAvailableException.class) - .withProportionalJitter() - .withMaxRetries(configuration.getInt("cassandra.retryConnection.maxRetries", DEFAULT_CONNECTION_MAX_RETRIES)) - .withMinDelay(configuration.getInt("cassandra.retryConnection.minDelay", DEFAULT_CONNECTION_MIN_DELAY)); - } - @Provides private AsyncRetryExecutor provideAsyncRetryExecutor(ScheduledExecutorService scheduler) { return new AsyncRetryExecutor(scheduler); http://git-wip-us.apache.org/repos/asf/james-project/blob/d258d900/server/container/guice/cassandra-guice/src/main/java/org/apache/james/modules/mailbox/ElasticSearchMailboxModule.java ---------------------------------------------------------------------- diff --git a/server/container/guice/cassandra-guice/src/main/java/org/apache/james/modules/mailbox/ElasticSearchMailboxModule.java b/server/container/guice/cassandra-guice/src/main/java/org/apache/james/modules/mailbox/ElasticSearchMailboxModule.java index 4c3c9ef..48c4145 100644 --- a/server/container/guice/cassandra-guice/src/main/java/org/apache/james/modules/mailbox/ElasticSearchMailboxModule.java +++ b/server/container/guice/cassandra-guice/src/main/java/org/apache/james/modules/mailbox/ElasticSearchMailboxModule.java @@ -42,8 +42,11 @@ import org.apache.james.mailbox.extractor.TextExtractor; import org.apache.james.mailbox.store.search.ListeningMessageSearchIndex; import org.apache.james.mailbox.store.search.MessageSearchIndex; import org.apache.james.mailbox.tika.extractor.TikaTextExtractor; +import org.apache.james.utils.RetryExecutorUtil; import org.elasticsearch.client.Client; import org.elasticsearch.client.transport.NoNodeAvailableException; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import com.google.common.annotations.VisibleForTesting; import com.google.inject.AbstractModule; @@ -52,6 +55,7 @@ import com.google.inject.Scopes; import com.nurkiewicz.asyncretry.AsyncRetryExecutor; public class ElasticSearchMailboxModule extends AbstractModule { + private static final Logger LOGGER = LoggerFactory.getLogger(ElasticSearchMailboxModule.class); public static final String ES_CONFIG_FILE = FileSystem.FILE_PROTOCOL_AND_CONF + "elasticsearch.properties"; public static final String ELASTICSEARCH_HOSTS = "elasticsearch.hosts"; @@ -77,11 +81,17 @@ public class ElasticSearchMailboxModule extends AbstractModule { @Singleton protected Client provideClientProvider(FileSystem fileSystem, AsyncRetryExecutor executor) throws ConfigurationException, FileNotFoundException, ExecutionException, InterruptedException { PropertiesConfiguration propertiesReader = new PropertiesConfiguration(fileSystem.getFile(ES_CONFIG_FILE)); + int maxRetries = propertiesReader.getInt("elasticsearch.retryConnection.maxRetries", DEFAULT_CONNECTION_MAX_RETRIES); + int minDelay = propertiesReader.getInt("elasticsearch.retryConnection.minDelay", DEFAULT_CONNECTION_MIN_DELAY); - ClientProvider clientProvider = connectToCluster(propertiesReader); + return RetryExecutorUtil.retryOnExceptions(executor, maxRetries, minDelay, NoNodeAvailableException.class) + .getWithRetry(context -> connectToCluster(propertiesReader)) + .get(); + } - Client client = getRetryer(executor, propertiesReader) - .getWithRetry(ctx -> clientProvider.get()).get(); + private Client createClientAndIndex(ClientProvider clientProvider, PropertiesConfiguration propertiesReader) { + LOGGER.info("Trying to connect to ElasticSearch service"); + Client client = clientProvider.get(); IndexCreationFactory.createIndex(client, MailboxElasticsearchConstants.MAILBOX_INDEX, propertiesReader.getInt("elasticsearch.nb.shards"), @@ -122,15 +132,7 @@ public class ElasticSearchMailboxModule extends AbstractModule { } } - private static AsyncRetryExecutor getRetryer(AsyncRetryExecutor executor, PropertiesConfiguration configuration) { - return executor - .withProportionalJitter() - .retryOn(NoNodeAvailableException.class) - .withMaxRetries(configuration.getInt("elasticsearch.retryConnection.maxRetries", DEFAULT_CONNECTION_MAX_RETRIES)) - .withMinDelay(configuration.getInt("elasticsearch.retryConnection.minDelay", DEFAULT_CONNECTION_MIN_DELAY)); - } - - @Provides + @Provides @Singleton public IndexAttachments provideIndexAttachments(PropertiesConfiguration configuration) { if (configuration.getBoolean("elasticsearch.indexAttachments", DEFAULT_INDEX_ATTACHMENTS)) { --------------------------------------------------------------------- To unsubscribe, e-mail: server-dev-unsubscr...@james.apache.org For additional commands, e-mail: server-dev-h...@james.apache.org