This is an automated email from the ASF dual-hosted git repository.
btellier pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/james-project.git
The following commit(s) were added to refs/heads/master by this push:
new 8f10d7abbe JAMES-4177 Finner grain management of Cassandra profiles
(#2951)
8f10d7abbe is described below
commit 8f10d7abbec1fa9f311e4a9cc6c6ed8daaa20ef1
Author: Benoit TELLIER <[email protected]>
AuthorDate: Fri Feb 27 13:42:36 2026 +0100
JAMES-4177 Finner grain management of Cassandra profiles (#2951)
---
.../components/CassandraQuotaCurrentValueDao.java | 25 ++++++---
.../components/CassandraQuotaLimitDao.java | 18 +++++--
.../backends/cassandra/utils/ProfileLocator.java | 56 ++++++++++++++++++++
.../src/test/resources/cassandra-driver.conf | 29 +++++++++--
.../pages/distributed/configure/cassandra.adoc | 57 ++++++++++++++++++++-
.../mailbox/cassandra/mail/CassandraACLDAOV2.java | 21 ++++++--
.../cassandra/mail/CassandraApplicableFlagDAO.java | 15 ++++--
.../cassandra/mail/CassandraAttachmentDAOV2.java | 16 ++++--
.../cassandra/mail/CassandraDeletedMessageDAO.java | 36 +++++++++----
.../cassandra/mail/CassandraFirstUnseenDAO.java | 39 ++++++++++----
.../cassandra/mail/CassandraMailboxCounterDAO.java | 39 +++++++-------
.../cassandra/mail/CassandraMailboxDAO.java | 21 ++++++--
.../cassandra/mail/CassandraMailboxPathV3DAO.java | 9 +++-
.../cassandra/mail/CassandraMailboxRecentsDAO.java | 6 +++
.../cassandra/mail/CassandraMessageDAOV3.java | 33 +++++++++---
.../cassandra/mail/CassandraMessageIdDAO.java | 47 ++++++++++++-----
.../mail/CassandraMessageIdToImapUidDAO.java | 59 ++++------------------
.../cassandra/mail/CassandraModSeqProvider.java | 13 ++++-
.../mailbox/cassandra/mail/CassandraThreadDAO.java | 17 +++++--
.../cassandra/mail/CassandraThreadLookupDAO.java | 20 ++++++--
.../cassandra/mail/CassandraUidProvider.java | 13 ++++-
.../mail/CassandraUserMailboxRightsDAO.java | 19 +++++--
.../cassandra/CassandraMailboxManagerTest.java | 3 +-
.../cassandra/mail/CassandraMessageDAOV3Test.java | 4 +-
.../sample-configuration/cassandra-driver.conf | 8 ++-
.../domainlist/cassandra/CassandraDomainList.java | 21 +++++---
.../rrt/cassandra/CassandraMappingsSourcesDAO.java | 15 ++++--
.../CassandraRecipientRewriteTableDAO.java | 15 ++++--
.../james/user/cassandra/CassandraUsersDAO.java | 55 ++++++++++++++------
.../CassandraNotificationRegistryDAO.java | 15 ++++--
.../vacation/cassandra/CassandraVacationDAO.java | 13 ++++-
.../CassandraPushSubscriptionDAO.java | 17 +++++--
.../MetaDataFixInconsistenciesServiceTest.java | 3 +-
.../Pop3MetaDataFixInconsistenciesRoutesTest.java | 3 +-
34 files changed, 583 insertions(+), 197 deletions(-)
diff --git
a/backends-common/cassandra/src/main/java/org/apache/james/backends/cassandra/components/CassandraQuotaCurrentValueDao.java
b/backends-common/cassandra/src/main/java/org/apache/james/backends/cassandra/components/CassandraQuotaCurrentValueDao.java
index aec997b27b..a835ecff11 100644
---
a/backends-common/cassandra/src/main/java/org/apache/james/backends/cassandra/components/CassandraQuotaCurrentValueDao.java
+++
b/backends-common/cassandra/src/main/java/org/apache/james/backends/cassandra/components/CassandraQuotaCurrentValueDao.java
@@ -33,6 +33,7 @@ import static
org.apache.james.backends.cassandra.components.CassandraQuotaCurre
import jakarta.inject.Inject;
import org.apache.james.backends.cassandra.utils.CassandraAsyncExecutor;
+import org.apache.james.backends.cassandra.utils.ProfileLocator;
import org.apache.james.core.quota.QuotaComponent;
import org.apache.james.core.quota.QuotaCurrentValue;
import org.apache.james.core.quota.QuotaType;
@@ -40,6 +41,7 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.datastax.oss.driver.api.core.CqlSession;
+import com.datastax.oss.driver.api.core.config.DriverExecutionProfile;
import com.datastax.oss.driver.api.core.cql.PreparedStatement;
import com.datastax.oss.driver.api.core.cql.Row;
import com.datastax.oss.driver.api.querybuilder.delete.Delete;
@@ -59,6 +61,8 @@ public class CassandraQuotaCurrentValueDao {
private final PreparedStatement getQuotaCurrentValueStatement;
private final PreparedStatement getQuotasByComponentStatement;
private final PreparedStatement deleteQuotaCurrentValueStatement;
+ private final DriverExecutionProfile readProfile;
+ private final DriverExecutionProfile writeProfile;
@Inject
public CassandraQuotaCurrentValueDao(CqlSession session) {
@@ -68,6 +72,8 @@ public class CassandraQuotaCurrentValueDao {
this.getQuotaCurrentValueStatement =
session.prepare(getQuotaCurrentValueStatement().build());
this.getQuotasByComponentStatement =
session.prepare(getQuotasByComponentStatement().build());
this.deleteQuotaCurrentValueStatement =
session.prepare(deleteQuotaCurrentValueStatement().build());
+ this.readProfile = ProfileLocator.READ.locateProfile(session,
"CURRENT-QUOTA");
+ this.writeProfile = ProfileLocator.WRITE.locateProfile(session,
"CURRENT-QUOTA");
}
public Mono<Void> increase(QuotaCurrentValue.Key quotaKey, long amount) {
@@ -75,7 +81,8 @@ public class CassandraQuotaCurrentValueDao {
.setString(QUOTA_COMPONENT,
quotaKey.getQuotaComponent().getValue())
.setString(IDENTIFIER, quotaKey.getIdentifier())
.setString(QUOTA_TYPE, quotaKey.getQuotaType().getValue())
- .setLong(CURRENT_VALUE, amount))
+ .setLong(CURRENT_VALUE, amount)
+ .setExecutionProfile(writeProfile))
.onErrorResume(ex -> {
LOGGER.warn("Failure when increasing {} {} quota for {}. Quota
current value is thus not updated and needs recomputation",
quotaKey.getQuotaComponent().getValue(),
quotaKey.getQuotaType().getValue(), quotaKey.getIdentifier(), ex);
@@ -88,7 +95,8 @@ public class CassandraQuotaCurrentValueDao {
.setString(QUOTA_COMPONENT,
quotaKey.getQuotaComponent().getValue())
.setString(IDENTIFIER, quotaKey.getIdentifier())
.setString(QUOTA_TYPE, quotaKey.getQuotaType().getValue())
- .setLong(CURRENT_VALUE, amount))
+ .setLong(CURRENT_VALUE, amount)
+ .setExecutionProfile(writeProfile))
.onErrorResume(ex -> {
LOGGER.warn("Failure when decreasing {} {} quota for {}. Quota
current value is thus not updated and needs recomputation",
quotaKey.getQuotaComponent().getValue(),
quotaKey.getQuotaType().getValue(), quotaKey.getIdentifier(), ex);
@@ -100,22 +108,25 @@ public class CassandraQuotaCurrentValueDao {
return
queryExecutor.executeSingleRow(getQuotaCurrentValueStatement.bind()
.setString(QUOTA_COMPONENT,
quotaKey.getQuotaComponent().getValue())
.setString(IDENTIFIER, quotaKey.getIdentifier())
- .setString(QUOTA_TYPE, quotaKey.getQuotaType().getValue()))
- .map(row -> convertRowToModel(row));
+ .setString(QUOTA_TYPE, quotaKey.getQuotaType().getValue())
+ .setExecutionProfile(readProfile))
+ .map(this::convertRowToModel);
}
public Mono<Void> deleteQuotaCurrentValue(QuotaCurrentValue.Key quotaKey) {
return
queryExecutor.executeVoid(deleteQuotaCurrentValueStatement.bind()
.setString(QUOTA_COMPONENT,
quotaKey.getQuotaComponent().getValue())
.setString(IDENTIFIER, quotaKey.getIdentifier())
- .setString(QUOTA_TYPE, quotaKey.getQuotaType().getValue()));
+ .setString(QUOTA_TYPE, quotaKey.getQuotaType().getValue())
+ .setExecutionProfile(writeProfile));
}
public Flux<QuotaCurrentValue> getQuotasByComponent(QuotaComponent
quotaComponent, String identifier) {
return queryExecutor.executeRows(getQuotasByComponentStatement.bind()
.setString(QUOTA_COMPONENT, quotaComponent.getValue())
- .setString(IDENTIFIER, identifier))
- .map(row -> convertRowToModel(row));
+ .setString(IDENTIFIER, identifier)
+ .setExecutionProfile(readProfile))
+ .map(this::convertRowToModel);
}
private Update increaseStatement() {
diff --git
a/backends-common/cassandra/src/main/java/org/apache/james/backends/cassandra/components/CassandraQuotaLimitDao.java
b/backends-common/cassandra/src/main/java/org/apache/james/backends/cassandra/components/CassandraQuotaLimitDao.java
index 2b3090a640..1d613f9555 100644
---
a/backends-common/cassandra/src/main/java/org/apache/james/backends/cassandra/components/CassandraQuotaLimitDao.java
+++
b/backends-common/cassandra/src/main/java/org/apache/james/backends/cassandra/components/CassandraQuotaLimitDao.java
@@ -34,12 +34,14 @@ import static
org.apache.james.backends.cassandra.components.CassandraQuotaLimit
import jakarta.inject.Inject;
import org.apache.james.backends.cassandra.utils.CassandraAsyncExecutor;
+import org.apache.james.backends.cassandra.utils.ProfileLocator;
import org.apache.james.core.quota.QuotaComponent;
import org.apache.james.core.quota.QuotaLimit;
import org.apache.james.core.quota.QuotaScope;
import org.apache.james.core.quota.QuotaType;
import com.datastax.oss.driver.api.core.CqlSession;
+import com.datastax.oss.driver.api.core.config.DriverExecutionProfile;
import com.datastax.oss.driver.api.core.cql.PreparedStatement;
import com.datastax.oss.driver.api.core.cql.Row;
import com.datastax.oss.driver.api.querybuilder.delete.Delete;
@@ -55,6 +57,8 @@ public class CassandraQuotaLimitDao {
private final PreparedStatement getQuotaLimitsStatement;
private final PreparedStatement setQuotaLimitStatement;
private final PreparedStatement deleteQuotaLimitStatement;
+ private final DriverExecutionProfile readProfile;
+ private final DriverExecutionProfile writeProfile;
@Inject
public CassandraQuotaLimitDao(CqlSession session) {
@@ -63,6 +67,8 @@ public class CassandraQuotaLimitDao {
this.getQuotaLimitsStatement =
session.prepare(getQuotaLimitsStatement().build());
this.setQuotaLimitStatement =
session.prepare(setQuotaLimitStatement().build());
this.deleteQuotaLimitStatement =
session.prepare((deleteQuotaLimitStatement().build()));
+ this.readProfile = ProfileLocator.READ.locateProfile(session,
"QUOTA-LIMITS");
+ this.writeProfile = ProfileLocator.WRITE.locateProfile(session,
"QUOTA-LIMITS");
}
public Mono<QuotaLimit> getQuotaLimit(QuotaLimit.QuotaLimitKey quotaKey) {
@@ -70,7 +76,8 @@ public class CassandraQuotaLimitDao {
.setString(QUOTA_COMPONENT,
quotaKey.getQuotaComponent().getValue())
.setString(QUOTA_SCOPE, quotaKey.getQuotaScope().getValue())
.setString(IDENTIFIER, quotaKey.getIdentifier())
- .setString(QUOTA_TYPE, quotaKey.getQuotaType().getValue()))
+ .setString(QUOTA_TYPE, quotaKey.getQuotaType().getValue())
+ .setExecutionProfile(readProfile))
.map(this::convertRowToModel);
}
@@ -78,7 +85,8 @@ public class CassandraQuotaLimitDao {
return queryExecutor.executeRows(getQuotaLimitsStatement.bind()
.setString(QUOTA_COMPONENT, quotaComponent.getValue())
.setString(QUOTA_SCOPE, quotaScope.getValue())
- .setString(IDENTIFIER, identifier))
+ .setString(IDENTIFIER, identifier)
+ .setExecutionProfile(readProfile))
.map(this::convertRowToModel);
}
@@ -88,7 +96,8 @@ public class CassandraQuotaLimitDao {
.setString(QUOTA_SCOPE, quotaLimit.getQuotaScope().getValue())
.setString(IDENTIFIER, quotaLimit.getIdentifier())
.setString(QUOTA_TYPE, quotaLimit.getQuotaType().getValue())
- .set(QUOTA_LIMIT, quotaLimit.getQuotaLimit().orElse(null),
Long.class));
+ .set(QUOTA_LIMIT, quotaLimit.getQuotaLimit().orElse(null),
Long.class)
+ .setExecutionProfile(writeProfile));
}
public Mono<Void> deleteQuotaLimit(QuotaLimit.QuotaLimitKey quotaKey) {
@@ -96,7 +105,8 @@ public class CassandraQuotaLimitDao {
.setString(QUOTA_COMPONENT,
quotaKey.getQuotaComponent().getValue())
.setString(QUOTA_SCOPE, quotaKey.getQuotaScope().getValue())
.setString(IDENTIFIER, quotaKey.getIdentifier())
- .setString(QUOTA_TYPE, quotaKey.getQuotaType().getValue()));
+ .setString(QUOTA_TYPE, quotaKey.getQuotaType().getValue())
+ .setExecutionProfile(writeProfile));
}
private Select getQuotaLimitStatement() {
diff --git
a/backends-common/cassandra/src/main/java/org/apache/james/backends/cassandra/utils/ProfileLocator.java
b/backends-common/cassandra/src/main/java/org/apache/james/backends/cassandra/utils/ProfileLocator.java
new file mode 100644
index 0000000000..747b2a0a0a
--- /dev/null
+++
b/backends-common/cassandra/src/main/java/org/apache/james/backends/cassandra/utils/ProfileLocator.java
@@ -0,0 +1,56 @@
+/****************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one *
+ * or more contributor license agreements. See the NOTICE file *
+ * distributed with this work for additional information *
+ * regarding copyright ownership. The ASF licenses this file *
+ * to you under the Apache License, Version 2.0 (the *
+ * "License"); you may not use this file except in compliance *
+ * with the License. You may obtain a copy of the License at *
+ * *
+ * http://www.apache.org/licenses/LICENSE-2.0 *
+ * *
+ * Unless required by applicable law or agreed to in writing, *
+ * software distributed under the License is distributed on an *
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY *
+ * KIND, either express or implied. See the License for the *
+ * specific language governing permissions and limitations *
+ * under the License. *
+ ****************************************************************/
+
+package org.apache.james.backends.cassandra.utils;
+
+import java.util.Map;
+import java.util.Optional;
+import java.util.function.BiFunction;
+
+import com.datastax.oss.driver.api.core.CqlSession;
+import com.datastax.oss.driver.api.core.config.DriverConfig;
+import com.datastax.oss.driver.api.core.config.DriverExecutionProfile;
+
+public enum ProfileLocator {
+ READ(profileLocatorFunction("READ")),
+ WRITE(profileLocatorFunction("WRITE"));
+
+ private static BiFunction<CqlSession, String, DriverExecutionProfile>
profileLocatorFunction(String baseProfileName) {
+ return (session, daoName) -> {
+ DriverConfig config = session.getContext().getConfig();
+ Map<String, ? extends DriverExecutionProfile> profiles =
config.getProfiles();
+ String profileName = baseProfileName + "-" + daoName;
+
+ return Optional.ofNullable(profiles.get(profileName))
+ .map(DriverExecutionProfile.class::cast)
+ .or(() -> Optional.ofNullable(profiles.get(baseProfileName)))
+ .orElseGet(config::getDefaultProfile);
+ };
+ }
+
+ private final BiFunction<CqlSession, String, DriverExecutionProfile>
profileLocatorFunction;
+
+ ProfileLocator(BiFunction<CqlSession, String, DriverExecutionProfile>
profileLocatorFunction) {
+ this.profileLocatorFunction = profileLocatorFunction;
+ }
+
+ public DriverExecutionProfile locateProfile(CqlSession session, String
daoName) {
+ return profileLocatorFunction.apply(session, daoName);
+ }
+}
diff --git a/backends-common/cassandra/src/test/resources/cassandra-driver.conf
b/backends-common/cassandra/src/test/resources/cassandra-driver.conf
index 09e228b669..fff72656e9 100644
--- a/backends-common/cassandra/src/test/resources/cassandra-driver.conf
+++ b/backends-common/cassandra/src/test/resources/cassandra-driver.conf
@@ -1,5 +1,28 @@
datastax-java-driver {
- basic.request {
- timeout = 5 seconds
- }
+ basic.request {
+ timeout = 5 seconds
+ consistency = QUORUM
+ page-size = 5000
+ serial-consistency = SERIAL
+ }
+ profiles {
+ # Provides controls on Execution profiles used by James
+ LWT {
+ basic.request.consistency = SERIAL
+ basic.request.serial-consistency = SERIAL
+ }
+ READ {
+ basic.request.consistency = QUORUM
+ }
+ WRITE {
+ basic.request.consistency = QUORUM
+ }
+ OPTIMISTIC_CONSISTENCY_LEVEL {
+ basic.request.consistency = LOCAL_ONE
+ basic.request.serial-consistency = LOCAL_ONE
+ }
+ BATCH {
+ basic.request.timeout = 1 hour
+ }
+ }
}
diff --git a/docs/modules/servers/pages/distributed/configure/cassandra.adoc
b/docs/modules/servers/pages/distributed/configure/cassandra.adoc
index a953ece304..9821b4d0cc 100644
--- a/docs/modules/servers/pages/distributed/configure/cassandra.adoc
+++ b/docs/modules/servers/pages/distributed/configure/cassandra.adoc
@@ -162,4 +162,59 @@ features to your users, you can consider disabling them in
order to improve perf
| Optional, default to 0. Defensive value to add to uids and modseqs
generated. This can be used as an heuristic to maintain
consistency even when consensus of Lightweight Transactions is broken, exemple
during a disaster recovery process.
-|===
\ No newline at end of file
+|===
+
+== Extra execution profiles
+
+Operators can specify fin grain execution profiles. This allows setting
per-dao request settings - like consistency levels.
+
+DAO would try to select at start time the most specific profile and fallback
to `READ` and `WRITE` profiles.
+
+Profiles include:
+
+ - READ-ACLV2
+ - WRITE-ACLV2
+ - READ-APPLICABLE-FLAGS
+ - WRITE-APPLICABLE-FLAGS
+ - READ-ATTACHMENTV2
+ - WRITE-ATTACHMENTV2
+ - READ-MAILBOX
+ - WRITE-MAILBOX
+ - READ-MAILBOXPATHV3
+ - WRITE-MAILBOXPATHV3
+ - READ-THREAD
+ - WRITE-THREAD
+ - READ-THREAD-LOOKUP
+ - WRITE-THREAD-LOOKUP
+ - READ-UID (only used for IMAP STATUS/SELECT)
+ - READ-MODSEQ (only used for IMAP STATUS/SELECT)
+ - READ-MESSAGEV3
+ - WRITE-MESSAGEV3
+ - READ-RECENTS
+ - WRITE-RECENTS
+ - READ-IMAP-UID-TABLE
+ - WRITE-IMAP-UID-TABLE
+ - READ-FIRST-UNSEEN
+ - WRITE-FIRST-UNSEEN
+ - READ-MARKED-AS-DELETED
+ - WRITE-MARKED-AS-DELETED
+ - READ-VACATION
+ - WRITE-VACATION
+ - READ-VACATION-REGISTRY
+ - WRITE-VACATION-REGISTRY
+ - READ-USER
+ - WRITE-USER
+ - READ-RRT
+ - WRITE-RRT
+ - READ-RRT-SOURCE
+ - WRITE-RRT-SOURCE
+ - READ-DOMAIN
+ - WRITE-DOMAIN
+ - READ-CURRENT-QUOTA
+ - WRITE-CURRENT-QUOTA
+ - READ-QUOTA-LIMITS
+ - WRITE-QUOTA-LIMITS
+ - READ-PUSH-SUBSCRIPTION
+ - WRITE-PUSH-SUBSCRIPTION
+ - READ-USER-ACL
+ - WRITE-USER-ACL
diff --git
a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraACLDAOV2.java
b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraACLDAOV2.java
index 71ee33537f..b10dec93eb 100644
---
a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraACLDAOV2.java
+++
b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraACLDAOV2.java
@@ -29,11 +29,13 @@ import jakarta.inject.Inject;
import org.apache.commons.lang3.NotImplementedException;
import org.apache.james.backends.cassandra.utils.CassandraAsyncExecutor;
+import org.apache.james.backends.cassandra.utils.ProfileLocator;
import org.apache.james.mailbox.cassandra.ids.CassandraId;
import org.apache.james.mailbox.cassandra.table.CassandraACLV2Table;
import org.apache.james.mailbox.model.MailboxACL;
import com.datastax.oss.driver.api.core.CqlSession;
+import com.datastax.oss.driver.api.core.config.DriverExecutionProfile;
import com.datastax.oss.driver.api.core.cql.PreparedStatement;
import com.datastax.oss.driver.api.core.type.codec.TypeCodecs;
import com.github.fge.lambdas.Throwing;
@@ -49,10 +51,14 @@ public class CassandraACLDAOV2 {
private final PreparedStatement replaceRights;
private final PreparedStatement delete;
private final PreparedStatement read;
+ private final DriverExecutionProfile readProfile;
+ private final DriverExecutionProfile writeProfile;
@Inject
public CassandraACLDAOV2(CqlSession session) {
this.executor = new CassandraAsyncExecutor(session);
+ this.readProfile = ProfileLocator.READ.locateProfile(session, "ACLV2");
+ this.writeProfile = ProfileLocator.WRITE.locateProfile(session,
"ACLV2");
this.insertRights = prepareInsertRights(session);
this.removeRights = prepareRemoveRights(session);
this.replaceRights = prepareReplaceRights(session);
@@ -100,13 +106,15 @@ public class CassandraACLDAOV2 {
public Mono<Void> delete(CassandraId cassandraId) {
return executor.executeVoid(
delete.bind()
- .setUuid(CassandraACLV2Table.ID, cassandraId.asUuid()));
+ .setUuid(CassandraACLV2Table.ID, cassandraId.asUuid())
+ .setExecutionProfile(writeProfile));
}
public Mono<MailboxACL> getACL(CassandraId cassandraId) {
return executor.executeRows(
read.bind()
- .set(CassandraACLV2Table.ID, cassandraId.asUuid(),
TypeCodecs.TIMEUUID))
+ .set(CassandraACLV2Table.ID, cassandraId.asUuid(),
TypeCodecs.TIMEUUID)
+ .setExecutionProfile(readProfile))
.map(Throwing.function(row -> {
MailboxACL.EntryKey entryKey =
MailboxACL.EntryKey.deserialize(row.getString(CassandraACLV2Table.KEY));
MailboxACL.Rfc4314Rights rights =
row.getSet(CassandraACLV2Table.RIGHTS, String.class)
@@ -125,17 +133,20 @@ public class CassandraACLDAOV2 {
return executor.executeVoid(insertRights.bind()
.setUuid(CassandraACLV2Table.ID, cassandraId.asUuid())
.setString(CassandraACLV2Table.KEY,
command.getEntryKey().serialize())
- .setSet(CassandraACLV2Table.RIGHTS,
ImmutableSet.copyOf(rightStrings), String.class));
+ .setSet(CassandraACLV2Table.RIGHTS,
ImmutableSet.copyOf(rightStrings), String.class)
+ .setExecutionProfile(writeProfile));
case REMOVE:
return executor.executeVoid(removeRights.bind()
.setUuid(CassandraACLV2Table.ID, cassandraId.asUuid())
.setString(CassandraACLV2Table.KEY,
command.getEntryKey().serialize())
- .setSet(CassandraACLV2Table.RIGHTS,
ImmutableSet.copyOf(rightStrings), String.class));
+ .setSet(CassandraACLV2Table.RIGHTS,
ImmutableSet.copyOf(rightStrings), String.class)
+ .setExecutionProfile(writeProfile));
case REPLACE:
return executor.executeVoid(replaceRights.bind()
.setUuid(CassandraACLV2Table.ID, cassandraId.asUuid())
.setString(CassandraACLV2Table.KEY,
command.getEntryKey().serialize())
- .setSet(CassandraACLV2Table.RIGHTS, rightStrings,
String.class));
+ .setSet(CassandraACLV2Table.RIGHTS, rightStrings,
String.class)
+ .setExecutionProfile(writeProfile));
default:
throw new NotImplementedException(command.getEditMode() + "is
not supported");
}
diff --git
a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraApplicableFlagDAO.java
b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraApplicableFlagDAO.java
index 419652a9a9..43a4ddfd18 100644
---
a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraApplicableFlagDAO.java
+++
b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraApplicableFlagDAO.java
@@ -33,10 +33,12 @@ import jakarta.inject.Inject;
import jakarta.mail.Flags;
import org.apache.james.backends.cassandra.utils.CassandraAsyncExecutor;
+import org.apache.james.backends.cassandra.utils.ProfileLocator;
import org.apache.james.mailbox.cassandra.ids.CassandraId;
import org.apache.james.mailbox.cassandra.table.Flag;
import com.datastax.oss.driver.api.core.CqlSession;
+import com.datastax.oss.driver.api.core.config.DriverExecutionProfile;
import com.datastax.oss.driver.api.core.cql.PreparedStatement;
import com.datastax.oss.driver.api.querybuilder.QueryBuilder;
@@ -47,6 +49,8 @@ public class CassandraApplicableFlagDAO {
private final PreparedStatement select;
private final PreparedStatement update;
private final PreparedStatement delete;
+ private final DriverExecutionProfile readProfile;
+ private final DriverExecutionProfile writeProfile;
@Inject
public CassandraApplicableFlagDAO(CqlSession session) {
@@ -54,6 +58,8 @@ public class CassandraApplicableFlagDAO {
this.select = prepareSelect(session);
this.delete = prepareDelete(session);
this.update = prepareUpdate(session);
+ this.readProfile = ProfileLocator.READ.locateProfile(session,
"APPLICABLE-FLAGS");
+ this.writeProfile = ProfileLocator.WRITE.locateProfile(session,
"APPLICABLE-FLAGS");
}
private PreparedStatement prepareSelect(CqlSession session) {
@@ -79,13 +85,15 @@ public class CassandraApplicableFlagDAO {
public Mono<Void> delete(CassandraId mailboxId) {
return cassandraAsyncExecutor.executeVoid(
delete.bind()
- .setUuid(MAILBOX_ID, mailboxId.asUuid()));
+ .setUuid(MAILBOX_ID, mailboxId.asUuid())
+ .setExecutionProfile(writeProfile));
}
public Mono<Flags> retrieveApplicableFlag(CassandraId mailboxId) {
return cassandraAsyncExecutor.executeSingleRow(
select.bind()
- .setUuid(MAILBOX_ID, mailboxId.asUuid()))
+ .setUuid(MAILBOX_ID, mailboxId.asUuid())
+ .setExecutionProfile(readProfile))
.map(FlagsExtractor::getApplicableFlags);
}
@@ -95,6 +103,7 @@ public class CassandraApplicableFlagDAO {
}
return cassandraAsyncExecutor.executeVoid(update.bind()
.setUuid(MAILBOX_ID, cassandraId.asUuid())
- .setSet(USER_FLAGS, toBeAdded, String.class));
+ .setSet(USER_FLAGS, toBeAdded, String.class)
+ .setExecutionProfile(writeProfile));
}
}
diff --git
a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraAttachmentDAOV2.java
b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraAttachmentDAOV2.java
index 2a1b9f0e27..995ba01bf9 100644
---
a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraAttachmentDAOV2.java
+++
b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraAttachmentDAOV2.java
@@ -40,6 +40,7 @@ import java.util.Optional;
import jakarta.inject.Inject;
import org.apache.james.backends.cassandra.utils.CassandraAsyncExecutor;
+import org.apache.james.backends.cassandra.utils.ProfileLocator;
import org.apache.james.blob.api.BlobId;
import org.apache.james.mailbox.cassandra.ids.CassandraMessageId;
import org.apache.james.mailbox.model.AttachmentId;
@@ -50,6 +51,7 @@ import
org.apache.james.mailbox.model.StringBackedAttachmentId;
import org.apache.james.util.DurationParser;
import com.datastax.oss.driver.api.core.CqlSession;
+import com.datastax.oss.driver.api.core.config.DriverExecutionProfile;
import com.datastax.oss.driver.api.core.cql.PreparedStatement;
import com.datastax.oss.driver.api.core.cql.Row;
import com.datastax.oss.driver.api.querybuilder.insert.RegularInsert;
@@ -153,6 +155,8 @@ public class CassandraAttachmentDAOV2 {
private final PreparedStatement selectStatement;
private final PreparedStatement listBlobs;
private final CqlSession session;
+ private final DriverExecutionProfile readProfile;
+ private final DriverExecutionProfile writeProfile;
@Inject
public CassandraAttachmentDAOV2(BlobId.Factory blobIdFactory, CqlSession
session) {
@@ -163,6 +167,9 @@ public class CassandraAttachmentDAOV2 {
this.insertStatement = prepareInsert();
this.deleteStatement = prepareDelete();
this.listBlobs = prepareSelectBlobs();
+
+ this.readProfile = ProfileLocator.READ.locateProfile(session,
"ATTACHMENTV2");
+ this.writeProfile = ProfileLocator.WRITE.locateProfile(session,
"ATTACHMENTV2");
}
private PreparedStatement prepareSelectBlobs() {
@@ -203,7 +210,8 @@ public class CassandraAttachmentDAOV2 {
Preconditions.checkArgument(attachmentId != null);
return cassandraAsyncExecutor.executeSingleRow(
selectStatement.bind()
- .setUuid(ID_AS_UUID, attachmentId.asUUID()))
+ .setUuid(ID_AS_UUID, attachmentId.asUUID())
+ .setExecutionProfile(readProfile))
.map(row -> CassandraAttachmentDAOV2.fromRow(row, blobIdFactory));
}
@@ -216,13 +224,15 @@ public class CassandraAttachmentDAOV2 {
.setLong(SIZE, attachment.getSize())
.setUuid(MESSAGE_ID, messageId.get())
.setString(TYPE, attachment.getType().asString())
- .setString(BLOB_ID, attachment.getBlobId().asString()));
+ .setString(BLOB_ID, attachment.getBlobId().asString())
+ .setExecutionProfile(writeProfile));
}
public Mono<Void> delete(AttachmentId attachmentId) {
return cassandraAsyncExecutor.executeVoid(
deleteStatement.bind()
- .setUuid(ID_AS_UUID, attachmentId.asUUID()));
+ .setUuid(ID_AS_UUID, attachmentId.asUUID())
+ .setExecutionProfile(writeProfile));
}
public Flux<BlobId> listBlobs() {
diff --git
a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraDeletedMessageDAO.java
b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraDeletedMessageDAO.java
index f440596c52..5c2bec7776 100644
---
a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraDeletedMessageDAO.java
+++
b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraDeletedMessageDAO.java
@@ -34,12 +34,14 @@ import java.util.stream.Stream;
import jakarta.inject.Inject;
import org.apache.james.backends.cassandra.utils.CassandraAsyncExecutor;
+import org.apache.james.backends.cassandra.utils.ProfileLocator;
import org.apache.james.mailbox.MessageUid;
import org.apache.james.mailbox.cassandra.ids.CassandraId;
import org.apache.james.mailbox.model.MessageRange;
import com.datastax.oss.driver.api.core.CqlSession;
import com.datastax.oss.driver.api.core.ProtocolVersion;
+import com.datastax.oss.driver.api.core.config.DriverExecutionProfile;
import com.datastax.oss.driver.api.core.cql.BatchStatement;
import com.datastax.oss.driver.api.core.cql.BatchStatementBuilder;
import com.datastax.oss.driver.api.core.cql.BatchType;
@@ -66,6 +68,8 @@ public class CassandraDeletedMessageDAO {
private final PreparedStatement selectBetweenUidStatement;
private final PreparedStatement selectFromUidStatement;
private final ProtocolVersion protocolVersion;
+ private final DriverExecutionProfile readProfile;
+ private final DriverExecutionProfile writeProfile;
@Inject
public CassandraDeletedMessageDAO(CqlSession session) {
@@ -78,6 +82,8 @@ public class CassandraDeletedMessageDAO {
this.selectBetweenUidStatement = prepareBetweenUidStatement(session);
this.selectFromUidStatement = prepareFromUidStatement(session);
this.protocolVersion = session.getContext().getProtocolVersion();
+ this.readProfile = ProfileLocator.READ.locateProfile(session,
"MARKED_AS_DELETED");
+ this.writeProfile = ProfileLocator.WRITE.locateProfile(session,
"MARKED_AS_DELETED");
}
private PreparedStatement prepareAllUidStatement(CqlSession session) {
@@ -136,7 +142,8 @@ public class CassandraDeletedMessageDAO {
return cassandraAsyncExecutor.executeVoid(
addStatement.bind()
.setUuid(MAILBOX_ID, cassandraId.asUuid())
- .setLong(UID, uid.asLong()));
+ .setLong(UID, uid.asLong())
+ .setExecutionProfile(writeProfile));
}
public Mono<Void> addDeleted(CassandraId cassandraId, List<MessageUid>
uids) {
@@ -144,7 +151,8 @@ public class CassandraDeletedMessageDAO {
return cassandraAsyncExecutor.executeVoid(
addStatement.bind()
.setUuid(MAILBOX_ID, cassandraId.asUuid())
- .setLong(UID, uids.iterator().next().asLong()));
+ .setLong(UID, uids.iterator().next().asLong())
+ .setExecutionProfile(writeProfile));
} else {
Stream<BatchStatement> batches = Lists.partition(uids,
BATCH_STATEMENT_WINDOW)
.stream()
@@ -164,14 +172,16 @@ public class CassandraDeletedMessageDAO {
public Mono<Void> removeDeleted(CassandraId cassandraId, MessageUid uid) {
return cassandraAsyncExecutor.executeVoid(deleteStatement.bind()
.setUuid(MAILBOX_ID, cassandraId.asUuid())
- .setLong(UID, uid.asLong()));
+ .setLong(UID, uid.asLong())
+ .setExecutionProfile(writeProfile));
}
public Mono<Void> removeDeleted(CassandraId cassandraId, List<MessageUid>
uids) {
if (uids.size() == 1) {
return cassandraAsyncExecutor.executeVoid(deleteStatement.bind()
.setUuid(MAILBOX_ID, cassandraId.asUuid())
- .setLong(UID, uids.iterator().next().asLong()));
+ .setLong(UID, uids.iterator().next().asLong())
+ .setExecutionProfile(writeProfile));
} else {
Stream<BatchStatement> batches = Lists.partition(uids,
BATCH_STATEMENT_WINDOW)
.stream()
@@ -179,7 +189,8 @@ public class CassandraDeletedMessageDAO {
BatchStatementBuilder batch = new
BatchStatementBuilder(BatchType.UNLOGGED);
uidBatch.forEach(uid ->
batch.addStatement(deleteStatement.bind()
.setUuid(MAILBOX_ID, cassandraId.asUuid())
- .setLong(UID, uid.asLong())));
+ .setLong(UID, uid.asLong()))
+ .setExecutionProfile(writeProfile));
return batch.build();
});
return Flux.fromStream(batches)
@@ -190,7 +201,8 @@ public class CassandraDeletedMessageDAO {
public Mono<Void> removeAll(CassandraId cassandraId) {
return cassandraAsyncExecutor.executeVoid(deleteAllStatement.bind()
- .set(MAILBOX_ID, cassandraId.asUuid(), TypeCodecs.TIMEUUID));
+ .set(MAILBOX_ID, cassandraId.asUuid(), TypeCodecs.TIMEUUID)
+ .setExecutionProfile(writeProfile));
}
public Flux<MessageUid> retrieveDeletedMessage(CassandraId cassandraId,
MessageRange range) {
@@ -216,14 +228,16 @@ public class CassandraDeletedMessageDAO {
private Flux<Row> retrieveAllDeleted(CassandraId cassandraId) {
return cassandraAsyncExecutor.executeRows(
selectAllUidStatement.bind()
- .set(MAILBOX_ID, cassandraId.asUuid(), TypeCodecs.TIMEUUID));
+ .set(MAILBOX_ID, cassandraId.asUuid(), TypeCodecs.TIMEUUID)
+ .setExecutionProfile(readProfile));
}
private Flux<Row> retrieveOneDeleted(CassandraId cassandraId, MessageUid
uid) {
return cassandraAsyncExecutor.executeRows(
selectOneUidStatement.bind()
.setUuid(MAILBOX_ID, cassandraId.asUuid())
- .setLong(UID, uid.asLong()));
+ .setLong(UID, uid.asLong())
+ .setExecutionProfile(readProfile));
}
private Flux<Row> retrieveDeletedBetween(CassandraId cassandraId,
MessageUid from, MessageUid to) {
@@ -231,14 +245,16 @@ public class CassandraDeletedMessageDAO {
selectBetweenUidStatement.bind()
.setUuid(MAILBOX_ID, cassandraId.asUuid())
.setLong(UID_FROM, from.asLong())
- .setLong(UID_TO, to.asLong()));
+ .setLong(UID_TO, to.asLong())
+ .setExecutionProfile(readProfile));
}
private Flux<Row> retrieveDeletedAfter(CassandraId cassandraId, MessageUid
from) {
return cassandraAsyncExecutor.executeRows(
selectFromUidStatement.bind()
.setUuid(MAILBOX_ID, cassandraId.asUuid())
- .setLong(UID_FROM, from.asLong()));
+ .setLong(UID_FROM, from.asLong())
+ .setExecutionProfile(readProfile));
}
private MessageUid asMessageUid(Row row) {
diff --git
a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraFirstUnseenDAO.java
b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraFirstUnseenDAO.java
index 934c60b9d7..3024e8d552 100644
---
a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraFirstUnseenDAO.java
+++
b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraFirstUnseenDAO.java
@@ -34,12 +34,14 @@ import java.util.stream.Stream;
import jakarta.inject.Inject;
import org.apache.james.backends.cassandra.utils.CassandraAsyncExecutor;
+import org.apache.james.backends.cassandra.utils.ProfileLocator;
import org.apache.james.mailbox.MessageUid;
import org.apache.james.mailbox.cassandra.ids.CassandraId;
import org.apache.james.mailbox.model.MessageRange;
import com.datastax.oss.driver.api.core.CqlSession;
import com.datastax.oss.driver.api.core.ProtocolVersion;
+import com.datastax.oss.driver.api.core.config.DriverExecutionProfile;
import com.datastax.oss.driver.api.core.cql.BatchStatement;
import com.datastax.oss.driver.api.core.cql.BatchStatementBuilder;
import com.datastax.oss.driver.api.core.cql.BatchType;
@@ -69,6 +71,8 @@ public class CassandraFirstUnseenDAO {
private final PreparedStatement selectBetweenUidStatement;
private final PreparedStatement selectFromUidStatement;
private final ProtocolVersion protocolVersion;
+ private final DriverExecutionProfile readProfile;
+ private final DriverExecutionProfile writeProfile;
@Inject
public CassandraFirstUnseenDAO(CqlSession session) {
@@ -82,6 +86,8 @@ public class CassandraFirstUnseenDAO {
this.selectOneUidStatement = prepareOneUidStatement(session);
this.selectBetweenUidStatement = prepareBetweenUidStatement(session);
this.selectFromUidStatement = prepareFromUidStatement(session);
+ this.readProfile = ProfileLocator.READ.locateProfile(session,
"FIRST_UNSEEN");
+ this.writeProfile = ProfileLocator.WRITE.locateProfile(session,
"FIRST_UNSEEN");
}
private PreparedStatement prepareOneUidStatement(CqlSession session) {
@@ -150,7 +156,8 @@ public class CassandraFirstUnseenDAO {
return cassandraAsyncExecutor.executeVoid(
addStatement.bind()
.setUuid(MAILBOX_ID, cassandraId.asUuid())
- .setLong(UID, uid.asLong()));
+ .setLong(UID, uid.asLong())
+ .setExecutionProfile(writeProfile));
}
public Mono<Void> addUnread(CassandraId mailboxId, List<MessageUid> uids) {
@@ -158,7 +165,8 @@ public class CassandraFirstUnseenDAO {
return cassandraAsyncExecutor.executeVoid(
addStatement.bind()
.setUuid(MAILBOX_ID, mailboxId.asUuid())
- .setLong(UID, uids.iterator().next().asLong()));
+ .setLong(UID, uids.iterator().next().asLong())
+ .setExecutionProfile(writeProfile));
} else {
Stream<BatchStatement> batches = Lists.partition(uids,
BATCH_STATEMENT_WINDOW)
.stream()
@@ -178,14 +186,16 @@ public class CassandraFirstUnseenDAO {
public Mono<Void> removeUnread(CassandraId cassandraId, MessageUid uid) {
return cassandraAsyncExecutor.executeVoid(deleteStatement.bind()
.setUuid(MAILBOX_ID, cassandraId.asUuid())
- .setLong(UID, uid.asLong()));
+ .setLong(UID, uid.asLong())
+ .setExecutionProfile(writeProfile));
}
public Mono<Void> removeUnread(CassandraId mailboxId, List<MessageUid>
uids) {
if (uids.size() == 1) {
return cassandraAsyncExecutor.executeVoid(deleteStatement.bind()
.setUuid(MAILBOX_ID, mailboxId.asUuid())
- .setLong(UID, uids.iterator().next().asLong()));
+ .setLong(UID, uids.iterator().next().asLong())
+ .setExecutionProfile(writeProfile));
} else {
Stream<BatchStatement> batches = Lists.partition(uids,
BATCH_STATEMENT_WINDOW)
.stream()
@@ -193,7 +203,8 @@ public class CassandraFirstUnseenDAO {
BatchStatementBuilder batch = new
BatchStatementBuilder(BatchType.UNLOGGED);
uidBatch.forEach(uid ->
batch.addStatement(deleteStatement.bind()
.setUuid(MAILBOX_ID, mailboxId.asUuid())
- .setLong(UID, uid.asLong())));
+ .setLong(UID, uid.asLong()))
+ .setExecutionProfile(writeProfile));
return batch.build();
});
return Flux.fromStream(batches)
@@ -204,20 +215,23 @@ public class CassandraFirstUnseenDAO {
public Mono<Void> removeAll(CassandraId cassandraId) {
return cassandraAsyncExecutor.executeVoid(deleteAllStatement.bind()
- .setUuid(MAILBOX_ID, cassandraId.asUuid()));
+ .setUuid(MAILBOX_ID, cassandraId.asUuid())
+ .setExecutionProfile(writeProfile));
}
public Mono<MessageUid> retrieveFirstUnread(CassandraId cassandraId) {
return cassandraAsyncExecutor.executeSingleRow(
readStatement.bind()
- .setUuid(MAILBOX_ID, cassandraId.asUuid()))
+ .setUuid(MAILBOX_ID, cassandraId.asUuid())
+ .setExecutionProfile(readProfile))
.map(this::asMessageUid);
}
public Flux<MessageUid> listUnseen(CassandraId cassandraId) {
return cassandraAsyncExecutor.executeRows(
listStatement.bind()
- .set(MAILBOX_ID, cassandraId.asUuid(),
TypeCodecs.TIMEUUID))
+ .set(MAILBOX_ID, cassandraId.asUuid(), TypeCodecs.TIMEUUID)
+ .setExecutionProfile(readProfile))
.map(this::asMessageUid);
}
@@ -229,20 +243,23 @@ public class CassandraFirstUnseenDAO {
return cassandraAsyncExecutor.executeRows(
selectFromUidStatement.bind()
.setLong(UID_FROM, range.getUidFrom().asLong())
- .set(MAILBOX_ID, cassandraId.asUuid(),
TypeCodecs.TIMEUUID))
+ .set(MAILBOX_ID, cassandraId.asUuid(),
TypeCodecs.TIMEUUID)
+ .setExecutionProfile(readProfile))
.map(this::asMessageUid);
case RANGE:
return cassandraAsyncExecutor.executeRows(
selectBetweenUidStatement.bind()
.setLong(UID_FROM, range.getUidFrom().asLong())
.setLong(UID_TO, range.getUidTo().asLong())
- .set(MAILBOX_ID, cassandraId.asUuid(),
TypeCodecs.TIMEUUID))
+ .set(MAILBOX_ID, cassandraId.asUuid(),
TypeCodecs.TIMEUUID)
+ .setExecutionProfile(readProfile))
.map(this::asMessageUid);
case ONE:
return cassandraAsyncExecutor.executeRows(
selectOneUidStatement.bind()
.setLong(UID, range.getUidFrom().asLong())
- .set(MAILBOX_ID, cassandraId.asUuid(),
TypeCodecs.TIMEUUID))
+ .set(MAILBOX_ID, cassandraId.asUuid(),
TypeCodecs.TIMEUUID)
+ .setExecutionProfile(readProfile))
.map(this::asMessageUid);
default:
throw new RuntimeException("Unsupported range type " +
range.getType());
diff --git
a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMailboxCounterDAO.java
b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMailboxCounterDAO.java
index 16aec0233b..d0833a4f0f 100644
---
a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMailboxCounterDAO.java
+++
b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMailboxCounterDAO.java
@@ -34,11 +34,13 @@ import static
org.apache.james.mailbox.cassandra.table.CassandraMailboxCountersT
import jakarta.inject.Inject;
import org.apache.james.backends.cassandra.utils.CassandraAsyncExecutor;
+import org.apache.james.backends.cassandra.utils.ProfileLocator;
import org.apache.james.mailbox.cassandra.ids.CassandraId;
import org.apache.james.mailbox.model.Mailbox;
import org.apache.james.mailbox.model.MailboxCounters;
import com.datastax.oss.driver.api.core.CqlSession;
+import com.datastax.oss.driver.api.core.config.DriverExecutionProfile;
import com.datastax.oss.driver.api.core.cql.BoundStatement;
import com.datastax.oss.driver.api.core.cql.PreparedStatement;
import com.datastax.oss.driver.api.core.type.codec.TypeCodecs;
@@ -59,6 +61,8 @@ public class CassandraMailboxCounterDAO {
private final PreparedStatement incrementUnseenAndCountStatement;
private final PreparedStatement decrementUnseenAndCountStatement;
private final PreparedStatement deleteStatement;
+ private final DriverExecutionProfile readProfile;
+ private final DriverExecutionProfile writeProfile;
@Inject
public CassandraMailboxCounterDAO(CqlSession session) {
@@ -97,6 +101,8 @@ public class CassandraMailboxCounterDAO {
deleteStatement = session.prepare(deleteFrom(TABLE_NAME)
.where(column(MAILBOX_ID).isEqualTo(bindMarker(MAILBOX_ID)))
.build());
+ readProfile = ProfileLocator.READ.locateProfile(session, "COUNTERS");
+ writeProfile = ProfileLocator.WRITE.locateProfile(session, "COUNTERS");
}
private PreparedStatement createReadStatement(CqlSession session) {
@@ -116,11 +122,11 @@ public class CassandraMailboxCounterDAO {
}
public Mono<Void> delete(CassandraId mailboxId) {
- return cassandraAsyncExecutor.executeVoid(bindWithMailbox(mailboxId,
deleteStatement));
+ return cassandraAsyncExecutor.executeVoid(bindWithMailbox(mailboxId,
deleteStatement).setExecutionProfile(writeProfile));
}
public Mono<MailboxCounters> retrieveMailboxCounters(CassandraId
mailboxId) {
- return
cassandraAsyncExecutor.executeSingleRow(bindWithMailbox(mailboxId,
readStatement))
+ return
cassandraAsyncExecutor.executeSingleRow(bindWithMailbox(mailboxId,
readStatement).setExecutionProfile(readProfile))
.map(row -> MailboxCounters.builder()
.mailboxId(mailboxId)
.count(row.getLong(COUNT))
@@ -147,7 +153,8 @@ public class CassandraMailboxCounterDAO {
return cassandraAsyncExecutor.executeVoid(
bindWithMailbox(mailboxId, addToCounters)
.setLong(COUNT, counters.getCount())
- .setLong(UNSEEN, counters.getUnseen()));
+ .setLong(UNSEEN, counters.getUnseen())
+ .setExecutionProfile(writeProfile));
}
public Mono<Void> remove(MailboxCounters counters) {
@@ -155,7 +162,8 @@ public class CassandraMailboxCounterDAO {
return cassandraAsyncExecutor.executeVoid(
bindWithMailbox(mailboxId, removeToCounters)
.setLong(COUNT, counters.getCount())
- .setLong(UNSEEN, counters.getUnseen()));
+ .setLong(UNSEEN, counters.getUnseen())
+ .setExecutionProfile(writeProfile));
}
public Mono<Long> countMessagesInMailbox(Mailbox mailbox) {
@@ -165,46 +173,39 @@ public class CassandraMailboxCounterDAO {
}
public Mono<Long> countMessagesInMailbox(CassandraId cassandraId) {
- return
cassandraAsyncExecutor.executeSingleRow(bindWithMailbox(cassandraId,
readStatement))
+ return
cassandraAsyncExecutor.executeSingleRow(bindWithMailbox(cassandraId,
readStatement).setExecutionProfile(readProfile))
.map(row -> row.getLong(COUNT));
}
public Mono<Long> countUnseenMessagesInMailbox(Mailbox mailbox) {
CassandraId mailboxId = (CassandraId) mailbox.getMailboxId();
- return
cassandraAsyncExecutor.executeSingleRow(bindWithMailbox(mailboxId,
readStatement))
+ return
cassandraAsyncExecutor.executeSingleRow(bindWithMailbox(mailboxId,
readStatement).setExecutionProfile(readProfile))
.map(row -> row.getLong(UNSEEN));
}
public Mono<Void> decrementCount(CassandraId mailboxId) {
- return cassandraAsyncExecutor.executeVoid(bindWithMailbox(mailboxId,
decrementMessageCountStatement));
+ return cassandraAsyncExecutor.executeVoid(bindWithMailbox(mailboxId,
decrementMessageCountStatement).setExecutionProfile(writeProfile));
}
public Mono<Void> incrementCount(CassandraId mailboxId) {
- return cassandraAsyncExecutor.executeVoid(bindWithMailbox(mailboxId,
incrementMessageCountStatement));
+ return cassandraAsyncExecutor.executeVoid(bindWithMailbox(mailboxId,
incrementMessageCountStatement).setExecutionProfile(writeProfile));
}
public Mono<Void> decrementUnseen(CassandraId mailboxId) {
- return cassandraAsyncExecutor.executeVoid(bindWithMailbox(mailboxId,
decrementUnseenCountStatement));
+ return cassandraAsyncExecutor.executeVoid(bindWithMailbox(mailboxId,
decrementUnseenCountStatement).setExecutionProfile(writeProfile));
}
public Mono<Void> incrementUnseen(CassandraId mailboxId) {
- return cassandraAsyncExecutor.executeVoid(bindWithMailbox(mailboxId,
incrementUnseenCountStatement));
+ return cassandraAsyncExecutor.executeVoid(bindWithMailbox(mailboxId,
incrementUnseenCountStatement).setExecutionProfile(writeProfile));
}
public Mono<Void> decrementUnseenAndCount(CassandraId mailboxId) {
- return cassandraAsyncExecutor.executeVoid(bindWithMailbox(mailboxId,
decrementUnseenAndCountStatement));
+ return cassandraAsyncExecutor.executeVoid(bindWithMailbox(mailboxId,
decrementUnseenAndCountStatement).setExecutionProfile(writeProfile));
}
public Mono<Void> incrementUnseenAndCount(CassandraId mailboxId) {
- return cassandraAsyncExecutor.executeVoid(bindWithMailbox(mailboxId,
incrementUnseenAndCountStatement));
- }
-
- public Mono<Void> incrementUnseenAndCount(CassandraId mailboxId, long
count, long unseen) {
- return cassandraAsyncExecutor.executeVoid(
- bindWithMailbox(mailboxId, addToCounters)
- .setLong(COUNT, count)
- .setLong(UNSEEN, unseen));
+ return cassandraAsyncExecutor.executeVoid(bindWithMailbox(mailboxId,
incrementUnseenAndCountStatement).setExecutionProfile(writeProfile));
}
private BoundStatement bindWithMailbox(CassandraId mailboxId,
PreparedStatement statement) {
diff --git
a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMailboxDAO.java
b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMailboxDAO.java
index 940cecacb8..6b92ed58fa 100644
---
a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMailboxDAO.java
+++
b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMailboxDAO.java
@@ -38,6 +38,7 @@ import jakarta.inject.Inject;
import org.apache.james.backends.cassandra.init.CassandraTypesProvider;
import org.apache.james.backends.cassandra.utils.CassandraAsyncExecutor;
+import org.apache.james.backends.cassandra.utils.ProfileLocator;
import org.apache.james.core.Username;
import org.apache.james.mailbox.cassandra.ids.CassandraId;
import org.apache.james.mailbox.cassandra.mail.utils.MailboxBaseTupleUtil;
@@ -47,6 +48,7 @@ import org.apache.james.mailbox.model.MailboxPath;
import org.apache.james.mailbox.model.UidValidity;
import com.datastax.oss.driver.api.core.CqlSession;
+import com.datastax.oss.driver.api.core.config.DriverExecutionProfile;
import com.datastax.oss.driver.api.core.cql.PreparedStatement;
import com.datastax.oss.driver.api.core.cql.Row;
import com.datastax.oss.driver.api.core.data.UdtValue;
@@ -68,6 +70,8 @@ public class CassandraMailboxDAO {
private final PreparedStatement updateUidValidityStatement;
private final CqlSession session;
private final TypeCodec<UdtValue> mailboxBaseTypeCodec;
+ private final DriverExecutionProfile readProfile;
+ private final DriverExecutionProfile writeProfile;
@Inject
public CassandraMailboxDAO(CqlSession session, CassandraTypesProvider
typesProvider) {
@@ -82,6 +86,8 @@ public class CassandraMailboxDAO {
this.readStatement = prepareRead();
this.mailboxBaseTypeCodec =
CodecRegistry.DEFAULT.codecFor(typesProvider.getDefinedUserType(MAILBOX_BASE.asCql(true)));
+ this.readProfile = ProfileLocator.READ.locateProfile(session,
"MAILBOX");
+ this.writeProfile = ProfileLocator.WRITE.locateProfile(session,
"MAILBOX");
}
private PreparedStatement prepareInsert() {
@@ -133,24 +139,28 @@ public class CassandraMailboxDAO {
.setUuid(ID, cassandraId.asUuid())
.setString(NAME, mailbox.getName())
.setLong(UIDVALIDITY, mailbox.getUidValidity().asLong())
- .setUdtValue(MAILBOX_BASE,
mailboxBaseTupleUtil.createMailboxBaseUDT(mailbox.getNamespace(),
mailbox.getUser())));
+ .setUdtValue(MAILBOX_BASE,
mailboxBaseTupleUtil.createMailboxBaseUDT(mailbox.getNamespace(),
mailbox.getUser()))
+ .setExecutionProfile(writeProfile));
}
public Mono<Void> updatePath(CassandraId mailboxId, MailboxPath
mailboxPath) {
return executor.executeVoid(updateStatement.bind()
.setUuid(ID, mailboxId.asUuid())
.setString(NAME, mailboxPath.getName())
- .setUdtValue(MAILBOX_BASE,
mailboxBaseTupleUtil.createMailboxBaseUDT(mailboxPath.getNamespace(),
mailboxPath.getUser())));
+ .setUdtValue(MAILBOX_BASE,
mailboxBaseTupleUtil.createMailboxBaseUDT(mailboxPath.getNamespace(),
mailboxPath.getUser()))
+ .setExecutionProfile(writeProfile));
}
public Mono<Void> delete(CassandraId mailboxId) {
return executor.executeVoid(deleteStatement.bind()
- .setUuid(ID, mailboxId.asUuid()));
+ .setUuid(ID, mailboxId.asUuid())
+ .setExecutionProfile(writeProfile));
}
public Mono<Mailbox> retrieveMailbox(CassandraId mailboxId) {
return executor.executeSingleRow(readStatement.bind()
- .set(ID, mailboxId.asUuid(), TypeCodecs.TIMEUUID))
+ .set(ID, mailboxId.asUuid(), TypeCodecs.TIMEUUID)
+ .setExecutionProfile(readProfile))
.flatMap(row -> mailboxFromRow(row, mailboxId));
}
@@ -185,7 +195,8 @@ public class CassandraMailboxDAO {
private Mono<Void> updateUidValidity(CassandraId cassandraId, UidValidity
uidValidity) {
return executor.executeVoid(updateUidValidityStatement.bind()
.setUuid(ID, cassandraId.asUuid())
- .setLong(UIDVALIDITY, uidValidity.asLong()));
+ .setLong(UIDVALIDITY, uidValidity.asLong())
+ .setExecutionProfile(writeProfile));
}
public Flux<Mailbox> retrieveAllMailboxes() {
diff --git
a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMailboxPathV3DAO.java
b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMailboxPathV3DAO.java
index 22c0f31c59..d6b4217ee9 100644
---
a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMailboxPathV3DAO.java
+++
b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMailboxPathV3DAO.java
@@ -38,6 +38,7 @@ import jakarta.inject.Inject;
import
org.apache.james.backends.cassandra.init.configuration.JamesExecutionProfiles;
import org.apache.james.backends.cassandra.utils.CassandraAsyncExecutor;
+import org.apache.james.backends.cassandra.utils.ProfileLocator;
import org.apache.james.core.Username;
import org.apache.james.mailbox.cassandra.GhostMailbox;
import org.apache.james.mailbox.cassandra.ids.CassandraId;
@@ -67,6 +68,8 @@ public class CassandraMailboxPathV3DAO {
private final PreparedStatement selectAll;
private final CqlSession session;
private final DriverExecutionProfile lwtProfile;
+ private final DriverExecutionProfile readProfile;
+ private final DriverExecutionProfile writeProfile;
@Inject
public CassandraMailboxPathV3DAO(CqlSession session) {
@@ -78,6 +81,8 @@ public class CassandraMailboxPathV3DAO {
this.selectUser = prepareSelectUser();
this.selectAll = prepareSelectAll();
this.lwtProfile = JamesExecutionProfiles.getLWTProfile(session);
+ this.readProfile = ProfileLocator.READ.locateProfile(session,
"MAILBOXPATHV3");
+ this.writeProfile = ProfileLocator.WRITE.locateProfile(session,
"MAILBOXPATHV3");
}
private PreparedStatement prepareDelete() {
@@ -146,6 +151,8 @@ public class CassandraMailboxPathV3DAO {
if (consistencyChoice.equals(STRONG)) {
statementBuilder.setExecutionProfile(lwtProfile);
+ } else {
+ statementBuilder.setExecutionProfile(readProfile);
}
return cassandraAsyncExecutor.executeRows(statementBuilder.build())
@@ -240,7 +247,7 @@ public class CassandraMailboxPathV3DAO {
if (consistencyChoice.equals(STRONG)) {
return statement.setExecutionProfile(lwtProfile);
} else {
- return statement;
+ return statement.setExecutionProfile(readProfile);
}
}
}
diff --git
a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMailboxRecentsDAO.java
b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMailboxRecentsDAO.java
index 80de0e5d9b..0ce0e36d9e 100644
---
a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMailboxRecentsDAO.java
+++
b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMailboxRecentsDAO.java
@@ -31,6 +31,7 @@ import java.util.stream.Stream;
import jakarta.inject.Inject;
import org.apache.james.backends.cassandra.utils.CassandraAsyncExecutor;
+import org.apache.james.backends.cassandra.utils.ProfileLocator;
import org.apache.james.mailbox.MessageUid;
import org.apache.james.mailbox.cassandra.ids.CassandraId;
import org.apache.james.mailbox.cassandra.table.CassandraMailboxRecentsTable;
@@ -38,6 +39,7 @@ import org.apache.james.mailbox.store.StoreMessageManager;
import com.datastax.oss.driver.api.core.CqlSession;
import com.datastax.oss.driver.api.core.ProtocolVersion;
+import com.datastax.oss.driver.api.core.config.DriverExecutionProfile;
import com.datastax.oss.driver.api.core.cql.BatchStatement;
import com.datastax.oss.driver.api.core.cql.BatchStatementBuilder;
import com.datastax.oss.driver.api.core.cql.BatchType;
@@ -59,6 +61,8 @@ public class CassandraMailboxRecentsDAO {
private final PreparedStatement deleteAllStatement;
private final PreparedStatement addStatement;
private final ProtocolVersion protocolVersion;
+ private final DriverExecutionProfile readProfile;
+ private final DriverExecutionProfile writeProfile;
@Inject
public CassandraMailboxRecentsDAO(CqlSession session) {
@@ -68,6 +72,8 @@ public class CassandraMailboxRecentsDAO {
deleteAllStatement = createDeleteAllStatement(session);
addStatement = createAddStatement(session);
protocolVersion = session.getContext().getProtocolVersion();
+ this.readProfile = ProfileLocator.READ.locateProfile(session,
"RECENTS");
+ this.writeProfile = ProfileLocator.WRITE.locateProfile(session,
"RECENTS");
}
private PreparedStatement createReadStatement(CqlSession session) {
diff --git
a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageDAOV3.java
b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageDAOV3.java
index a9221f0c5f..050cc18bf6 100644
---
a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageDAOV3.java
+++
b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageDAOV3.java
@@ -65,7 +65,10 @@ import jakarta.inject.Inject;
import org.apache.commons.io.IOUtils;
import org.apache.james.backends.cassandra.init.CassandraTypesProvider;
+import
org.apache.james.backends.cassandra.init.configuration.CassandraConfiguration;
+import
org.apache.james.backends.cassandra.init.configuration.JamesExecutionProfiles;
import org.apache.james.backends.cassandra.utils.CassandraAsyncExecutor;
+import org.apache.james.backends.cassandra.utils.ProfileLocator;
import org.apache.james.blob.api.BlobId;
import org.apache.james.blob.api.BlobStore;
import org.apache.james.mailbox.cassandra.ids.CassandraMessageId;
@@ -84,6 +87,7 @@ import
org.apache.james.mailbox.store.mail.model.impl.PropertyBuilder;
import com.datastax.oss.driver.api.core.CqlIdentifier;
import com.datastax.oss.driver.api.core.CqlSession;
+import com.datastax.oss.driver.api.core.config.DriverExecutionProfile;
import com.datastax.oss.driver.api.core.cql.BoundStatement;
import com.datastax.oss.driver.api.core.cql.BoundStatementBuilder;
import com.datastax.oss.driver.api.core.cql.PreparedStatement;
@@ -114,13 +118,17 @@ public class CassandraMessageDAOV3 {
private final PreparedStatement delete;
private final PreparedStatement select;
private final PreparedStatement listBlobs;
+ private final CassandraConfiguration configuration;
private final Cid.CidParser cidParser;
private final UserDefinedType attachmentsType;
private final TypeCodec<List<UdtValue>> attachmentCodec;
+ private final DriverExecutionProfile readProfile;
+ private final DriverExecutionProfile writeProfile;
+ private final DriverExecutionProfile optimisticConsistencyLevelProfile;
@Inject
public CassandraMessageDAOV3(CqlSession session, CassandraTypesProvider
typesProvider, BlobStore blobStore,
- BlobId.Factory blobIdFactory) {
+ BlobId.Factory blobIdFactory,
CassandraConfiguration cassandraConfiguration) {
this.cassandraAsyncExecutor = new CassandraAsyncExecutor(session);
this.blobStore = blobStore;
this.blobIdFactory = blobIdFactory;
@@ -129,9 +137,14 @@ public class CassandraMessageDAOV3 {
this.delete = prepareDelete(session);
this.select = prepareSelect(session);
this.listBlobs = prepareSelectBlobs(session);
+ this.configuration = cassandraConfiguration;
this.cidParser = Cid.parser().relaxed();
this.attachmentsType =
typesProvider.getDefinedUserType(ATTACHMENTS.asCql(true));
this.attachmentCodec =
CodecRegistry.DEFAULT.codecFor(listOf(attachmentsType));
+
+ this.readProfile = ProfileLocator.READ.locateProfile(session,
"MESSAGEV3");
+ this.writeProfile = ProfileLocator.WRITE.locateProfile(session,
"MESSAGEV3");
+ this.optimisticConsistencyLevelProfile =
JamesExecutionProfiles.getOptimisticConsistencyLevelProfile(session);
}
private PreparedStatement prepareSelect(CqlSession session) {
@@ -262,7 +275,8 @@ public class CassandraMessageDAOV3 {
.setString(CONTENT_LOCATION,
message.getProperties().getContentLocation())
.set(CONTENT_LANGUAGE,
message.getProperties().getContentLanguage(), LIST_OF_STRINGS_CODEC)
.set(CONTENT_DISPOSITION_PARAMETERS,
message.getProperties().getContentDispositionParameters(), MAP_OF_STRINGS_CODEC)
- .set(CONTENT_TYPE_PARAMETERS,
message.getProperties().getContentTypeParameters(), MAP_OF_STRINGS_CODEC);
+ .set(CONTENT_TYPE_PARAMETERS,
message.getProperties().getContentTypeParameters(), MAP_OF_STRINGS_CODEC)
+ .setExecutionProfile(writeProfile);
if (message.getAttachments().isEmpty()) {
return boundStatement.unset(ATTACHMENTS);
@@ -321,14 +335,20 @@ public class CassandraMessageDAOV3 {
}
public Mono<MessageRepresentation> retrieveMessage(CassandraMessageId
cassandraMessageId, FetchType fetchType) {
- return retrieveRow(cassandraMessageId)
+ if (configuration.isOptimisticConsistencyLevel()) {
+ return retrieveRow(cassandraMessageId,
optimisticConsistencyLevelProfile)
+ .switchIfEmpty(Mono.defer(() ->
retrieveRow(cassandraMessageId, readProfile)))
+ .flatMap(row -> message(row, cassandraMessageId, fetchType));
+ }
+ return retrieveRow(cassandraMessageId, readProfile)
.flatMap(row -> message(row, cassandraMessageId, fetchType));
}
- private Mono<Row> retrieveRow(CassandraMessageId messageId) {
+ private Mono<Row> retrieveRow(CassandraMessageId messageId,
DriverExecutionProfile driverExecutionProfile) {
return cassandraAsyncExecutor.executeSingleRow(select
.bind()
- .set(MESSAGE_ID, messageId.get(), TypeCodecs.TIMEUUID));
+ .set(MESSAGE_ID, messageId.get(), TypeCodecs.TIMEUUID)
+ .setExecutionProfile(driverExecutionProfile));
}
private Mono<MessageRepresentation> message(Row row, CassandraMessageId
cassandraMessageId, FetchType fetchType) {
@@ -389,7 +409,8 @@ public class CassandraMessageDAOV3 {
public Mono<Void> delete(CassandraMessageId messageId) {
return cassandraAsyncExecutor.executeVoid(delete.bind()
- .setUuid(MESSAGE_ID, messageId.get()));
+ .setUuid(MESSAGE_ID, messageId.get())
+ .setExecutionProfile(writeProfile));
}
private Mono<Content> buildContentRetriever(FetchType fetchType, BlobId
headerId, BlobId bodyId) {
diff --git
a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageIdDAO.java
b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageIdDAO.java
index 556f30038a..f0ddb2eb2d 100644
---
a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageIdDAO.java
+++
b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageIdDAO.java
@@ -59,6 +59,7 @@ import jakarta.mail.Flags;
import jakarta.mail.Flags.Flag;
import org.apache.james.backends.cassandra.utils.CassandraAsyncExecutor;
+import org.apache.james.backends.cassandra.utils.ProfileLocator;
import org.apache.james.blob.api.BlobId;
import org.apache.james.mailbox.MessageUid;
import org.apache.james.mailbox.ModSeq;
@@ -74,6 +75,7 @@ import org.apache.james.util.streams.Limit;
import com.datastax.oss.driver.api.core.CqlSession;
import com.datastax.oss.driver.api.core.ProtocolVersion;
+import com.datastax.oss.driver.api.core.config.DriverExecutionProfile;
import com.datastax.oss.driver.api.core.cql.BoundStatement;
import com.datastax.oss.driver.api.core.cql.BoundStatementBuilder;
import com.datastax.oss.driver.api.core.cql.PreparedStatement;
@@ -129,6 +131,9 @@ public class CassandraMessageIdDAO {
private final PreparedStatement update;
private final PreparedStatement listStatement;
private final ProtocolVersion protocolVersion;
+ private final DriverExecutionProfile readProfile;
+ private final DriverExecutionProfile listUidProfile;
+ private final DriverExecutionProfile writeProfile;
@Inject
public CassandraMessageIdDAO(CqlSession session, BlobId.Factory
blobIdFactory) {
@@ -151,6 +156,9 @@ public class CassandraMessageIdDAO {
this.listStatement = prepareList(session);
this.selectMetadataRange = prepareSelectMetadataRange(session);
this.selectNotDeletedRange = prepareSelectNotDeletedRange(session);
+ this.readProfile = ProfileLocator.READ.locateProfile(session,
"MESSAGEID");
+ this.listUidProfile = ProfileLocator.READ.locateProfile(session,
"MESSAGEID-LIST-UID");
+ this.writeProfile = ProfileLocator.WRITE.locateProfile(session,
"MESSAGEID");
}
private PreparedStatement prepareDelete(CqlSession session) {
@@ -322,7 +330,8 @@ public class CassandraMessageIdDAO {
public Mono<Void> delete(CassandraId mailboxId, MessageUid uid) {
return cassandraAsyncExecutor.executeVoid(delete.bind()
.set(MAILBOX_ID, mailboxId.asUuid(), TypeCodecs.TIMEUUID)
- .setLong(IMAP_UID, uid.asLong()));
+ .setLong(IMAP_UID, uid.asLong())
+ .setExecutionProfile(writeProfile));
}
public Mono<Void> insert(CassandraMessageMetadata metadata) {
@@ -356,6 +365,7 @@ public class CassandraMessageIdDAO {
.setInt(BODY_START_OCTET,
Math.toIntExact(metadata.getBodyStartOctet().get()))
.setLong(FULL_CONTENT_OCTETS, metadata.getSize().get())
.setString(HEADER_CONTENT,
metadata.getHeaderContent().get().asString())
+ .setExecutionProfile(writeProfile)
.build());
}
@@ -420,6 +430,7 @@ public class CassandraMessageIdDAO {
} else {
statementBuilder.setSet(REMOVED_USERS_FLAGS, removedFlags,
String.class);
}
+ statementBuilder.setExecutionProfile(writeProfile);
return statementBuilder.build();
}
@@ -436,7 +447,8 @@ public class CassandraMessageIdDAO {
private Mono<Row> selectOneRow(CassandraId mailboxId, MessageUid uid) {
return cassandraAsyncExecutor.executeSingleRow(select.bind()
.set(MAILBOX_ID, mailboxId.asUuid(), TypeCodecs.TIMEUUID)
- .setLong(IMAP_UID, uid.asLong()));
+ .setLong(IMAP_UID, uid.asLong())
+ .setExecutionProfile(readProfile));
}
public Flux<CassandraMessageMetadata> retrieveMessages(CassandraId
mailboxId, MessageRange set, Limit limit) {
@@ -447,7 +459,8 @@ public class CassandraMessageIdDAO {
public Flux<MessageUid> listUids(CassandraId mailboxId) {
return cassandraAsyncExecutor.executeRows(selectAllUids.bind()
- .set(MAILBOX_ID, mailboxId.asUuid(), TypeCodecs.TIMEUUID))
+ .set(MAILBOX_ID, mailboxId.asUuid(), TypeCodecs.TIMEUUID)
+ .setExecutionProfile(listUidProfile))
.map(row ->
MessageUid.of(TypeCodecs.BIGINT.decodePrimitive(row.getBytesUnsafe(0),
protocolVersion)));
}
@@ -455,7 +468,8 @@ public class CassandraMessageIdDAO {
return cassandraAsyncExecutor.executeRows(selectMetadataRange.bind()
.set(MAILBOX_ID, mailboxId.asUuid(), TypeCodecs.TIMEUUID)
.setLong(IMAP_UID_GTE, range.getUidFrom().asLong())
- .setLong(IMAP_UID_LTE, range.getUidTo().asLong()))
+ .setLong(IMAP_UID_LTE, range.getUidTo().asLong())
+ .setExecutionProfile(readProfile))
.map(row -> {
CassandraMessageId messageId =
CassandraMessageId.Factory.of(row.get(MESSAGE_ID, TypeCodecs.TIMEUUID));
return ComposedMessageIdWithMetaData.builder()
@@ -476,7 +490,8 @@ public class CassandraMessageIdDAO {
return cassandraAsyncExecutor.executeRows(selectNotDeletedRange.bind()
.set(MAILBOX_ID, mailboxId.asUuid(), TypeCodecs.TIMEUUID)
.setLong(IMAP_UID_GTE, range.getUidFrom().asLong())
- .setLong(IMAP_UID_LTE, range.getUidTo().asLong()))
+ .setLong(IMAP_UID_LTE, range.getUidTo().asLong())
+ .setExecutionProfile(listUidProfile))
.filter(row -> !TypeCodecs.BOOLEAN.decodePrimitive(
row.getBytesUnsafe(deletedPosition.get(() ->
row.getColumnDefinitions().firstIndexOf(DELETED))), protocolVersion))
.map(row -> MessageUid.of(TypeCodecs.BIGINT.decodePrimitive(
@@ -487,7 +502,8 @@ public class CassandraMessageIdDAO {
return cassandraAsyncExecutor.executeRows(selectUidOnlyRange.bind()
.set(MAILBOX_ID, mailboxId.asUuid(), TypeCodecs.TIMEUUID)
.setLong(IMAP_UID_GTE, range.getUidFrom().asLong())
- .setLong(IMAP_UID_LTE, range.getUidTo().asLong()))
+ .setLong(IMAP_UID_LTE, range.getUidTo().asLong())
+ .setExecutionProfile(listUidProfile))
.map(row ->
MessageUid.of(TypeCodecs.BIGINT.decodePrimitive(row.getBytesUnsafe(0),
protocolVersion)));
}
@@ -523,9 +539,11 @@ public class CassandraMessageIdDAO {
return cassandraAsyncExecutor.executeRows(limit.getLimit()
.map(limitAsInt -> selectAllLimited.bind()
.set(MAILBOX_ID, mailboxId.asUuid(), TypeCodecs.TIMEUUID)
- .setInt(LIMIT, limitAsInt))
+ .setInt(LIMIT, limitAsInt)
+ .setExecutionProfile(readProfile))
.orElseGet(() -> selectAll.bind()
- .set(MAILBOX_ID, mailboxId.asUuid(), TypeCodecs.TIMEUUID)));
+ .set(MAILBOX_ID, mailboxId.asUuid(), TypeCodecs.TIMEUUID)
+ .setExecutionProfile(readProfile)));
}
private Flux<Row> selectFrom(CassandraId mailboxId, MessageUid uid, Limit
limit) {
@@ -533,10 +551,12 @@ public class CassandraMessageIdDAO {
.map(limitAsInt -> selectUidGteLimited.bind()
.set(MAILBOX_ID, mailboxId.asUuid(), TypeCodecs.TIMEUUID)
.setLong(IMAP_UID, uid.asLong())
- .setInt(LIMIT, limitAsInt))
+ .setInt(LIMIT, limitAsInt)
+ .setExecutionProfile(readProfile))
.orElseGet(() -> selectUidGte.bind()
.set(MAILBOX_ID, mailboxId.asUuid(), TypeCodecs.TIMEUUID)
- .setLong(IMAP_UID, uid.asLong())));
+ .setLong(IMAP_UID, uid.asLong())
+ .setExecutionProfile(readProfile)));
}
private Flux<Row> selectRange(CassandraId mailboxId, MessageUid from,
MessageUid to, Limit limit) {
@@ -545,11 +565,13 @@ public class CassandraMessageIdDAO {
.set(MAILBOX_ID, mailboxId.asUuid(), TypeCodecs.TIMEUUID)
.setLong(IMAP_UID_GTE, from.asLong())
.setLong(IMAP_UID_LTE, to.asLong())
- .setInt(LIMIT, limitAsInt))
+ .setInt(LIMIT, limitAsInt)
+ .setExecutionProfile(readProfile))
.orElseGet(() -> selectUidRange.bind()
.set(MAILBOX_ID, mailboxId.asUuid(), TypeCodecs.TIMEUUID)
.setLong(IMAP_UID_GTE, from.asLong())
- .setLong(IMAP_UID_LTE, to.asLong())));
+ .setLong(IMAP_UID_LTE, to.asLong())
+ .setExecutionProfile(readProfile)));
}
private Optional<CassandraMessageMetadata>
fromRowToComposedMessageIdWithFlags(Row row) {
@@ -622,6 +644,7 @@ public class CassandraMessageIdDAO {
.setInt(BODY_START_OCTET, 0)
.setLong(FULL_CONTENT_OCTETS, 0)
.setString(HEADER_CONTENT, null)
+ .setExecutionProfile(writeProfile)
.build());
}
}
diff --git
a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageIdToImapUidDAO.java
b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageIdToImapUidDAO.java
index ec58165fc6..6f793a0cef 100644
---
a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageIdToImapUidDAO.java
+++
b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageIdToImapUidDAO.java
@@ -59,6 +59,7 @@ import jakarta.mail.Flags.Flag;
import
org.apache.james.backends.cassandra.init.configuration.CassandraConfiguration;
import
org.apache.james.backends.cassandra.init.configuration.JamesExecutionProfiles;
import org.apache.james.backends.cassandra.utils.CassandraAsyncExecutor;
+import org.apache.james.backends.cassandra.utils.ProfileLocator;
import org.apache.james.blob.api.BlobId;
import org.apache.james.mailbox.MessageUid;
import org.apache.james.mailbox.ModSeq;
@@ -96,7 +97,6 @@ public class CassandraMessageIdToImapUidDAO {
private final BlobId.Factory blobIdFactory;
private final PreparedStatement delete;
private final PreparedStatement insert;
- private final PreparedStatement insertForced;
private final PreparedStatement update;
private final PreparedStatement selectAll;
private final PreparedStatement select;
@@ -104,6 +104,8 @@ public class CassandraMessageIdToImapUidDAO {
private final CassandraConfiguration cassandraConfiguration;
private final CqlSession session;
private final DriverExecutionProfile lwtProfile;
+ private final DriverExecutionProfile readProfile;
+ private final DriverExecutionProfile writeProfile;
@Inject
public CassandraMessageIdToImapUidDAO(CqlSession session, BlobId.Factory
blobIdFactory,
@@ -114,12 +116,13 @@ public class CassandraMessageIdToImapUidDAO {
this.cassandraConfiguration = cassandraConfiguration;
this.delete = prepareDelete();
this.insert = prepareInsert();
- this.insertForced = prepareInsertForced();
this.update = prepareUpdate();
this.selectAll = prepareSelectAll();
this.select = prepareSelect();
this.listStatement = prepareList();
this.lwtProfile = JamesExecutionProfiles.getLWTProfile(session);
+ this.readProfile = ProfileLocator.READ.locateProfile(session,
"IMAP-UID-TABLE");
+ this.writeProfile = ProfileLocator.WRITE.locateProfile(session,
"IMAP-UID-TABLE");
}
private PreparedStatement prepareDelete() {
@@ -175,28 +178,6 @@ public class CassandraMessageIdToImapUidDAO {
}
}
- private PreparedStatement prepareInsertForced() {
- Insert insert = insertInto(TABLE_NAME)
- .value(MESSAGE_ID, bindMarker(MESSAGE_ID))
- .value(MAILBOX_ID, bindMarker(MAILBOX_ID))
- .value(IMAP_UID, bindMarker(IMAP_UID))
- .value(MOD_SEQ, bindMarker(MOD_SEQ))
- .value(ANSWERED, bindMarker(ANSWERED))
- .value(DELETED, bindMarker(DELETED))
- .value(DRAFT, bindMarker(DRAFT))
- .value(FLAGGED, bindMarker(FLAGGED))
- .value(RECENT, bindMarker(RECENT))
- .value(SEEN, bindMarker(SEEN))
- .value(USER, bindMarker(USER))
- .value(USER_FLAGS, bindMarker(USER_FLAGS))
- .value(INTERNAL_DATE, bindMarker(INTERNAL_DATE))
- .value(SAVE_DATE, bindMarker(SAVE_DATE))
- .value(BODY_START_OCTET, bindMarker(BODY_START_OCTET))
- .value(FULL_CONTENT_OCTETS, bindMarker(FULL_CONTENT_OCTETS))
- .value(HEADER_CONTENT, bindMarker(HEADER_CONTENT));
- return session.prepare(insert.build());
- }
-
private PreparedStatement prepareUpdate() {
Update update = QueryBuilder.update(TABLE_NAME)
.set(setColumn(MOD_SEQ, bindMarker(MOD_SEQ)),
@@ -242,7 +223,8 @@ public class CassandraMessageIdToImapUidDAO {
public Mono<Void> delete(CassandraMessageId messageId, CassandraId
mailboxId) {
return cassandraAsyncExecutor.executeVoid(delete.bind()
.setUuid(MESSAGE_ID, messageId.get())
- .setUuid(MAILBOX_ID, mailboxId.asUuid()));
+ .setUuid(MAILBOX_ID, mailboxId.asUuid())
+ .setExecutionProfile(writeProfile));
}
public Mono<Void> insert(CassandraMessageMetadata metadata) {
@@ -275,32 +257,10 @@ public class CassandraMessageIdToImapUidDAO {
.setInt(BODY_START_OCTET,
Math.toIntExact(metadata.getBodyStartOctet().get()))
.setLong(FULL_CONTENT_OCTETS, metadata.getSize().get())
.setString(HEADER_CONTENT,
metadata.getHeaderContent().get().asString())
+ .setExecutionProfile(writeProfile)
.build());
}
- public Mono<Void> insertForce(CassandraMessageMetadata metadata) {
- ComposedMessageId composedMessageId =
metadata.getComposedMessageId().getComposedMessageId();
- Flags flags = metadata.getComposedMessageId().getFlags();
- return cassandraAsyncExecutor.executeVoid(insertForced.bind()
- .set(MESSAGE_ID, ((CassandraMessageId)
composedMessageId.getMessageId()).get(), TypeCodecs.TIMEUUID)
- .set(MAILBOX_ID, ((CassandraId)
composedMessageId.getMailboxId()).asUuid(), TypeCodecs.TIMEUUID)
- .setLong(IMAP_UID, composedMessageId.getUid().asLong())
- .setLong(MOD_SEQ,
metadata.getComposedMessageId().getModSeq().asLong())
- .setBoolean(ANSWERED, flags.contains(Flag.ANSWERED))
- .setBoolean(DELETED, flags.contains(Flag.DELETED))
- .setBoolean(DRAFT, flags.contains(Flag.DRAFT))
- .setBoolean(FLAGGED, flags.contains(Flag.FLAGGED))
- .setBoolean(RECENT, flags.contains(Flag.RECENT))
- .setBoolean(SEEN, flags.contains(Flag.SEEN))
- .setBoolean(USER, flags.contains(Flag.USER))
- .setSet(USER_FLAGS, ImmutableSet.copyOf(flags.getUserFlags()),
String.class)
- .setInstant(INTERNAL_DATE,
metadata.getInternalDate().get().toInstant())
- .setInstant(SAVE_DATE,
metadata.getSaveDate().map(Date::toInstant).orElse(null))
- .setInt(BODY_START_OCTET,
Math.toIntExact(metadata.getBodyStartOctet().get()))
- .setLong(FULL_CONTENT_OCTETS, metadata.getSize().get())
- .setString(HEADER_CONTENT,
metadata.getHeaderContent().get().asString()));
- }
-
public Mono<Boolean> updateMetadata(ComposedMessageId id, UpdatedFlags
updatedFlags, ModSeq previousModeq) {
if (cassandraConfiguration.isMessageWriteStrongConsistency()) {
return
cassandraAsyncExecutor.executeReturnApplied(updateBoundStatement(id,
updatedFlags, previousModeq));
@@ -432,7 +392,7 @@ public class CassandraMessageIdToImapUidDAO {
if (consistencyChoice.equals(STRONG)) {
return statement.setExecutionProfile(lwtProfile);
} else {
- return statement;
+ return statement.setExecutionProfile(readProfile);
}
}
@@ -466,6 +426,7 @@ public class CassandraMessageIdToImapUidDAO {
.setInt(BODY_START_OCTET, 0)
.setLong(FULL_CONTENT_OCTETS, 0)
.setString(HEADER_CONTENT, null)
+ .setExecutionProfile(writeProfile)
.build());
}
}
diff --git
a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraModSeqProvider.java
b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraModSeqProvider.java
index 92ce031e8b..a5d0de7b4c 100644
---
a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraModSeqProvider.java
+++
b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraModSeqProvider.java
@@ -39,6 +39,7 @@ import jakarta.inject.Inject;
import
org.apache.james.backends.cassandra.init.configuration.CassandraConfiguration;
import
org.apache.james.backends.cassandra.init.configuration.JamesExecutionProfiles;
import org.apache.james.backends.cassandra.utils.CassandraAsyncExecutor;
+import org.apache.james.backends.cassandra.utils.ProfileLocator;
import org.apache.james.mailbox.ModSeq;
import org.apache.james.mailbox.cassandra.ids.CassandraId;
import org.apache.james.mailbox.exception.MailboxException;
@@ -92,6 +93,7 @@ public class CassandraModSeqProvider implements
ModSeqProvider {
private final RetryBackoffSpec retrySpec;
private final DriverExecutionProfile lwtProfile;
private final CassandraConfiguration cassandraConfiguration;
+ private final DriverExecutionProfile readProfile;
@Inject
public CassandraModSeqProvider(CqlSession session, CassandraConfiguration
cassandraConfiguration) {
@@ -104,6 +106,7 @@ public class CassandraModSeqProvider implements
ModSeqProvider {
this.retrySpec =
Retry.backoff(cassandraConfiguration.getModSeqMaxRetry(), firstBackoff)
.scheduler(Schedulers.parallel());
this.cassandraConfiguration = cassandraConfiguration;
+ this.readProfile = ProfileLocator.READ.locateProfile(session,
"MODSEQ");
}
private PreparedStatement prepareInsert(CqlSession session) {
@@ -151,11 +154,17 @@ public class CassandraModSeqProvider implements
ModSeqProvider {
@Override
public ModSeq highestModSeq(MailboxId mailboxId) throws MailboxException {
return unbox(() -> findHighestModSeq((CassandraId) mailboxId,
- Optional.of(lwtProfile).filter(any ->
cassandraConfiguration.isUidReadStrongConsistency()))
+ locateReadProfile())
.block().orElse(ModSeq.first()))
.add(cassandraConfiguration.getUidModseqIncrement());
}
+ private Optional<DriverExecutionProfile> locateReadProfile() {
+ return Optional.of(lwtProfile)
+ .filter(any -> cassandraConfiguration.isUidReadStrongConsistency())
+ .or(() -> Optional.of(readProfile));
+ }
+
private Mono<Optional<ModSeq>> findHighestModSeq(CassandraId mailboxId,
Optional<DriverExecutionProfile> executionProfile) {
BoundStatement statement = select.bind()
.set(MAILBOX_ID, mailboxId.asUuid(), TypeCodecs.TIMEUUID);
@@ -207,7 +216,7 @@ public class CassandraModSeqProvider implements
ModSeqProvider {
@Override
public Mono<ModSeq> highestModSeqReactive(Mailbox mailbox) {
- return findHighestModSeq((CassandraId) mailbox.getMailboxId(),
Optional.empty())
+ return findHighestModSeq((CassandraId) mailbox.getMailboxId(),
locateReadProfile())
.map(optional -> optional.orElse(ModSeq.first()))
.map(modSeq ->
modSeq.add(cassandraConfiguration.getUidModseqIncrement()));
}
diff --git
a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraThreadDAO.java
b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraThreadDAO.java
index da747b082d..6ea4ba3f49 100644
---
a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraThreadDAO.java
+++
b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraThreadDAO.java
@@ -39,12 +39,14 @@ import jakarta.inject.Inject;
import org.apache.commons.lang3.tuple.Pair;
import org.apache.james.backends.cassandra.utils.CassandraAsyncExecutor;
+import org.apache.james.backends.cassandra.utils.ProfileLocator;
import org.apache.james.core.Username;
import org.apache.james.mailbox.cassandra.ids.CassandraMessageId;
import org.apache.james.mailbox.model.MessageId;
import org.apache.james.mailbox.model.ThreadId;
import com.datastax.oss.driver.api.core.CqlSession;
+import com.datastax.oss.driver.api.core.config.DriverExecutionProfile;
import com.datastax.oss.driver.api.core.cql.PreparedStatement;
import com.datastax.oss.driver.api.core.cql.Row;
import com.datastax.oss.driver.api.core.type.codec.TypeCodecs;
@@ -56,6 +58,8 @@ public class CassandraThreadDAO {
private final PreparedStatement insertOne;
private final PreparedStatement selectOne;
private final PreparedStatement deleteOne;
+ private final DriverExecutionProfile readProfile;
+ private final DriverExecutionProfile writeProfile;
@Inject
public CassandraThreadDAO(CqlSession session) {
@@ -79,6 +83,10 @@ public class CassandraThreadDAO {
.where(column(USERNAME).isEqualTo(bindMarker(USERNAME)),
column(MIME_MESSAGE_ID).isEqualTo(bindMarker(MIME_MESSAGE_ID)))
.build());
+
+
+ this.readProfile = ProfileLocator.READ.locateProfile(session,
"THREAD");
+ this.writeProfile = ProfileLocator.WRITE.locateProfile(session,
"THREAD");
}
public Flux<Void> insertSome(Username username, Set<Integer>
hashMimeMessageIds, MessageId messageId, ThreadId threadId, Optional<Integer>
hashBaseSubject) {
@@ -88,7 +96,8 @@ public class CassandraThreadDAO {
.set(MIME_MESSAGE_ID, mimeMessageId, TypeCodecs.INT)
.set(MESSAGE_ID, ((CassandraMessageId) messageId).get(),
TypeCodecs.TIMEUUID)
.set(THREAD_ID, ((CassandraMessageId)
threadId.getBaseMessageId()).get(), TypeCodecs.TIMEUUID)
- .set(BASE_SUBJECT, hashBaseSubject.orElse(null),
TypeCodecs.INT)), DEFAULT_CONCURRENCY);
+ .set(BASE_SUBJECT, hashBaseSubject.orElse(null),
TypeCodecs.INT)
+ .setExecutionProfile(writeProfile)), DEFAULT_CONCURRENCY);
}
public Flux<Pair<Optional<Integer>, ThreadId>> selectSome(Username
username, Set<Integer> hashMimeMessageIds) {
@@ -96,7 +105,8 @@ public class CassandraThreadDAO {
.flatMap(mimeMessageId -> executor
.executeSingleRow(selectOne.bind()
.set(USERNAME, username.asString(), TypeCodecs.TEXT)
- .set(MIME_MESSAGE_ID, mimeMessageId, TypeCodecs.INT))
+ .set(MIME_MESSAGE_ID, mimeMessageId, TypeCodecs.INT)
+ .setExecutionProfile(readProfile))
.map(this::readRow), DEFAULT_CONCURRENCY)
.distinct();
}
@@ -105,7 +115,8 @@ public class CassandraThreadDAO {
return Flux.fromIterable(hashMimeMessageIds)
.flatMap(mimeMessageId -> executor.executeVoid(deleteOne.bind()
.set(USERNAME, username.asString(), TypeCodecs.TEXT)
- .set(MIME_MESSAGE_ID, mimeMessageId, TypeCodecs.INT)));
+ .set(MIME_MESSAGE_ID, mimeMessageId, TypeCodecs.INT)
+ .setExecutionProfile(writeProfile)));
}
public Pair<Optional<Integer>, ThreadId> readRow(Row row) {
diff --git
a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraThreadLookupDAO.java
b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraThreadLookupDAO.java
index a3ba779a08..2e4d9d1309 100644
---
a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraThreadLookupDAO.java
+++
b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraThreadLookupDAO.java
@@ -37,12 +37,14 @@ import java.util.Set;
import jakarta.inject.Inject;
import org.apache.james.backends.cassandra.utils.CassandraAsyncExecutor;
+import org.apache.james.backends.cassandra.utils.ProfileLocator;
import org.apache.james.core.Username;
import org.apache.james.mailbox.cassandra.ids.CassandraMessageId;
import org.apache.james.mailbox.model.MessageId;
import org.apache.james.mailbox.model.ThreadId;
import com.datastax.oss.driver.api.core.CqlSession;
+import com.datastax.oss.driver.api.core.config.DriverExecutionProfile;
import com.datastax.oss.driver.api.core.cql.PreparedStatement;
import com.datastax.oss.driver.api.core.cql.Row;
import com.datastax.oss.driver.api.core.type.codec.TypeCodec;
@@ -61,6 +63,8 @@ public class CassandraThreadLookupDAO {
private final PreparedStatement select;
private final PreparedStatement selectAll;
private final PreparedStatement delete;
+ private final DriverExecutionProfile readProfile;
+ private final DriverExecutionProfile writeProfile;
@Inject
public CassandraThreadLookupDAO(CqlSession session) {
@@ -88,6 +92,10 @@ public class CassandraThreadLookupDAO {
.where(column(MESSAGE_ID).isEqualTo(bindMarker(MESSAGE_ID)),
column(THREAD_ID).isEqualTo(bindMarker(THREAD_ID)))
.build());
+
+
+ this.readProfile = ProfileLocator.READ.locateProfile(session,
"THREAD-LOOKUP");
+ this.writeProfile = ProfileLocator.WRITE.locateProfile(session,
"THREAD-LOOKUP");
}
public Mono<Void> insert(MessageId messageId, ThreadId threadId, Username
username, Set<Integer> hashMimeMessageIds) {
@@ -95,13 +103,15 @@ public class CassandraThreadLookupDAO {
.set(MESSAGE_ID, ((CassandraMessageId) messageId).get(),
TypeCodecs.TIMEUUID)
.set(THREAD_ID, ((CassandraMessageId)
threadId.getBaseMessageId()).get(), TypeCodecs.TIMEUUID)
.set(USERNAME, username.asString(), TypeCodecs.TEXT)
- .set(MIME_MESSAGE_IDS, hashMimeMessageIds, SET_OF_INTS_CODEC));
+ .set(MIME_MESSAGE_IDS, hashMimeMessageIds, SET_OF_INTS_CODEC)
+ .setExecutionProfile(writeProfile));
}
public Flux<MessageId> selectAll(ThreadId threadId) {
return executor.executeRows(
selectAll.bind()
- .set(THREAD_ID, ((CassandraMessageId)
threadId.getBaseMessageId()).get(), TypeCodecs.TIMEUUID))
+ .set(THREAD_ID, ((CassandraMessageId)
threadId.getBaseMessageId()).get(), TypeCodecs.TIMEUUID)
+ .setExecutionProfile(readProfile))
.map(row -> CassandraMessageId.of(row.get(MESSAGE_ID,
TypeCodecs.TIMEUUID)));
}
@@ -109,14 +119,16 @@ public class CassandraThreadLookupDAO {
return executor.executeSingleRow(
select.bind()
.set(THREAD_ID, ((CassandraMessageId)
threadId.getBaseMessageId()).get(), TypeCodecs.TIMEUUID)
- .set(MESSAGE_ID, ((CassandraMessageId) messageId).get(),
TypeCodecs.TIMEUUID))
+ .set(MESSAGE_ID, ((CassandraMessageId) messageId).get(),
TypeCodecs.TIMEUUID)
+ .setExecutionProfile(readProfile))
.map(this::readRow);
}
public Mono<Void> deleteOneRow(ThreadId threadId, MessageId messageId) {
return executor.executeVoid(delete.bind()
.set(THREAD_ID, ((CassandraMessageId)
threadId.getBaseMessageId()).get(), TypeCodecs.TIMEUUID)
- .set(MESSAGE_ID, ((CassandraMessageId) messageId).get(),
TypeCodecs.TIMEUUID));
+ .set(MESSAGE_ID, ((CassandraMessageId) messageId).get(),
TypeCodecs.TIMEUUID)
+ .setExecutionProfile(writeProfile));
}
private ThreadTablePartitionKey readRow(Row row) {
diff --git
a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraUidProvider.java
b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraUidProvider.java
index f8332a337f..3a12845669 100644
---
a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraUidProvider.java
+++
b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraUidProvider.java
@@ -39,6 +39,7 @@ import jakarta.inject.Inject;
import
org.apache.james.backends.cassandra.init.configuration.CassandraConfiguration;
import
org.apache.james.backends.cassandra.init.configuration.JamesExecutionProfiles;
import org.apache.james.backends.cassandra.utils.CassandraAsyncExecutor;
+import org.apache.james.backends.cassandra.utils.ProfileLocator;
import org.apache.james.mailbox.MessageUid;
import org.apache.james.mailbox.cassandra.ids.CassandraId;
import org.apache.james.mailbox.exception.MailboxException;
@@ -68,6 +69,7 @@ public class CassandraUidProvider implements UidProvider {
private final DriverExecutionProfile lwtProfile;
private final RetryBackoffSpec retrySpec;
private final CassandraConfiguration cassandraConfiguration;
+ private final DriverExecutionProfile readProfile;
@Inject
public CassandraUidProvider(CqlSession session, CassandraConfiguration
cassandraConfiguration) {
@@ -80,6 +82,7 @@ public class CassandraUidProvider implements UidProvider {
this.retrySpec =
Retry.backoff(cassandraConfiguration.getUidMaxRetry(), firstBackoff)
.scheduler(Schedulers.parallel());
this.cassandraConfiguration = cassandraConfiguration;
+ this.readProfile = ProfileLocator.READ.locateProfile(session, "UID");
}
private PreparedStatement prepareSelect(CqlSession session) {
@@ -157,14 +160,20 @@ public class CassandraUidProvider implements UidProvider {
@Override
public Optional<MessageUid> lastUid(Mailbox mailbox) {
- return findHighestUid((CassandraId) mailbox.getMailboxId(),
Optional.of(lwtProfile).filter(any ->
cassandraConfiguration.isUidReadStrongConsistency()))
+ return findHighestUid((CassandraId) mailbox.getMailboxId(),
locateReadProfile())
.blockOptional()
.map(uid ->
uid.add(cassandraConfiguration.getUidModseqIncrement()));
}
+ private Optional<DriverExecutionProfile> locateReadProfile() {
+ return Optional.of(lwtProfile)
+ .filter(any -> cassandraConfiguration.isUidReadStrongConsistency())
+ .or(() -> Optional.of(readProfile));
+ }
+
@Override
public Mono<Optional<MessageUid>> lastUidReactive(Mailbox mailbox) {
- return findHighestUid((CassandraId) mailbox.getMailboxId(),
Optional.of(lwtProfile).filter(any ->
cassandraConfiguration.isUidReadStrongConsistency()))
+ return findHighestUid((CassandraId) mailbox.getMailboxId(),
locateReadProfile())
.map(uid ->
uid.add(cassandraConfiguration.getUidModseqIncrement()))
.map(Optional::of)
.switchIfEmpty(Mono.just(Optional.empty()));
diff --git
a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraUserMailboxRightsDAO.java
b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraUserMailboxRightsDAO.java
index b9a961949a..8406e6ab96 100644
---
a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraUserMailboxRightsDAO.java
+++
b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraUserMailboxRightsDAO.java
@@ -36,6 +36,7 @@ import jakarta.inject.Inject;
import org.apache.commons.lang3.tuple.Pair;
import org.apache.james.backends.cassandra.utils.CassandraAsyncExecutor;
+import org.apache.james.backends.cassandra.utils.ProfileLocator;
import org.apache.james.core.Username;
import org.apache.james.mailbox.acl.ACLDiff;
import org.apache.james.mailbox.acl.PositiveUserACLDiff;
@@ -45,6 +46,7 @@ import org.apache.james.mailbox.model.MailboxACL;
import org.apache.james.mailbox.model.MailboxACL.Rfc4314Rights;
import com.datastax.oss.driver.api.core.CqlSession;
+import com.datastax.oss.driver.api.core.config.DriverExecutionProfile;
import com.datastax.oss.driver.api.core.cql.PreparedStatement;
import com.datastax.oss.driver.api.core.cql.Row;
import com.github.fge.lambdas.Throwing;
@@ -59,6 +61,8 @@ public class CassandraUserMailboxRightsDAO {
private final PreparedStatement insert;
private final PreparedStatement select;
private final PreparedStatement selectUser;
+ private final DriverExecutionProfile readProfile;
+ private final DriverExecutionProfile writeProfile;
@Inject
public CassandraUserMailboxRightsDAO(CqlSession session) {
@@ -67,6 +71,9 @@ public class CassandraUserMailboxRightsDAO {
this.insert = prepareInsert(session);
this.select = prepareSelect(session);
this.selectUser = prepareSelectAllForUser(session);
+
+ this.readProfile = ProfileLocator.READ.locateProfile(session,
"USER_ACL");
+ this.writeProfile = ProfileLocator.WRITE.locateProfile(session,
"USER_ACL");
}
private PreparedStatement prepareDelete(CqlSession session) {
@@ -113,7 +120,8 @@ public class CassandraUserMailboxRightsDAO {
.flatMap(entry -> cassandraAsyncExecutor.executeVoid(
delete.bind()
.setString(USER_NAME, entry.getKey().getName())
- .setUuid(MAILBOX_ID, cassandraId.asUuid())),
+ .setUuid(MAILBOX_ID, cassandraId.asUuid())
+ .setExecutionProfile(writeProfile)),
DEFAULT_CONCURRENCY);
}
@@ -123,7 +131,8 @@ public class CassandraUserMailboxRightsDAO {
insert.bind()
.setString(USER_NAME, entry.getKey().getName())
.setUuid(MAILBOX_ID, cassandraId.asUuid())
- .setString(RIGHTS, entry.getValue().serialize())),
+ .setString(RIGHTS, entry.getValue().serialize())
+ .setExecutionProfile(writeProfile)),
DEFAULT_CONCURRENCY);
}
@@ -131,7 +140,8 @@ public class CassandraUserMailboxRightsDAO {
return cassandraAsyncExecutor.executeSingleRowOptional(
select.bind()
.setString(USER_NAME, userName.asString())
- .setUuid(MAILBOX_ID, mailboxId.asUuid()))
+ .setUuid(MAILBOX_ID, mailboxId.asUuid())
+ .setExecutionProfile(readProfile))
.map(rowOptional ->
rowOptional.map(Throwing.function(row ->
Rfc4314Rights.fromSerializedRfc4314Rights(row.getString(RIGHTS)))));
}
@@ -139,7 +149,8 @@ public class CassandraUserMailboxRightsDAO {
public Flux<Pair<CassandraId, Rfc4314Rights>> listRightsForUser(Username
userName) {
return cassandraAsyncExecutor.executeRows(
selectUser.bind()
- .setString(USER_NAME, userName.asString()))
+ .setString(USER_NAME, userName.asString())
+ .setExecutionProfile(readProfile))
.map(Throwing.function(this::toPair));
}
diff --git
a/mailbox/cassandra/src/test/java/org/apache/james/mailbox/cassandra/CassandraMailboxManagerTest.java
b/mailbox/cassandra/src/test/java/org/apache/james/mailbox/cassandra/CassandraMailboxManagerTest.java
index 34972330f2..1a6ffab2f1 100644
---
a/mailbox/cassandra/src/test/java/org/apache/james/mailbox/cassandra/CassandraMailboxManagerTest.java
+++
b/mailbox/cassandra/src/test/java/org/apache/james/mailbox/cassandra/CassandraMailboxManagerTest.java
@@ -832,7 +832,8 @@ public class CassandraMailboxManagerTest extends
MailboxManagerTest<CassandraMai
cassandraCluster.getConf(),
cassandraCluster.getTypesProvider(),
mock(BlobStore.class),
- new PlainBlobId.Factory());
+ new PlainBlobId.Factory(),
+ CassandraConfiguration.DEFAULT_CONFIGURATION);
}
private CassandraThreadDAO threadDAO(CassandraCluster
cassandraCluster) {
diff --git
a/mailbox/cassandra/src/test/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageDAOV3Test.java
b/mailbox/cassandra/src/test/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageDAOV3Test.java
index 285f062113..fd891c9271 100644
---
a/mailbox/cassandra/src/test/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageDAOV3Test.java
+++
b/mailbox/cassandra/src/test/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageDAOV3Test.java
@@ -33,6 +33,7 @@ import org.apache.commons.io.IOUtils;
import org.apache.james.backends.cassandra.CassandraCluster;
import org.apache.james.backends.cassandra.CassandraClusterExtension;
import org.apache.james.backends.cassandra.components.CassandraDataDefinition;
+import
org.apache.james.backends.cassandra.init.configuration.CassandraConfiguration;
import
org.apache.james.backends.cassandra.versions.CassandraSchemaVersionDataDefinition;
import org.apache.james.blob.api.BlobStore;
import org.apache.james.blob.api.PlainBlobId;
@@ -101,7 +102,8 @@ class CassandraMessageDAOV3Test {
cassandra.getConf(),
cassandra.getTypesProvider(),
blobStore,
- blobIdFactory);
+ blobIdFactory,
+ CassandraConfiguration.DEFAULT_CONFIGURATION);
messageIdWithMetadata = ComposedMessageIdWithMetaData.builder()
.composedMessageId(new ComposedMessageId(MAILBOX_ID,
messageId, messageUid))
diff --git
a/server/apps/distributed-app/sample-configuration/cassandra-driver.conf
b/server/apps/distributed-app/sample-configuration/cassandra-driver.conf
index af1ba36054..ac9816f9a1 100644
--- a/server/apps/distributed-app/sample-configuration/cassandra-driver.conf
+++ b/server/apps/distributed-app/sample-configuration/cassandra-driver.conf
@@ -133,15 +133,19 @@ datastax-java-driver {
}
CACHING {
basic.request.consistency = LOCAL_ONE
- basic.request.serial-consistency = LOCAL_ONE
basic.request.timeout = 100 milliseconds
}
OPTIMISTIC_CONSISTENCY_LEVEL {
basic.request.consistency = LOCAL_ONE
- basic.request.serial-consistency = LOCAL_ONE
}
BATCH {
basic.request.timeout = 1 hour
}
+ READ {
+ basic.request.consistency = QUORUM;
+ }
+ WRITE {
+ basic.request.consistency = QUORUM;
+ }
}
}
diff --git
a/server/data/data-cassandra/src/main/java/org/apache/james/domainlist/cassandra/CassandraDomainList.java
b/server/data/data-cassandra/src/main/java/org/apache/james/domainlist/cassandra/CassandraDomainList.java
index 364ca65fb2..f3d7903e49 100644
---
a/server/data/data-cassandra/src/main/java/org/apache/james/domainlist/cassandra/CassandraDomainList.java
+++
b/server/data/data-cassandra/src/main/java/org/apache/james/domainlist/cassandra/CassandraDomainList.java
@@ -31,6 +31,7 @@ import java.util.List;
import jakarta.inject.Inject;
import org.apache.james.backends.cassandra.utils.CassandraAsyncExecutor;
+import org.apache.james.backends.cassandra.utils.ProfileLocator;
import org.apache.james.core.Domain;
import org.apache.james.dnsservice.api.DNSService;
import org.apache.james.domainlist.api.DomainListException;
@@ -38,6 +39,7 @@ import org.apache.james.domainlist.lib.AbstractDomainList;
import org.apache.james.util.ReactorUtils;
import com.datastax.oss.driver.api.core.CqlSession;
+import com.datastax.oss.driver.api.core.config.DriverExecutionProfile;
import com.datastax.oss.driver.api.core.cql.PreparedStatement;
import com.datastax.oss.driver.api.core.type.codec.TypeCodecs;
@@ -49,6 +51,8 @@ public class CassandraDomainList extends AbstractDomainList {
private final PreparedStatement readStatement;
private final PreparedStatement insertStatement;
private final PreparedStatement removeStatement;
+ private final DriverExecutionProfile readProfile;
+ private final DriverExecutionProfile writeProfile;
@Inject
public CassandraDomainList(DNSService dnsService, CqlSession session) {
@@ -72,11 +76,13 @@ public class CassandraDomainList extends AbstractDomainList
{
.whereColumn(DOMAIN).isEqualTo(bindMarker(DOMAIN))
.ifExists()
.build());
+ this.readProfile = ProfileLocator.READ.locateProfile(session,
"DOMAIN");
+ this.writeProfile = ProfileLocator.WRITE.locateProfile(session,
"DOMAIN");
}
@Override
protected List<Domain> getDomainListInternal() throws DomainListException {
- return executor.executeRows(readAllStatement.bind())
+ return
executor.executeRows(readAllStatement.bind().setExecutionProfile(readProfile))
.map(row -> Domain.of(row.get(0, TypeCodecs.TEXT)))
.collectList()
.block();
@@ -85,7 +91,8 @@ public class CassandraDomainList extends AbstractDomainList {
@Override
protected boolean containsDomainInternal(Domain domain) throws
DomainListException {
return executor.executeSingleRowOptional(readStatement.bind()
- .set(DOMAIN, domain.asString(), TypeCodecs.TEXT))
+ .set(DOMAIN, domain.asString(), TypeCodecs.TEXT)
+ .setExecutionProfile(readProfile))
.block()
.isPresent();
}
@@ -93,7 +100,8 @@ public class CassandraDomainList extends AbstractDomainList {
@Override
public Mono<Boolean> containsDomainReactive(Domain domain) {
return executor.executeSingleRowOptional(readStatement.bind()
- .set(DOMAIN, domain.asString(), TypeCodecs.TEXT))
+ .set(DOMAIN, domain.asString(), TypeCodecs.TEXT)
+ .setExecutionProfile(readProfile))
.handle(ReactorUtils.publishIfPresent())
.hasElement();
}
@@ -101,7 +109,8 @@ public class CassandraDomainList extends AbstractDomainList
{
@Override
public void addDomain(Domain domain) throws DomainListException {
boolean executed = executor.executeReturnApplied(insertStatement.bind()
- .set(DOMAIN, domain.asString(), TypeCodecs.TEXT))
+ .set(DOMAIN, domain.asString(), TypeCodecs.TEXT)
+ .setExecutionProfile(writeProfile))
.block();
if (!executed) {
throw new DomainListException(domain.name() + " already exists.");
@@ -111,11 +120,11 @@ public class CassandraDomainList extends
AbstractDomainList {
@Override
public void doRemoveDomain(Domain domain) throws DomainListException {
boolean executed = executor.executeReturnApplied(removeStatement.bind()
- .set(DOMAIN, domain.asString(), TypeCodecs.TEXT))
+ .set(DOMAIN, domain.asString(), TypeCodecs.TEXT)
+ .setExecutionProfile(writeProfile))
.block();
if (!executed) {
throw new DomainListException(domain.name() + " was not found");
}
}
-
}
\ No newline at end of file
diff --git
a/server/data/data-cassandra/src/main/java/org/apache/james/rrt/cassandra/CassandraMappingsSourcesDAO.java
b/server/data/data-cassandra/src/main/java/org/apache/james/rrt/cassandra/CassandraMappingsSourcesDAO.java
index 4c258881f2..5415c06f95 100644
---
a/server/data/data-cassandra/src/main/java/org/apache/james/rrt/cassandra/CassandraMappingsSourcesDAO.java
+++
b/server/data/data-cassandra/src/main/java/org/apache/james/rrt/cassandra/CassandraMappingsSourcesDAO.java
@@ -32,10 +32,12 @@ import static
org.apache.james.rrt.cassandra.tables.CassandraMappingsSourcesTabl
import jakarta.inject.Inject;
import org.apache.james.backends.cassandra.utils.CassandraAsyncExecutor;
+import org.apache.james.backends.cassandra.utils.ProfileLocator;
import org.apache.james.rrt.lib.Mapping;
import org.apache.james.rrt.lib.MappingSource;
import com.datastax.oss.driver.api.core.CqlSession;
+import com.datastax.oss.driver.api.core.config.DriverExecutionProfile;
import com.datastax.oss.driver.api.core.cql.PreparedStatement;
import reactor.core.publisher.Flux;
@@ -47,6 +49,8 @@ public class CassandraMappingsSourcesDAO {
private final PreparedStatement deleteStatement;
private final PreparedStatement retrieveSourcesStatement;
private final PreparedStatement truncateStatement;
+ private final DriverExecutionProfile readProfile;
+ private final DriverExecutionProfile writeProfile;
@Inject
public CassandraMappingsSourcesDAO(CqlSession session) {
@@ -70,26 +74,31 @@ public class CassandraMappingsSourcesDAO {
.build());
this.truncateStatement = session.prepare(truncate(TABLE_NAME).build());
+ this.readProfile = ProfileLocator.READ.locateProfile(session,
"RRT-SOURCE");
+ this.writeProfile = ProfileLocator.WRITE.locateProfile(session,
"RRT-SOURCE");
}
public Mono<Void> addMapping(Mapping mapping, MappingSource source) {
return executor.executeVoid(insertStatement.bind()
.setString(MAPPING_TYPE, mapping.getType().asPrefix())
.setString(MAPPING_VALUE, mapping.getMappingValue())
- .setString(SOURCE, source.asMailAddressString()));
+ .setString(SOURCE, source.asMailAddressString())
+ .setExecutionProfile(writeProfile));
}
Mono<Void> removeMapping(Mapping mapping, MappingSource source) {
return executor.executeVoid(deleteStatement.bind()
.setString(MAPPING_TYPE, mapping.getType().asPrefix())
.setString(MAPPING_VALUE, mapping.getMappingValue())
- .setString(SOURCE, source.asMailAddressString()));
+ .setString(SOURCE, source.asMailAddressString())
+ .setExecutionProfile(writeProfile));
}
public Flux<MappingSource> retrieveSources(Mapping mapping) {
return executor.executeRows(retrieveSourcesStatement.bind()
.setString(MAPPING_TYPE, mapping.getType().asPrefix())
- .setString(MAPPING_VALUE, mapping.getMappingValue()))
+ .setString(MAPPING_VALUE, mapping.getMappingValue())
+ .setExecutionProfile(readProfile))
.map(row -> MappingSource.parse(row.getString(SOURCE)));
}
diff --git
a/server/data/data-cassandra/src/main/java/org/apache/james/rrt/cassandra/CassandraRecipientRewriteTableDAO.java
b/server/data/data-cassandra/src/main/java/org/apache/james/rrt/cassandra/CassandraRecipientRewriteTableDAO.java
index c157932ede..0663b08f07 100644
---
a/server/data/data-cassandra/src/main/java/org/apache/james/rrt/cassandra/CassandraRecipientRewriteTableDAO.java
+++
b/server/data/data-cassandra/src/main/java/org/apache/james/rrt/cassandra/CassandraRecipientRewriteTableDAO.java
@@ -34,11 +34,13 @@ import jakarta.inject.Inject;
import org.apache.commons.lang3.tuple.Pair;
import org.apache.james.backends.cassandra.utils.CassandraAsyncExecutor;
+import org.apache.james.backends.cassandra.utils.ProfileLocator;
import org.apache.james.rrt.lib.Mapping;
import org.apache.james.rrt.lib.MappingSource;
import org.apache.james.rrt.lib.MappingsImpl;
import com.datastax.oss.driver.api.core.CqlSession;
+import com.datastax.oss.driver.api.core.config.DriverExecutionProfile;
import com.datastax.oss.driver.api.core.cql.PreparedStatement;
import com.google.common.collect.ImmutableList;
@@ -51,6 +53,8 @@ public class CassandraRecipientRewriteTableDAO {
private final PreparedStatement deleteStatement;
private final PreparedStatement retrieveMappingStatement;
private final PreparedStatement retrieveAllMappingsStatement;
+ private final DriverExecutionProfile readProfile;
+ private final DriverExecutionProfile writeProfile;
@Inject
public CassandraRecipientRewriteTableDAO(CqlSession session) {
@@ -76,26 +80,31 @@ public class CassandraRecipientRewriteTableDAO {
.whereColumn(DOMAIN).isEqualTo(bindMarker(DOMAIN))
.whereColumn(MAPPING).isEqualTo(bindMarker(MAPPING))
.build());
+ this.readProfile = ProfileLocator.READ.locateProfile(session, "RRT");
+ this.writeProfile = ProfileLocator.WRITE.locateProfile(session, "RRT");
}
public Mono<Void> addMapping(MappingSource source, Mapping mapping) {
return executor.executeVoid(insertStatement.bind()
.setString(USER, source.getFixedUser())
.setString(DOMAIN, source.getFixedDomain())
- .setString(MAPPING, mapping.asString()));
+ .setString(MAPPING, mapping.asString())
+ .setExecutionProfile(writeProfile));
}
Mono<Void> removeMapping(MappingSource source, Mapping mapping) {
return executor.executeVoid(deleteStatement.bind()
.setString(USER, source.getFixedUser())
.setString(DOMAIN, source.getFixedDomain())
- .setString(MAPPING, mapping.asString()));
+ .setString(MAPPING, mapping.asString())
+ .setExecutionProfile(writeProfile));
}
Mono<MappingsImpl> retrieveMappings(MappingSource source) {
return executor.executeRows(retrieveMappingStatement.bind()
.setString(USER, source.getFixedUser())
- .setString(DOMAIN, source.getFixedDomain()))
+ .setString(DOMAIN, source.getFixedDomain())
+ .setExecutionProfile(readProfile))
.mapNotNull(row -> row.getString(MAPPING))
.collect(ImmutableList.toImmutableList())
.map(MappingsImpl::fromCollection)
diff --git
a/server/data/data-cassandra/src/main/java/org/apache/james/user/cassandra/CassandraUsersDAO.java
b/server/data/data-cassandra/src/main/java/org/apache/james/user/cassandra/CassandraUsersDAO.java
index 077a0fa4e0..b448a426ea 100644
---
a/server/data/data-cassandra/src/main/java/org/apache/james/user/cassandra/CassandraUsersDAO.java
+++
b/server/data/data-cassandra/src/main/java/org/apache/james/user/cassandra/CassandraUsersDAO.java
@@ -39,6 +39,7 @@ import java.util.Optional;
import jakarta.inject.Inject;
import org.apache.james.backends.cassandra.utils.CassandraAsyncExecutor;
+import org.apache.james.backends.cassandra.utils.ProfileLocator;
import org.apache.james.core.Username;
import org.apache.james.user.api.AlreadyExistInUsersRepositoryException;
import org.apache.james.user.api.UsersRepositoryException;
@@ -50,6 +51,7 @@ import org.apache.james.user.lib.model.DefaultUser;
import org.reactivestreams.Publisher;
import com.datastax.oss.driver.api.core.CqlSession;
+import com.datastax.oss.driver.api.core.config.DriverExecutionProfile;
import com.datastax.oss.driver.api.core.cql.BatchStatementBuilder;
import com.datastax.oss.driver.api.core.cql.BatchType;
import com.datastax.oss.driver.api.core.cql.PreparedStatement;
@@ -78,6 +80,8 @@ public class CassandraUsersDAO implements UsersDAO {
private final PreparedStatement getDelegatedToUsersStatement;
private final PreparedStatement addDelegatedToUsersStatement;
private final PreparedStatement removeDelegatedToUsersStatement;
+ private final DriverExecutionProfile readProfile;
+ private final DriverExecutionProfile writeProfile;
private final Algorithm preferredAlgorithm;
private final HashingMode fallbackHashingMode;
@@ -156,6 +160,9 @@ public class CassandraUsersDAO implements UsersDAO {
.remove(DELEGATED_USERS, bindMarker(DELEGATED_USERS))
.whereColumn(NAME).isEqualTo(bindMarker(NAME))
.build());
+
+ this.readProfile = ProfileLocator.READ.locateProfile(session, "USER");
+ this.writeProfile = ProfileLocator.WRITE.locateProfile(session,
"USER");
}
@VisibleForTesting
@@ -172,7 +179,8 @@ public class CassandraUsersDAO implements UsersDAO {
private Mono<DefaultUser> getUserByNameReactive(Username name) {
return executor.executeSingleRow(
getUserStatement.bind()
- .setString(NAME, name.asString()))
+ .setString(NAME, name.asString())
+ .setExecutionProfile(readProfile))
.filter(row -> row.getString(ALGORITHM) != null)
.map(row -> new DefaultUser(Username.of(row.getString(NAME)),
row.getString(PASSWORD),
Algorithm.of(row.getString(ALGORITHM), fallbackHashingMode),
preferredAlgorithm));
@@ -180,7 +188,8 @@ public class CassandraUsersDAO implements UsersDAO {
public Mono<Boolean> exist(Username name) {
return executor.executeReturnExists(getUserStatement.bind()
- .setString(NAME, name.asString()));
+ .setString(NAME, name.asString())
+ .setExecutionProfile(readProfile));
}
@Override
@@ -192,7 +201,8 @@ public class CassandraUsersDAO implements UsersDAO {
.setString(REALNAME, defaultUser.getUserName().asString())
.setString(PASSWORD, defaultUser.getHashedPassword())
.setString(ALGORITHM,
defaultUser.getHashAlgorithm().asString())
- .setString(NAME, defaultUser.getUserName().asString()))
+ .setString(NAME, defaultUser.getUserName().asString())
+ .setExecutionProfile(writeProfile))
.block();
if (!executed) {
@@ -204,11 +214,13 @@ public class CassandraUsersDAO implements UsersDAO {
BatchStatementBuilder batchBuilder = new
BatchStatementBuilder(BatchType.LOGGED);
batchBuilder.addStatement(addAuthorizedUsersStatement.bind()
.setString(NAME, baseUser.asString())
- .setSet(AUTHORIZED_USERS,
ImmutableSet.of(userWithAccess.asString()), String.class));
+ .setSet(AUTHORIZED_USERS,
ImmutableSet.of(userWithAccess.asString()), String.class)
+ .setExecutionProfile(writeProfile));
if (targetUserExists) {
batchBuilder.addStatement(addDelegatedToUsersStatement.bind()
.setString(NAME, userWithAccess.asString())
- .setSet(DELEGATED_USERS, ImmutableSet.of(baseUser.asString()),
String.class));
+ .setSet(DELEGATED_USERS, ImmutableSet.of(baseUser.asString()),
String.class)
+ .setExecutionProfile(writeProfile));
}
return executor.executeVoid(batchBuilder.build());
@@ -218,10 +230,12 @@ public class CassandraUsersDAO implements UsersDAO {
return executor.executeVoid(new BatchStatementBuilder(BatchType.LOGGED)
.addStatement(removeAuthorizedUsersStatement.bind()
.setString(NAME, baseUser.asString())
- .setSet(AUTHORIZED_USERS,
ImmutableSet.of(userWithAccess.asString()), String.class))
+ .setSet(AUTHORIZED_USERS,
ImmutableSet.of(userWithAccess.asString()), String.class)
+ .setExecutionProfile(writeProfile))
.addStatement(removeDelegatedToUsersStatement.bind()
.setString(NAME, userWithAccess.asString())
- .setSet(DELEGATED_USERS, ImmutableSet.of(baseUser.asString()),
String.class))
+ .setSet(DELEGATED_USERS, ImmutableSet.of(baseUser.asString()),
String.class)
+ .setExecutionProfile(writeProfile))
.build());
}
@@ -233,9 +247,11 @@ public class CassandraUsersDAO implements UsersDAO {
authorizedList.forEach(username -> batch.addStatement(
removeDelegatedToUsersStatement.bind()
.setString(NAME, username.asString())
- .setSet(DELEGATED_USERS,
ImmutableSet.of(baseUser.asString()), String.class)));
+ .setSet(DELEGATED_USERS,
ImmutableSet.of(baseUser.asString()), String.class))
+ .setExecutionProfile(writeProfile));
batch.addStatement(removeAllAuthorizedUsersStatement.bind()
- .setString(NAME, baseUser.asString()));
+ .setString(NAME, baseUser.asString())
+ .setExecutionProfile(writeProfile));
return batch.build();
})
.flatMap(executor::executeVoid);
@@ -244,7 +260,8 @@ public class CassandraUsersDAO implements UsersDAO {
public Flux<Username> getAuthorizedUsers(Username name) {
return executor.executeSingleRow(
getAuthorizedUsersStatement.bind()
- .setString(NAME, name.asString()))
+ .setString(NAME, name.asString())
+ .setExecutionProfile(readProfile))
.mapNotNull(row -> row.getSet(AUTHORIZED_USERS, String.class))
.flatMapIterable(set -> set)
.map(Username::of);
@@ -254,17 +271,20 @@ public class CassandraUsersDAO implements UsersDAO {
return executor.executeVoid(new BatchStatementBuilder(BatchType.LOGGED)
.addStatement(removeAuthorizedUsersStatement.bind()
.setString(NAME, delegatedToUser.asString())
- .setSet(AUTHORIZED_USERS,
ImmutableSet.of(baseUser.asString()), String.class))
+ .setSet(AUTHORIZED_USERS,
ImmutableSet.of(baseUser.asString()), String.class)
+ .setExecutionProfile(writeProfile))
.addStatement(removeDelegatedToUsersStatement.bind()
.setString(NAME, baseUser.asString())
- .setSet(DELEGATED_USERS,
ImmutableSet.of(delegatedToUser.asString()), String.class))
+ .setSet(DELEGATED_USERS,
ImmutableSet.of(delegatedToUser.asString()), String.class)
+ .setExecutionProfile(writeProfile))
.build());
}
public Flux<Username> getDelegatedToUsers(Username name) {
return executor.executeSingleRow(
getDelegatedToUsersStatement.bind()
- .setString(NAME, name.asString()))
+ .setString(NAME, name.asString())
+ .setExecutionProfile(readProfile))
.mapNotNull(row -> row.getSet(DELEGATED_USERS, String.class))
.flatMapIterable(set -> set)
.map(Username::of);
@@ -274,7 +294,8 @@ public class CassandraUsersDAO implements UsersDAO {
public void removeUser(Username name) throws UsersRepositoryException {
boolean executed = executor.executeReturnApplied(
removeUserStatement.bind()
- .setString(NAME, name.asString()))
+ .setString(NAME, name.asString())
+ .setExecutionProfile(writeProfile))
.block();
if (!executed) {
@@ -294,7 +315,8 @@ public class CassandraUsersDAO implements UsersDAO {
@Override
public int countUsers() {
- return executor.executeSingleRow(countUserStatement.bind())
+ return executor.executeSingleRow(countUserStatement.bind()
+ .setExecutionProfile(readProfile))
.map(row -> Ints.checkedCast(row.getLong(0)))
.block();
}
@@ -323,7 +345,8 @@ public class CassandraUsersDAO implements UsersDAO {
.setString(NAME, user.getUserName().asString())
.setString(REALNAME, user.getUserName().asString())
.setString(PASSWORD, user.getHashedPassword())
- .setString(ALGORITHM, user.getHashAlgorithm().asString()))
+ .setString(ALGORITHM, user.getHashAlgorithm().asString())
+ .setExecutionProfile(writeProfile))
.block();
if (!executed) {
diff --git
a/server/data/data-cassandra/src/main/java/org/apache/james/vacation/cassandra/CassandraNotificationRegistryDAO.java
b/server/data/data-cassandra/src/main/java/org/apache/james/vacation/cassandra/CassandraNotificationRegistryDAO.java
index 3f6b8b80c8..8f75c9caa5 100644
---
a/server/data/data-cassandra/src/main/java/org/apache/james/vacation/cassandra/CassandraNotificationRegistryDAO.java
+++
b/server/data/data-cassandra/src/main/java/org/apache/james/vacation/cassandra/CassandraNotificationRegistryDAO.java
@@ -30,11 +30,13 @@ import java.util.Optional;
import jakarta.inject.Inject;
import org.apache.james.backends.cassandra.utils.CassandraAsyncExecutor;
+import org.apache.james.backends.cassandra.utils.ProfileLocator;
import org.apache.james.vacation.api.AccountId;
import org.apache.james.vacation.api.RecipientId;
import org.apache.james.vacation.cassandra.tables.CassandraNotificationTable;
import com.datastax.oss.driver.api.core.CqlSession;
+import com.datastax.oss.driver.api.core.config.DriverExecutionProfile;
import com.datastax.oss.driver.api.core.cql.PreparedStatement;
import com.datastax.oss.driver.api.querybuilder.insert.RegularInsert;
@@ -48,6 +50,8 @@ public class CassandraNotificationRegistryDAO {
private final PreparedStatement registerWithTTLStatement;
private final PreparedStatement isRegisteredStatement;
private final PreparedStatement flushStatement;
+ private final DriverExecutionProfile readProfile;
+ private final DriverExecutionProfile writeProfile;
@Inject
public CassandraNotificationRegistryDAO(CqlSession session) {
@@ -66,6 +70,8 @@ public class CassandraNotificationRegistryDAO {
this.flushStatement =
session.prepare(deleteFrom(CassandraNotificationTable.TABLE_NAME)
.whereColumn(CassandraNotificationTable.ACCOUNT_ID).isEqualTo(bindMarker(CassandraNotificationTable.ACCOUNT_ID))
.build());
+ this.readProfile = ProfileLocator.READ.locateProfile(session,
"VACATION-REGISTRY");
+ this.writeProfile = ProfileLocator.WRITE.locateProfile(session,
"VACATION-REGISTRY");
}
private RegularInsert createInsert() {
@@ -80,20 +86,23 @@ public class CassandraNotificationRegistryDAO {
.map(value -> registerWithTTLStatement.bind().setInt(TTL,
value))
.orElse(registerStatement.bind())
.setString(CassandraNotificationTable.ACCOUNT_ID,
accountId.getIdentifier())
- .setString(CassandraNotificationTable.RECIPIENT_ID,
recipientId.getAsString()));
+ .setString(CassandraNotificationTable.RECIPIENT_ID,
recipientId.getAsString())
+ .setExecutionProfile(writeProfile));
}
public Mono<Boolean> isRegistered(AccountId accountId, RecipientId
recipientId) {
return cassandraAsyncExecutor.executeSingleRowOptional(
isRegisteredStatement.bind()
.setString(CassandraNotificationTable.ACCOUNT_ID,
accountId.getIdentifier())
- .setString(CassandraNotificationTable.RECIPIENT_ID,
recipientId.getAsString()))
+ .setString(CassandraNotificationTable.RECIPIENT_ID,
recipientId.getAsString())
+ .setExecutionProfile(readProfile))
.map(Optional::isPresent);
}
public Mono<Void> flush(AccountId accountId) {
return cassandraAsyncExecutor.executeVoid(
flushStatement.bind()
- .setString(CassandraNotificationTable.ACCOUNT_ID,
accountId.getIdentifier()));
+ .setString(CassandraNotificationTable.ACCOUNT_ID,
accountId.getIdentifier())
+ .setExecutionProfile(writeProfile));
}
}
diff --git
a/server/data/data-cassandra/src/main/java/org/apache/james/vacation/cassandra/CassandraVacationDAO.java
b/server/data/data-cassandra/src/main/java/org/apache/james/vacation/cassandra/CassandraVacationDAO.java
index 4255019406..2e27907b36 100644
---
a/server/data/data-cassandra/src/main/java/org/apache/james/vacation/cassandra/CassandraVacationDAO.java
+++
b/server/data/data-cassandra/src/main/java/org/apache/james/vacation/cassandra/CassandraVacationDAO.java
@@ -35,6 +35,7 @@ import jakarta.inject.Inject;
import org.apache.james.backends.cassandra.init.CassandraTypesProvider;
import
org.apache.james.backends.cassandra.init.CassandraZonedDateTimeDataDefinition;
import org.apache.james.backends.cassandra.utils.CassandraAsyncExecutor;
+import org.apache.james.backends.cassandra.utils.ProfileLocator;
import org.apache.james.util.ValuePatch;
import org.apache.james.vacation.api.AccountId;
import org.apache.james.vacation.api.Vacation;
@@ -42,6 +43,7 @@ import org.apache.james.vacation.api.VacationPatch;
import org.apache.james.vacation.cassandra.tables.CassandraVacationTable;
import com.datastax.oss.driver.api.core.CqlSession;
+import com.datastax.oss.driver.api.core.config.DriverExecutionProfile;
import com.datastax.oss.driver.api.core.cql.PreparedStatement;
import com.datastax.oss.driver.api.core.cql.Row;
import com.datastax.oss.driver.api.core.type.UserDefinedType;
@@ -56,6 +58,8 @@ public class CassandraVacationDAO {
private final PreparedStatement readStatement;
private final UserDefinedType zonedDateTimeUserType;
private final BiFunction<VacationPatch, RegularInsert, RegularInsert>
insertGeneratorPipeline;
+ private final DriverExecutionProfile readProfile;
+ private final DriverExecutionProfile writeProfile;
@Inject
public CassandraVacationDAO(CqlSession session, CassandraTypesProvider
cassandraTypesProvider) {
@@ -77,6 +81,9 @@ public class CassandraVacationDAO {
.stream()
.reduce((vacation, insert) -> insert,
(a, b) -> (vacation, insert) -> b.apply(vacation,
a.apply(vacation, insert)));
+
+ this.readProfile = ProfileLocator.READ.locateProfile(session,
"VACATION");
+ this.writeProfile = ProfileLocator.WRITE.locateProfile(session,
"VACATION");
}
public Mono<Void> modifyVacation(AccountId accountId, VacationPatch
vacationPatch) {
@@ -84,12 +91,14 @@ public class CassandraVacationDAO {
createSpecificUpdate(vacationPatch,
insertInto(CassandraVacationTable.TABLE_NAME)
.value(CassandraVacationTable.ACCOUNT_ID,
literal(accountId.getIdentifier())))
- .build());
+ .build()
+ .setExecutionProfile(writeProfile));
}
public Mono<Optional<Vacation>> retrieveVacation(AccountId accountId) {
return
cassandraAsyncExecutor.executeSingleRowOptional(readStatement.bind()
- .setString(CassandraVacationTable.ACCOUNT_ID,
accountId.getIdentifier()))
+ .setString(CassandraVacationTable.ACCOUNT_ID,
accountId.getIdentifier())
+ .setExecutionProfile(readProfile))
.map(optional -> optional.map(row -> Vacation.builder()
.enabled(row.getBoolean(CassandraVacationTable.IS_ENABLED))
.fromDate(retrieveDate(row, CassandraVacationTable.FROM_DATE))
diff --git
a/server/data/data-jmap-cassandra/src/main/java/org/apache/james/jmap/cassandra/pushsubscription/CassandraPushSubscriptionDAO.java
b/server/data/data-jmap-cassandra/src/main/java/org/apache/james/jmap/cassandra/pushsubscription/CassandraPushSubscriptionDAO.java
index 4437a1bac3..0649c33e08 100644
---
a/server/data/data-jmap-cassandra/src/main/java/org/apache/james/jmap/cassandra/pushsubscription/CassandraPushSubscriptionDAO.java
+++
b/server/data/data-jmap-cassandra/src/main/java/org/apache/james/jmap/cassandra/pushsubscription/CassandraPushSubscriptionDAO.java
@@ -43,6 +43,7 @@ import java.util.Set;
import jakarta.inject.Inject;
import org.apache.james.backends.cassandra.utils.CassandraAsyncExecutor;
+import org.apache.james.backends.cassandra.utils.ProfileLocator;
import org.apache.james.core.Username;
import org.apache.james.jmap.api.change.TypeStateFactory;
import org.apache.james.jmap.api.model.DeviceClientId;
@@ -55,6 +56,7 @@ import org.apache.james.jmap.api.model.TypeName;
import org.apache.james.jmap.api.model.VerificationCode;
import com.datastax.oss.driver.api.core.CqlSession;
+import com.datastax.oss.driver.api.core.config.DriverExecutionProfile;
import com.datastax.oss.driver.api.core.cql.BoundStatementBuilder;
import com.datastax.oss.driver.api.core.cql.PreparedStatement;
import com.datastax.oss.driver.api.core.cql.Row;
@@ -74,6 +76,8 @@ public class CassandraPushSubscriptionDAO {
private final PreparedStatement selectAll;
private final PreparedStatement deleteOne;
private final PreparedStatement deleteAll;
+ private final DriverExecutionProfile readProfile;
+ private final DriverExecutionProfile writeProfile;
@Inject
public CassandraPushSubscriptionDAO(CqlSession session, TypeStateFactory
typeStateFactory) {
@@ -106,6 +110,9 @@ public class CassandraPushSubscriptionDAO {
.build());
this.typeStateFactory = typeStateFactory;
+
+ this.readProfile = ProfileLocator.READ.locateProfile(session,
"PUSH-SUBSCRIPTION");
+ this.writeProfile = ProfileLocator.WRITE.locateProfile(session,
"PUSH-SUBSCRIPTION");
}
public Mono<PushSubscription> insert(Username username, PushSubscription
subscription) {
@@ -128,24 +135,26 @@ public class CassandraPushSubscriptionDAO {
.ifPresent(keys ->
insertSubscription.setString(ENCRYPT_PUBLIC_KEY, keys.p256dh())
.setString(ENCRYPT_AUTH_SECRET, keys.auth()));
- return executor.executeVoid(insertSubscription.build())
+ return
executor.executeVoid(insertSubscription.build().setExecutionProfile(writeProfile))
.thenReturn(subscription);
}
public Flux<PushSubscription> selectAll(Username username) {
- return executor.executeRows(selectAll.bind().setString(USER,
username.asString()))
+ return executor.executeRows(selectAll.bind().setString(USER,
username.asString()).setExecutionProfile(readProfile))
.map(this::toPushSubscription);
}
public Mono<Void> deleteOne(Username username, String deviceClientId) {
return executor.executeVoid(deleteOne.bind()
.setString(USER, username.asString())
- .setString(DEVICE_CLIENT_ID, deviceClientId));
+ .setString(DEVICE_CLIENT_ID, deviceClientId)
+ .setExecutionProfile(writeProfile));
}
public Mono<Void> deleteAll(Username username) {
return executor.executeVoid(deleteAll.bind()
- .setString(USER, username.asString()));
+ .setString(USER, username.asString())
+ .setExecutionProfile(writeProfile));
}
private PushSubscription toPushSubscription(Row row) {
diff --git
a/server/protocols/protocols-pop3-distributed/src/test/java/org/apache/james/pop3server/mailbox/task/MetaDataFixInconsistenciesServiceTest.java
b/server/protocols/protocols-pop3-distributed/src/test/java/org/apache/james/pop3server/mailbox/task/MetaDataFixInconsistenciesServiceTest.java
index 6a5af68586..1b0b611780 100644
---
a/server/protocols/protocols-pop3-distributed/src/test/java/org/apache/james/pop3server/mailbox/task/MetaDataFixInconsistenciesServiceTest.java
+++
b/server/protocols/protocols-pop3-distributed/src/test/java/org/apache/james/pop3server/mailbox/task/MetaDataFixInconsistenciesServiceTest.java
@@ -169,7 +169,8 @@ public class MetaDataFixInconsistenciesServiceTest {
cassandra.getTypesProvider(),
CassandraBlobStoreFactory.forTesting(cassandra.getConf(), new
RecordingMetricFactory())
.passthrough(),
- new PlainBlobId.Factory());
+ new PlainBlobId.Factory(),
+ CassandraConfiguration.DEFAULT_CONFIGURATION);
testee = new MetaDataFixInconsistenciesService(imapUidDAO,
pop3MetadataStore, cassandraMessageDAOV3);
}
diff --git
a/server/protocols/webadmin/webadmin-pop3/src/test/java/org/apache/james/pop3/webadmin/Pop3MetaDataFixInconsistenciesRoutesTest.java
b/server/protocols/webadmin/webadmin-pop3/src/test/java/org/apache/james/pop3/webadmin/Pop3MetaDataFixInconsistenciesRoutesTest.java
index 76cef2dbdd..047765ec6e 100644
---
a/server/protocols/webadmin/webadmin-pop3/src/test/java/org/apache/james/pop3/webadmin/Pop3MetaDataFixInconsistenciesRoutesTest.java
+++
b/server/protocols/webadmin/webadmin-pop3/src/test/java/org/apache/james/pop3/webadmin/Pop3MetaDataFixInconsistenciesRoutesTest.java
@@ -186,7 +186,8 @@ class Pop3MetaDataFixInconsistenciesRoutesTest {
cassandra.getTypesProvider(),
CassandraBlobStoreFactory.forTesting(cassandra.getConf(), new
RecordingMetricFactory())
.passthrough(),
- new PlainBlobId.Factory());
+ new PlainBlobId.Factory(),
+ CassandraConfiguration.DEFAULT_CONFIGURATION);
MetaDataFixInconsistenciesService fixInconsistenciesService = new
MetaDataFixInconsistenciesService(imapUidDAO, pop3MetadataStore,
cassandraMessageDAOV3);
taskManager = new MemoryTaskManager(new Hostname("foo"));
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]