JAMES-2090 Cassandra guice should Inject CassandraConfiguration

Project: http://git-wip-us.apache.org/repos/asf/james-project/repo
Commit: http://git-wip-us.apache.org/repos/asf/james-project/commit/0daadc62
Tree: http://git-wip-us.apache.org/repos/asf/james-project/tree/0daadc62
Diff: http://git-wip-us.apache.org/repos/asf/james-project/diff/0daadc62

Branch: refs/heads/master
Commit: 0daadc62d2ac1b1371b5a9061f9e1b4b3f67cf15
Parents: 29fb5d3
Author: benwa <btell...@linagora.com>
Authored: Thu Jul 6 08:27:18 2017 +0700
Committer: benwa <btell...@linagora.com>
Committed: Thu Jul 6 08:28:26 2017 +0700

----------------------------------------------------------------------
 backends-common/cassandra/pom.xml               |   6 +
 .../cassandra/CassandraConfiguration.java       | 223 ++++++++++++++++++
 .../cassandra/utils/CassandraUtils.java         |  21 +-
 .../cassandra/CassandraConfigurationTest.java   | 225 +++++++++++++++++++
 .../CassandraMailboxSessionMapperFactory.java   |  28 +--
 .../CassandraMailboxPathRegisterMapper.java     |   6 +-
 .../cassandra/mail/CassandraACLMapper.java      |  10 +-
 .../mail/CassandraAnnotationMapper.java         |  26 ++-
 .../mail/CassandraDeletedMessageDAO.java        |  17 +-
 .../cassandra/mail/CassandraMailboxDAO.java     |  22 +-
 .../cassandra/mail/CassandraMailboxMapper.java  |   9 +-
 .../cassandra/mail/CassandraMailboxPathDAO.java |  12 +-
 .../mail/CassandraMailboxRecentsDAO.java        |  12 +-
 .../cassandra/mail/CassandraMessageDAO.java     |  14 +-
 .../cassandra/mail/CassandraMessageIdDAO.java   |  12 +-
 .../mail/CassandraMessageIdMapper.java          |   9 +-
 .../mail/CassandraMessageIdToImapUidDAO.java    |  12 +-
 .../cassandra/mail/CassandraMessageMapper.java  |  17 +-
 .../cassandra/mail/CassandraModSeqProvider.java |  18 +-
 .../cassandra/mail/CassandraUidProvider.java    |  18 +-
 .../user/CassandraSubscriptionMapper.java       |   8 +-
 .../CassandraMailboxManagerProvider.java        |   3 +-
 .../cassandra/CassandraTestSystemFixture.java   |   3 +-
 ...istributedMailboxDelegatingListenerTest.java |   4 +
 ...CassandraMailboxPathRegistrerMapperTest.java |  11 +-
 .../cassandra/mail/CassandraACLMapperTest.java  |  18 +-
 .../cassandra/mail/CassandraMailboxDAOTest.java |   3 +-
 .../CassandraMailboxManagerAttachmentTest.java  |   4 +-
 .../CassandraMailboxMapperConcurrencyTest.java  |   6 +-
 .../mail/CassandraMailboxMapperTest.java        |   8 +-
 .../cassandra/mail/CassandraMapperProvider.java |   2 +-
 .../mail/CassandraModSeqProviderTest.java       |   7 +-
 .../mail/CassandraUidProviderTest.java          |   7 +-
 .../user/CassandraSubscriptionMapperTest.java   |   3 +-
 .../cassandra/host/CassandraHostSystem.java     |   3 +-
 .../cassandra/host/CassandraHostSystem.java     |   4 +-
 .../modules/mailbox/CassandraMailboxModule.java |   4 -
 .../modules/mailbox/CassandraSessionModule.java |   5 +
 .../cassandra/CassandraDomainList.java          |  16 +-
 .../cassandra/CassandraUsersRepository.java     |   9 +-
 .../cassandra/CassandraDomainListTest.java      |   3 +-
 .../cassandra/CassandraUsersRepositoryTest.java |   5 +-
 42 files changed, 695 insertions(+), 158 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/james-project/blob/0daadc62/backends-common/cassandra/pom.xml
----------------------------------------------------------------------
diff --git a/backends-common/cassandra/pom.xml 
b/backends-common/cassandra/pom.xml
index 4eaec71..6810208 100644
--- a/backends-common/cassandra/pom.xml
+++ b/backends-common/cassandra/pom.xml
@@ -168,6 +168,12 @@
                     <version>${assertj-3.version}</version>
                     <scope>test</scope>
                 </dependency>
+                <dependency>
+                    <groupId>nl.jqno.equalsverifier</groupId>
+                    <artifactId>equalsverifier</artifactId>
+                    <version>1.7.6</version>
+                    <scope>test</scope>
+                </dependency>
                        <dependency>
                            <groupId>org.cassandraunit</groupId>
                            <artifactId>cassandra-unit</artifactId>

http://git-wip-us.apache.org/repos/asf/james-project/blob/0daadc62/backends-common/cassandra/src/main/java/org/apache/james/backends/cassandra/CassandraConfiguration.java
----------------------------------------------------------------------
diff --git 
a/backends-common/cassandra/src/main/java/org/apache/james/backends/cassandra/CassandraConfiguration.java
 
