This is an automated email from the ASF dual-hosted git repository.
imbajin pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hugegraph.git
The following commit(s) were added to refs/heads/master by this push:
new 836b34832 fix(server): auto-recover session after Cassandra restart
(#2997)
836b34832 is described below
commit 836b34832b485df8b4cae02880a489ebc516cf6d
Author: Davide Polato <[email protected]>
AuthorDate: Sat Apr 25 22:13:06 2026 +0200
fix(server): auto-recover session after Cassandra restart (#2997)
- Reset driver session after each transient failure in executeWithRetry()
so retries reopen cleanly via lazy open()
- Remove redundant finally block in reconnectIfNeeded(); null session
directly on DriverException
- Store retryBaseDelay as field, reuse in open() (removes double-read)
- One-time LOG.warn via AtomicBoolean for commitAsync() retry gap
- Tighten defaults: max_delay 60sā10s, max_retries 10ā3, interval 5sā1s
- Wire retry config via HugeConfig in tests; add cross-validator tests
---
.../backend/store/cassandra/CassandraOptions.java | 43 ++++
.../store/cassandra/CassandraSessionPool.java | 229 ++++++++++++++++++-
.../hugegraph/unit/cassandra/CassandraTest.java | 244 ++++++++++++++++++++-
3 files changed, 510 insertions(+), 6 deletions(-)
diff --git
a/hugegraph-server/hugegraph-cassandra/src/main/java/org/apache/hugegraph/backend/store/cassandra/CassandraOptions.java
b/hugegraph-server/hugegraph-cassandra/src/main/java/org/apache/hugegraph/backend/store/cassandra/CassandraOptions.java
index a9ccf9776..6cb3b4138 100644
---
a/hugegraph-server/hugegraph-cassandra/src/main/java/org/apache/hugegraph/backend/store/cassandra/CassandraOptions.java
+++
b/hugegraph-server/hugegraph-cassandra/src/main/java/org/apache/hugegraph/backend/store/cassandra/CassandraOptions.java
@@ -130,4 +130,47 @@ public class CassandraOptions extends OptionHolder {
positiveInt(),
12 * 60 * 60
);
+
+ public static final ConfigOption<Long> CASSANDRA_RECONNECT_BASE_DELAY =
+ new ConfigOption<>(
+ "cassandra.reconnect_base_delay",
+ "The base delay in milliseconds used by the driver's " +
+ "exponential reconnection policy when a Cassandra host " +
+ "becomes unreachable.",
+ rangeInt(100L, Long.MAX_VALUE),
+ 1000L
+ );
+
+ public static final ConfigOption<Long> CASSANDRA_RECONNECT_MAX_DELAY =
+ new ConfigOption<>(
+ "cassandra.reconnect_max_delay",
+ "The maximum delay in milliseconds used by the driver's " +
+ "exponential reconnection policy when a Cassandra host " +
+ "becomes unreachable.",
+ rangeInt(1000L, Long.MAX_VALUE),
+ 10_000L
+ );
+
+ public static final ConfigOption<Integer>
CASSANDRA_QUERY_RETRY_MAX_ATTEMPTS =
+ new ConfigOption<>(
+ "cassandra.query_retry_max_attempts",
+ "The maximum number of retry attempts applied at
query-time when " +
+ "a Cassandra host is temporarily unreachable. " +
+ "OperationTimedOutException is retried only for " +
+ "idempotent statements. " +
+ "Set to 0 to disable query-time retries.",
+ rangeInt(0, Integer.MAX_VALUE),
+ 3
+ );
+
+ public static final ConfigOption<Long> CASSANDRA_QUERY_RETRY_INTERVAL =
+ new ConfigOption<>(
+ "cassandra.query_retry_interval",
+ "The interval in milliseconds between query-time retries "
+
+ "when a Cassandra host is temporarily unreachable. The " +
+ "actual wait grows with exponential backoff, capped at " +
+ "cassandra.reconnect_max_delay.",
+ rangeInt(100L, Long.MAX_VALUE),
+ 1000L
+ );
}
diff --git
a/hugegraph-server/hugegraph-cassandra/src/main/java/org/apache/hugegraph/backend/store/cassandra/CassandraSessionPool.java
b/hugegraph-server/hugegraph-cassandra/src/main/java/org/apache/hugegraph/backend/store/cassandra/CassandraSessionPool.java
index 7a9ffa2b9..a217d5344 100644
---
a/hugegraph-server/hugegraph-cassandra/src/main/java/org/apache/hugegraph/backend/store/cassandra/CassandraSessionPool.java
+++
b/hugegraph-server/hugegraph-cassandra/src/main/java/org/apache/hugegraph/backend/store/cassandra/CassandraSessionPool.java
@@ -20,12 +20,15 @@ package org.apache.hugegraph.backend.store.cassandra;
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
+import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.hugegraph.backend.BackendException;
import
org.apache.hugegraph.backend.store.BackendSession.AbstractBackendSession;
import org.apache.hugegraph.backend.store.BackendSessionPool;
import org.apache.hugegraph.config.HugeConfig;
import org.apache.hugegraph.util.E;
+import org.apache.hugegraph.util.Log;
+import org.slf4j.Logger;
import com.datastax.driver.core.BatchStatement;
import com.datastax.driver.core.Cluster;
@@ -34,22 +37,67 @@ import com.datastax.driver.core.Metadata;
import com.datastax.driver.core.ProtocolOptions.Compression;
import com.datastax.driver.core.ResultSet;
import com.datastax.driver.core.ResultSetFuture;
+import com.datastax.driver.core.SimpleStatement;
import com.datastax.driver.core.SocketOptions;
import com.datastax.driver.core.Statement;
+import com.datastax.driver.core.exceptions.DriverException;
import com.datastax.driver.core.exceptions.InvalidQueryException;
+import com.datastax.driver.core.exceptions.NoHostAvailableException;
+import com.datastax.driver.core.exceptions.OperationTimedOutException;
+import com.datastax.driver.core.policies.ExponentialReconnectionPolicy;
public class CassandraSessionPool extends BackendSessionPool {
+ private static final Logger LOG = Log.logger(CassandraSessionPool.class);
+
private static final int SECOND = 1000;
+ private static final String HEALTH_CHECK_CQL =
+ "SELECT now() FROM system.local";
+
+ /**
+ * Guards the one-time JVM-wide warning about {@code commitAsync()} not
+ * being covered by query-time retries. {@link CassandraSessionPool} is
+ * instantiated once per backend store per graph, so without this guard
+ * the warning would fire many times on startup for a structural
+ * limitation that does not change between instances.
+ */
+ private static final AtomicBoolean ASYNC_RETRY_WARNING_LOGGED =
+ new AtomicBoolean(false);
private Cluster cluster;
private final String keyspace;
+ private final int maxRetries;
+ private final long retryInterval;
+ private final long retryBaseDelay;
+ private final long retryMaxDelay;
public CassandraSessionPool(HugeConfig config,
String keyspace, String store) {
super(config, keyspace + "/" + store);
this.cluster = null;
this.keyspace = keyspace;
+ this.maxRetries = config.get(
+ CassandraOptions.CASSANDRA_QUERY_RETRY_MAX_ATTEMPTS);
+ this.retryInterval = config.get(
+ CassandraOptions.CASSANDRA_QUERY_RETRY_INTERVAL);
+ long reconnectBase = config.get(
+ CassandraOptions.CASSANDRA_RECONNECT_BASE_DELAY);
+ long reconnectMax = config.get(
+ CassandraOptions.CASSANDRA_RECONNECT_MAX_DELAY);
+ E.checkArgument(reconnectMax >= reconnectBase,
+ "'%s' (%s) must be >= '%s' (%s)",
+ CassandraOptions.CASSANDRA_RECONNECT_MAX_DELAY.name(),
+ reconnectMax,
+ CassandraOptions.CASSANDRA_RECONNECT_BASE_DELAY.name(),
+ reconnectBase);
+ this.retryBaseDelay = reconnectBase;
+ this.retryMaxDelay = reconnectMax;
+
+ if (this.maxRetries > 0 &&
+ ASYNC_RETRY_WARNING_LOGGED.compareAndSet(false, true)) {
+ LOG.warn("cassandra.query_retry_max_attempts={} applies to sync
commit()" +
+ " only. commitAsync() has no retry protection.",
this.maxRetries);
+ }
}
@Override
@@ -86,6 +134,12 @@ public class CassandraSessionPool extends
BackendSessionPool {
builder.withSocketOptions(socketOptions);
+ // Reconnection policy: let driver keep retrying nodes in background
+ // with exponential backoff after they go down (see issue #2740).
+ builder.withReconnectionPolicy(
+ new ExponentialReconnectionPolicy(this.retryBaseDelay,
+ this.retryMaxDelay));
+
// Credential options
String username = config.get(CassandraOptions.CASSANDRA_USERNAME);
String password = config.get(CassandraOptions.CASSANDRA_PASSWORD);
@@ -161,7 +215,7 @@ public class CassandraSessionPool extends
BackendSessionPool {
@Override
public ResultSet commit() {
- ResultSet rs = this.session.execute(this.batch);
+ ResultSet rs = this.executeWithRetry(this.batch);
// Clear batch if execute() successfully (retained if failed)
this.batch.clear();
return rs;
@@ -169,12 +223,25 @@ public class CassandraSessionPool extends
BackendSessionPool {
public void commitAsync() {
Collection<Statement> statements = this.batch.getStatements();
+ if (statements.isEmpty()) {
+ this.batch.clear();
+ return;
+ }
int count = 0;
int processors = Math.min(statements.size(), 1023);
List<ResultSetFuture> results = new ArrayList<>(processors + 1);
+ com.datastax.driver.core.Session driverSession =
+ this.sessionForAsyncCommit();
for (Statement s : statements) {
- ResultSetFuture future = this.session.executeAsync(s);
+ // TODO: track async retry support in a follow-up issue.
+ // commitAsync() bypasses executeWithRetry().
+ // During a Cassandra restart, async writes may fail with
+ // NoHostAvailableException even when maxRetries > 0. Callers
+ // must handle ResultSetFuture failures surfaced by
+ // getUninterruptibly(). A follow-up issue should wrap each
+ // future with retry semantics.
+ ResultSetFuture future = driverSession.executeAsync(s);
results.add(future);
if (++count > processors) {
@@ -197,15 +264,99 @@ public class CassandraSessionPool extends
BackendSessionPool {
}
public ResultSet execute(Statement statement) {
- return this.session.execute(statement);
+ return this.executeWithRetry(statement);
}
public ResultSet execute(String statement) {
- return this.session.execute(statement);
+ return this.executeWithRetry(new SimpleStatement(statement));
}
public ResultSet execute(String statement, Object... args) {
- return this.session.execute(statement, args);
+ return this.executeWithRetry(new SimpleStatement(statement, args));
+ }
+
+ /**
+ * Execute a statement, retrying on transient connectivity failures
+ * (NoHostAvailableException / OperationTimedOutException). The driver
+ * itself keeps retrying connections in the background via the
+ * reconnection policy, so once Cassandra comes back online, a
+ * subsequent attempt here will succeed without restarting the server.
+ *
+ * <p>OperationTimedOutException is only retried for statements marked
+ * idempotent; otherwise a timed-out mutation might be applied once by
+ * Cassandra and then duplicated by a client-side retry.
+ *
+ * <p>If the driver session has been discarded (e.g. by
+ * {@link #reconnectIfNeeded()} after a failed health-check) it is
+ * lazily reopened at the start of each attempt.
+ *
+ * <p><b>Blocking note:</b> retries block the calling thread via
+ * {@link Thread#sleep(long)}. Worst-case a single call blocks for
+ * {@code maxRetries * retryMaxDelay} ms. Under high-throughput
+ * workloads concurrent threads may pile up in {@code sleep()} during
+ * a Cassandra outage. For such deployments lower
+ * {@code cassandra.query_retry_max_attempts} (default 3) and
+ * {@code cassandra.reconnect_max_delay} (default 10000ms) so the
+ * request fails fast and pressure is released back to the caller.
+ */
+ private ResultSet executeWithRetry(Statement statement) {
+ int retries = CassandraSessionPool.this.maxRetries;
+ long interval = CassandraSessionPool.this.retryInterval;
+ long maxDelay = CassandraSessionPool.this.retryMaxDelay;
+ DriverException lastError = null;
+ for (int attempt = 0; attempt <= retries; attempt++) {
+ try {
+ if (this.session == null || this.session.isClosed()) {
+ // Lazy reopen: may itself throw NHAE while
+ // Cassandra is still unreachable; the catch below
+ // treats that as a transient failure.
+ this.session = null;
+ this.open();
+ }
+ return this.session.execute(statement);
+ } catch (NoHostAvailableException | OperationTimedOutException
e) {
+ lastError = e;
+ if (e instanceof OperationTimedOutException &&
+ !Boolean.TRUE.equals(statement.isIdempotent())) {
+ throw new BackendException(
+ "Cassandra query timed out and won't be " +
+ "retried because the statement is not " +
+ "marked idempotent", e);
+ }
+ if (attempt >= retries) {
+ break;
+ }
+ long cap = maxDelay > 0 ? maxDelay : interval;
+ long shift = 1L << Math.min(attempt, 20);
+ long delay;
+ try {
+ // Guard against Long overflow when retryInterval is
huge.
+ delay = Math.min(Math.multiplyExact(interval, shift),
cap);
+ } catch (ArithmeticException overflow) {
+ delay = cap;
+ }
+ LOG.warn("Cassandra temporarily unavailable ({}), " +
+ "retry {}/{} in {} ms",
+ e.getClass().getSimpleName(), attempt + 1,
+ retries, delay);
+ try {
+ Thread.sleep(delay);
+ } catch (InterruptedException ie) {
+ Thread.currentThread().interrupt();
+ throw new BackendException("Interrupted while " +
+ "waiting to retry " +
+ "Cassandra query", ie);
+ }
+ }
+ }
+ // Preserve original exception as cause (stack trace + type) by
+ // pre-formatting the message and using the (String, Throwable)
+ // constructor explicitly to avoid ambiguity with varargs
overloads.
+ String msg = String.format(
+ "Failed to execute Cassandra query after %s retries: %s",
+ retries,
+ lastError == null ? "<null>" : lastError.getMessage());
+ throw new BackendException(msg, lastError);
}
private void tryOpen() {
@@ -217,6 +368,24 @@ public class CassandraSessionPool extends
BackendSessionPool {
}
}
+ private com.datastax.driver.core.Session sessionForAsyncCommit() {
+ if (this.session == null || this.session.isClosed()) {
+ this.session = null;
+ try {
+ this.open();
+ } catch (DriverException e) {
+ throw new BackendException(
+ "Failed to open Cassandra session for async
commit",
+ e);
+ }
+ }
+ if (this.session == null) {
+ throw new BackendException(
+ "Cassandra session is unavailable for async commit");
+ }
+ return this.session;
+ }
+
@Override
public void open() {
this.opened = true;
@@ -255,6 +424,56 @@ public class CassandraSessionPool extends
BackendSessionPool {
return this.batch.size() > 0;
}
+ /**
+ * Periodic liveness probe invoked by {@link BackendSessionPool} to
+ * recover thread-local sessions after Cassandra has been restarted.
+ * Reopens the driver session if it was closed and pings the cluster
+ * with a lightweight query. On failure the session is discarded via
+ * {@link #reset()} so the next call to
+ * {@link #executeWithRetry(Statement)} reopens it; any exception
+ * here is swallowed so the caller can still issue the real query.
+ */
+ @Override
+ public void reconnectIfNeeded() {
+ if (!this.opened) {
+ return;
+ }
+ try {
+ if (this.session == null || this.session.isClosed()) {
+ this.session = null;
+ this.tryOpen();
+ }
+ if (this.session != null) {
+ this.session.execute(new
SimpleStatement(HEALTH_CHECK_CQL));
+ }
+ } catch (NoHostAvailableException | OperationTimedOutException e) {
+ LOG.debug("Cassandra health-check failed, resetting session:
{}",
+ e.getMessage());
+ this.reset();
+ }
+ }
+
+ /**
+ * Force-close the driver session so it is re-opened on the next
+ * {@link #opened()} call. Used when a failure is observed and we
+ * want to start fresh on the next attempt.
+ */
+ @Override
+ public void reset() {
+ if (this.session == null) {
+ return;
+ }
+ try {
+ this.session.close();
+ } catch (Exception e) {
+ // Do not swallow Error (OOM / StackOverflow); only log
+ // ordinary exceptions raised by the driver on close.
+ LOG.warn("Failed to reset Cassandra session", e);
+ } finally {
+ this.session = null;
+ }
+ }
+
public Collection<Statement> statements() {
return this.batch.getStatements();
}
diff --git
a/hugegraph-server/hugegraph-test/src/main/java/org/apache/hugegraph/unit/cassandra/CassandraTest.java
b/hugegraph-server/hugegraph-test/src/main/java/org/apache/hugegraph/unit/cassandra/CassandraTest.java
index ef5a8e896..6445fc38b 100644
---
a/hugegraph-server/hugegraph-test/src/main/java/org/apache/hugegraph/unit/cassandra/CassandraTest.java
+++
b/hugegraph-server/hugegraph-test/src/main/java/org/apache/hugegraph/unit/cassandra/CassandraTest.java
@@ -17,11 +17,15 @@
package org.apache.hugegraph.unit.cassandra;
+import java.net.InetSocketAddress;
+import java.util.Collections;
import java.util.Map;
import org.apache.commons.configuration2.Configuration;
import org.apache.commons.configuration2.PropertiesConfiguration;
+import org.apache.hugegraph.backend.BackendException;
import org.apache.hugegraph.backend.store.cassandra.CassandraOptions;
+import org.apache.hugegraph.backend.store.cassandra.CassandraSessionPool;
import org.apache.hugegraph.backend.store.cassandra.CassandraStore;
import org.apache.hugegraph.config.HugeConfig;
import org.apache.hugegraph.config.OptionSpace;
@@ -30,7 +34,14 @@ import org.apache.hugegraph.testutil.Whitebox;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
-
+import org.mockito.Mockito;
+
+import com.datastax.driver.core.ResultSet;
+import com.datastax.driver.core.ResultSetFuture;
+import com.datastax.driver.core.SimpleStatement;
+import com.datastax.driver.core.Statement;
+import com.datastax.driver.core.exceptions.NoHostAvailableException;
+import com.datastax.driver.core.exceptions.OperationTimedOutException;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
@@ -192,4 +203,235 @@ public class CassandraTest {
Whitebox.invokeStatic(CassandraStore.class, "parseReplica",
config);
});
}
+
+ @Test
+ public void testReconnectOptionsHaveSensibleDefaults() {
+ // Runtime-reconnection options must exist with non-zero defaults so
+ // HugeGraph keeps running when Cassandra restarts (issue #2740).
+ Assert.assertEquals(1000L, (long) CassandraOptions
+ .CASSANDRA_RECONNECT_BASE_DELAY.defaultValue());
+ Assert.assertEquals(10_000L, (long) CassandraOptions
+ .CASSANDRA_RECONNECT_MAX_DELAY.defaultValue());
+ Assert.assertEquals(3, (int) CassandraOptions
+ .CASSANDRA_QUERY_RETRY_MAX_ATTEMPTS.defaultValue());
+ Assert.assertEquals(1000L, (long) CassandraOptions
+ .CASSANDRA_QUERY_RETRY_INTERVAL.defaultValue());
+ }
+
+ @Test
+ public void testReconnectOptionsAreOverridable() {
+ String base = CassandraOptions.CASSANDRA_RECONNECT_BASE_DELAY.name();
+ String max = CassandraOptions.CASSANDRA_RECONNECT_MAX_DELAY.name();
+ String retries = CassandraOptions.CASSANDRA_QUERY_RETRY_MAX_ATTEMPTS
+ .name();
+ String interval =
CassandraOptions.CASSANDRA_QUERY_RETRY_INTERVAL.name();
+
+ Configuration conf = new PropertiesConfiguration();
+ conf.setProperty(base, 500L);
+ conf.setProperty(max, 30_000L);
+ conf.setProperty(retries, 3);
+ conf.setProperty(interval, 1000L);
+ HugeConfig config = new HugeConfig(conf);
+
+ Assert.assertEquals(500L, (long) config.get(
+ CassandraOptions.CASSANDRA_RECONNECT_BASE_DELAY));
+ Assert.assertEquals(30_000L, (long) config.get(
+ CassandraOptions.CASSANDRA_RECONNECT_MAX_DELAY));
+ Assert.assertEquals(3, (int) config.get(
+ CassandraOptions.CASSANDRA_QUERY_RETRY_MAX_ATTEMPTS));
+ Assert.assertEquals(1000L, (long) config.get(
+ CassandraOptions.CASSANDRA_QUERY_RETRY_INTERVAL));
+ }
+
+ @Test
+ public void testQueryRetryAttemptsCanBeDisabled() {
+ String retries = CassandraOptions.CASSANDRA_QUERY_RETRY_MAX_ATTEMPTS
+ .name();
+ Configuration conf = new PropertiesConfiguration();
+ conf.setProperty(retries, 0);
+ HugeConfig config = new HugeConfig(conf);
+ Assert.assertEquals(0, (int) config.get(
+ CassandraOptions.CASSANDRA_QUERY_RETRY_MAX_ATTEMPTS));
+ }
+
+ @Test
+ public void testExecuteWithRetrySucceedsAfterTransientFailures() {
+ // Configure retry knobs via config so the pool reads them through
+ // the normal path (no Whitebox overrides on retry fields). Keep the
+ // values within the validators' lower bounds (base >= 100, max >=
+ // base, interval >= 100).
+ Configuration conf = new PropertiesConfiguration();
+ conf.setProperty(
+ CassandraOptions.CASSANDRA_RECONNECT_BASE_DELAY.name(), 100L);
+ conf.setProperty(
+ CassandraOptions.CASSANDRA_RECONNECT_MAX_DELAY.name(), 1000L);
+ conf.setProperty(
+ CassandraOptions.CASSANDRA_QUERY_RETRY_MAX_ATTEMPTS.name(), 3);
+ conf.setProperty(
+ CassandraOptions.CASSANDRA_QUERY_RETRY_INTERVAL.name(), 100L);
+ HugeConfig config = new HugeConfig(conf);
+ CassandraSessionPool pool = new CassandraSessionPool(config,
+ "ks", "store");
+
+ com.datastax.driver.core.Session driverSession = Mockito.mock(
+ com.datastax.driver.core.Session.class);
+ ResultSet rs = Mockito.mock(ResultSet.class);
+ NoHostAvailableException transientFailure =
+ new NoHostAvailableException(Collections.emptyMap());
+ Mockito.when(driverSession.execute(Mockito.any(Statement.class)))
+ .thenThrow(transientFailure)
+ .thenThrow(transientFailure)
+ .thenReturn(rs);
+
+ CassandraSessionPool.Session session = pool.new Session();
+ Whitebox.setInternalState(session, "session", driverSession);
+
+ ResultSet result = session.execute("SELECT now() FROM system.local");
+ Assert.assertSame(rs, result);
+ Mockito.verify(driverSession, Mockito.times(3))
+ .execute(Mockito.any(Statement.class));
+ Mockito.verify(driverSession, Mockito.never()).close();
+ }
+
+ @Test
+ public void testExecuteWithRetrySkipsNonIdempotentTimeoutRetry() {
+ Configuration conf = new PropertiesConfiguration();
+ conf.setProperty(
+ CassandraOptions.CASSANDRA_RECONNECT_BASE_DELAY.name(), 100L);
+ conf.setProperty(
+ CassandraOptions.CASSANDRA_RECONNECT_MAX_DELAY.name(), 1000L);
+ conf.setProperty(
+ CassandraOptions.CASSANDRA_QUERY_RETRY_MAX_ATTEMPTS.name(), 3);
+ conf.setProperty(
+ CassandraOptions.CASSANDRA_QUERY_RETRY_INTERVAL.name(), 100L);
+ HugeConfig config = new HugeConfig(conf);
+ CassandraSessionPool pool = new CassandraSessionPool(config,
+ "ks", "store");
+
+ com.datastax.driver.core.Session driverSession = Mockito.mock(
+ com.datastax.driver.core.Session.class);
+ OperationTimedOutException timeout = new OperationTimedOutException(
+ new InetSocketAddress("127.0.0.1", 9042));
+ Mockito.when(driverSession.execute(Mockito.any(Statement.class)))
+ .thenThrow(timeout);
+
+ CassandraSessionPool.Session session = pool.new Session();
+ Whitebox.setInternalState(session, "session", driverSession);
+
+ Assert.assertThrows(BackendException.class, () ->
+ session.execute("UPDATE counter SET value = value + 1"));
+ Mockito.verify(driverSession, Mockito.times(1))
+ .execute(Mockito.any(Statement.class));
+ }
+
+ @Test
+ public void testExecuteWithRetryAllowsIdempotentTimeoutRetry() {
+ Configuration conf = new PropertiesConfiguration();
+ conf.setProperty(
+ CassandraOptions.CASSANDRA_RECONNECT_BASE_DELAY.name(), 100L);
+ conf.setProperty(
+ CassandraOptions.CASSANDRA_RECONNECT_MAX_DELAY.name(), 1000L);
+ conf.setProperty(
+ CassandraOptions.CASSANDRA_QUERY_RETRY_MAX_ATTEMPTS.name(), 3);
+ conf.setProperty(
+ CassandraOptions.CASSANDRA_QUERY_RETRY_INTERVAL.name(), 100L);
+ HugeConfig config = new HugeConfig(conf);
+ CassandraSessionPool pool = new CassandraSessionPool(config,
+ "ks", "store");
+
+ com.datastax.driver.core.Session driverSession = Mockito.mock(
+ com.datastax.driver.core.Session.class);
+ ResultSet rs = Mockito.mock(ResultSet.class);
+ OperationTimedOutException timeout = new OperationTimedOutException(
+ new InetSocketAddress("127.0.0.1", 9042));
+ SimpleStatement statement = new SimpleStatement(
+ "SELECT now() FROM system.local");
+ statement.setIdempotent(true);
+ Mockito.when(driverSession.execute(statement))
+ .thenThrow(timeout)
+ .thenReturn(rs);
+
+ CassandraSessionPool.Session session = pool.new Session();
+ Whitebox.setInternalState(session, "session", driverSession);
+
+ ResultSet result = session.execute(statement);
+ Assert.assertSame(rs, result);
+ Mockito.verify(driverSession, Mockito.times(2)).execute(statement);
+ }
+
+ @Test
+ public void testCommitAsyncOpensSessionBeforeExecuteAsync() {
+ Configuration conf = new PropertiesConfiguration();
+ conf.setProperty(
+ CassandraOptions.CASSANDRA_RECONNECT_BASE_DELAY.name(), 100L);
+ conf.setProperty(
+ CassandraOptions.CASSANDRA_RECONNECT_MAX_DELAY.name(), 1000L);
+ conf.setProperty(
+ CassandraOptions.CASSANDRA_QUERY_RETRY_MAX_ATTEMPTS.name(), 3);
+ conf.setProperty(
+ CassandraOptions.CASSANDRA_QUERY_RETRY_INTERVAL.name(), 100L);
+ HugeConfig config = new HugeConfig(conf);
+ CassandraSessionPool pool = new CassandraSessionPool(config,
+ "ks", "store");
+
+ com.datastax.driver.core.Cluster mockCluster = Mockito.mock(
+ com.datastax.driver.core.Cluster.class);
+ com.datastax.driver.core.Session driverSession = Mockito.mock(
+ com.datastax.driver.core.Session.class);
+ ResultSetFuture future = Mockito.mock(ResultSetFuture.class);
+ Mockito.when(mockCluster.isClosed()).thenReturn(false);
+ Mockito.when(mockCluster.connect(Mockito.anyString()))
+ .thenReturn(driverSession);
+ Mockito.when(driverSession.executeAsync(Mockito.any(Statement.class)))
+ .thenReturn(future);
+ Whitebox.setInternalState(pool, "cluster", mockCluster);
+
+ CassandraSessionPool.Session session = pool.new Session();
+ Statement statement = new SimpleStatement(
+ "INSERT INTO system.local(key) VALUES ('test')");
+ session.add(statement);
+
+ session.commitAsync();
+
+ Mockito.verify(mockCluster, Mockito.times(1)).connect("ks");
+ Mockito.verify(driverSession,
Mockito.times(1)).executeAsync(statement);
+ Mockito.verify(future, Mockito.times(1)).getUninterruptibly();
+ Assert.assertFalse(session.hasChanges());
+ }
+
+ @Test
+ public void testReconnectBaseDelayBelowMinimumRejected() {
+ // The validator on CASSANDRA_RECONNECT_BASE_DELAY is
+ // rangeInt(100L, Long.MAX_VALUE); values below 100 must be rejected
+ // at parse time. Setting the property as a String forces HugeConfig
+ // to run parseConvert() which invokes the range check.
+ Configuration conf = new PropertiesConfiguration();
+ Assert.assertThrows(Exception.class, () -> {
+ conf.setProperty(
+ CassandraOptions.CASSANDRA_RECONNECT_BASE_DELAY.name(),
+ "50");
+ new HugeConfig(conf);
+ });
+ }
+
+ @Test
+ public void testReconnectMaxDelayLessThanBaseRejected() {
+ // Both values must pass their individual range validators with margin
+ // (base >= 100, max >= 1000), so the only thing that can throw is the
+ // E.checkArgument(max >= base) cross-check inside the pool ctor. Set
+ // all four retry/reconnect properties explicitly so the test does not
+ // depend on default values changing in CassandraOptions.
+ Configuration conf = new PropertiesConfiguration();
+ conf.setProperty(
+ CassandraOptions.CASSANDRA_RECONNECT_BASE_DELAY.name(),
10_000L);
+ conf.setProperty(
+ CassandraOptions.CASSANDRA_RECONNECT_MAX_DELAY.name(), 2_000L);
+ conf.setProperty(
+ CassandraOptions.CASSANDRA_QUERY_RETRY_MAX_ATTEMPTS.name(), 3);
+ conf.setProperty(
+ CassandraOptions.CASSANDRA_QUERY_RETRY_INTERVAL.name(),
1_000L);
+ HugeConfig config = new HugeConfig(conf);
+ Assert.assertThrows(IllegalArgumentException.class, () ->
+ new CassandraSessionPool(config, "ks", "store"));
+ }
}