JAMES-2090 Cassandra guice should Inject CassandraConfiguration
Project: http://git-wip-us.apache.org/repos/asf/james-project/repo Commit: http://git-wip-us.apache.org/repos/asf/james-project/commit/0daadc62 Tree: http://git-wip-us.apache.org/repos/asf/james-project/tree/0daadc62 Diff: http://git-wip-us.apache.org/repos/asf/james-project/diff/0daadc62 Branch: refs/heads/master Commit: 0daadc62d2ac1b1371b5a9061f9e1b4b3f67cf15 Parents: 29fb5d3 Author: benwa <btell...@linagora.com> Authored: Thu Jul 6 08:27:18 2017 +0700 Committer: benwa <btell...@linagora.com> Committed: Thu Jul 6 08:28:26 2017 +0700 ---------------------------------------------------------------------- backends-common/cassandra/pom.xml | 6 + .../cassandra/CassandraConfiguration.java | 223 ++++++++++++++++++ .../cassandra/utils/CassandraUtils.java | 21 +- .../cassandra/CassandraConfigurationTest.java | 225 +++++++++++++++++++ .../CassandraMailboxSessionMapperFactory.java | 28 +-- .../CassandraMailboxPathRegisterMapper.java | 6 +- .../cassandra/mail/CassandraACLMapper.java | 10 +- .../mail/CassandraAnnotationMapper.java | 26 ++- .../mail/CassandraDeletedMessageDAO.java | 17 +- .../cassandra/mail/CassandraMailboxDAO.java | 22 +- .../cassandra/mail/CassandraMailboxMapper.java | 9 +- .../cassandra/mail/CassandraMailboxPathDAO.java | 12 +- .../mail/CassandraMailboxRecentsDAO.java | 12 +- .../cassandra/mail/CassandraMessageDAO.java | 14 +- .../cassandra/mail/CassandraMessageIdDAO.java | 12 +- .../mail/CassandraMessageIdMapper.java | 9 +- .../mail/CassandraMessageIdToImapUidDAO.java | 12 +- .../cassandra/mail/CassandraMessageMapper.java | 17 +- .../cassandra/mail/CassandraModSeqProvider.java | 18 +- .../cassandra/mail/CassandraUidProvider.java | 18 +- .../user/CassandraSubscriptionMapper.java | 8 +- .../CassandraMailboxManagerProvider.java | 3 +- .../cassandra/CassandraTestSystemFixture.java | 3 +- ...istributedMailboxDelegatingListenerTest.java | 4 + ...CassandraMailboxPathRegistrerMapperTest.java | 11 +- .../cassandra/mail/CassandraACLMapperTest.java | 18 +- .../cassandra/mail/CassandraMailboxDAOTest.java | 3 +- .../CassandraMailboxManagerAttachmentTest.java | 4 +- .../CassandraMailboxMapperConcurrencyTest.java | 6 +- .../mail/CassandraMailboxMapperTest.java | 8 +- .../cassandra/mail/CassandraMapperProvider.java | 2 +- .../mail/CassandraModSeqProviderTest.java | 7 +- .../mail/CassandraUidProviderTest.java | 7 +- .../user/CassandraSubscriptionMapperTest.java | 3 +- .../cassandra/host/CassandraHostSystem.java | 3 +- .../cassandra/host/CassandraHostSystem.java | 4 +- .../modules/mailbox/CassandraMailboxModule.java | 4 - .../modules/mailbox/CassandraSessionModule.java | 5 + .../cassandra/CassandraDomainList.java | 16 +- .../cassandra/CassandraUsersRepository.java | 9 +- .../cassandra/CassandraDomainListTest.java | 3 +- .../cassandra/CassandraUsersRepositoryTest.java | 5 +- 42 files changed, 695 insertions(+), 158 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/james-project/blob/0daadc62/backends-common/cassandra/pom.xml ---------------------------------------------------------------------- diff --git a/backends-common/cassandra/pom.xml b/backends-common/cassandra/pom.xml index 4eaec71..6810208 100644 --- a/backends-common/cassandra/pom.xml +++ b/backends-common/cassandra/pom.xml @@ -168,6 +168,12 @@ <version>${assertj-3.version}</version> <scope>test</scope> </dependency> + <dependency> + <groupId>nl.jqno.equalsverifier</groupId> + <artifactId>equalsverifier</artifactId> + <version>1.7.6</version> + <scope>test</scope> + </dependency> <dependency> <groupId>org.cassandraunit</groupId> <artifactId>cassandra-unit</artifactId> http://git-wip-us.apache.org/repos/asf/james-project/blob/0daadc62/backends-common/cassandra/src/main/java/org/apache/james/backends/cassandra/CassandraConfiguration.java ---------------------------------------------------------------------- diff --git a/backends-common/cassandra/src/main/java/org/apache/james/backends/cassandra/CassandraConfiguration.java b/backends-common/cassandra/src/main/java/org/apache/james/backends/cassandra/CassandraConfiguration.java new file mode 100644 index 0000000..3d985d4 --- /dev/null +++ b/backends-common/cassandra/src/main/java/org/apache/james/backends/cassandra/CassandraConfiguration.java @@ -0,0 +1,223 @@ +/**************************************************************** + * Licensed to the Apache Software Foundation (ASF) under one * + * or more contributor license agreements. See the NOTICE file * + * distributed with this work for additional information * + * regarding copyright ownership. The ASF licenses this file * + * to you under the Apache License, Version 2.0 (the * + * "License"); you may not use this file except in compliance * + * with the License. You may obtain a copy of the License at * + * * + * http://www.apache.org/licenses/LICENSE-2.0 * + * * + * Unless required by applicable law or agreed to in writing, * + * software distributed under the License is distributed on an * + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY * + * KIND, either express or implied. See the License for the * + * specific language governing permissions and limitations * + * under the License. * + ****************************************************************/ + +package org.apache.james.backends.cassandra; + +import java.util.Objects; +import java.util.Optional; + +import com.google.common.annotations.VisibleForTesting; +import com.google.common.base.MoreObjects; +import com.google.common.base.Preconditions; + +public class CassandraConfiguration { + public static final CassandraConfiguration DEFAULT_CONFIGURATION = builder().build(); + + public static final int DEFAULT_MESSAGE_CHUNK_SIZE_ON_READ = 100; + public static final int DEFAULT_EXPUNGE_BATCH_SIZE = 100; + public static final int DEFAULT_UPDATE_FLAGS_BATCH_SIZE = 20; + public static final int DEFAULT_FLAGS_UPDATE_MESSAGE_MAX_RETRY = 1000; + public static final int DEFAULT_FLAGS_UPDATE_MESSAGE_ID_MAX_RETRY = 1000; + public static final int DEFAULT_MODSEQ_MAX_RETRY = 100000; + public static final int DEFAULT_UID_MAX_RETRY = 100000; + public static final int DEFAULT_ACL_MAX_RETRY = 1000; + public static final int DEFAULT_FETCH_NEXT_PAGE_ADVANCE_IN_ROW = 100; + + public static class Builder { + private Optional<Integer> messageReadChunkSize = Optional.empty(); + private Optional<Integer> expungeChunkSize = Optional.empty(); + private Optional<Integer> flagsUpdateChunkSize = Optional.empty(); + private Optional<Integer> flagsUpdateMessageIdMaxRetry = Optional.empty(); + private Optional<Integer> flagsUpdateMessageMaxRetry = Optional.empty(); + private Optional<Integer> modSeqMaxRetry = Optional.empty(); + private Optional<Integer> uidMaxRetry = Optional.empty(); + private Optional<Integer> aclMaxRetry = Optional.empty(); + private Optional<Integer> fetchNextPageInAdvanceRow = Optional.empty(); + + public Builder messageReadChunkSize(int value) { + Preconditions.checkArgument(value > 0, "messageReadChunkSize needs to be strictly positive"); + this.messageReadChunkSize = Optional.of(value); + return this; + } + + public Builder expungeChunkSize(int value) { + Preconditions.checkArgument(value > 0, "expungeChunkSize needs to be strictly positive"); + this.expungeChunkSize = Optional.of(value); + return this; + } + + public Builder flagsUpdateChunkSize(int value) { + Preconditions.checkArgument(value > 0, "flagsUpdateChunkSize needs to be strictly positive"); + this.flagsUpdateChunkSize = Optional.of(value); + return this; + } + + public Builder flagsUpdateMessageIdMaxRetry(int value) { + Preconditions.checkArgument(value > 0, "flagsUpdateMessageIdMaxRetry needs to be strictly positive"); + this.flagsUpdateMessageIdMaxRetry = Optional.of(value); + return this; + } + + public Builder flagsUpdateMessageMaxRetry(int value) { + Preconditions.checkArgument(value > 0, "flagsUpdateMessageMaxRetry needs to be strictly positive"); + this.flagsUpdateMessageMaxRetry = Optional.of(value); + return this; + } + + public Builder modSeqMaxRetry(int value) { + Preconditions.checkArgument(value > 0, "modSeqMaxRetry needs to be strictly positive"); + this.modSeqMaxRetry = Optional.of(value); + return this; + } + + public Builder uidMaxRetry(int value) { + Preconditions.checkArgument(value > 0, "uidMaxRetry needs to be strictly positive"); + this.uidMaxRetry = Optional.of(value); + return this; + } + + public Builder aclMaxRetry(int value) { + Preconditions.checkArgument(value > 0, "aclMaxRetry needs to be strictly positive"); + this.aclMaxRetry = Optional.of(value); + return this; + } + + public Builder fetchNextPageInAdvanceRow(int value) { + Preconditions.checkArgument(value > 0, "fetchNextPageInAdvanceRow needs to be strictly positive"); + this.fetchNextPageInAdvanceRow = Optional.of(value); + return this; + } + + public CassandraConfiguration build() { + return new CassandraConfiguration(aclMaxRetry.orElse(DEFAULT_ACL_MAX_RETRY), + messageReadChunkSize.orElse(DEFAULT_MESSAGE_CHUNK_SIZE_ON_READ), + expungeChunkSize.orElse(DEFAULT_EXPUNGE_BATCH_SIZE), + flagsUpdateChunkSize.orElse(DEFAULT_UPDATE_FLAGS_BATCH_SIZE), + flagsUpdateMessageIdMaxRetry.orElse(DEFAULT_FLAGS_UPDATE_MESSAGE_ID_MAX_RETRY), + flagsUpdateMessageMaxRetry.orElse(DEFAULT_FLAGS_UPDATE_MESSAGE_MAX_RETRY), + modSeqMaxRetry.orElse(DEFAULT_MODSEQ_MAX_RETRY), + uidMaxRetry.orElse(DEFAULT_UID_MAX_RETRY), + fetchNextPageInAdvanceRow.orElse(DEFAULT_FETCH_NEXT_PAGE_ADVANCE_IN_ROW)); + } + } + + public static Builder builder() { + return new Builder(); + } + + private final int messageReadChunkSize; + private final int expungeChunkSize; + private final int flagsUpdateChunkSize; + private final int flagsUpdateMessageIdMaxRetry; + private final int flagsUpdateMessageMaxRetry; + private final int modSeqMaxRetry; + private final int uidMaxRetry; + private final int aclMaxRetry; + private final int fetchNextPageInAdvanceRow; + + @VisibleForTesting + CassandraConfiguration(int aclMaxRetry, int messageReadChunkSize, int expungeChunkSize, int flagsUpdateChunkSize, + int flagsUpdateMessageIdMaxRetry, int flagsUpdateMessageMaxRetry, int modSeqMaxRetry, + int uidMaxRetry, int fetchNextPageInAdvanceRow) { + this.aclMaxRetry = aclMaxRetry; + this.messageReadChunkSize = messageReadChunkSize; + this.expungeChunkSize = expungeChunkSize; + this.flagsUpdateMessageIdMaxRetry = flagsUpdateMessageIdMaxRetry; + this.flagsUpdateMessageMaxRetry = flagsUpdateMessageMaxRetry; + this.modSeqMaxRetry = modSeqMaxRetry; + this.uidMaxRetry = uidMaxRetry; + this.fetchNextPageInAdvanceRow = fetchNextPageInAdvanceRow; + this.flagsUpdateChunkSize = flagsUpdateChunkSize; + } + + public int getFlagsUpdateChunkSize() { + return flagsUpdateChunkSize; + } + + public int getAclMaxRetry() { + return aclMaxRetry; + } + + public int getMessageReadChunkSize() { + return messageReadChunkSize; + } + + public int getExpungeChunkSize() { + return expungeChunkSize; + } + + public int getFlagsUpdateMessageIdMaxRetry() { + return flagsUpdateMessageIdMaxRetry; + } + + public int getFlagsUpdateMessageMaxRetry() { + return flagsUpdateMessageMaxRetry; + } + + public int getModSeqMaxRetry() { + return modSeqMaxRetry; + } + + public int getUidMaxRetry() { + return uidMaxRetry; + } + + public int getFetchNextPageInAdvanceRow() { + return fetchNextPageInAdvanceRow; + } + + @Override + public final boolean equals(Object o) { + if (o instanceof CassandraConfiguration) { + CassandraConfiguration that = (CassandraConfiguration) o; + + return Objects.equals(this.aclMaxRetry, that.aclMaxRetry) + && Objects.equals(this.messageReadChunkSize, that.messageReadChunkSize) + && Objects.equals(this.expungeChunkSize, that.expungeChunkSize) + && Objects.equals(this.flagsUpdateMessageIdMaxRetry, that.flagsUpdateMessageIdMaxRetry) + && Objects.equals(this.flagsUpdateMessageMaxRetry, that.flagsUpdateMessageMaxRetry) + && Objects.equals(this.modSeqMaxRetry, that.modSeqMaxRetry) + && Objects.equals(this.uidMaxRetry, that.uidMaxRetry) + && Objects.equals(this.flagsUpdateChunkSize, that.flagsUpdateChunkSize) + && Objects.equals(this.fetchNextPageInAdvanceRow, that.fetchNextPageInAdvanceRow); + } + return false; + } + + @Override + public final int hashCode() { + return Objects.hash(aclMaxRetry, messageReadChunkSize, expungeChunkSize, flagsUpdateMessageIdMaxRetry, + flagsUpdateMessageMaxRetry, modSeqMaxRetry, uidMaxRetry, fetchNextPageInAdvanceRow, flagsUpdateChunkSize); + } + + @Override + public String toString() { + return MoreObjects.toStringHelper(this) + .add("aclMaxRetry", aclMaxRetry) + .add("messageReadChunkSize", messageReadChunkSize) + .add("expungeChunkSize", expungeChunkSize) + .add("flagsUpdateMessageIdMaxRetry", flagsUpdateMessageIdMaxRetry) + .add("flagsUpdateMessageMaxRetry", flagsUpdateMessageMaxRetry) + .add("modSeqMaxRetry", modSeqMaxRetry) + .add("fetchNextPageInAdvanceRow", fetchNextPageInAdvanceRow) + .add("flagsUpdateChunkSize", flagsUpdateChunkSize) + .add("uidMaxRetry", uidMaxRetry) + .toString(); + } +} http://git-wip-us.apache.org/repos/asf/james-project/blob/0daadc62/backends-common/cassandra/src/main/java/org/apache/james/backends/cassandra/utils/CassandraUtils.java ---------------------------------------------------------------------- diff --git a/backends-common/cassandra/src/main/java/org/apache/james/backends/cassandra/utils/CassandraUtils.java b/backends-common/cassandra/src/main/java/org/apache/james/backends/cassandra/utils/CassandraUtils.java index 1732dd2..adef1c0 100644 --- a/backends-common/cassandra/src/main/java/org/apache/james/backends/cassandra/utils/CassandraUtils.java +++ b/backends-common/cassandra/src/main/java/org/apache/james/backends/cassandra/utils/CassandraUtils.java @@ -22,22 +22,35 @@ package org.apache.james.backends.cassandra.utils; import java.util.stream.Stream; import java.util.stream.StreamSupport; +import org.apache.james.backends.cassandra.CassandraConfiguration; + import com.datastax.driver.core.ResultSet; import com.datastax.driver.core.Row; public class CassandraUtils { - private static final int FETCH_NEXT_PAGE_ADVANCE_IN_ROW = 100; + public static final CassandraUtils DEFAULT_CASSANDRA_UTILS = new CassandraUtils(CassandraConfiguration.DEFAULT_CONFIGURATION); + + private final CassandraConfiguration cassandraConfiguration; + + public CassandraUtils(CassandraConfiguration cassandraConfiguration) { + this.cassandraConfiguration = cassandraConfiguration; + } - public static Stream<Row> convertToStream(ResultSet resultSet) { + public Stream<Row> convertToStream(ResultSet resultSet) { return StreamSupport.stream(resultSet.spliterator(), true) .peek(row -> ensureFetchedNextPage(resultSet)); } - private static void ensureFetchedNextPage(ResultSet resultSet) { - if (resultSet.getAvailableWithoutFetching() == FETCH_NEXT_PAGE_ADVANCE_IN_ROW && !resultSet.isFullyFetched()) { + private void ensureFetchedNextPage(ResultSet resultSet) { + if (fetchNeeded(resultSet)) { resultSet.fetchMoreResults(); } } + private boolean fetchNeeded(ResultSet resultSet) { + return resultSet.getAvailableWithoutFetching() == cassandraConfiguration.getFetchNextPageInAdvanceRow() + && !resultSet.isFullyFetched(); + } + } http://git-wip-us.apache.org/repos/asf/james-project/blob/0daadc62/backends-common/cassandra/src/test/java/org/apache/james/backends/cassandra/CassandraConfigurationTest.java ---------------------------------------------------------------------- diff --git a/backends-common/cassandra/src/test/java/org/apache/james/backends/cassandra/CassandraConfigurationTest.java b/backends-common/cassandra/src/test/java/org/apache/james/backends/cassandra/CassandraConfigurationTest.java new file mode 100644 index 0000000..dec62ea --- /dev/null +++ b/backends-common/cassandra/src/test/java/org/apache/james/backends/cassandra/CassandraConfigurationTest.java @@ -0,0 +1,225 @@ +/**************************************************************** + * Licensed to the Apache Software Foundation (ASF) under one * + * or more contributor license agreements. See the NOTICE file * + * distributed with this work for additional information * + * regarding copyright ownership. The ASF licenses this file * + * to you under the Apache License, Version 2.0 (the * + * "License"); you may not use this file except in compliance * + * with the License. You may obtain a copy of the License at * + * * + * http://www.apache.org/licenses/LICENSE-2.0 * + * * + * Unless required by applicable law or agreed to in writing, * + * software distributed under the License is distributed on an * + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY * + * KIND, either express or implied. See the License for the * + * specific language governing permissions and limitations * + * under the License. * + ****************************************************************/ + +package org.apache.james.backends.cassandra; + +import static org.assertj.core.api.Assertions.assertThat; + +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.ExpectedException; + +import nl.jqno.equalsverifier.EqualsVerifier; + +public class CassandraConfigurationTest { + + @Rule + public ExpectedException expectedException = ExpectedException.none(); + + @Test + public void cassandraConfigurationShouldRespectBeanContract() { + EqualsVerifier.forClass(CassandraConfiguration.class).verify(); + } + + @Test + public void defaultBuilderShouldConstructDefaultConfiguration() { + assertThat(CassandraConfiguration.builder().build()) + .isEqualTo(CassandraConfiguration.DEFAULT_CONFIGURATION); + } + + @Test + public void aclMaxRetryShouldThrowOnNegativeValue() { + expectedException.expect(IllegalArgumentException.class); + + CassandraConfiguration.builder() + .aclMaxRetry(-1); + } + + @Test + public void aclMaxRetryShouldThrowOnZero() { + expectedException.expect(IllegalArgumentException.class); + + CassandraConfiguration.builder() + .aclMaxRetry(0); + } + + @Test + public void expungeChunkSizeShouldThrowOnNegativeValue() { + expectedException.expect(IllegalArgumentException.class); + + CassandraConfiguration.builder() + .expungeChunkSize(-1); + } + + @Test + public void expungeChunkSizeShouldThrowOnZero() { + expectedException.expect(IllegalArgumentException.class); + + CassandraConfiguration.builder() + .expungeChunkSize(0); + } + + @Test + public void messageReadChunkSizeShouldThrowOnNegativeValue() { + expectedException.expect(IllegalArgumentException.class); + + CassandraConfiguration.builder() + .messageReadChunkSize(-1); + } + + @Test + public void messageReadChunkSizeShouldThrowOnZero() { + expectedException.expect(IllegalArgumentException.class); + + CassandraConfiguration.builder() + .messageReadChunkSize(0); + } + + @Test + public void flagsUpdateChunkSizeShouldThrowOnNegativeValue() { + expectedException.expect(IllegalArgumentException.class); + + CassandraConfiguration.builder() + .flagsUpdateChunkSize(-1); + } + + @Test + public void flagsUpdateChunkSizeShouldThrowOnZero() { + expectedException.expect(IllegalArgumentException.class); + + CassandraConfiguration.builder() + .flagsUpdateChunkSize(0); + } + + @Test + public void flagsUpdateMessageIdMaxRetryShouldThrowOnNegativeValue() { + expectedException.expect(IllegalArgumentException.class); + + CassandraConfiguration.builder() + .flagsUpdateMessageIdMaxRetry(-1); + } + + @Test + public void flagsUpdateMessageIdMaxRetryShouldThrowOnZero() { + expectedException.expect(IllegalArgumentException.class); + + CassandraConfiguration.builder() + .flagsUpdateMessageIdMaxRetry(0); + } + + @Test + public void flagsUpdateMessageMaxRetryShouldThrowOnNegativeValue() { + expectedException.expect(IllegalArgumentException.class); + + CassandraConfiguration.builder() + .flagsUpdateMessageMaxRetry(-1); + } + + @Test + public void flagsUpdateMessageMaxRetryShouldThrowOnZero() { + expectedException.expect(IllegalArgumentException.class); + + CassandraConfiguration.builder() + .flagsUpdateMessageMaxRetry(0); + } + + @Test + public void fetchNextPageInAdvanceRowShouldThrowOnNegativeValue() { + expectedException.expect(IllegalArgumentException.class); + + CassandraConfiguration.builder() + .fetchNextPageInAdvanceRow(-1); + } + + @Test + public void fetchNextPageInAdvanceRowShouldThrowOnZero() { + expectedException.expect(IllegalArgumentException.class); + + CassandraConfiguration.builder() + .fetchNextPageInAdvanceRow(0); + } + + @Test + public void modSeqMaxRetryShouldThrowOnNegativeValue() { + expectedException.expect(IllegalArgumentException.class); + + CassandraConfiguration.builder() + .modSeqMaxRetry(-1); + } + + @Test + public void modSeqMaxRetryShouldThrowOnZero() { + expectedException.expect(IllegalArgumentException.class); + + CassandraConfiguration.builder() + .modSeqMaxRetry(0); + } + + @Test + public void uidMaxRetryShouldThrowOnNegativeValue() { + expectedException.expect(IllegalArgumentException.class); + + CassandraConfiguration.builder() + .uidMaxRetry(-1); + } + + @Test + public void uidMaxRetryShouldThrowOnZero() { + expectedException.expect(IllegalArgumentException.class); + + CassandraConfiguration.builder() + .uidMaxRetry(0); + } + + @Test + public void builderShouldCreateTheRightObject() { + int aclMaxRetry = 1; + int modSeqMaxRetry = 2; + int uidMaxRetry = 3; + int fetchNextPageInAdvanceRow = 4; + int flagsUpdateMessageMaxRetry = 5; + int flagsUpdateMessageIdMaxRetry = 6; + int flagsUpdateChunkSize = 7; + int messageReadChunkSize = 8; + int expungeChunkSize = 9; + + CassandraConfiguration configuration = CassandraConfiguration.builder() + .aclMaxRetry(aclMaxRetry) + .modSeqMaxRetry(modSeqMaxRetry) + .uidMaxRetry(uidMaxRetry) + .fetchNextPageInAdvanceRow(fetchNextPageInAdvanceRow) + .flagsUpdateMessageMaxRetry(flagsUpdateMessageMaxRetry) + .flagsUpdateMessageIdMaxRetry(flagsUpdateMessageIdMaxRetry) + .flagsUpdateChunkSize(flagsUpdateChunkSize) + .messageReadChunkSize(messageReadChunkSize) + .expungeChunkSize(expungeChunkSize) + .build(); + + assertThat(configuration.getAclMaxRetry()).isEqualTo(aclMaxRetry); + assertThat(configuration.getModSeqMaxRetry()).isEqualTo(modSeqMaxRetry); + assertThat(configuration.getUidMaxRetry()).isEqualTo(uidMaxRetry); + assertThat(configuration.getFetchNextPageInAdvanceRow()).isEqualTo(fetchNextPageInAdvanceRow); + assertThat(configuration.getFlagsUpdateMessageMaxRetry()).isEqualTo(flagsUpdateMessageMaxRetry); + assertThat(configuration.getFlagsUpdateMessageIdMaxRetry()).isEqualTo(flagsUpdateMessageIdMaxRetry); + assertThat(configuration.getFlagsUpdateChunkSize()).isEqualTo(flagsUpdateChunkSize); + assertThat(configuration.getMessageReadChunkSize()).isEqualTo(messageReadChunkSize); + assertThat(configuration.getExpungeChunkSize()).isEqualTo(expungeChunkSize); + } + +} http://git-wip-us.apache.org/repos/asf/james-project/blob/0daadc62/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/CassandraMailboxSessionMapperFactory.java ---------------------------------------------------------------------- diff --git a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/CassandraMailboxSessionMapperFactory.java b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/CassandraMailboxSessionMapperFactory.java index 36f536a..dc0d622 100644 --- a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/CassandraMailboxSessionMapperFactory.java +++ b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/CassandraMailboxSessionMapperFactory.java @@ -20,8 +20,9 @@ package org.apache.james.mailbox.cassandra; import javax.inject.Inject; -import javax.inject.Named; +import org.apache.james.backends.cassandra.CassandraConfiguration; +import org.apache.james.backends.cassandra.utils.CassandraUtils; import org.apache.james.mailbox.MailboxSession; import org.apache.james.mailbox.cassandra.mail.CassandraAnnotationMapper; import org.apache.james.mailbox.cassandra.mail.CassandraApplicableFlagDAO; @@ -59,8 +60,6 @@ import com.datastax.driver.core.Session; * */ public class CassandraMailboxSessionMapperFactory extends MailboxSessionMapperFactory { - public static final Integer DEFAULT_MAX_RETRY = 1000; - private final Session session; private final CassandraUidProvider uidProvider; private final CassandraModSeqProvider modSeqProvider; @@ -74,15 +73,16 @@ public class CassandraMailboxSessionMapperFactory extends MailboxSessionMapperFa private final CassandraMailboxPathDAO mailboxPathDAO; private final CassandraFirstUnseenDAO firstUnseenDAO; private final CassandraApplicableFlagDAO applicableFlagDAO; + private CassandraUtils cassandraUtils; + private CassandraConfiguration cassandraConfiguration; private final CassandraDeletedMessageDAO deletedMessageDAO; - private int maxRetry; @Inject public CassandraMailboxSessionMapperFactory(CassandraUidProvider uidProvider, CassandraModSeqProvider modSeqProvider, Session session, CassandraMessageDAO messageDAO, CassandraMessageIdDAO messageIdDAO, CassandraMessageIdToImapUidDAO imapUidDAO, CassandraMailboxCounterDAO mailboxCounterDAO, CassandraMailboxRecentsDAO mailboxRecentsDAO, CassandraMailboxDAO mailboxDAO, CassandraMailboxPathDAO mailboxPathDAO, CassandraFirstUnseenDAO firstUnseenDAO, CassandraApplicableFlagDAO applicableFlagDAO, - CassandraDeletedMessageDAO deletedMessageDAO, @Named(CassandraMailboxDAO.MAX_ACL_RETRY) Integer maxRetry) { + CassandraDeletedMessageDAO deletedMessageDAO, CassandraUtils cassandraUtils, CassandraConfiguration cassandraConfiguration) { this.uidProvider = uidProvider; this.modSeqProvider = modSeqProvider; this.session = session; @@ -96,13 +96,14 @@ public class CassandraMailboxSessionMapperFactory extends MailboxSessionMapperFa this.firstUnseenDAO = firstUnseenDAO; this.deletedMessageDAO = deletedMessageDAO; this.applicableFlagDAO = applicableFlagDAO; + this.cassandraUtils = cassandraUtils; + this.cassandraConfiguration = cassandraConfiguration; this.indexTableHandler = new CassandraIndexTableHandler( mailboxRecentsDAO, mailboxCounterDAO, firstUnseenDAO, applicableFlagDAO, deletedMessageDAO); - this.maxRetry = maxRetry; } public CassandraMailboxSessionMapperFactory( @@ -121,7 +122,8 @@ public class CassandraMailboxSessionMapperFactory extends MailboxSessionMapperFa CassandraDeletedMessageDAO deletedMesageDAO) { this(uidProvider, modSeqProvider, session, messageDAO, messageIdDAO, imapUidDAO, mailboxCounterDAO, - mailboxRecentsDAO, mailboxDAO, mailboxPathDAO, firstUnseenDAO, applicableFlagDAO, deletedMesageDAO, DEFAULT_MAX_RETRY); + mailboxRecentsDAO, mailboxDAO, mailboxPathDAO, firstUnseenDAO, applicableFlagDAO, deletedMesageDAO, + CassandraUtils.DEFAULT_CASSANDRA_UTILS, CassandraConfiguration.DEFAULT_CONFIGURATION); } @Override @@ -130,7 +132,6 @@ public class CassandraMailboxSessionMapperFactory extends MailboxSessionMapperFa uidProvider, modSeqProvider, null, - maxRetry, (CassandraAttachmentMapper) createAttachmentMapper(mailboxSession), messageDAO, messageIdDAO, @@ -140,19 +141,20 @@ public class CassandraMailboxSessionMapperFactory extends MailboxSessionMapperFa applicableFlagDAO, indexTableHandler, firstUnseenDAO, - deletedMessageDAO); + deletedMessageDAO, + cassandraConfiguration); } @Override public MessageIdMapper createMessageIdMapper(MailboxSession mailboxSession) throws MailboxException { return new CassandraMessageIdMapper(getMailboxMapper(mailboxSession), mailboxDAO, (CassandraAttachmentMapper) getAttachmentMapper(mailboxSession), - imapUidDAO, messageIdDAO, messageDAO, indexTableHandler, modSeqProvider, mailboxSession); + imapUidDAO, messageIdDAO, messageDAO, indexTableHandler, modSeqProvider, mailboxSession, cassandraConfiguration); } @Override public MailboxMapper createMailboxMapper(MailboxSession mailboxSession) { - return new CassandraMailboxMapper(session, mailboxDAO, mailboxPathDAO, maxRetry); + return new CassandraMailboxMapper(session, mailboxDAO, mailboxPathDAO, cassandraConfiguration); } @Override @@ -162,7 +164,7 @@ public class CassandraMailboxSessionMapperFactory extends MailboxSessionMapperFa @Override public SubscriptionMapper createSubscriptionMapper(MailboxSession mailboxSession) { - return new CassandraSubscriptionMapper(session); + return new CassandraSubscriptionMapper(session, cassandraUtils); } public ModSeqProvider getModSeqProvider() { @@ -180,6 +182,6 @@ public class CassandraMailboxSessionMapperFactory extends MailboxSessionMapperFa @Override public AnnotationMapper createAnnotationMapper(MailboxSession mailboxSession) throws MailboxException { - return new CassandraAnnotationMapper(session); + return new CassandraAnnotationMapper(session, cassandraUtils); } } http://git-wip-us.apache.org/repos/asf/james-project/blob/0daadc62/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/event/distributed/CassandraMailboxPathRegisterMapper.java ---------------------------------------------------------------------- diff --git a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/event/distributed/CassandraMailboxPathRegisterMapper.java b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/event/distributed/CassandraMailboxPathRegisterMapper.java index fd648f5..d1b83ef 100644 --- a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/event/distributed/CassandraMailboxPathRegisterMapper.java +++ b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/event/distributed/CassandraMailboxPathRegisterMapper.java @@ -44,11 +44,12 @@ public class CassandraMailboxPathRegisterMapper implements DistantMailboxPathReg private final Session session; private final CassandraTypesProvider typesProvider; private final int cassandraTimeOutInS; + private final CassandraUtils cassandraUtils; private final PreparedStatement insertStatement; private final PreparedStatement deleteStatement; private final PreparedStatement selectStatement; - public CassandraMailboxPathRegisterMapper(Session session, CassandraTypesProvider typesProvider, int cassandraTimeOutInS) { + public CassandraMailboxPathRegisterMapper(Session session, CassandraTypesProvider typesProvider, CassandraUtils cassandraUtils, int cassandraTimeOutInS) { this.session = session; this.typesProvider = typesProvider; this.cassandraTimeOutInS = cassandraTimeOutInS; @@ -61,11 +62,12 @@ public class CassandraMailboxPathRegisterMapper implements DistantMailboxPathReg .and(eq(CassandraMailboxPathRegisterTable.TOPIC, bindMarker()))); this.selectStatement = session.prepare(select().from(CassandraMailboxPathRegisterTable.TABLE_NAME) .where(eq(CassandraMailboxPathRegisterTable.MAILBOX_PATH, bindMarker()))); + this.cassandraUtils = cassandraUtils; } @Override public Set<Topic> getTopics(MailboxPath mailboxPath) { - return CassandraUtils.convertToStream(session.execute(selectStatement.bind(buildUDTFromMailboxPath(mailboxPath)))) + return cassandraUtils.convertToStream(session.execute(selectStatement.bind(buildUDTFromMailboxPath(mailboxPath)))) .map(row -> new Topic(row.getString(CassandraMailboxPathRegisterTable.TOPIC))) .collect(Collectors.toSet()); } http://git-wip-us.apache.org/repos/asf/james-project/blob/0daadc62/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraACLMapper.java ---------------------------------------------------------------------- diff --git a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraACLMapper.java b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraACLMapper.java index 68698ca..7e1626d 100644 --- a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraACLMapper.java +++ b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraACLMapper.java @@ -29,6 +29,7 @@ import java.io.IOException; import java.util.Optional; import java.util.concurrent.CompletableFuture; +import org.apache.james.backends.cassandra.CassandraConfiguration; import org.apache.james.backends.cassandra.utils.CassandraAsyncExecutor; import org.apache.james.backends.cassandra.utils.CassandraConstants; import org.apache.james.backends.cassandra.utils.FunctionRunnerWithRetry; @@ -66,17 +67,16 @@ public class CassandraACLMapper { private static final Logger LOG = LoggerFactory.getLogger(CassandraACLMapper.class); - public CassandraACLMapper(CassandraId cassandraId, Session session, CassandraAsyncExecutor cassandraAsyncExecutor, int maxRetry) { - this(cassandraId, session, cassandraAsyncExecutor, maxRetry, () -> {}); + public CassandraACLMapper(CassandraId cassandraId, Session session, CassandraAsyncExecutor cassandraAsyncExecutor, CassandraConfiguration cassandraConfiguration) { + this(cassandraId, session, cassandraAsyncExecutor, cassandraConfiguration, () -> {}); } - public CassandraACLMapper(CassandraId cassandraId, Session session, CassandraAsyncExecutor cassandraAsyncExecutor, int maxRetry, CodeInjector codeInjector) { - Preconditions.checkArgument(maxRetry > 0); + public CassandraACLMapper(CassandraId cassandraId, Session session, CassandraAsyncExecutor cassandraAsyncExecutor, CassandraConfiguration cassandraConfiguration, CodeInjector codeInjector) { Preconditions.checkArgument(cassandraId != null); this.cassandraId = cassandraId; this.session = session; this.executor = cassandraAsyncExecutor; - this.maxRetry = maxRetry; + this.maxRetry = cassandraConfiguration.getAclMaxRetry(); this.codeInjector = codeInjector; } http://git-wip-us.apache.org/repos/asf/james-project/blob/0daadc62/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraAnnotationMapper.java ---------------------------------------------------------------------- diff --git a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraAnnotationMapper.java b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraAnnotationMapper.java index bf8da4f..073227b 100644 --- a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraAnnotationMapper.java +++ b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraAnnotationMapper.java @@ -28,13 +28,12 @@ import static com.datastax.driver.core.querybuilder.QueryBuilder.lte; import static com.datastax.driver.core.querybuilder.QueryBuilder.select; import java.util.List; +import java.util.Optional; import java.util.Set; import java.util.stream.Collectors; import java.util.stream.Stream; -import com.github.steveash.guavate.Guavate; -import com.google.common.base.Ascii; -import com.google.common.base.Optional; +import org.apache.james.backends.cassandra.CassandraConfiguration; import org.apache.james.backends.cassandra.utils.CassandraUtils; import org.apache.james.mailbox.cassandra.CassandraId; import org.apache.james.mailbox.cassandra.table.CassandraAnnotationTable; @@ -47,26 +46,30 @@ import org.apache.james.mailbox.store.transaction.NonTransactionalMapper; import com.datastax.driver.core.Row; import com.datastax.driver.core.Session; import com.datastax.driver.core.querybuilder.Select; +import com.github.steveash.guavate.Guavate; +import com.google.common.base.Ascii; import com.google.common.base.Preconditions; public class CassandraAnnotationMapper extends NonTransactionalMapper implements AnnotationMapper { private final Session session; + private final CassandraUtils cassandraUtils; - public CassandraAnnotationMapper(Session session) { + public CassandraAnnotationMapper(Session session, CassandraUtils cassandraUtils) { this.session = session; + this.cassandraUtils = cassandraUtils; } public List<MailboxAnnotation> getAllAnnotations(MailboxId mailboxId) { CassandraId cassandraId = (CassandraId)mailboxId; - return CassandraUtils.convertToStream(session.execute(getStoredAnnotationsQuery(cassandraId))) + return cassandraUtils.convertToStream(session.execute(getStoredAnnotationsQuery(cassandraId))) .map(this::toAnnotation) .collect(Collectors.toList()); } public List<MailboxAnnotation> getAnnotationsByKeys(MailboxId mailboxId, Set<MailboxAnnotationKey> keys) { CassandraId cassandraId = (CassandraId)mailboxId; - return CassandraUtils.convertToStream(session.execute(getStoredAnnotationsQueryForKeys(cassandraId, keys))) + return cassandraUtils.convertToStream(session.execute(getStoredAnnotationsQueryForKeys(cassandraId, keys))) .map(this::toAnnotation) .collect(Collectors.toList()); } @@ -102,8 +105,11 @@ public class CassandraAnnotationMapper extends NonTransactionalMapper implements @Override public boolean exist(MailboxId mailboxId, MailboxAnnotation mailboxAnnotation) { CassandraId cassandraId = (CassandraId)mailboxId; - Optional<Row> row = Optional.fromNullable(session.execute(getStoredAnnotationsQueryByKey(cassandraId, mailboxAnnotation.getKey().asString())) - .one()); + Optional<Row> row = Optional.ofNullable( + session.execute( + getStoredAnnotationsQueryByKey(cassandraId, + mailboxAnnotation.getKey().asString())) + .one()); return row.isPresent(); } @@ -146,13 +152,13 @@ public class CassandraAnnotationMapper extends NonTransactionalMapper implements } private Stream<MailboxAnnotation> getAnnotationsByKeyWithAllDepth(CassandraId mailboxId, MailboxAnnotationKey key) { - return CassandraUtils.convertToStream(session.execute(getStoredAnnotationsQueryLikeKey(mailboxId, key.asString()))) + return cassandraUtils.convertToStream(session.execute(getStoredAnnotationsQueryLikeKey(mailboxId, key.asString()))) .map(this::toAnnotation) .filter(annotation -> key.isAncestorOrIsEqual(annotation.getKey())); } private Stream<MailboxAnnotation> getAnnotationsByKeyWithOneDepth(CassandraId mailboxId, MailboxAnnotationKey key) { - return CassandraUtils.convertToStream(session.execute(getStoredAnnotationsQueryLikeKey(mailboxId, key.asString()))) + return cassandraUtils.convertToStream(session.execute(getStoredAnnotationsQueryLikeKey(mailboxId, key.asString()))) .map(this::toAnnotation) .filter(annotation -> key.isParentOrIsEqual(annotation.getKey())); } http://git-wip-us.apache.org/repos/asf/james-project/blob/0daadc62/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraDeletedMessageDAO.java ---------------------------------------------------------------------- diff --git a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraDeletedMessageDAO.java b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraDeletedMessageDAO.java index 10dabc6..c86d1e0 100644 --- a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraDeletedMessageDAO.java +++ b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraDeletedMessageDAO.java @@ -43,6 +43,8 @@ import org.apache.james.mailbox.cassandra.CassandraId; import com.datastax.driver.core.PreparedStatement; import com.datastax.driver.core.Session; +import com.google.common.annotations.VisibleForTesting; + import org.apache.james.mailbox.model.MessageRange; public class CassandraDeletedMessageDAO { @@ -57,9 +59,10 @@ public class CassandraDeletedMessageDAO { private final PreparedStatement selectOneUidStatement; private final PreparedStatement selectBetweenUidStatement; private final PreparedStatement selectFromUidStatement; + private final CassandraUtils cassandraUtils; @Inject - public CassandraDeletedMessageDAO(Session session) { + public CassandraDeletedMessageDAO(Session session, CassandraUtils cassandraUtils) { this.cassandraAsyncExecutor = new CassandraAsyncExecutor(session); this.addStatement = prepareAddStatement(session); this.deleteStatement = prepareDeleteStatement(session); @@ -67,6 +70,12 @@ public class CassandraDeletedMessageDAO { this.selectOneUidStatement = prepareOneUidStatement(session); this.selectBetweenUidStatement = prepareBetweenUidStatement(session); this.selectFromUidStatement = prepareFromUidStatement(session); + this.cassandraUtils = cassandraUtils; + } + + @VisibleForTesting + public CassandraDeletedMessageDAO(Session session) { + this(session, CassandraUtils.DEFAULT_CASSANDRA_UTILS); } private PreparedStatement prepareAllUidStatement(Session session) { @@ -125,7 +134,7 @@ public class CassandraDeletedMessageDAO { public CompletableFuture<Stream<MessageUid>> retrieveDeletedMessage(CassandraId cassandraId, MessageRange range) { return retrieveResultSetOfDeletedMessage(cassandraId, range) - .thenApply(CassandraDeletedMessageDAO::resultSetToStream); + .thenApply(this::resultSetToStream); } private CompletableFuture<ResultSet> retrieveResultSetOfDeletedMessage(CassandraId cassandraId, MessageRange range) { @@ -143,8 +152,8 @@ public class CassandraDeletedMessageDAO { throw new UnsupportedOperationException(); } - private static Stream<MessageUid> resultSetToStream(ResultSet resultSet) { - return CassandraUtils.convertToStream(resultSet) + private Stream<MessageUid> resultSetToStream(ResultSet resultSet) { + return cassandraUtils.convertToStream(resultSet) .map(row -> MessageUid.of(row.getLong(UID))); } http://git-wip-us.apache.org/repos/asf/james-project/blob/0daadc62/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMailboxDAO.java ---------------------------------------------------------------------- diff --git a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMailboxDAO.java b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMailboxDAO.java index 3f6f86c..ca6cfd7 100644 --- a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMailboxDAO.java +++ b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMailboxDAO.java @@ -39,6 +39,7 @@ import java.util.stream.Stream; import javax.inject.Inject; import javax.inject.Named; +import org.apache.james.backends.cassandra.CassandraConfiguration; import org.apache.james.backends.cassandra.init.CassandraTypesProvider; import org.apache.james.backends.cassandra.utils.CassandraAsyncExecutor; import org.apache.james.backends.cassandra.utils.CassandraUtils; @@ -55,14 +56,15 @@ import com.datastax.driver.core.PreparedStatement; import com.datastax.driver.core.Row; import com.datastax.driver.core.Session; import com.datastax.driver.core.querybuilder.QueryBuilder; +import com.google.common.annotations.VisibleForTesting; public class CassandraMailboxDAO { - public static final String MAX_ACL_RETRY = "maxAclRetry"; private final CassandraAsyncExecutor executor; private final MailboxBaseTupleUtil mailboxBaseTupleUtil; private final Session session; - private final int maxAclRetry; + private final CassandraUtils cassandraUtils; + private CassandraConfiguration cassandraConfiguration; private final PreparedStatement readStatement; private final PreparedStatement listStatement; private final PreparedStatement deleteStatement; @@ -70,7 +72,7 @@ public class CassandraMailboxDAO { private final PreparedStatement updateStatement; @Inject - public CassandraMailboxDAO(Session session, CassandraTypesProvider typesProvider, @Named(MAX_ACL_RETRY) Integer maxAclRetry) { + public CassandraMailboxDAO(Session session, CassandraTypesProvider typesProvider, CassandraUtils cassandraUtils, CassandraConfiguration cassandraConfiguration) { this.executor = new CassandraAsyncExecutor(session); this.mailboxBaseTupleUtil = new MailboxBaseTupleUtil(typesProvider); this.session = session; @@ -79,7 +81,13 @@ public class CassandraMailboxDAO { this.deleteStatement = prepareDelete(session); this.listStatement = prepareList(session); this.readStatement = prepareRead(session); - this.maxAclRetry = maxAclRetry; + this.cassandraUtils = cassandraUtils; + this.cassandraConfiguration = cassandraConfiguration; + } + + @VisibleForTesting + public CassandraMailboxDAO(Session session, CassandraTypesProvider typesProvider) { + this(session, typesProvider, CassandraUtils.DEFAULT_CASSANDRA_UTILS, CassandraConfiguration.DEFAULT_CONFIGURATION); } private PreparedStatement prepareInsert(Session session) { @@ -140,7 +148,7 @@ public class CassandraMailboxDAO { } private CompletableFuture<Optional<SimpleMailbox>> mailbox(CassandraId cassandraId, CompletableFuture<Optional<Row>> rowFuture) { - CompletableFuture<MailboxACL> aclCompletableFuture = new CassandraACLMapper(cassandraId, session, executor, maxAclRetry).getACL(); + CompletableFuture<MailboxACL> aclCompletableFuture = new CassandraACLMapper(cassandraId, session, executor, cassandraConfiguration).getACL(); return rowFuture.thenApply(rowOptional -> rowOptional.map(this::mailboxFromRow)) .thenApply(mailboxOptional -> { mailboxOptional.ifPresent(mailbox -> mailbox.setMailboxId(cassandraId)); @@ -163,7 +171,7 @@ public class CassandraMailboxDAO { public CompletableFuture<Stream<SimpleMailbox>> retrieveAllMailboxes() { return executor.execute(listStatement.bind()) - .thenApply(CassandraUtils::convertToStream) + .thenApply(cassandraUtils::convertToStream) .thenApply(stream -> stream.map(this::toMailboxWithId)) .thenCompose(stream -> CompletableFutureUtil.allOf(stream.map(this::toMailboxWithAclFuture))); } @@ -176,7 +184,7 @@ public class CassandraMailboxDAO { private CompletableFuture<SimpleMailbox> toMailboxWithAclFuture(SimpleMailbox mailbox) { CassandraId cassandraId = (CassandraId) mailbox.getMailboxId(); - return new CassandraACLMapper(cassandraId, session, executor, maxAclRetry).getACL() + return new CassandraACLMapper(cassandraId, session, executor, cassandraConfiguration).getACL() .thenApply(acl -> { mailbox.setACL(acl); return mailbox; http://git-wip-us.apache.org/repos/asf/james-project/blob/0daadc62/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMailboxMapper.java ---------------------------------------------------------------------- diff --git a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMailboxMapper.java b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMailboxMapper.java index 6c0d484..ef0362a 100644 --- a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMailboxMapper.java +++ b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMailboxMapper.java @@ -29,6 +29,7 @@ import java.util.regex.Pattern; import java.util.stream.Collectors; import org.apache.commons.lang3.StringUtils; +import org.apache.james.backends.cassandra.CassandraConfiguration; import org.apache.james.backends.cassandra.utils.CassandraAsyncExecutor; import org.apache.james.mailbox.cassandra.CassandraId; import org.apache.james.mailbox.exception.MailboxException; @@ -55,18 +56,18 @@ public class CassandraMailboxMapper implements MailboxMapper { public static final String VALUES_MAY_NOT_BE_LARGER_THAN_64_K = "Index expression values may not be larger than 64K"; public static final String CLUSTERING_COLUMNS_IS_TOO_LONG = "The sum of all clustering columns is too long"; - private final int maxRetry; private final CassandraAsyncExecutor cassandraAsyncExecutor; private final CassandraMailboxPathDAO mailboxPathDAO; private final CassandraMailboxDAO mailboxDAO; private final Session session; + private final CassandraConfiguration cassandraConfiguration; - public CassandraMailboxMapper(Session session, CassandraMailboxDAO mailboxDAO, CassandraMailboxPathDAO mailboxPathDAO, int maxRetry) { - this.maxRetry = maxRetry; + public CassandraMailboxMapper(Session session, CassandraMailboxDAO mailboxDAO, CassandraMailboxPathDAO mailboxPathDAO, CassandraConfiguration cassandraConfiguration) { this.cassandraAsyncExecutor = new CassandraAsyncExecutor(session); this.mailboxDAO = mailboxDAO; this.mailboxPathDAO = mailboxPathDAO; this.session = session; + this.cassandraConfiguration = cassandraConfiguration; } @Override @@ -193,7 +194,7 @@ public class CassandraMailboxMapper implements MailboxMapper { @Override public void updateACL(Mailbox mailbox, MailboxACL.MailboxACLCommand mailboxACLCommand) throws MailboxException { CassandraId cassandraId = (CassandraId) mailbox.getMailboxId(); - new CassandraACLMapper(cassandraId, session, cassandraAsyncExecutor, maxRetry).updateACL(mailboxACLCommand); + new CassandraACLMapper(cassandraId, session, cassandraAsyncExecutor, cassandraConfiguration).updateACL(mailboxACLCommand); } @Override http://git-wip-us.apache.org/repos/asf/james-project/blob/0daadc62/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMailboxPathDAO.java ---------------------------------------------------------------------- diff --git a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMailboxPathDAO.java b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMailboxPathDAO.java index 552e010..15d4a94 100644 --- a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMailboxPathDAO.java +++ b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMailboxPathDAO.java @@ -47,6 +47,7 @@ import com.datastax.driver.core.PreparedStatement; import com.datastax.driver.core.Row; import com.datastax.driver.core.Session; import com.datastax.driver.core.querybuilder.QueryBuilder; +import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Objects; public class CassandraMailboxPathDAO { @@ -87,21 +88,28 @@ public class CassandraMailboxPathDAO { private final CassandraAsyncExecutor cassandraAsyncExecutor; private final MailboxBaseTupleUtil mailboxBaseTupleUtil; + private final CassandraUtils cassandraUtils; private final PreparedStatement delete; private final PreparedStatement insert; private final PreparedStatement select; private final PreparedStatement selectAll; @Inject - public CassandraMailboxPathDAO(Session session, CassandraTypesProvider typesProvider) { + public CassandraMailboxPathDAO(Session session, CassandraTypesProvider typesProvider, CassandraUtils cassandraUtils) { this.cassandraAsyncExecutor = new CassandraAsyncExecutor(session); this.mailboxBaseTupleUtil = new MailboxBaseTupleUtil(typesProvider); + this.cassandraUtils = cassandraUtils; this.insert = prepareInsert(session); this.delete = prepareDelete(session); this.select = prepareSelect(session); this.selectAll = prepareSelectAll(session); } + @VisibleForTesting + public CassandraMailboxPathDAO(Session session, CassandraTypesProvider typesProvider) { + this(session, typesProvider, CassandraUtils.DEFAULT_CASSANDRA_UTILS); + } + private PreparedStatement prepareDelete(Session session) { return session.prepare(QueryBuilder.delete() .from(TABLE_NAME) @@ -145,7 +153,7 @@ public class CassandraMailboxPathDAO { return cassandraAsyncExecutor.execute( selectAll.bind() .setUDTValue(NAMESPACE_AND_USER, mailboxBaseTupleUtil.createMailboxBaseUDT(namespace, user))) - .thenApply(resultSet -> CassandraUtils.convertToStream(resultSet).map(this::fromRowToCassandraIdAndPath)); + .thenApply(resultSet -> cassandraUtils.convertToStream(resultSet).map(this::fromRowToCassandraIdAndPath)); } private CassandraIdAndPath fromRowToCassandraIdAndPath(Row row) { http://git-wip-us.apache.org/repos/asf/james-project/blob/0daadc62/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMailboxRecentsDAO.java ---------------------------------------------------------------------- diff --git a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMailboxRecentsDAO.java b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMailboxRecentsDAO.java index f4eddf9..f3264f4 100644 --- a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMailboxRecentsDAO.java +++ b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMailboxRecentsDAO.java @@ -39,6 +39,7 @@ import org.apache.james.mailbox.cassandra.table.CassandraMailboxRecentsTable; import com.datastax.driver.core.BoundStatement; import com.datastax.driver.core.PreparedStatement; import com.datastax.driver.core.Session; +import com.google.common.annotations.VisibleForTesting; public class CassandraMailboxRecentsDAO { @@ -46,13 +47,20 @@ public class CassandraMailboxRecentsDAO { private final PreparedStatement readStatement; private final PreparedStatement deleteStatement; private final PreparedStatement addStatement; + private CassandraUtils cassandraUtils; @Inject - public CassandraMailboxRecentsDAO(Session session) { + public CassandraMailboxRecentsDAO(Session session, CassandraUtils cassandraUtils) { cassandraAsyncExecutor = new CassandraAsyncExecutor(session); readStatement = createReadStatement(session); deleteStatement = createDeleteStatement(session); addStatement = createAddStatement(session); + this.cassandraUtils = cassandraUtils; + } + + @VisibleForTesting + public CassandraMailboxRecentsDAO(Session session) { + this(session, CassandraUtils.DEFAULT_CASSANDRA_UTILS); } private PreparedStatement createReadStatement(Session session) { @@ -79,7 +87,7 @@ public class CassandraMailboxRecentsDAO { public CompletableFuture<Stream<MessageUid>> getRecentMessageUidsInMailbox(CassandraId mailboxId) { return cassandraAsyncExecutor.execute(bindWithMailbox(mailboxId, readStatement)) - .thenApply(CassandraUtils::convertToStream) + .thenApply(cassandraUtils::convertToStream) .thenApply(stream -> stream.map(row -> row.getLong(CassandraMailboxRecentsTable.RECENT_MESSAGE_UID))) .thenApply(stream -> stream.map(MessageUid::of)); } http://git-wip-us.apache.org/repos/asf/james-project/blob/0daadc62/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageDAO.java ---------------------------------------------------------------------- diff --git a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageDAO.java b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageDAO.java index b5101c3..9dd3d21 100644 --- a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageDAO.java +++ b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageDAO.java @@ -55,6 +55,7 @@ import javax.mail.Flags; import javax.mail.util.SharedByteArrayInputStream; import org.apache.commons.lang3.tuple.Pair; +import org.apache.james.backends.cassandra.CassandraConfiguration; import org.apache.james.backends.cassandra.init.CassandraTypesProvider; import org.apache.james.backends.cassandra.utils.CassandraAsyncExecutor; import org.apache.james.mailbox.MessageUid; @@ -93,8 +94,6 @@ import com.google.common.io.ByteStreams; import com.google.common.primitives.Bytes; public class CassandraMessageDAO { - - public static final int CHUNK_SIZE_ON_READ = 100; private final CassandraAsyncExecutor cassandraAsyncExecutor; private final CassandraTypesProvider typesProvider; private final PreparedStatement insert; @@ -103,9 +102,10 @@ public class CassandraMessageDAO { private final PreparedStatement selectHeaders; private final PreparedStatement selectFields; private final PreparedStatement selectBody; + private final CassandraConfiguration cassandraConfiguration; @Inject - public CassandraMessageDAO(Session session, CassandraTypesProvider typesProvider) { + public CassandraMessageDAO(Session session, CassandraTypesProvider typesProvider, CassandraConfiguration cassandraConfiguration) { this.cassandraAsyncExecutor = new CassandraAsyncExecutor(session); this.typesProvider = typesProvider; this.insert = prepareInsert(session); @@ -114,6 +114,12 @@ public class CassandraMessageDAO { this.selectHeaders = prepareSelect(session, HEADERS); this.selectFields = prepareSelect(session, FIELDS); this.selectBody = prepareSelect(session, BODY); + this.cassandraConfiguration = cassandraConfiguration; + } + + @VisibleForTesting + public CassandraMessageDAO(Session session, CassandraTypesProvider typesProvider) { + this(session, typesProvider, CassandraConfiguration.DEFAULT_CONFIGURATION); } private PreparedStatement prepareSelect(Session session, String[] fields) { @@ -193,7 +199,7 @@ public class CassandraMessageDAO { public CompletableFuture<Stream<Pair<MessageWithoutAttachment, Stream<MessageAttachmentRepresentation>>>> retrieveMessages(List<ComposedMessageIdWithMetaData> messageIds, FetchType fetchType, Optional<Integer> limit) { return CompletableFutureUtil.chainAll( getLimitedIdStream(messageIds.stream().distinct(), limit) - .collect(JamesCollectors.chunker(CHUNK_SIZE_ON_READ)), + .collect(JamesCollectors.chunker(cassandraConfiguration.getMessageReadChunkSize())), ids -> FluentFutureStream.of( ids.stream() .map(id -> retrieveRow(id, fetchType) http://git-wip-us.apache.org/repos/asf/james-project/blob/0daadc62/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageIdDAO.java ---------------------------------------------------------------------- diff --git a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageIdDAO.java b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageIdDAO.java index e7d5a34..6f9c265 100644 --- a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageIdDAO.java +++ b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageIdDAO.java @@ -65,6 +65,7 @@ import com.datastax.driver.core.ResultSet; import com.datastax.driver.core.Row; import com.datastax.driver.core.Session; import com.datastax.driver.core.querybuilder.QueryBuilder; +import com.google.common.annotations.VisibleForTesting; import com.google.common.collect.ImmutableSet; public class CassandraMessageIdDAO { @@ -80,10 +81,11 @@ public class CassandraMessageIdDAO { private final PreparedStatement selectAllUids; private final PreparedStatement selectUidGte; private final PreparedStatement selectUidRange; + private CassandraUtils cassandraUtils; private final PreparedStatement update; @Inject - public CassandraMessageIdDAO(Session session, CassandraMessageId.Factory messageIdFactory) { + public CassandraMessageIdDAO(Session session, CassandraMessageId.Factory messageIdFactory, CassandraUtils cassandraUtils) { this.cassandraAsyncExecutor = new CassandraAsyncExecutor(session); this.messageIdFactory = messageIdFactory; this.delete = prepareDelete(session); @@ -93,6 +95,12 @@ public class CassandraMessageIdDAO { this.selectAllUids = prepareSelectAllUids(session); this.selectUidGte = prepareSelectUidGte(session); this.selectUidRange = prepareSelectUidRange(session); + this.cassandraUtils = cassandraUtils; + } + + @VisibleForTesting + public CassandraMessageIdDAO(Session session, CassandraMessageId.Factory messageIdFactory) { + this(session, messageIdFactory, CassandraUtils.DEFAULT_CASSANDRA_UTILS); } private PreparedStatement prepareDelete(Session session) { @@ -253,7 +261,7 @@ public class CassandraMessageIdDAO { private CompletableFuture<Stream<ComposedMessageIdWithMetaData>> toMessageIds(CompletableFuture<ResultSet> completableFuture) { return completableFuture - .thenApply(resultSet -> CassandraUtils.convertToStream(resultSet) + .thenApply(resultSet -> cassandraUtils.convertToStream(resultSet) .map(this::fromRowToComposedMessageIdWithFlags)); } http://git-wip-us.apache.org/repos/asf/james-project/blob/0daadc62/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageIdMapper.java ---------------------------------------------------------------------- diff --git a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageIdMapper.java b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageIdMapper.java index bf3dc1d..91e7b3a 100644 --- a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageIdMapper.java +++ b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageIdMapper.java @@ -29,6 +29,7 @@ import java.util.stream.Stream; import javax.mail.Flags; import org.apache.commons.lang3.tuple.Pair; +import org.apache.james.backends.cassandra.CassandraConfiguration; import org.apache.james.backends.cassandra.utils.FunctionRunnerWithRetry; import org.apache.james.backends.cassandra.utils.LightweightTransactionException; import org.apache.james.mailbox.MailboxSession; @@ -60,8 +61,6 @@ import com.github.steveash.guavate.Guavate; import com.google.common.base.Throwables; public class CassandraMessageIdMapper implements MessageIdMapper { - - private static final int MAX_RETRY = 1000; private static final Logger LOGGER = LoggerFactory.getLogger(CassandraMessageIdMapper.class); private final MailboxMapper mailboxMapper; @@ -73,10 +72,11 @@ public class CassandraMessageIdMapper implements MessageIdMapper { private final ModSeqProvider modSeqProvider; private final MailboxSession mailboxSession; private final AttachmentLoader attachmentLoader; + private final CassandraConfiguration cassandraConfiguration; public CassandraMessageIdMapper(MailboxMapper mailboxMapper, CassandraMailboxDAO mailboxDAO, CassandraAttachmentMapper attachmentMapper, CassandraMessageIdToImapUidDAO imapUidDAO, CassandraMessageIdDAO messageIdDAO, CassandraMessageDAO messageDAO, - CassandraIndexTableHandler indexTableHandler, ModSeqProvider modSeqProvider, MailboxSession mailboxSession) { + CassandraIndexTableHandler indexTableHandler, ModSeqProvider modSeqProvider, MailboxSession mailboxSession, CassandraConfiguration cassandraConfiguration) { this.mailboxMapper = mailboxMapper; this.mailboxDAO = mailboxDAO; this.imapUidDAO = imapUidDAO; @@ -86,6 +86,7 @@ public class CassandraMessageIdMapper implements MessageIdMapper { this.modSeqProvider = modSeqProvider; this.mailboxSession = mailboxSession; this.attachmentLoader = new AttachmentLoader(attachmentMapper); + this.cassandraConfiguration = cassandraConfiguration; } @Override @@ -223,7 +224,7 @@ public class CassandraMessageIdMapper implements MessageIdMapper { private Stream<Pair<MailboxId, UpdatedFlags>> flagsUpdateWithRetry(Flags newState, MessageManager.FlagsUpdateMode updateMode, MailboxId mailboxId, MessageId messageId) { try { - Pair<Flags, ComposedMessageIdWithMetaData> pair = new FunctionRunnerWithRetry(MAX_RETRY) + Pair<Flags, ComposedMessageIdWithMetaData> pair = new FunctionRunnerWithRetry(cassandraConfiguration.getFlagsUpdateMessageIdMaxRetry()) .executeAndRetrieveObject(() -> tryFlagsUpdate(newState, updateMode, mailboxId, messageId)); ComposedMessageIdWithMetaData composedMessageIdWithMetaData = pair.getRight(); Flags oldFlags = pair.getLeft(); http://git-wip-us.apache.org/repos/asf/james-project/blob/0daadc62/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageIdToImapUidDAO.java ---------------------------------------------------------------------- diff --git a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageIdToImapUidDAO.java b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageIdToImapUidDAO.java index 3e1bbbd..38a02d6 100644 --- a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageIdToImapUidDAO.java +++ b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageIdToImapUidDAO.java @@ -62,6 +62,7 @@ import com.datastax.driver.core.ResultSet; import com.datastax.driver.core.Row; import com.datastax.driver.core.Session; import com.datastax.driver.core.querybuilder.QueryBuilder; +import com.google.common.annotations.VisibleForTesting; import com.google.common.collect.ImmutableSet; public class CassandraMessageIdToImapUidDAO { @@ -75,9 +76,10 @@ public class CassandraMessageIdToImapUidDAO { private final PreparedStatement update; private final PreparedStatement selectAll; private final PreparedStatement select; + private CassandraUtils cassandraUtils; @Inject - public CassandraMessageIdToImapUidDAO(Session session, CassandraMessageId.Factory messageIdFactory) { + public CassandraMessageIdToImapUidDAO(Session session, CassandraMessageId.Factory messageIdFactory, CassandraUtils cassandraUtils) { this.cassandraAsyncExecutor = new CassandraAsyncExecutor(session); this.messageIdFactory = messageIdFactory; this.delete = prepareDelete(session); @@ -85,6 +87,12 @@ public class CassandraMessageIdToImapUidDAO { this.update = prepareUpdate(session); this.selectAll = prepareSelectAll(session); this.select = prepareSelect(session); + this.cassandraUtils = cassandraUtils; + } + + @VisibleForTesting + public CassandraMessageIdToImapUidDAO(Session session, CassandraMessageId.Factory messageIdFactory) { + this(session, messageIdFactory, CassandraUtils.DEFAULT_CASSANDRA_UTILS); } private PreparedStatement prepareDelete(Session session) { @@ -185,7 +193,7 @@ public class CassandraMessageIdToImapUidDAO { public CompletableFuture<Stream<ComposedMessageIdWithMetaData>> retrieve(CassandraMessageId messageId, Optional<CassandraId> mailboxId) { return selectStatement(messageId, mailboxId) - .thenApply(resultSet -> CassandraUtils.convertToStream(resultSet) + .thenApply(resultSet -> cassandraUtils.convertToStream(resultSet) .map(this::toComposedMessageIdWithMetadata)); } http://git-wip-us.apache.org/repos/asf/james-project/blob/0daadc62/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageMapper.java ---------------------------------------------------------------------- diff --git a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageMapper.java b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageMapper.java index 2a6e74d..55bf43d 100644 --- a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageMapper.java +++ b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageMapper.java @@ -32,6 +32,7 @@ import javax.mail.Flags; import javax.mail.Flags.Flag; import org.apache.commons.lang3.tuple.Pair; +import org.apache.james.backends.cassandra.CassandraConfiguration; import org.apache.james.mailbox.ApplicableFlagBuilder; import org.apache.james.mailbox.FlagsBuilder; import org.apache.james.mailbox.MailboxSession; @@ -67,14 +68,11 @@ public class CassandraMessageMapper implements MessageMapper { .count(0L) .unseen(0L) .build(); - public static final int EXPUNGE_BATCH_SIZE = 100; - public static final int UPDATE_FLAGS_BATCH_SIZE = 20; public static final Logger LOGGER = LoggerFactory.getLogger(CassandraMessageMapper.class); private final CassandraModSeqProvider modSeqProvider; private final MailboxSession mailboxSession; private final CassandraUidProvider uidProvider; - private final int maxRetries; private final CassandraMessageDAO messageDAO; private final CassandraMessageIdDAO messageIdDAO; private final CassandraMessageIdToImapUidDAO imapUidDAO; @@ -85,16 +83,16 @@ public class CassandraMessageMapper implements MessageMapper { private final CassandraFirstUnseenDAO firstUnseenDAO; private final AttachmentLoader attachmentLoader; private final CassandraDeletedMessageDAO deletedMessageDAO; + private final CassandraConfiguration cassandraConfiguration; public CassandraMessageMapper(CassandraUidProvider uidProvider, CassandraModSeqProvider modSeqProvider, - MailboxSession mailboxSession, int maxRetries, CassandraAttachmentMapper attachmentMapper, + MailboxSession mailboxSession, CassandraAttachmentMapper attachmentMapper, CassandraMessageDAO messageDAO, CassandraMessageIdDAO messageIdDAO, CassandraMessageIdToImapUidDAO imapUidDAO, CassandraMailboxCounterDAO mailboxCounterDAO, CassandraMailboxRecentsDAO mailboxRecentDAO, CassandraApplicableFlagDAO applicableFlagDAO, - CassandraIndexTableHandler indexTableHandler, CassandraFirstUnseenDAO firstUnseenDAO, CassandraDeletedMessageDAO deletedMessageDAO) { + CassandraIndexTableHandler indexTableHandler, CassandraFirstUnseenDAO firstUnseenDAO, CassandraDeletedMessageDAO deletedMessageDAO, CassandraConfiguration cassandraConfiguration) { this.uidProvider = uidProvider; this.modSeqProvider = modSeqProvider; this.mailboxSession = mailboxSession; - this.maxRetries = maxRetries; this.messageDAO = messageDAO; this.messageIdDAO = messageIdDAO; this.imapUidDAO = imapUidDAO; @@ -105,6 +103,7 @@ public class CassandraMessageMapper implements MessageMapper { this.attachmentLoader = new AttachmentLoader(attachmentMapper); this.applicableFlagDAO = applicableFlagDAO; this.deletedMessageDAO = deletedMessageDAO; + this.cassandraConfiguration = cassandraConfiguration; } @Override @@ -230,7 +229,7 @@ public class CassandraMessageMapper implements MessageMapper { return deletedMessageDAO.retrieveDeletedMessage(mailboxId, messageRange) .join() - .collect(JamesCollectors.chunker(EXPUNGE_BATCH_SIZE)) + .collect(JamesCollectors.chunker(cassandraConfiguration.getExpungeChunkSize())) .map(uidChunk -> expungeUidChunk(mailboxId, uidChunk)) .flatMap(CompletableFuture::join) .collect(Guavate.toImmutableMap(MailboxMessage::getUid, SimpleMessageMetaData::new)); @@ -303,7 +302,7 @@ public class CassandraMessageMapper implements MessageMapper { private FlagsUpdateStageResult handleUpdatesStagedRetry(CassandraId mailboxId, FlagsUpdateCalculator flagUpdateCalculator, FlagsUpdateStageResult firstResult) { FlagsUpdateStageResult globalResult = firstResult; int retryCount = 0; - while (retryCount < maxRetries && globalResult.containsFailedResults()) { + while (retryCount < cassandraConfiguration.getFlagsUpdateMessageMaxRetry() && globalResult.containsFailedResults()) { retryCount++; FlagsUpdateStageResult stageResult = retryUpdatesStage(mailboxId, flagUpdateCalculator, globalResult.getFailed()); globalResult = globalResult.keepSucceded().merge(stageResult); @@ -323,7 +322,7 @@ public class CassandraMessageMapper implements MessageMapper { private FlagsUpdateStageResult runUpdateStage(CassandraId mailboxId, Stream<ComposedMessageIdWithMetaData> toBeUpdated, FlagsUpdateCalculator flagsUpdateCalculator) { Long newModSeq = modSeqProvider.nextModSeq(mailboxId).join().orElseThrow(() -> new RuntimeException("ModSeq generation failed for mailbox " + mailboxId.asUuid())); - return toBeUpdated.collect(JamesCollectors.chunker(UPDATE_FLAGS_BATCH_SIZE)) + return toBeUpdated.collect(JamesCollectors.chunker(cassandraConfiguration.getFlagsUpdateChunkSize())) .map(uidChunk -> performUpdatesForChunk(mailboxId, flagsUpdateCalculator, newModSeq, uidChunk)) .map(CompletableFuture::join) .reduce(FlagsUpdateStageResult.none(), FlagsUpdateStageResult::merge); http://git-wip-us.apache.org/repos/asf/james-project/blob/0daadc62/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraModSeqProvider.java ---------------------------------------------------------------------- diff --git a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraModSeqProvider.java b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraModSeqProvider.java index 1a6a885..986fc74 100644 --- a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraModSeqProvider.java +++ b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraModSeqProvider.java @@ -35,6 +35,7 @@ import java.util.concurrent.CompletionException; import javax.inject.Inject; +import org.apache.james.backends.cassandra.CassandraConfiguration; import org.apache.james.backends.cassandra.utils.CassandraAsyncExecutor; import org.apache.james.backends.cassandra.utils.FunctionRunnerWithRetry; import org.apache.james.mailbox.MailboxSession; @@ -46,6 +47,7 @@ import org.apache.james.mailbox.store.mail.model.Mailbox; import com.datastax.driver.core.PreparedStatement; import com.datastax.driver.core.Session; +import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Supplier; public class CassandraModSeqProvider implements ModSeqProvider { @@ -76,7 +78,6 @@ public class CassandraModSeqProvider implements ModSeqProvider { } } - private static final int DEFAULT_MAX_RETRY = 100000; private static final ModSeq FIRST_MODSEQ = new ModSeq(0); private final CassandraAsyncExecutor cassandraAsyncExecutor; @@ -85,14 +86,20 @@ public class CassandraModSeqProvider implements ModSeqProvider { private final PreparedStatement update; private final PreparedStatement insert; - public CassandraModSeqProvider(Session session, int maxRetry) { + @Inject + public CassandraModSeqProvider(Session session, CassandraConfiguration cassandraConfiguration) { this.cassandraAsyncExecutor = new CassandraAsyncExecutor(session); - this.runner = new FunctionRunnerWithRetry(maxRetry); + this.runner = new FunctionRunnerWithRetry(cassandraConfiguration.getModSeqMaxRetry()); this.insert = prepareInsert(session); this.update = prepareUpdate(session); this.select = prepareSelect(session); } + @VisibleForTesting + public CassandraModSeqProvider(Session session) { + this(session, CassandraConfiguration.DEFAULT_CONFIGURATION); + } + private PreparedStatement prepareInsert(Session session) { return session.prepare(insertInto(TABLE_NAME) .value(NEXT_MODSEQ, bindMarker(NEXT_MODSEQ)) @@ -113,10 +120,7 @@ public class CassandraModSeqProvider implements ModSeqProvider { .where(eq(MAILBOX_ID, bindMarker(MAILBOX_ID)))); } - @Inject - public CassandraModSeqProvider(Session session) { - this(session, DEFAULT_MAX_RETRY); - } + @Override public long nextModSeq(MailboxSession mailboxSession, Mailbox mailbox) throws MailboxException { http://git-wip-us.apache.org/repos/asf/james-project/blob/0daadc62/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraUidProvider.java ---------------------------------------------------------------------- diff --git a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraUidProvider.java b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraUidProvider.java index 4aeb483..8f738f4 100644 --- a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraUidProvider.java +++ b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraUidProvider.java @@ -34,6 +34,7 @@ import java.util.concurrent.CompletableFuture; import javax.inject.Inject; +import org.apache.james.backends.cassandra.CassandraConfiguration; import org.apache.james.backends.cassandra.utils.CassandraAsyncExecutor; import org.apache.james.backends.cassandra.utils.FunctionRunnerWithRetry; import org.apache.james.mailbox.MailboxSession; @@ -47,9 +48,9 @@ import org.apache.james.util.OptionalConverter; import com.datastax.driver.core.PreparedStatement; import com.datastax.driver.core.Session; +import com.google.common.annotations.VisibleForTesting; public class CassandraUidProvider implements UidProvider { - private static final int DEFAULT_MAX_RETRY = 100000; private static final String CONDITION = "Condition"; private final CassandraAsyncExecutor executor; @@ -58,14 +59,20 @@ public class CassandraUidProvider implements UidProvider { private final PreparedStatement updateStatement; private final PreparedStatement selectStatement; - public CassandraUidProvider(Session session, int maxRetry) { + @Inject + public CassandraUidProvider(Session session, CassandraConfiguration cassandraConfiguration) { this.executor = new CassandraAsyncExecutor(session); - this.runner = new FunctionRunnerWithRetry(maxRetry); + this.runner = new FunctionRunnerWithRetry(cassandraConfiguration.getUidMaxRetry()); this.selectStatement = prepareSelect(session); this.updateStatement = prepareUpdate(session); this.insertStatement = prepareInsert(session); } + @VisibleForTesting + public CassandraUidProvider(Session session) { + this(session, CassandraConfiguration.DEFAULT_CONFIGURATION); + } + private PreparedStatement prepareSelect(Session session) { return session.prepare(select(NEXT_UID) .from(TABLE_NAME) @@ -86,11 +93,6 @@ public class CassandraUidProvider implements UidProvider { .ifNotExists()); } - @Inject - public CassandraUidProvider(Session session) { - this(session, DEFAULT_MAX_RETRY); - } - @Override public MessageUid nextUid(MailboxSession mailboxSession, Mailbox mailbox) throws MailboxException { return nextUid(mailboxSession, mailbox.getMailboxId()); http://git-wip-us.apache.org/repos/asf/james-project/blob/0daadc62/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/user/CassandraSubscriptionMapper.java ---------------------------------------------------------------------- diff --git a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/user/CassandraSubscriptionMapper.java b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/user/CassandraSubscriptionMapper.java index 4f78655..a1ce340 100644 --- a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/user/CassandraSubscriptionMapper.java +++ b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/user/CassandraSubscriptionMapper.java @@ -42,9 +42,11 @@ import com.datastax.driver.core.querybuilder.QueryBuilder; public class CassandraSubscriptionMapper extends NonTransactionalMapper implements SubscriptionMapper { private final Session session; + private CassandraUtils cassandraUtils; - public CassandraSubscriptionMapper(Session session) { + public CassandraSubscriptionMapper(Session session, CassandraUtils cassandraUtils) { this.session = session; + this.cassandraUtils = cassandraUtils; } @Override @@ -66,7 +68,7 @@ public class CassandraSubscriptionMapper extends NonTransactionalMapper implemen @Override public List<Subscription> findSubscriptionsForUser(String user) { - return CassandraUtils.convertToStream( + return cassandraUtils.convertToStream( session.execute(select(MAILBOX) .from(TABLE_NAME) .where(eq(USER, user)))) @@ -82,7 +84,7 @@ public class CassandraSubscriptionMapper extends NonTransactionalMapper implemen } public List<SimpleSubscription> list() { - return CassandraUtils.convertToStream( + return cassandraUtils.convertToStream( session.execute(select(FIELDS) .from(TABLE_NAME))) .map((row) -> new SimpleSubscription(row.getString(USER), row.getString(MAILBOX))) http://git-wip-us.apache.org/repos/asf/james-project/blob/0daadc62/mailbox/cassandra/src/test/java/org/apache/james/mailbox/cassandra/CassandraMailboxManagerProvider.java ---------------------------------------------------------------------- diff --git a/mailbox/cassandra/src/test/java/org/apache/james/mailbox/cassandra/CassandraMailboxManagerProvider.java b/mailbox/cassandra/src/test/java/org/apache/james/mailbox/cassandra/CassandraMailboxManagerProvider.java index d10b7c3..c34effb 100644 --- a/mailbox/cassandra/src/test/java/org/apache/james/mailbox/cassandra/CassandraMailboxManagerProvider.java +++ b/mailbox/cassandra/src/test/java/org/apache/james/mailbox/cassandra/CassandraMailboxManagerProvider.java @@ -48,7 +48,6 @@ import com.google.common.base.Throwables; public class CassandraMailboxManagerProvider { private static final int LIMIT_ANNOTATIONS = 3; private static final int LIMIT_ANNOTATION_SIZE = 30; - public static final int MAX_ACL_RETRY = 10; public static CassandraMailboxManager provideMailboxManager(Session session, CassandraTypesProvider cassandraTypesProvider) { CassandraUidProvider uidProvider = new CassandraUidProvider(session); @@ -59,7 +58,7 @@ public class CassandraMailboxManagerProvider { CassandraMessageDAO messageDAO = new CassandraMessageDAO(session, cassandraTypesProvider); CassandraMailboxCounterDAO mailboxCounterDAO = new CassandraMailboxCounterDAO(session); CassandraMailboxRecentsDAO mailboxRecentsDAO = new CassandraMailboxRecentsDAO(session); - CassandraMailboxDAO mailboxDAO = new CassandraMailboxDAO(session, cassandraTypesProvider, MAX_ACL_RETRY); + CassandraMailboxDAO mailboxDAO = new CassandraMailboxDAO(session, cassandraTypesProvider); CassandraMailboxPathDAO mailboxPathDAO = new CassandraMailboxPathDAO(session, cassandraTypesProvider); CassandraFirstUnseenDAO firstUnseenDAO = new CassandraFirstUnseenDAO(session); CassandraApplicableFlagDAO applicableFlagDAO = new CassandraApplicableFlagDAO(session); --------------------------------------------------------------------- To unsubscribe, e-mail: server-dev-unsubscr...@james.apache.org For additional commands, e-mail: server-dev-h...@james.apache.org