b/backends-common/cassandra/src/main/java/org/apache/james/backends/cassandra/CassandraConfiguration.java
new file mode 100644
index 0000000..3d985d4
--- /dev/null
+++ 
b/backends-common/cassandra/src/main/java/org/apache/james/backends/cassandra/CassandraConfiguration.java
@@ -0,0 +1,223 @@
+/****************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one   *
+ * or more contributor license agreements.  See the NOTICE file *
+ * distributed with this work for additional information        *
+ * regarding copyright ownership.  The ASF licenses this file   *
+ * to you under the Apache License, Version 2.0 (the            *
+ * "License"); you may not use this file except in compliance   *
+ * with the License.  You may obtain a copy of the License at   *
+ *                                                              *
+ *   http://www.apache.org/licenses/LICENSE-2.0                 *
+ *                                                              *
+ * Unless required by applicable law or agreed to in writing,   *
+ * software distributed under the License is distributed on an  *
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY       *
+ * KIND, either express or implied.  See the License for the    *
+ * specific language governing permissions and limitations      *
+ * under the License.                                           *
+ ****************************************************************/
+
+package org.apache.james.backends.cassandra;
+
+import java.util.Objects;
+import java.util.Optional;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.MoreObjects;
+import com.google.common.base.Preconditions;
+
+public class CassandraConfiguration {
+    public static final CassandraConfiguration DEFAULT_CONFIGURATION = 
builder().build();
+
+    public static final int DEFAULT_MESSAGE_CHUNK_SIZE_ON_READ = 100;
+    public static final int DEFAULT_EXPUNGE_BATCH_SIZE = 100;
+    public static final int DEFAULT_UPDATE_FLAGS_BATCH_SIZE = 20;
+    public static final int DEFAULT_FLAGS_UPDATE_MESSAGE_MAX_RETRY = 1000;
+    public static final int DEFAULT_FLAGS_UPDATE_MESSAGE_ID_MAX_RETRY = 1000;
+    public static final int DEFAULT_MODSEQ_MAX_RETRY = 100000;
+    public static final int DEFAULT_UID_MAX_RETRY = 100000;
+    public static final int DEFAULT_ACL_MAX_RETRY = 1000;
+    public static final int DEFAULT_FETCH_NEXT_PAGE_ADVANCE_IN_ROW = 100;
+
+    public static class Builder {
+        private Optional<Integer> messageReadChunkSize = Optional.empty();
+        private Optional<Integer> expungeChunkSize = Optional.empty();
+        private Optional<Integer> flagsUpdateChunkSize = Optional.empty();
+        private Optional<Integer> flagsUpdateMessageIdMaxRetry = 
Optional.empty();
+        private Optional<Integer> flagsUpdateMessageMaxRetry = 
Optional.empty();
+        private Optional<Integer> modSeqMaxRetry = Optional.empty();
+        private Optional<Integer> uidMaxRetry = Optional.empty();
+        private Optional<Integer> aclMaxRetry = Optional.empty();
+        private Optional<Integer> fetchNextPageInAdvanceRow = Optional.empty();
+
+        public Builder messageReadChunkSize(int value) {
+            Preconditions.checkArgument(value > 0, "messageReadChunkSize needs 
to be strictly positive");
+            this.messageReadChunkSize = Optional.of(value);
+            return this;
+        }
+
+        public Builder expungeChunkSize(int value) {
+            Preconditions.checkArgument(value > 0, "expungeChunkSize needs to 
be strictly positive");
+            this.expungeChunkSize = Optional.of(value);
+            return this;
+        }
+
+        public Builder flagsUpdateChunkSize(int value) {
+            Preconditions.checkArgument(value > 0, "flagsUpdateChunkSize needs 
to be strictly positive");
+            this.flagsUpdateChunkSize = Optional.of(value);
+            return this;
+        }
+
+        public Builder flagsUpdateMessageIdMaxRetry(int value) {
+            Preconditions.checkArgument(value > 0, 
"flagsUpdateMessageIdMaxRetry needs to be strictly positive");
+            this.flagsUpdateMessageIdMaxRetry = Optional.of(value);
+            return this;
+        }
+
+        public Builder flagsUpdateMessageMaxRetry(int value) {
+            Preconditions.checkArgument(value > 0, "flagsUpdateMessageMaxRetry 
needs to be strictly positive");
+            this.flagsUpdateMessageMaxRetry = Optional.of(value);
+            return this;
+        }
+
+        public Builder modSeqMaxRetry(int value) {
+            Preconditions.checkArgument(value > 0, "modSeqMaxRetry needs to be 
strictly positive");
+            this.modSeqMaxRetry = Optional.of(value);
+            return this;
+        }
+
+        public Builder uidMaxRetry(int value) {
+            Preconditions.checkArgument(value > 0, "uidMaxRetry needs to be 
strictly positive");
+            this.uidMaxRetry = Optional.of(value);
+            return this;
+        }
+
+        public Builder aclMaxRetry(int value) {
+            Preconditions.checkArgument(value > 0, "aclMaxRetry needs to be 
strictly positive");
+            this.aclMaxRetry = Optional.of(value);
+            return this;
+        }
+
+        public Builder fetchNextPageInAdvanceRow(int value) {
+            Preconditions.checkArgument(value > 0, "fetchNextPageInAdvanceRow 
needs to be strictly positive");
+            this.fetchNextPageInAdvanceRow = Optional.of(value);
+            return this;
+        }
+
+        public CassandraConfiguration build() {
+            return new 
CassandraConfiguration(aclMaxRetry.orElse(DEFAULT_ACL_MAX_RETRY),
+                
messageReadChunkSize.orElse(DEFAULT_MESSAGE_CHUNK_SIZE_ON_READ),
+                expungeChunkSize.orElse(DEFAULT_EXPUNGE_BATCH_SIZE),
+                flagsUpdateChunkSize.orElse(DEFAULT_UPDATE_FLAGS_BATCH_SIZE),
+                
flagsUpdateMessageIdMaxRetry.orElse(DEFAULT_FLAGS_UPDATE_MESSAGE_ID_MAX_RETRY),
+                
flagsUpdateMessageMaxRetry.orElse(DEFAULT_FLAGS_UPDATE_MESSAGE_MAX_RETRY),
+                modSeqMaxRetry.orElse(DEFAULT_MODSEQ_MAX_RETRY),
+                uidMaxRetry.orElse(DEFAULT_UID_MAX_RETRY),
+                
fetchNextPageInAdvanceRow.orElse(DEFAULT_FETCH_NEXT_PAGE_ADVANCE_IN_ROW));
+        }
+    }
+
+    public static Builder builder() {
+        return new Builder();
+    }
+
+    private final int messageReadChunkSize;
+    private final int expungeChunkSize;
+    private final int flagsUpdateChunkSize;
+    private final int flagsUpdateMessageIdMaxRetry;
+    private final int flagsUpdateMessageMaxRetry;
+    private final int modSeqMaxRetry;
+    private final int uidMaxRetry;
+    private final int aclMaxRetry;
+    private final int fetchNextPageInAdvanceRow;
+
+    @VisibleForTesting
+    CassandraConfiguration(int aclMaxRetry, int messageReadChunkSize, int 
expungeChunkSize, int flagsUpdateChunkSize,
+                                  int flagsUpdateMessageIdMaxRetry, int 
flagsUpdateMessageMaxRetry, int modSeqMaxRetry,
+                                   int uidMaxRetry, int 
fetchNextPageInAdvanceRow) {
+        this.aclMaxRetry = aclMaxRetry;
+        this.messageReadChunkSize = messageReadChunkSize;
+        this.expungeChunkSize = expungeChunkSize;
+        this.flagsUpdateMessageIdMaxRetry = flagsUpdateMessageIdMaxRetry;
+        this.flagsUpdateMessageMaxRetry = flagsUpdateMessageMaxRetry;
+        this.modSeqMaxRetry = modSeqMaxRetry;
+        this.uidMaxRetry = uidMaxRetry;
+        this.fetchNextPageInAdvanceRow = fetchNextPageInAdvanceRow;
+        this.flagsUpdateChunkSize = flagsUpdateChunkSize;
+    }
+
+    public int getFlagsUpdateChunkSize() {
+        return flagsUpdateChunkSize;
+    }
+
+    public int getAclMaxRetry() {
+        return aclMaxRetry;
+    }
+
+    public int getMessageReadChunkSize() {
+        return messageReadChunkSize;
+    }
+
+    public int getExpungeChunkSize() {
+        return expungeChunkSize;
+    }
+
+    public int getFlagsUpdateMessageIdMaxRetry() {
+        return flagsUpdateMessageIdMaxRetry;
+    }
+
+    public int getFlagsUpdateMessageMaxRetry() {
+        return flagsUpdateMessageMaxRetry;
+    }
+
+    public int getModSeqMaxRetry() {
+        return modSeqMaxRetry;
+    }
+
+    public int getUidMaxRetry() {
+        return uidMaxRetry;
+    }
+
+    public int getFetchNextPageInAdvanceRow() {
+        return fetchNextPageInAdvanceRow;
+    }
+
+    @Override
+    public final boolean equals(Object o) {
+        if (o instanceof CassandraConfiguration) {
+            CassandraConfiguration that = (CassandraConfiguration) o;
+
+            return Objects.equals(this.aclMaxRetry, that.aclMaxRetry)
+                && Objects.equals(this.messageReadChunkSize, 
that.messageReadChunkSize)
+                && Objects.equals(this.expungeChunkSize, that.expungeChunkSize)
+                && Objects.equals(this.flagsUpdateMessageIdMaxRetry, 
that.flagsUpdateMessageIdMaxRetry)
+                && Objects.equals(this.flagsUpdateMessageMaxRetry, 
that.flagsUpdateMessageMaxRetry)
+                && Objects.equals(this.modSeqMaxRetry, that.modSeqMaxRetry)
+                && Objects.equals(this.uidMaxRetry, that.uidMaxRetry)
+                && Objects.equals(this.flagsUpdateChunkSize, 
that.flagsUpdateChunkSize)
+                && Objects.equals(this.fetchNextPageInAdvanceRow, 
that.fetchNextPageInAdvanceRow);
+        }
+        return false;
+    }
+
+    @Override
+    public final int hashCode() {
+        return Objects.hash(aclMaxRetry, messageReadChunkSize, 
expungeChunkSize, flagsUpdateMessageIdMaxRetry,
+            flagsUpdateMessageMaxRetry, modSeqMaxRetry, uidMaxRetry, 
fetchNextPageInAdvanceRow, flagsUpdateChunkSize);
+    }
+
+    @Override
+    public String toString() {
+        return MoreObjects.toStringHelper(this)
+            .add("aclMaxRetry", aclMaxRetry)
+            .add("messageReadChunkSize", messageReadChunkSize)
+            .add("expungeChunkSize", expungeChunkSize)
+            .add("flagsUpdateMessageIdMaxRetry", flagsUpdateMessageIdMaxRetry)
+            .add("flagsUpdateMessageMaxRetry", flagsUpdateMessageMaxRetry)
+            .add("modSeqMaxRetry", modSeqMaxRetry)
+            .add("fetchNextPageInAdvanceRow", fetchNextPageInAdvanceRow)
+            .add("flagsUpdateChunkSize", flagsUpdateChunkSize)
+            .add("uidMaxRetry", uidMaxRetry)
+            .toString();
+    }
+}

http://git-wip-us.apache.org/repos/asf/james-project/blob/0daadc62/backends-common/cassandra/src/main/java/org/apache/james/backends/cassandra/utils/CassandraUtils.java
----------------------------------------------------------------------
diff --git 
a/backends-common/cassandra/src/main/java/org/apache/james/backends/cassandra/utils/CassandraUtils.java
 
b/backends-common/cassandra/src/main/java/org/apache/james/backends/cassandra/utils/CassandraUtils.java
index 1732dd2..adef1c0 100644
--- 
a/backends-common/cassandra/src/main/java/org/apache/james/backends/cassandra/utils/CassandraUtils.java
+++ 
b/backends-common/cassandra/src/main/java/org/apache/james/backends/cassandra/utils/CassandraUtils.java
@@ -22,22 +22,35 @@ package org.apache.james.backends.cassandra.utils;
 import java.util.stream.Stream;
 import java.util.stream.StreamSupport;
 
+import org.apache.james.backends.cassandra.CassandraConfiguration;
+
 import com.datastax.driver.core.ResultSet;
 import com.datastax.driver.core.Row;
 
 public class CassandraUtils {
 
-    private static final int FETCH_NEXT_PAGE_ADVANCE_IN_ROW = 100;
+    public static final CassandraUtils DEFAULT_CASSANDRA_UTILS = new 
CassandraUtils(CassandraConfiguration.DEFAULT_CONFIGURATION);
+
+    private final CassandraConfiguration cassandraConfiguration;
+
+    public CassandraUtils(CassandraConfiguration cassandraConfiguration) {
+        this.cassandraConfiguration = cassandraConfiguration;
+    }
 
-    public static Stream<Row> convertToStream(ResultSet resultSet) {
+    public Stream<Row> convertToStream(ResultSet resultSet) {
         return StreamSupport.stream(resultSet.spliterator(), true)
             .peek(row -> ensureFetchedNextPage(resultSet));
     }
 
-    private static void ensureFetchedNextPage(ResultSet resultSet) {
-        if (resultSet.getAvailableWithoutFetching() == 
FETCH_NEXT_PAGE_ADVANCE_IN_ROW && !resultSet.isFullyFetched()) {
+    private void ensureFetchedNextPage(ResultSet resultSet) {
+        if (fetchNeeded(resultSet)) {
             resultSet.fetchMoreResults();
         }
     }
 
+    private boolean fetchNeeded(ResultSet resultSet) {
+        return resultSet.getAvailableWithoutFetching() == 
cassandraConfiguration.getFetchNextPageInAdvanceRow()
+            && !resultSet.isFullyFetched();
+    }
+
 }

http://git-wip-us.apache.org/repos/asf/james-project/blob/0daadc62/backends-common/cassandra/src/test/java/org/apache/james/backends/cassandra/CassandraConfigurationTest.java
----------------------------------------------------------------------
diff --git 
a/backends-common/cassandra/src/test/java/org/apache/james/backends/cassandra/CassandraConfigurationTest.java
 
b/backends-common/cassandra/src/test/java/org/apache/james/backends/cassandra/CassandraConfigurationTest.java
new file mode 100644
index 0000000..dec62ea
--- /dev/null
+++ 
b/backends-common/cassandra/src/test/java/org/apache/james/backends/cassandra/CassandraConfigurationTest.java
@@ -0,0 +1,225 @@
+/****************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one   *
+ * or more contributor license agreements.  See the NOTICE file *
+ * distributed with this work for additional information        *
+ * regarding copyright ownership.  The ASF licenses this file   *
+ * to you under the Apache License, Version 2.0 (the            *
+ * "License"); you may not use this file except in compliance   *
+ * with the License.  You may obtain a copy of the License at   *
+ *                                                              *
+ *   http://www.apache.org/licenses/LICENSE-2.0                 *
+ *                                                              *
+ * Unless required by applicable law or agreed to in writing,   *
+ * software distributed under the License is distributed on an  *
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY       *
+ * KIND, either express or implied.  See the License for the    *
+ * specific language governing permissions and limitations      *
+ * under the License.                                           *
+ ****************************************************************/
+
+package org.apache.james.backends.cassandra;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.ExpectedException;
+
+import nl.jqno.equalsverifier.EqualsVerifier;
+
+public class CassandraConfigurationTest {
+
+    @Rule
+    public ExpectedException expectedException = ExpectedException.none();
+
+    @Test
+    public void cassandraConfigurationShouldRespectBeanContract() {
+        EqualsVerifier.forClass(CassandraConfiguration.class).verify();
+    }
+
+    @Test
+    public void defaultBuilderShouldConstructDefaultConfiguration() {
+        assertThat(CassandraConfiguration.builder().build())
+            .isEqualTo(CassandraConfiguration.DEFAULT_CONFIGURATION);
+    }
+
+    @Test
+    public void aclMaxRetryShouldThrowOnNegativeValue() {
+        expectedException.expect(IllegalArgumentException.class);
+
+        CassandraConfiguration.builder()
+            .aclMaxRetry(-1);
+    }
+
+    @Test
+    public void aclMaxRetryShouldThrowOnZero() {
+        expectedException.expect(IllegalArgumentException.class);
+
+        CassandraConfiguration.builder()
+            .aclMaxRetry(0);
+    }
+
+    @Test
+    public void expungeChunkSizeShouldThrowOnNegativeValue() {
+        expectedException.expect(IllegalArgumentException.class);
+
+        CassandraConfiguration.builder()
+            .expungeChunkSize(-1);
+    }
+
+    @Test
+    public void expungeChunkSizeShouldThrowOnZero() {
+        expectedException.expect(IllegalArgumentException.class);
+
+        CassandraConfiguration.builder()
+            .expungeChunkSize(0);
+    }
+
+    @Test
+    public void messageReadChunkSizeShouldThrowOnNegativeValue() {
+        expectedException.expect(IllegalArgumentException.class);
+
+        CassandraConfiguration.builder()
+            .messageReadChunkSize(-1);
+    }
+
+    @Test
+    public void messageReadChunkSizeShouldThrowOnZero() {
+        expectedException.expect(IllegalArgumentException.class);
+
+        CassandraConfiguration.builder()
+            .messageReadChunkSize(0);
+    }
+
+    @Test
+    public void flagsUpdateChunkSizeShouldThrowOnNegativeValue() {
+        expectedException.expect(IllegalArgumentException.class);
+
+        CassandraConfiguration.builder()
+            .flagsUpdateChunkSize(-1);
+    }
+
+    @Test
+    public void flagsUpdateChunkSizeShouldThrowOnZero() {
+        expectedException.expect(IllegalArgumentException.class);
+
+        CassandraConfiguration.builder()
+            .flagsUpdateChunkSize(0);
+    }
+
+    @Test
+    public void flagsUpdateMessageIdMaxRetryShouldThrowOnNegativeValue() {
+        expectedException.expect(IllegalArgumentException.class);
+
+        CassandraConfiguration.builder()
+            .flagsUpdateMessageIdMaxRetry(-1);
+    }
+
+    @Test
+    public void flagsUpdateMessageIdMaxRetryShouldThrowOnZero() {
+        expectedException.expect(IllegalArgumentException.class);
+
+        CassandraConfiguration.builder()
+            .flagsUpdateMessageIdMaxRetry(0);
+    }
+
+    @Test
+    public void flagsUpdateMessageMaxRetryShouldThrowOnNegativeValue() {
+        expectedException.expect(IllegalArgumentException.class);
+
+        CassandraConfiguration.builder()
+            .flagsUpdateMessageMaxRetry(-1);
+    }
+
+    @Test
+    public void flagsUpdateMessageMaxRetryShouldThrowOnZero() {
+        expectedException.expect(IllegalArgumentException.class);
+
+        CassandraConfiguration.builder()
+            .flagsUpdateMessageMaxRetry(0);
+    }
+
+    @Test
+    public void fetchNextPageInAdvanceRowShouldThrowOnNegativeValue() {
+        expectedException.expect(IllegalArgumentException.class);
+
+        CassandraConfiguration.builder()
+            .fetchNextPageInAdvanceRow(-1);
+    }
+
+    @Test
+    public void fetchNextPageInAdvanceRowShouldThrowOnZero() {
+        expectedException.expect(IllegalArgumentException.class);
+
+        CassandraConfiguration.builder()
+            .fetchNextPageInAdvanceRow(0);
+    }
+
+    @Test
+    public void modSeqMaxRetryShouldThrowOnNegativeValue() {
+        expectedException.expect(IllegalArgumentException.class);
+
+        CassandraConfiguration.builder()
+            .modSeqMaxRetry(-1);
+    }
+
+    @Test
+    public void modSeqMaxRetryShouldThrowOnZero() {
+        expectedException.expect(IllegalArgumentException.class);
+
+        CassandraConfiguration.builder()
+            .modSeqMaxRetry(0);
+    }
+
+    @Test
+    public void uidMaxRetryShouldThrowOnNegativeValue() {
+        expectedException.expect(IllegalArgumentException.class);
+
+        CassandraConfiguration.builder()
+            .uidMaxRetry(-1);
+    }
+
+    @Test
+    public void uidMaxRetryShouldThrowOnZero() {
+        expectedException.expect(IllegalArgumentException.class);
+
+        CassandraConfiguration.builder()
+            .uidMaxRetry(0);
+    }
+
+    @Test
+    public void builderShouldCreateTheRightObject() {
+        int aclMaxRetry = 1;
+        int modSeqMaxRetry = 2;
+        int uidMaxRetry = 3;
+        int fetchNextPageInAdvanceRow = 4;
+        int flagsUpdateMessageMaxRetry = 5;
+        int flagsUpdateMessageIdMaxRetry = 6;
+        int flagsUpdateChunkSize = 7;
+        int messageReadChunkSize = 8;
+        int expungeChunkSize = 9;
+
+        CassandraConfiguration configuration = CassandraConfiguration.builder()
+            .aclMaxRetry(aclMaxRetry)
+            .modSeqMaxRetry(modSeqMaxRetry)
+            .uidMaxRetry(uidMaxRetry)
+            .fetchNextPageInAdvanceRow(fetchNextPageInAdvanceRow)
+            .flagsUpdateMessageMaxRetry(flagsUpdateMessageMaxRetry)
+            .flagsUpdateMessageIdMaxRetry(flagsUpdateMessageIdMaxRetry)
+            .flagsUpdateChunkSize(flagsUpdateChunkSize)
+            .messageReadChunkSize(messageReadChunkSize)
+            .expungeChunkSize(expungeChunkSize)
+            .build();
+
+        assertThat(configuration.getAclMaxRetry()).isEqualTo(aclMaxRetry);
+        
assertThat(configuration.getModSeqMaxRetry()).isEqualTo(modSeqMaxRetry);
+        assertThat(configuration.getUidMaxRetry()).isEqualTo(uidMaxRetry);
+        
assertThat(configuration.getFetchNextPageInAdvanceRow()).isEqualTo(fetchNextPageInAdvanceRow);
+        
assertThat(configuration.getFlagsUpdateMessageMaxRetry()).isEqualTo(flagsUpdateMessageMaxRetry);
+        
assertThat(configuration.getFlagsUpdateMessageIdMaxRetry()).isEqualTo(flagsUpdateMessageIdMaxRetry);
+        
assertThat(configuration.getFlagsUpdateChunkSize()).isEqualTo(flagsUpdateChunkSize);
+        
assertThat(configuration.getMessageReadChunkSize()).isEqualTo(messageReadChunkSize);
+        
assertThat(configuration.getExpungeChunkSize()).isEqualTo(expungeChunkSize);
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/james-project/blob/0daadc62/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/CassandraMailboxSessionMapperFactory.java
----------------------------------------------------------------------
diff --git 
a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/CassandraMailboxSessionMapperFactory.java
 
b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/CassandraMailboxSessionMapperFactory.java
index 36f536a..dc0d622 100644
--- 
a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/CassandraMailboxSessionMapperFactory.java
+++ 
b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/CassandraMailboxSessionMapperFactory.java
@@ -20,8 +20,9 @@
 package org.apache.james.mailbox.cassandra;
 
 import javax.inject.Inject;
-import javax.inject.Named;
 
+import org.apache.james.backends.cassandra.CassandraConfiguration;
+import org.apache.james.backends.cassandra.utils.CassandraUtils;
 import org.apache.james.mailbox.MailboxSession;
 import org.apache.james.mailbox.cassandra.mail.CassandraAnnotationMapper;
 import org.apache.james.mailbox.cassandra.mail.CassandraApplicableFlagDAO;
@@ -59,8 +60,6 @@ import com.datastax.driver.core.Session;
  * 
  */
 public class CassandraMailboxSessionMapperFactory extends 
MailboxSessionMapperFactory {
-    public static final Integer DEFAULT_MAX_RETRY = 1000;
-
     private final Session session;
     private final CassandraUidProvider uidProvider;
     private final CassandraModSeqProvider modSeqProvider;
@@ -74,15 +73,16 @@ public class CassandraMailboxSessionMapperFactory extends 
MailboxSessionMapperFa
     private final CassandraMailboxPathDAO mailboxPathDAO;
     private final CassandraFirstUnseenDAO firstUnseenDAO;
     private final CassandraApplicableFlagDAO applicableFlagDAO;
+    private CassandraUtils cassandraUtils;
+    private CassandraConfiguration cassandraConfiguration;
     private final CassandraDeletedMessageDAO deletedMessageDAO;
-    private int maxRetry;
 
     @Inject
     public CassandraMailboxSessionMapperFactory(CassandraUidProvider 
uidProvider, CassandraModSeqProvider modSeqProvider, Session session,
                                                 CassandraMessageDAO 
messageDAO, CassandraMessageIdDAO messageIdDAO, CassandraMessageIdToImapUidDAO 
imapUidDAO,
                                                 CassandraMailboxCounterDAO 
mailboxCounterDAO, CassandraMailboxRecentsDAO mailboxRecentsDAO, 
CassandraMailboxDAO mailboxDAO,
                                                 CassandraMailboxPathDAO 
mailboxPathDAO, CassandraFirstUnseenDAO firstUnseenDAO, 
CassandraApplicableFlagDAO applicableFlagDAO,
-                                                CassandraDeletedMessageDAO 
deletedMessageDAO, @Named(CassandraMailboxDAO.MAX_ACL_RETRY) Integer maxRetry) {
+                                                CassandraDeletedMessageDAO 
deletedMessageDAO, CassandraUtils cassandraUtils, CassandraConfiguration 
cassandraConfiguration) {
         this.uidProvider = uidProvider;
         this.modSeqProvider = modSeqProvider;
         this.session = session;
@@ -96,13 +96,14 @@ public class CassandraMailboxSessionMapperFactory extends 
MailboxSessionMapperFa
         this.firstUnseenDAO = firstUnseenDAO;
         this.deletedMessageDAO = deletedMessageDAO;
         this.applicableFlagDAO = applicableFlagDAO;
+        this.cassandraUtils = cassandraUtils;
+        this.cassandraConfiguration = cassandraConfiguration;
         this.indexTableHandler = new CassandraIndexTableHandler(
             mailboxRecentsDAO,
             mailboxCounterDAO,
             firstUnseenDAO,
             applicableFlagDAO,
             deletedMessageDAO);
-        this.maxRetry = maxRetry;
     }
 
     public CassandraMailboxSessionMapperFactory(
@@ -121,7 +122,8 @@ public class CassandraMailboxSessionMapperFactory extends 
MailboxSessionMapperFa
         CassandraDeletedMessageDAO deletedMesageDAO) {
 
         this(uidProvider, modSeqProvider, session, messageDAO, messageIdDAO, 
imapUidDAO, mailboxCounterDAO,
-             mailboxRecentsDAO, mailboxDAO, mailboxPathDAO, firstUnseenDAO, 
applicableFlagDAO, deletedMesageDAO, DEFAULT_MAX_RETRY);
+             mailboxRecentsDAO, mailboxDAO, mailboxPathDAO, firstUnseenDAO, 
applicableFlagDAO, deletedMesageDAO,
+            CassandraUtils.DEFAULT_CASSANDRA_UTILS, 
CassandraConfiguration.DEFAULT_CONFIGURATION);
     }
 
     @Override
@@ -130,7 +132,6 @@ public class CassandraMailboxSessionMapperFactory extends 
MailboxSessionMapperFa
                                           uidProvider,
                                           modSeqProvider,
                                           null,
-                                          maxRetry,
                                           (CassandraAttachmentMapper) 
createAttachmentMapper(mailboxSession),
                                           messageDAO,
                                           messageIdDAO,
@@ -140,19 +141,20 @@ public class CassandraMailboxSessionMapperFactory extends 
MailboxSessionMapperFa
                                           applicableFlagDAO,
                                           indexTableHandler,
                                           firstUnseenDAO,
-                                          deletedMessageDAO);
+                                          deletedMessageDAO,
+                                          cassandraConfiguration);
     }
 
     @Override
     public MessageIdMapper createMessageIdMapper(MailboxSession 
mailboxSession) throws MailboxException {
         return new CassandraMessageIdMapper(getMailboxMapper(mailboxSession), 
mailboxDAO,
                 (CassandraAttachmentMapper) 
getAttachmentMapper(mailboxSession),
-                imapUidDAO, messageIdDAO, messageDAO, indexTableHandler, 
modSeqProvider, mailboxSession);
+                imapUidDAO, messageIdDAO, messageDAO, indexTableHandler, 
modSeqProvider, mailboxSession, cassandraConfiguration);
     }
 
     @Override
     public MailboxMapper createMailboxMapper(MailboxSession mailboxSession) {
-        return new CassandraMailboxMapper(session, mailboxDAO, mailboxPathDAO, 
maxRetry);
+        return new CassandraMailboxMapper(session, mailboxDAO, mailboxPathDAO, 
cassandraConfiguration);
     }
 
     @Override
@@ -162,7 +164,7 @@ public class CassandraMailboxSessionMapperFactory extends 
MailboxSessionMapperFa
 
     @Override
     public SubscriptionMapper createSubscriptionMapper(MailboxSession 
mailboxSession) {
-        return new CassandraSubscriptionMapper(session);
+        return new CassandraSubscriptionMapper(session, cassandraUtils);
     }
 
     public ModSeqProvider getModSeqProvider() {
@@ -180,6 +182,6 @@ public class CassandraMailboxSessionMapperFactory extends 
MailboxSessionMapperFa
     @Override
     public AnnotationMapper createAnnotationMapper(MailboxSession 
mailboxSession)
             throws MailboxException {
-        return new CassandraAnnotationMapper(session);
+        return new CassandraAnnotationMapper(session, cassandraUtils);
     }
 }

http://git-wip-us.apache.org/repos/asf/james-project/blob/0daadc62/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/event/distributed/CassandraMailboxPathRegisterMapper.java
----------------------------------------------------------------------
diff --git 
a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/event/distributed/CassandraMailboxPathRegisterMapper.java
 
b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/event/distributed/CassandraMailboxPathRegisterMapper.java
index fd648f5..d1b83ef 100644
--- 
a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/event/distributed/CassandraMailboxPathRegisterMapper.java
+++ 
b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/event/distributed/CassandraMailboxPathRegisterMapper.java
@@ -44,11 +44,12 @@ public class CassandraMailboxPathRegisterMapper implements 
DistantMailboxPathReg
     private final Session session;
     private final CassandraTypesProvider typesProvider;
     private final int cassandraTimeOutInS;
+    private final CassandraUtils cassandraUtils;
     private final PreparedStatement insertStatement;
     private final PreparedStatement deleteStatement;
     private final PreparedStatement selectStatement;
 
-    public CassandraMailboxPathRegisterMapper(Session session, 
CassandraTypesProvider typesProvider, int cassandraTimeOutInS) {
+    public CassandraMailboxPathRegisterMapper(Session session, 
CassandraTypesProvider typesProvider, CassandraUtils cassandraUtils, int 
cassandraTimeOutInS) {
         this.session = session;
         this.typesProvider = typesProvider;
         this.cassandraTimeOutInS = cassandraTimeOutInS;
@@ -61,11 +62,12 @@ public class CassandraMailboxPathRegisterMapper implements 
DistantMailboxPathReg
             .and(eq(CassandraMailboxPathRegisterTable.TOPIC, bindMarker())));
         this.selectStatement = 
session.prepare(select().from(CassandraMailboxPathRegisterTable.TABLE_NAME)
             .where(eq(CassandraMailboxPathRegisterTable.MAILBOX_PATH, 
bindMarker())));
+        this.cassandraUtils = cassandraUtils;
     }
 
     @Override
     public Set<Topic> getTopics(MailboxPath mailboxPath) {
-        return 
CassandraUtils.convertToStream(session.execute(selectStatement.bind(buildUDTFromMailboxPath(mailboxPath))))
+        return 
cassandraUtils.convertToStream(session.execute(selectStatement.bind(buildUDTFromMailboxPath(mailboxPath))))
             .map(row -> new 
Topic(row.getString(CassandraMailboxPathRegisterTable.TOPIC)))
             .collect(Collectors.toSet());
     }

http://git-wip-us.apache.org/repos/asf/james-project/blob/0daadc62/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraACLMapper.java
----------------------------------------------------------------------
diff --git 
a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraACLMapper.java
 
b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraACLMapper.java
index 68698ca..7e1626d 100644
--- 
a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraACLMapper.java
+++ 
b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraACLMapper.java
@@ -29,6 +29,7 @@ import java.io.IOException;
 import java.util.Optional;
 import java.util.concurrent.CompletableFuture;
 
+import org.apache.james.backends.cassandra.CassandraConfiguration;
 import org.apache.james.backends.cassandra.utils.CassandraAsyncExecutor;
 import org.apache.james.backends.cassandra.utils.CassandraConstants;
 import org.apache.james.backends.cassandra.utils.FunctionRunnerWithRetry;
@@ -66,17 +67,16 @@ public class CassandraACLMapper {
 
     private static final Logger LOG = 
LoggerFactory.getLogger(CassandraACLMapper.class);
 
-    public CassandraACLMapper(CassandraId cassandraId, Session session, 
CassandraAsyncExecutor cassandraAsyncExecutor, int maxRetry) {
-        this(cassandraId, session, cassandraAsyncExecutor, maxRetry, () -> {});
+    public CassandraACLMapper(CassandraId cassandraId, Session session, 
CassandraAsyncExecutor cassandraAsyncExecutor, CassandraConfiguration 
cassandraConfiguration) {
+        this(cassandraId, session, cassandraAsyncExecutor, 
cassandraConfiguration, () -> {});
     }
 
-    public CassandraACLMapper(CassandraId cassandraId, Session session, 
CassandraAsyncExecutor cassandraAsyncExecutor, int maxRetry, CodeInjector 
codeInjector) {
-        Preconditions.checkArgument(maxRetry > 0);
+    public CassandraACLMapper(CassandraId cassandraId, Session session, 
CassandraAsyncExecutor cassandraAsyncExecutor, CassandraConfiguration 
cassandraConfiguration, CodeInjector codeInjector) {
         Preconditions.checkArgument(cassandraId != null);
         this.cassandraId = cassandraId;
         this.session = session;
         this.executor = cassandraAsyncExecutor;
-        this.maxRetry = maxRetry;
+        this.maxRetry = cassandraConfiguration.getAclMaxRetry();
         this.codeInjector = codeInjector;
     }
 

http://git-wip-us.apache.org/repos/asf/james-project/blob/0daadc62/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraAnnotationMapper.java
----------------------------------------------------------------------
diff --git 
a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraAnnotationMapper.java
 
b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraAnnotationMapper.java
index bf8da4f..073227b 100644
--- 
a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraAnnotationMapper.java
+++ 
b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraAnnotationMapper.java
@@ -28,13 +28,12 @@ import static 
com.datastax.driver.core.querybuilder.QueryBuilder.lte;
 import static com.datastax.driver.core.querybuilder.QueryBuilder.select;
 
 import java.util.List;
+import java.util.Optional;
 import java.util.Set;
 import java.util.stream.Collectors;
 import java.util.stream.Stream;
 
-import com.github.steveash.guavate.Guavate;
-import com.google.common.base.Ascii;
-import com.google.common.base.Optional;
+import org.apache.james.backends.cassandra.CassandraConfiguration;
 import org.apache.james.backends.cassandra.utils.CassandraUtils;
 import org.apache.james.mailbox.cassandra.CassandraId;
 import org.apache.james.mailbox.cassandra.table.CassandraAnnotationTable;
@@ -47,26 +46,30 @@ import 
org.apache.james.mailbox.store.transaction.NonTransactionalMapper;
 import com.datastax.driver.core.Row;
 import com.datastax.driver.core.Session;
 import com.datastax.driver.core.querybuilder.Select;
+import com.github.steveash.guavate.Guavate;
+import com.google.common.base.Ascii;
 import com.google.common.base.Preconditions;
 
 public class CassandraAnnotationMapper extends NonTransactionalMapper 
implements AnnotationMapper {
 
     private final Session session;
+    private final CassandraUtils cassandraUtils;
 
-    public CassandraAnnotationMapper(Session session) {
+    public CassandraAnnotationMapper(Session session, CassandraUtils 
cassandraUtils) {
         this.session = session;
+        this.cassandraUtils = cassandraUtils;
     }
 
     public List<MailboxAnnotation> getAllAnnotations(MailboxId mailboxId) {
         CassandraId cassandraId = (CassandraId)mailboxId;
-        return 
CassandraUtils.convertToStream(session.execute(getStoredAnnotationsQuery(cassandraId)))
+        return 
cassandraUtils.convertToStream(session.execute(getStoredAnnotationsQuery(cassandraId)))
             .map(this::toAnnotation)
             .collect(Collectors.toList());
     }
 
     public List<MailboxAnnotation> getAnnotationsByKeys(MailboxId mailboxId, 
Set<MailboxAnnotationKey> keys) {
         CassandraId cassandraId = (CassandraId)mailboxId;
-        return 
CassandraUtils.convertToStream(session.execute(getStoredAnnotationsQueryForKeys(cassandraId,
 keys)))
+        return 
cassandraUtils.convertToStream(session.execute(getStoredAnnotationsQueryForKeys(cassandraId,
 keys)))
             .map(this::toAnnotation)
             .collect(Collectors.toList());
     }
@@ -102,8 +105,11 @@ public class CassandraAnnotationMapper extends 
NonTransactionalMapper implements
     @Override
     public boolean exist(MailboxId mailboxId, MailboxAnnotation 
mailboxAnnotation) {
         CassandraId cassandraId = (CassandraId)mailboxId;
-        Optional<Row> row = 
Optional.fromNullable(session.execute(getStoredAnnotationsQueryByKey(cassandraId,
 mailboxAnnotation.getKey().asString()))
-            .one());
+        Optional<Row> row = Optional.ofNullable(
+            session.execute(
+                getStoredAnnotationsQueryByKey(cassandraId,
+                    mailboxAnnotation.getKey().asString()))
+                .one());
         return row.isPresent();
     }
 
@@ -146,13 +152,13 @@ public class CassandraAnnotationMapper extends 
NonTransactionalMapper implements
     }
 
     private Stream<MailboxAnnotation> 
getAnnotationsByKeyWithAllDepth(CassandraId mailboxId, MailboxAnnotationKey 
key) {
-        return 
CassandraUtils.convertToStream(session.execute(getStoredAnnotationsQueryLikeKey(mailboxId,
 key.asString())))
+        return 
cassandraUtils.convertToStream(session.execute(getStoredAnnotationsQueryLikeKey(mailboxId,
 key.asString())))
             .map(this::toAnnotation)
             .filter(annotation -> 
key.isAncestorOrIsEqual(annotation.getKey()));
     }
 
     private Stream<MailboxAnnotation> 
getAnnotationsByKeyWithOneDepth(CassandraId mailboxId, MailboxAnnotationKey 
key) {
-        return 
CassandraUtils.convertToStream(session.execute(getStoredAnnotationsQueryLikeKey(mailboxId,
 key.asString())))
+        return 
cassandraUtils.convertToStream(session.execute(getStoredAnnotationsQueryLikeKey(mailboxId,
 key.asString())))
             .map(this::toAnnotation)
             .filter(annotation -> key.isParentOrIsEqual(annotation.getKey()));
     }

http://git-wip-us.apache.org/repos/asf/james-project/blob/0daadc62/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraDeletedMessageDAO.java
----------------------------------------------------------------------
diff --git 
a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraDeletedMessageDAO.java
 
b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraDeletedMessageDAO.java
index 10dabc6..c86d1e0 100644
--- 
a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraDeletedMessageDAO.java
+++ 
b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraDeletedMessageDAO.java
@@ -43,6 +43,8 @@ import org.apache.james.mailbox.cassandra.CassandraId;
 
 import com.datastax.driver.core.PreparedStatement;
 import com.datastax.driver.core.Session;
+import com.google.common.annotations.VisibleForTesting;
+
 import org.apache.james.mailbox.model.MessageRange;
 
 public class CassandraDeletedMessageDAO {
@@ -57,9 +59,10 @@ public class CassandraDeletedMessageDAO {
     private final PreparedStatement selectOneUidStatement;
     private final PreparedStatement selectBetweenUidStatement;
     private final PreparedStatement selectFromUidStatement;
+    private final CassandraUtils cassandraUtils;
 
     @Inject
-    public CassandraDeletedMessageDAO(Session session) {
+    public CassandraDeletedMessageDAO(Session session, CassandraUtils 
cassandraUtils) {
         this.cassandraAsyncExecutor = new CassandraAsyncExecutor(session);
         this.addStatement = prepareAddStatement(session);
         this.deleteStatement = prepareDeleteStatement(session);
@@ -67,6 +70,12 @@ public class CassandraDeletedMessageDAO {
         this.selectOneUidStatement = prepareOneUidStatement(session);
         this.selectBetweenUidStatement = prepareBetweenUidStatement(session);
         this.selectFromUidStatement = prepareFromUidStatement(session);
+        this.cassandraUtils = cassandraUtils;
+    }
+
+    @VisibleForTesting
+    public CassandraDeletedMessageDAO(Session session) {
+        this(session, CassandraUtils.DEFAULT_CASSANDRA_UTILS);
     }
 
     private PreparedStatement prepareAllUidStatement(Session session) {
@@ -125,7 +134,7 @@ public class CassandraDeletedMessageDAO {
 
     public CompletableFuture<Stream<MessageUid>> 
retrieveDeletedMessage(CassandraId cassandraId, MessageRange range) {
         return retrieveResultSetOfDeletedMessage(cassandraId, range)
-            .thenApply(CassandraDeletedMessageDAO::resultSetToStream);
+            .thenApply(this::resultSetToStream);
     }
 
     private CompletableFuture<ResultSet> 
retrieveResultSetOfDeletedMessage(CassandraId cassandraId, MessageRange range) {
@@ -143,8 +152,8 @@ public class CassandraDeletedMessageDAO {
         throw new UnsupportedOperationException();
     }
 
-    private static Stream<MessageUid> resultSetToStream(ResultSet resultSet) {
-        return CassandraUtils.convertToStream(resultSet)
+    private Stream<MessageUid> resultSetToStream(ResultSet resultSet) {
+        return cassandraUtils.convertToStream(resultSet)
             .map(row ->
                 MessageUid.of(row.getLong(UID)));
     }

http://git-wip-us.apache.org/repos/asf/james-project/blob/0daadc62/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMailboxDAO.java
----------------------------------------------------------------------
diff --git 
a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMailboxDAO.java
 
b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMailboxDAO.java
index 3f6f86c..ca6cfd7 100644
--- 
a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMailboxDAO.java
+++ 
b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMailboxDAO.java
@@ -39,6 +39,7 @@ import java.util.stream.Stream;
 import javax.inject.Inject;
 import javax.inject.Named;
 
+import org.apache.james.backends.cassandra.CassandraConfiguration;
 import org.apache.james.backends.cassandra.init.CassandraTypesProvider;
 import org.apache.james.backends.cassandra.utils.CassandraAsyncExecutor;
 import org.apache.james.backends.cassandra.utils.CassandraUtils;
@@ -55,14 +56,15 @@ import com.datastax.driver.core.PreparedStatement;
 import com.datastax.driver.core.Row;
 import com.datastax.driver.core.Session;
 import com.datastax.driver.core.querybuilder.QueryBuilder;
+import com.google.common.annotations.VisibleForTesting;
 
 public class CassandraMailboxDAO {
 
-    public static final String MAX_ACL_RETRY = "maxAclRetry";
     private final CassandraAsyncExecutor executor;
     private final MailboxBaseTupleUtil mailboxBaseTupleUtil;
     private final Session session;
-    private final int maxAclRetry;
+    private final CassandraUtils cassandraUtils;
+    private CassandraConfiguration cassandraConfiguration;
     private final PreparedStatement readStatement;
     private final PreparedStatement listStatement;
     private final PreparedStatement deleteStatement;
@@ -70,7 +72,7 @@ public class CassandraMailboxDAO {
     private final PreparedStatement updateStatement;
 
     @Inject
-    public CassandraMailboxDAO(Session session, CassandraTypesProvider 
typesProvider, @Named(MAX_ACL_RETRY) Integer maxAclRetry) {
+    public CassandraMailboxDAO(Session session, CassandraTypesProvider 
typesProvider, CassandraUtils cassandraUtils, CassandraConfiguration 
cassandraConfiguration) {
         this.executor = new CassandraAsyncExecutor(session);
         this.mailboxBaseTupleUtil = new MailboxBaseTupleUtil(typesProvider);
         this.session = session;
@@ -79,7 +81,13 @@ public class CassandraMailboxDAO {
         this.deleteStatement = prepareDelete(session);
         this.listStatement = prepareList(session);
         this.readStatement = prepareRead(session);
-        this.maxAclRetry = maxAclRetry;
+        this.cassandraUtils = cassandraUtils;
+        this.cassandraConfiguration = cassandraConfiguration;
+    }
+
+    @VisibleForTesting
+    public CassandraMailboxDAO(Session session, CassandraTypesProvider 
typesProvider) {
+        this(session, typesProvider, CassandraUtils.DEFAULT_CASSANDRA_UTILS, 
CassandraConfiguration.DEFAULT_CONFIGURATION);
     }
 
     private PreparedStatement prepareInsert(Session session) {
@@ -140,7 +148,7 @@ public class CassandraMailboxDAO {
     }
 
     private CompletableFuture<Optional<SimpleMailbox>> mailbox(CassandraId 
cassandraId, CompletableFuture<Optional<Row>> rowFuture) {
-        CompletableFuture<MailboxACL> aclCompletableFuture = new 
CassandraACLMapper(cassandraId, session, executor, maxAclRetry).getACL();
+        CompletableFuture<MailboxACL> aclCompletableFuture = new 
CassandraACLMapper(cassandraId, session, executor, 
cassandraConfiguration).getACL();
         return rowFuture.thenApply(rowOptional -> 
rowOptional.map(this::mailboxFromRow))
             .thenApply(mailboxOptional -> {
                 mailboxOptional.ifPresent(mailbox -> 
mailbox.setMailboxId(cassandraId));
@@ -163,7 +171,7 @@ public class CassandraMailboxDAO {
 
     public CompletableFuture<Stream<SimpleMailbox>> retrieveAllMailboxes() {
         return executor.execute(listStatement.bind())
-            .thenApply(CassandraUtils::convertToStream)
+            .thenApply(cassandraUtils::convertToStream)
             .thenApply(stream -> stream.map(this::toMailboxWithId))
             .thenCompose(stream -> 
CompletableFutureUtil.allOf(stream.map(this::toMailboxWithAclFuture)));
     }
@@ -176,7 +184,7 @@ public class CassandraMailboxDAO {
 
     private CompletableFuture<SimpleMailbox> 
toMailboxWithAclFuture(SimpleMailbox mailbox) {
         CassandraId cassandraId = (CassandraId) mailbox.getMailboxId();
-        return new CassandraACLMapper(cassandraId, session, executor, 
maxAclRetry).getACL()
+        return new CassandraACLMapper(cassandraId, session, executor, 
cassandraConfiguration).getACL()
             .thenApply(acl -> {
                 mailbox.setACL(acl);
                 return mailbox;

http://git-wip-us.apache.org/repos/asf/james-project/blob/0daadc62/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMailboxMapper.java
----------------------------------------------------------------------
diff --git 
a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMailboxMapper.java
 
b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMailboxMapper.java
index 6c0d484..ef0362a 100644
--- 
a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMailboxMapper.java
+++ 
b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMailboxMapper.java
@@ -29,6 +29,7 @@ import java.util.regex.Pattern;
 import java.util.stream.Collectors;
 
 import org.apache.commons.lang3.StringUtils;
+import org.apache.james.backends.cassandra.CassandraConfiguration;
 import org.apache.james.backends.cassandra.utils.CassandraAsyncExecutor;
 import org.apache.james.mailbox.cassandra.CassandraId;
 import org.apache.james.mailbox.exception.MailboxException;
@@ -55,18 +56,18 @@ public class CassandraMailboxMapper implements 
MailboxMapper {
     public static final String VALUES_MAY_NOT_BE_LARGER_THAN_64_K = "Index 
expression values may not be larger than 64K";
     public static final String CLUSTERING_COLUMNS_IS_TOO_LONG = "The sum of 
all clustering columns is too long";
 
-    private final int maxRetry;
     private final CassandraAsyncExecutor cassandraAsyncExecutor;
     private final CassandraMailboxPathDAO mailboxPathDAO;
     private final CassandraMailboxDAO mailboxDAO;
     private final Session session;
+    private final CassandraConfiguration cassandraConfiguration;
 
-    public CassandraMailboxMapper(Session session, CassandraMailboxDAO 
mailboxDAO, CassandraMailboxPathDAO mailboxPathDAO, int maxRetry) {
-        this.maxRetry = maxRetry;
+    public CassandraMailboxMapper(Session session, CassandraMailboxDAO 
mailboxDAO, CassandraMailboxPathDAO mailboxPathDAO, CassandraConfiguration 
cassandraConfiguration) {
         this.cassandraAsyncExecutor = new CassandraAsyncExecutor(session);
         this.mailboxDAO = mailboxDAO;
         this.mailboxPathDAO = mailboxPathDAO;
         this.session = session;
+        this.cassandraConfiguration = cassandraConfiguration;
     }
 
     @Override
@@ -193,7 +194,7 @@ public class CassandraMailboxMapper implements 
MailboxMapper {
     @Override
     public void updateACL(Mailbox mailbox, MailboxACL.MailboxACLCommand 
mailboxACLCommand) throws MailboxException {
         CassandraId cassandraId = (CassandraId) mailbox.getMailboxId();
-        new CassandraACLMapper(cassandraId, session, cassandraAsyncExecutor, 
maxRetry).updateACL(mailboxACLCommand);
+        new CassandraACLMapper(cassandraId, session, cassandraAsyncExecutor, 
cassandraConfiguration).updateACL(mailboxACLCommand);
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/james-project/blob/0daadc62/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMailboxPathDAO.java
----------------------------------------------------------------------
diff --git 
a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMailboxPathDAO.java
 
b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMailboxPathDAO.java
index 552e010..15d4a94 100644
--- 
a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMailboxPathDAO.java
+++ 
b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMailboxPathDAO.java
@@ -47,6 +47,7 @@ import com.datastax.driver.core.PreparedStatement;
 import com.datastax.driver.core.Row;
 import com.datastax.driver.core.Session;
 import com.datastax.driver.core.querybuilder.QueryBuilder;
+import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Objects;
 
 public class CassandraMailboxPathDAO {
@@ -87,21 +88,28 @@ public class CassandraMailboxPathDAO {
 
     private final CassandraAsyncExecutor cassandraAsyncExecutor;
     private final MailboxBaseTupleUtil mailboxBaseTupleUtil;
+    private final CassandraUtils cassandraUtils;
     private final PreparedStatement delete;
     private final PreparedStatement insert;
     private final PreparedStatement select;
     private final PreparedStatement selectAll;
 
     @Inject
-    public CassandraMailboxPathDAO(Session session, CassandraTypesProvider 
typesProvider) {
+    public CassandraMailboxPathDAO(Session session, CassandraTypesProvider 
typesProvider, CassandraUtils cassandraUtils) {
         this.cassandraAsyncExecutor = new CassandraAsyncExecutor(session);
         this.mailboxBaseTupleUtil = new MailboxBaseTupleUtil(typesProvider);
+        this.cassandraUtils = cassandraUtils;
         this.insert = prepareInsert(session);
         this.delete = prepareDelete(session);
         this.select = prepareSelect(session);
         this.selectAll = prepareSelectAll(session);
     }
 
+    @VisibleForTesting
+    public CassandraMailboxPathDAO(Session session, CassandraTypesProvider 
typesProvider) {
+        this(session, typesProvider, CassandraUtils.DEFAULT_CASSANDRA_UTILS);
+    }
+
     private PreparedStatement prepareDelete(Session session) {
         return session.prepare(QueryBuilder.delete()
             .from(TABLE_NAME)
@@ -145,7 +153,7 @@ public class CassandraMailboxPathDAO {
         return cassandraAsyncExecutor.execute(
             selectAll.bind()
                 .setUDTValue(NAMESPACE_AND_USER, 
mailboxBaseTupleUtil.createMailboxBaseUDT(namespace, user)))
-            .thenApply(resultSet -> 
CassandraUtils.convertToStream(resultSet).map(this::fromRowToCassandraIdAndPath));
+            .thenApply(resultSet -> 
cassandraUtils.convertToStream(resultSet).map(this::fromRowToCassandraIdAndPath));
     }
 
     private CassandraIdAndPath fromRowToCassandraIdAndPath(Row row) {

http://git-wip-us.apache.org/repos/asf/james-project/blob/0daadc62/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMailboxRecentsDAO.java
----------------------------------------------------------------------
diff --git 
a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMailboxRecentsDAO.java
 
b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMailboxRecentsDAO.java
index f4eddf9..f3264f4 100644
--- 
a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMailboxRecentsDAO.java
+++ 
b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMailboxRecentsDAO.java
@@ -39,6 +39,7 @@ import 
org.apache.james.mailbox.cassandra.table.CassandraMailboxRecentsTable;
 import com.datastax.driver.core.BoundStatement;
 import com.datastax.driver.core.PreparedStatement;
 import com.datastax.driver.core.Session;
+import com.google.common.annotations.VisibleForTesting;
 
 public class CassandraMailboxRecentsDAO {
 
@@ -46,13 +47,20 @@ public class CassandraMailboxRecentsDAO {
     private final PreparedStatement readStatement;
     private final PreparedStatement deleteStatement;
     private final PreparedStatement addStatement;
+    private CassandraUtils cassandraUtils;
 
     @Inject
-    public CassandraMailboxRecentsDAO(Session session) {
+    public CassandraMailboxRecentsDAO(Session session, CassandraUtils 
cassandraUtils) {
         cassandraAsyncExecutor = new CassandraAsyncExecutor(session);
         readStatement = createReadStatement(session);
         deleteStatement = createDeleteStatement(session);
         addStatement = createAddStatement(session);
+        this.cassandraUtils = cassandraUtils;
+    }
+
+    @VisibleForTesting
+    public CassandraMailboxRecentsDAO(Session session) {
+        this(session, CassandraUtils.DEFAULT_CASSANDRA_UTILS);
     }
 
     private PreparedStatement createReadStatement(Session session) {
@@ -79,7 +87,7 @@ public class CassandraMailboxRecentsDAO {
 
     public CompletableFuture<Stream<MessageUid>> 
getRecentMessageUidsInMailbox(CassandraId mailboxId) {
         return cassandraAsyncExecutor.execute(bindWithMailbox(mailboxId, 
readStatement))
-            .thenApply(CassandraUtils::convertToStream)
+            .thenApply(cassandraUtils::convertToStream)
             .thenApply(stream -> stream.map(row -> 
row.getLong(CassandraMailboxRecentsTable.RECENT_MESSAGE_UID)))
             .thenApply(stream -> stream.map(MessageUid::of));
     }

http://git-wip-us.apache.org/repos/asf/james-project/blob/0daadc62/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageDAO.java
----------------------------------------------------------------------
diff --git 
a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageDAO.java
 
b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageDAO.java
index b5101c3..9dd3d21 100644
--- 
a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageDAO.java
+++ 
b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageDAO.java
@@ -55,6 +55,7 @@ import javax.mail.Flags;
 import javax.mail.util.SharedByteArrayInputStream;
 
 import org.apache.commons.lang3.tuple.Pair;
+import org.apache.james.backends.cassandra.CassandraConfiguration;
 import org.apache.james.backends.cassandra.init.CassandraTypesProvider;
 import org.apache.james.backends.cassandra.utils.CassandraAsyncExecutor;
 import org.apache.james.mailbox.MessageUid;
@@ -93,8 +94,6 @@ import com.google.common.io.ByteStreams;
 import com.google.common.primitives.Bytes;
 
 public class CassandraMessageDAO {
-
-    public static final int CHUNK_SIZE_ON_READ = 100;
     private final CassandraAsyncExecutor cassandraAsyncExecutor;
     private final CassandraTypesProvider typesProvider;
     private final PreparedStatement insert;
@@ -103,9 +102,10 @@ public class CassandraMessageDAO {
     private final PreparedStatement selectHeaders;
     private final PreparedStatement selectFields;
     private final PreparedStatement selectBody;
+    private final CassandraConfiguration cassandraConfiguration;
 
     @Inject
-    public CassandraMessageDAO(Session session, CassandraTypesProvider 
typesProvider) {
+    public CassandraMessageDAO(Session session, CassandraTypesProvider 
typesProvider, CassandraConfiguration cassandraConfiguration) {
         this.cassandraAsyncExecutor = new CassandraAsyncExecutor(session);
         this.typesProvider = typesProvider;
         this.insert = prepareInsert(session);
@@ -114,6 +114,12 @@ public class CassandraMessageDAO {
         this.selectHeaders = prepareSelect(session, HEADERS);
         this.selectFields = prepareSelect(session, FIELDS);
         this.selectBody = prepareSelect(session, BODY);
+        this.cassandraConfiguration = cassandraConfiguration;
+    }
+
+    @VisibleForTesting
+    public CassandraMessageDAO(Session session, CassandraTypesProvider 
typesProvider) {
+        this(session, typesProvider, 
CassandraConfiguration.DEFAULT_CONFIGURATION);
     }
 
     private PreparedStatement prepareSelect(Session session, String[] fields) {
@@ -193,7 +199,7 @@ public class CassandraMessageDAO {
     public CompletableFuture<Stream<Pair<MessageWithoutAttachment, 
Stream<MessageAttachmentRepresentation>>>> 
retrieveMessages(List<ComposedMessageIdWithMetaData> messageIds, FetchType 
fetchType, Optional<Integer> limit) {
         return CompletableFutureUtil.chainAll(
             getLimitedIdStream(messageIds.stream().distinct(), limit)
-                .collect(JamesCollectors.chunker(CHUNK_SIZE_ON_READ)),
+                
.collect(JamesCollectors.chunker(cassandraConfiguration.getMessageReadChunkSize())),
             ids -> FluentFutureStream.of(
                 ids.stream()
                     .map(id -> retrieveRow(id, fetchType)

http://git-wip-us.apache.org/repos/asf/james-project/blob/0daadc62/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageIdDAO.java
----------------------------------------------------------------------
diff --git 
a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageIdDAO.java
 
b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageIdDAO.java
index e7d5a34..6f9c265 100644
--- 
a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageIdDAO.java
+++ 
b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageIdDAO.java
@@ -65,6 +65,7 @@ import com.datastax.driver.core.ResultSet;
 import com.datastax.driver.core.Row;
 import com.datastax.driver.core.Session;
 import com.datastax.driver.core.querybuilder.QueryBuilder;
+import com.google.common.annotations.VisibleForTesting;
 import com.google.common.collect.ImmutableSet;
 
 public class CassandraMessageIdDAO {
@@ -80,10 +81,11 @@ public class CassandraMessageIdDAO {
     private final PreparedStatement selectAllUids;
     private final PreparedStatement selectUidGte;
     private final PreparedStatement selectUidRange;
+    private CassandraUtils cassandraUtils;
     private final PreparedStatement update;
 
     @Inject
-    public CassandraMessageIdDAO(Session session, CassandraMessageId.Factory 
messageIdFactory) {
+    public CassandraMessageIdDAO(Session session, CassandraMessageId.Factory 
messageIdFactory, CassandraUtils cassandraUtils) {
         this.cassandraAsyncExecutor = new CassandraAsyncExecutor(session);
         this.messageIdFactory = messageIdFactory;
         this.delete = prepareDelete(session);
@@ -93,6 +95,12 @@ public class CassandraMessageIdDAO {
         this.selectAllUids = prepareSelectAllUids(session);
         this.selectUidGte = prepareSelectUidGte(session);
         this.selectUidRange = prepareSelectUidRange(session);
+        this.cassandraUtils = cassandraUtils;
+    }
+
+    @VisibleForTesting
+    public CassandraMessageIdDAO(Session session, CassandraMessageId.Factory 
messageIdFactory) {
+        this(session, messageIdFactory, 
CassandraUtils.DEFAULT_CASSANDRA_UTILS);
     }
 
     private PreparedStatement prepareDelete(Session session) {
@@ -253,7 +261,7 @@ public class CassandraMessageIdDAO {
 
     private CompletableFuture<Stream<ComposedMessageIdWithMetaData>> 
toMessageIds(CompletableFuture<ResultSet> completableFuture) {
         return completableFuture
-            .thenApply(resultSet -> CassandraUtils.convertToStream(resultSet)
+            .thenApply(resultSet -> cassandraUtils.convertToStream(resultSet)
                 .map(this::fromRowToComposedMessageIdWithFlags));
     }
 

http://git-wip-us.apache.org/repos/asf/james-project/blob/0daadc62/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageIdMapper.java
----------------------------------------------------------------------
diff --git 
a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageIdMapper.java
 
b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageIdMapper.java
index bf3dc1d..91e7b3a 100644
--- 
a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageIdMapper.java
+++ 
b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageIdMapper.java
@@ -29,6 +29,7 @@ import java.util.stream.Stream;
 import javax.mail.Flags;
 
 import org.apache.commons.lang3.tuple.Pair;
+import org.apache.james.backends.cassandra.CassandraConfiguration;
 import org.apache.james.backends.cassandra.utils.FunctionRunnerWithRetry;
 import 
org.apache.james.backends.cassandra.utils.LightweightTransactionException;
 import org.apache.james.mailbox.MailboxSession;
@@ -60,8 +61,6 @@ import com.github.steveash.guavate.Guavate;
 import com.google.common.base.Throwables;
 
 public class CassandraMessageIdMapper implements MessageIdMapper {
-
-    private static final int MAX_RETRY = 1000;
     private static final Logger LOGGER = 
LoggerFactory.getLogger(CassandraMessageIdMapper.class);
 
     private final MailboxMapper mailboxMapper;
@@ -73,10 +72,11 @@ public class CassandraMessageIdMapper implements 
MessageIdMapper {
     private final ModSeqProvider modSeqProvider;
     private final MailboxSession mailboxSession;
     private final AttachmentLoader attachmentLoader;
+    private final CassandraConfiguration cassandraConfiguration;
 
     public CassandraMessageIdMapper(MailboxMapper mailboxMapper, 
CassandraMailboxDAO mailboxDAO, CassandraAttachmentMapper attachmentMapper,
                                     CassandraMessageIdToImapUidDAO imapUidDAO, 
CassandraMessageIdDAO messageIdDAO, CassandraMessageDAO messageDAO,
-                                    CassandraIndexTableHandler 
indexTableHandler, ModSeqProvider modSeqProvider, MailboxSession 
mailboxSession) {
+                                    CassandraIndexTableHandler 
indexTableHandler, ModSeqProvider modSeqProvider, MailboxSession 
mailboxSession, CassandraConfiguration cassandraConfiguration) {
         this.mailboxMapper = mailboxMapper;
         this.mailboxDAO = mailboxDAO;
         this.imapUidDAO = imapUidDAO;
@@ -86,6 +86,7 @@ public class CassandraMessageIdMapper implements 
MessageIdMapper {
         this.modSeqProvider = modSeqProvider;
         this.mailboxSession = mailboxSession;
         this.attachmentLoader = new AttachmentLoader(attachmentMapper);
+        this.cassandraConfiguration = cassandraConfiguration;
     }
 
     @Override
@@ -223,7 +224,7 @@ public class CassandraMessageIdMapper implements 
MessageIdMapper {
 
     private Stream<Pair<MailboxId, UpdatedFlags>> flagsUpdateWithRetry(Flags 
newState, MessageManager.FlagsUpdateMode updateMode, MailboxId mailboxId, 
MessageId messageId) {
         try {
-            Pair<Flags, ComposedMessageIdWithMetaData> pair = new 
FunctionRunnerWithRetry(MAX_RETRY)
+            Pair<Flags, ComposedMessageIdWithMetaData> pair = new 
FunctionRunnerWithRetry(cassandraConfiguration.getFlagsUpdateMessageIdMaxRetry())
                 .executeAndRetrieveObject(() -> tryFlagsUpdate(newState, 
updateMode, mailboxId, messageId));
             ComposedMessageIdWithMetaData composedMessageIdWithMetaData = 
pair.getRight();
             Flags oldFlags = pair.getLeft();

http://git-wip-us.apache.org/repos/asf/james-project/blob/0daadc62/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageIdToImapUidDAO.java
----------------------------------------------------------------------
diff --git 
a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageIdToImapUidDAO.java
 
b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageIdToImapUidDAO.java
index 3e1bbbd..38a02d6 100644
--- 
a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageIdToImapUidDAO.java
+++ 
b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageIdToImapUidDAO.java
@@ -62,6 +62,7 @@ import com.datastax.driver.core.ResultSet;
 import com.datastax.driver.core.Row;
 import com.datastax.driver.core.Session;
 import com.datastax.driver.core.querybuilder.QueryBuilder;
+import com.google.common.annotations.VisibleForTesting;
 import com.google.common.collect.ImmutableSet;
 
 public class CassandraMessageIdToImapUidDAO {
@@ -75,9 +76,10 @@ public class CassandraMessageIdToImapUidDAO {
     private final PreparedStatement update;
     private final PreparedStatement selectAll;
     private final PreparedStatement select;
+    private CassandraUtils cassandraUtils;
 
     @Inject
-    public CassandraMessageIdToImapUidDAO(Session session, 
CassandraMessageId.Factory messageIdFactory) {
+    public CassandraMessageIdToImapUidDAO(Session session, 
CassandraMessageId.Factory messageIdFactory, CassandraUtils cassandraUtils) {
         this.cassandraAsyncExecutor = new CassandraAsyncExecutor(session);
         this.messageIdFactory = messageIdFactory;
         this.delete = prepareDelete(session);
@@ -85,6 +87,12 @@ public class CassandraMessageIdToImapUidDAO {
         this.update = prepareUpdate(session);
         this.selectAll = prepareSelectAll(session);
         this.select = prepareSelect(session);
+        this.cassandraUtils = cassandraUtils;
+    }
+
+    @VisibleForTesting
+    public CassandraMessageIdToImapUidDAO(Session session, 
CassandraMessageId.Factory messageIdFactory) {
+        this(session, messageIdFactory, 
CassandraUtils.DEFAULT_CASSANDRA_UTILS);
     }
 
     private PreparedStatement prepareDelete(Session session) {
@@ -185,7 +193,7 @@ public class CassandraMessageIdToImapUidDAO {
 
     public CompletableFuture<Stream<ComposedMessageIdWithMetaData>> 
retrieve(CassandraMessageId messageId, Optional<CassandraId> mailboxId) {
         return selectStatement(messageId, mailboxId)
-                .thenApply(resultSet -> 
CassandraUtils.convertToStream(resultSet)
+                .thenApply(resultSet -> 
cassandraUtils.convertToStream(resultSet)
                         .map(this::toComposedMessageIdWithMetadata));
     }
 

http://git-wip-us.apache.org/repos/asf/james-project/blob/0daadc62/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageMapper.java
----------------------------------------------------------------------
diff --git 
a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageMapper.java
 
b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageMapper.java
index 2a6e74d..55bf43d 100644
--- 
a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageMapper.java
+++ 
b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageMapper.java
@@ -32,6 +32,7 @@ import javax.mail.Flags;
 import javax.mail.Flags.Flag;
 
 import org.apache.commons.lang3.tuple.Pair;
+import org.apache.james.backends.cassandra.CassandraConfiguration;
 import org.apache.james.mailbox.ApplicableFlagBuilder;
 import org.apache.james.mailbox.FlagsBuilder;
 import org.apache.james.mailbox.MailboxSession;
@@ -67,14 +68,11 @@ public class CassandraMessageMapper implements 
MessageMapper {
         .count(0L)
         .unseen(0L)
         .build();
-    public static final int EXPUNGE_BATCH_SIZE = 100;
-    public static final int UPDATE_FLAGS_BATCH_SIZE = 20;
     public static final Logger LOGGER = 
LoggerFactory.getLogger(CassandraMessageMapper.class);
 
     private final CassandraModSeqProvider modSeqProvider;
     private final MailboxSession mailboxSession;
     private final CassandraUidProvider uidProvider;
-    private final int maxRetries;
     private final CassandraMessageDAO messageDAO;
     private final CassandraMessageIdDAO messageIdDAO;
     private final CassandraMessageIdToImapUidDAO imapUidDAO;
@@ -85,16 +83,16 @@ public class CassandraMessageMapper implements 
MessageMapper {
     private final CassandraFirstUnseenDAO firstUnseenDAO;
     private final AttachmentLoader attachmentLoader;
     private final CassandraDeletedMessageDAO deletedMessageDAO;
+    private final CassandraConfiguration cassandraConfiguration;
 
     public CassandraMessageMapper(CassandraUidProvider uidProvider, 
CassandraModSeqProvider modSeqProvider,
-                                  MailboxSession mailboxSession, int 
maxRetries, CassandraAttachmentMapper attachmentMapper,
+                                  MailboxSession mailboxSession, 
CassandraAttachmentMapper attachmentMapper,
                                   CassandraMessageDAO messageDAO, 
CassandraMessageIdDAO messageIdDAO, CassandraMessageIdToImapUidDAO imapUidDAO,
                                   CassandraMailboxCounterDAO 
mailboxCounterDAO, CassandraMailboxRecentsDAO mailboxRecentDAO, 
CassandraApplicableFlagDAO applicableFlagDAO,
-                                  CassandraIndexTableHandler 
indexTableHandler, CassandraFirstUnseenDAO firstUnseenDAO, 
CassandraDeletedMessageDAO deletedMessageDAO) {
+                                  CassandraIndexTableHandler 
indexTableHandler, CassandraFirstUnseenDAO firstUnseenDAO, 
CassandraDeletedMessageDAO deletedMessageDAO, CassandraConfiguration 
cassandraConfiguration) {
         this.uidProvider = uidProvider;
         this.modSeqProvider = modSeqProvider;
         this.mailboxSession = mailboxSession;
-        this.maxRetries = maxRetries;
         this.messageDAO = messageDAO;
         this.messageIdDAO = messageIdDAO;
         this.imapUidDAO = imapUidDAO;
@@ -105,6 +103,7 @@ public class CassandraMessageMapper implements 
MessageMapper {
         this.attachmentLoader = new AttachmentLoader(attachmentMapper);
         this.applicableFlagDAO = applicableFlagDAO;
         this.deletedMessageDAO = deletedMessageDAO;
+        this.cassandraConfiguration = cassandraConfiguration;
     }
 
     @Override
@@ -230,7 +229,7 @@ public class CassandraMessageMapper implements 
MessageMapper {
 
         return deletedMessageDAO.retrieveDeletedMessage(mailboxId, 
messageRange)
             .join()
-            .collect(JamesCollectors.chunker(EXPUNGE_BATCH_SIZE))
+            
.collect(JamesCollectors.chunker(cassandraConfiguration.getExpungeChunkSize()))
             .map(uidChunk -> expungeUidChunk(mailboxId, uidChunk))
             .flatMap(CompletableFuture::join)
             .collect(Guavate.toImmutableMap(MailboxMessage::getUid, 
SimpleMessageMetaData::new));
@@ -303,7 +302,7 @@ public class CassandraMessageMapper implements 
MessageMapper {
     private FlagsUpdateStageResult handleUpdatesStagedRetry(CassandraId 
mailboxId, FlagsUpdateCalculator flagUpdateCalculator, FlagsUpdateStageResult 
firstResult) {
         FlagsUpdateStageResult globalResult = firstResult;
         int retryCount = 0;
-        while (retryCount < maxRetries && 
globalResult.containsFailedResults()) {
+        while (retryCount < 
cassandraConfiguration.getFlagsUpdateMessageMaxRetry() && 
globalResult.containsFailedResults()) {
             retryCount++;
             FlagsUpdateStageResult stageResult = retryUpdatesStage(mailboxId, 
flagUpdateCalculator, globalResult.getFailed());
             globalResult = globalResult.keepSucceded().merge(stageResult);
@@ -323,7 +322,7 @@ public class CassandraMessageMapper implements 
MessageMapper {
     private FlagsUpdateStageResult runUpdateStage(CassandraId mailboxId, 
Stream<ComposedMessageIdWithMetaData> toBeUpdated, FlagsUpdateCalculator 
flagsUpdateCalculator) {
         Long newModSeq = 
modSeqProvider.nextModSeq(mailboxId).join().orElseThrow(() -> new 
RuntimeException("ModSeq generation failed for mailbox " + mailboxId.asUuid()));
 
-        return 
toBeUpdated.collect(JamesCollectors.chunker(UPDATE_FLAGS_BATCH_SIZE))
+        return 
toBeUpdated.collect(JamesCollectors.chunker(cassandraConfiguration.getFlagsUpdateChunkSize()))
             .map(uidChunk -> performUpdatesForChunk(mailboxId, 
flagsUpdateCalculator, newModSeq, uidChunk))
             .map(CompletableFuture::join)
             .reduce(FlagsUpdateStageResult.none(), 
FlagsUpdateStageResult::merge);

http://git-wip-us.apache.org/repos/asf/james-project/blob/0daadc62/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraModSeqProvider.java
----------------------------------------------------------------------
diff --git 
a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraModSeqProvider.java
 
b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraModSeqProvider.java
index 1a6a885..986fc74 100644
--- 
a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraModSeqProvider.java
+++ 
b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraModSeqProvider.java
@@ -35,6 +35,7 @@ import java.util.concurrent.CompletionException;
 
 import javax.inject.Inject;
 
+import org.apache.james.backends.cassandra.CassandraConfiguration;
 import org.apache.james.backends.cassandra.utils.CassandraAsyncExecutor;
 import org.apache.james.backends.cassandra.utils.FunctionRunnerWithRetry;
 import org.apache.james.mailbox.MailboxSession;
@@ -46,6 +47,7 @@ import org.apache.james.mailbox.store.mail.model.Mailbox;
 
 import com.datastax.driver.core.PreparedStatement;
 import com.datastax.driver.core.Session;
+import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Supplier;
 
 public class CassandraModSeqProvider implements ModSeqProvider {
@@ -76,7 +78,6 @@ public class CassandraModSeqProvider implements 
ModSeqProvider {
         }
     }
 
-    private static final int DEFAULT_MAX_RETRY = 100000;
     private static final ModSeq FIRST_MODSEQ = new ModSeq(0);
 
     private final CassandraAsyncExecutor cassandraAsyncExecutor;
@@ -85,14 +86,20 @@ public class CassandraModSeqProvider implements 
ModSeqProvider {
     private final PreparedStatement update;
     private final PreparedStatement insert;
 
-    public CassandraModSeqProvider(Session session, int maxRetry) {
+    @Inject
+    public CassandraModSeqProvider(Session session, CassandraConfiguration 
cassandraConfiguration) {
         this.cassandraAsyncExecutor = new CassandraAsyncExecutor(session);
-        this.runner = new FunctionRunnerWithRetry(maxRetry);
+        this.runner = new 
FunctionRunnerWithRetry(cassandraConfiguration.getModSeqMaxRetry());
         this.insert = prepareInsert(session);
         this.update = prepareUpdate(session);
         this.select = prepareSelect(session);
     }
 
+    @VisibleForTesting
+    public CassandraModSeqProvider(Session session) {
+        this(session, CassandraConfiguration.DEFAULT_CONFIGURATION);
+    }
+
     private PreparedStatement prepareInsert(Session session) {
         return session.prepare(insertInto(TABLE_NAME)
             .value(NEXT_MODSEQ, bindMarker(NEXT_MODSEQ))
@@ -113,10 +120,7 @@ public class CassandraModSeqProvider implements 
ModSeqProvider {
             .where(eq(MAILBOX_ID, bindMarker(MAILBOX_ID))));
     }
 
-    @Inject
-    public CassandraModSeqProvider(Session session) {
-        this(session, DEFAULT_MAX_RETRY);
-    }
+
 
     @Override
     public long nextModSeq(MailboxSession mailboxSession, Mailbox mailbox) 
throws MailboxException {

http://git-wip-us.apache.org/repos/asf/james-project/blob/0daadc62/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraUidProvider.java
----------------------------------------------------------------------
diff --git 
a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraUidProvider.java
 
b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraUidProvider.java
index 4aeb483..8f738f4 100644
--- 
a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraUidProvider.java
+++ 
b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraUidProvider.java
@@ -34,6 +34,7 @@ import java.util.concurrent.CompletableFuture;
 
 import javax.inject.Inject;
 
+import org.apache.james.backends.cassandra.CassandraConfiguration;
 import org.apache.james.backends.cassandra.utils.CassandraAsyncExecutor;
 import org.apache.james.backends.cassandra.utils.FunctionRunnerWithRetry;
 import org.apache.james.mailbox.MailboxSession;
@@ -47,9 +48,9 @@ import org.apache.james.util.OptionalConverter;
 
 import com.datastax.driver.core.PreparedStatement;
 import com.datastax.driver.core.Session;
+import com.google.common.annotations.VisibleForTesting;
 
 public class CassandraUidProvider implements UidProvider {
-    private static final int DEFAULT_MAX_RETRY = 100000;
     private static final String CONDITION = "Condition";
 
     private final CassandraAsyncExecutor executor;
@@ -58,14 +59,20 @@ public class CassandraUidProvider implements UidProvider {
     private final PreparedStatement updateStatement;
     private final PreparedStatement selectStatement;
 
-    public CassandraUidProvider(Session session, int maxRetry) {
+    @Inject
+    public CassandraUidProvider(Session session, CassandraConfiguration 
cassandraConfiguration) {
         this.executor = new CassandraAsyncExecutor(session);
-        this.runner = new FunctionRunnerWithRetry(maxRetry);
+        this.runner = new 
FunctionRunnerWithRetry(cassandraConfiguration.getUidMaxRetry());
         this.selectStatement = prepareSelect(session);
         this.updateStatement = prepareUpdate(session);
         this.insertStatement = prepareInsert(session);
     }
 
+    @VisibleForTesting
+    public CassandraUidProvider(Session session) {
+        this(session, CassandraConfiguration.DEFAULT_CONFIGURATION);
+    }
+
     private PreparedStatement prepareSelect(Session session) {
         return session.prepare(select(NEXT_UID)
             .from(TABLE_NAME)
@@ -86,11 +93,6 @@ public class CassandraUidProvider implements UidProvider {
             .ifNotExists());
     }
 
-    @Inject
-    public CassandraUidProvider(Session session) {
-        this(session, DEFAULT_MAX_RETRY);
-    }
-
     @Override
     public MessageUid nextUid(MailboxSession mailboxSession, Mailbox mailbox) 
throws MailboxException {
         return nextUid(mailboxSession, mailbox.getMailboxId());

http://git-wip-us.apache.org/repos/asf/james-project/blob/0daadc62/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/user/CassandraSubscriptionMapper.java
----------------------------------------------------------------------
diff --git 
a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/user/CassandraSubscriptionMapper.java
 
b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/user/CassandraSubscriptionMapper.java
index 4f78655..a1ce340 100644
--- 
a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/user/CassandraSubscriptionMapper.java
+++ 
b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/user/CassandraSubscriptionMapper.java
@@ -42,9 +42,11 @@ import com.datastax.driver.core.querybuilder.QueryBuilder;
 
 public class CassandraSubscriptionMapper extends NonTransactionalMapper 
implements SubscriptionMapper {
     private final Session session;
+    private CassandraUtils cassandraUtils;
 
-    public CassandraSubscriptionMapper(Session session) {
+    public CassandraSubscriptionMapper(Session session, CassandraUtils 
cassandraUtils) {
         this.session = session;
+        this.cassandraUtils = cassandraUtils;
     }
 
     @Override
@@ -66,7 +68,7 @@ public class CassandraSubscriptionMapper extends 
NonTransactionalMapper implemen
 
     @Override
     public List<Subscription> findSubscriptionsForUser(String user) {
-        return CassandraUtils.convertToStream(
+        return cassandraUtils.convertToStream(
             session.execute(select(MAILBOX)
                 .from(TABLE_NAME)
                 .where(eq(USER, user))))
@@ -82,7 +84,7 @@ public class CassandraSubscriptionMapper extends 
NonTransactionalMapper implemen
     }
 
     public List<SimpleSubscription> list() {
-        return CassandraUtils.convertToStream(
+        return cassandraUtils.convertToStream(
             session.execute(select(FIELDS)
                 .from(TABLE_NAME)))
             .map((row) -> new SimpleSubscription(row.getString(USER), 
row.getString(MAILBOX)))

http://git-wip-us.apache.org/repos/asf/james-project/blob/0daadc62/mailbox/cassandra/src/test/java/org/apache/james/mailbox/cassandra/CassandraMailboxManagerProvider.java
----------------------------------------------------------------------
diff --git 
a/mailbox/cassandra/src/test/java/org/apache/james/mailbox/cassandra/CassandraMailboxManagerProvider.java
 
b/mailbox/cassandra/src/test/java/org/apache/james/mailbox/cassandra/CassandraMailboxManagerProvider.java
index d10b7c3..c34effb 100644
--- 
a/mailbox/cassandra/src/test/java/org/apache/james/mailbox/cassandra/CassandraMailboxManagerProvider.java
+++ 
b/mailbox/cassandra/src/test/java/org/apache/james/mailbox/cassandra/CassandraMailboxManagerProvider.java
@@ -48,7 +48,6 @@ import com.google.common.base.Throwables;
 public class CassandraMailboxManagerProvider {
     private static final int LIMIT_ANNOTATIONS = 3;
     private static final int LIMIT_ANNOTATION_SIZE = 30;
-    public static final int MAX_ACL_RETRY = 10;
 
     public static CassandraMailboxManager provideMailboxManager(Session 
session, CassandraTypesProvider cassandraTypesProvider) {
         CassandraUidProvider uidProvider = new CassandraUidProvider(session);
@@ -59,7 +58,7 @@ public class CassandraMailboxManagerProvider {
         CassandraMessageDAO messageDAO = new CassandraMessageDAO(session, 
cassandraTypesProvider);
         CassandraMailboxCounterDAO mailboxCounterDAO = new 
CassandraMailboxCounterDAO(session);
         CassandraMailboxRecentsDAO mailboxRecentsDAO = new 
CassandraMailboxRecentsDAO(session);
-        CassandraMailboxDAO mailboxDAO = new CassandraMailboxDAO(session, 
cassandraTypesProvider, MAX_ACL_RETRY);
+        CassandraMailboxDAO mailboxDAO = new CassandraMailboxDAO(session, 
cassandraTypesProvider);
         CassandraMailboxPathDAO mailboxPathDAO = new 
CassandraMailboxPathDAO(session, cassandraTypesProvider);
         CassandraFirstUnseenDAO firstUnseenDAO = new 
CassandraFirstUnseenDAO(session);
         CassandraApplicableFlagDAO applicableFlagDAO = new 
CassandraApplicableFlagDAO(session);


---------------------------------------------------------------------
To unsubscribe, e-mail: server-dev-unsubscr...@james.apache.org
For additional commands, e-mail: server-dev-h...@james.apache.org

Reply via email to