This is an automated email from the ASF dual-hosted git repository.

imbajin pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hugegraph.git


The following commit(s) were added to refs/heads/master by this push:
     new 836b34832 fix(server): auto-recover session after Cassandra restart 
(#2997)
836b34832 is described below

commit 836b34832b485df8b4cae02880a489ebc516cf6d
Author: Davide Polato <[email protected]>
AuthorDate: Sat Apr 25 22:13:06 2026 +0200

    fix(server): auto-recover session after Cassandra restart (#2997)
    
    - Reset driver session after each transient failure in executeWithRetry()
      so retries reopen cleanly via lazy open()
    - Remove redundant finally block in reconnectIfNeeded(); null session
      directly on DriverException
    - Store retryBaseDelay as field, reuse in open() (removes double-read)
    - One-time LOG.warn via AtomicBoolean for commitAsync() retry gap
    - Tighten defaults: max_delay 60s→10s, max_retries 10→3, interval 5s→1s
    - Wire retry config via HugeConfig in tests; add cross-validator tests
---
 .../backend/store/cassandra/CassandraOptions.java  |  43 ++++
 .../store/cassandra/CassandraSessionPool.java      | 229 ++++++++++++++++++-
 .../hugegraph/unit/cassandra/CassandraTest.java    | 244 ++++++++++++++++++++-
 3 files changed, 510 insertions(+), 6 deletions(-)

diff --git 
a/hugegraph-server/hugegraph-cassandra/src/main/java/org/apache/hugegraph/backend/store/cassandra/CassandraOptions.java
 
b/hugegraph-server/hugegraph-cassandra/src/main/java/org/apache/hugegraph/backend/store/cassandra/CassandraOptions.java
index a9ccf9776..6cb3b4138 100644
--- 
a/hugegraph-server/hugegraph-cassandra/src/main/java/org/apache/hugegraph/backend/store/cassandra/CassandraOptions.java
+++ 
b/hugegraph-server/hugegraph-cassandra/src/main/java/org/apache/hugegraph/backend/store/cassandra/CassandraOptions.java
@@ -130,4 +130,47 @@ public class CassandraOptions extends OptionHolder {
                     positiveInt(),
                     12 * 60 * 60
             );
+
+    public static final ConfigOption<Long> CASSANDRA_RECONNECT_BASE_DELAY =
+            new ConfigOption<>(
+                    "cassandra.reconnect_base_delay",
+                    "The base delay in milliseconds used by the driver's " +
+                    "exponential reconnection policy when a Cassandra host " +
+                    "becomes unreachable.",
+                    rangeInt(100L, Long.MAX_VALUE),
+                    1000L
+            );
+
+    public static final ConfigOption<Long> CASSANDRA_RECONNECT_MAX_DELAY =
+            new ConfigOption<>(
+                    "cassandra.reconnect_max_delay",
+                    "The maximum delay in milliseconds used by the driver's " +
+                    "exponential reconnection policy when a Cassandra host " +
+                    "becomes unreachable.",
+                    rangeInt(1000L, Long.MAX_VALUE),
+                    10_000L
+            );
+
+    public static final ConfigOption<Integer> 
CASSANDRA_QUERY_RETRY_MAX_ATTEMPTS =
+            new ConfigOption<>(
+                    "cassandra.query_retry_max_attempts",
+                    "The maximum number of retry attempts applied at 
query-time when " +
+                    "a Cassandra host is temporarily unreachable. " +
+                    "OperationTimedOutException is retried only for " +
+                    "idempotent statements. " +
+                    "Set to 0 to disable query-time retries.",
+                    rangeInt(0, Integer.MAX_VALUE),
+                    3
+            );
+
+    public static final ConfigOption<Long> CASSANDRA_QUERY_RETRY_INTERVAL =
+            new ConfigOption<>(
+                    "cassandra.query_retry_interval",
+                    "The interval in milliseconds between query-time retries " 
+
+                    "when a Cassandra host is temporarily unreachable. The " +
+                    "actual wait grows with exponential backoff, capped at " +
+                    "cassandra.reconnect_max_delay.",
+                    rangeInt(100L, Long.MAX_VALUE),
+                    1000L
+            );
 }
diff --git 
a/hugegraph-server/hugegraph-cassandra/src/main/java/org/apache/hugegraph/backend/store/cassandra/CassandraSessionPool.java
 
b/hugegraph-server/hugegraph-cassandra/src/main/java/org/apache/hugegraph/backend/store/cassandra/CassandraSessionPool.java
index 7a9ffa2b9..a217d5344 100644
--- 
a/hugegraph-server/hugegraph-cassandra/src/main/java/org/apache/hugegraph/backend/store/cassandra/CassandraSessionPool.java
+++ 
b/hugegraph-server/hugegraph-cassandra/src/main/java/org/apache/hugegraph/backend/store/cassandra/CassandraSessionPool.java
@@ -20,12 +20,15 @@ package org.apache.hugegraph.backend.store.cassandra;
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.List;
+import java.util.concurrent.atomic.AtomicBoolean;
 
 import org.apache.hugegraph.backend.BackendException;
 import 
org.apache.hugegraph.backend.store.BackendSession.AbstractBackendSession;
 import org.apache.hugegraph.backend.store.BackendSessionPool;
 import org.apache.hugegraph.config.HugeConfig;
 import org.apache.hugegraph.util.E;
+import org.apache.hugegraph.util.Log;
+import org.slf4j.Logger;
 
 import com.datastax.driver.core.BatchStatement;
 import com.datastax.driver.core.Cluster;
@@ -34,22 +37,67 @@ import com.datastax.driver.core.Metadata;
 import com.datastax.driver.core.ProtocolOptions.Compression;
 import com.datastax.driver.core.ResultSet;
 import com.datastax.driver.core.ResultSetFuture;
+import com.datastax.driver.core.SimpleStatement;
 import com.datastax.driver.core.SocketOptions;
 import com.datastax.driver.core.Statement;
+import com.datastax.driver.core.exceptions.DriverException;
 import com.datastax.driver.core.exceptions.InvalidQueryException;
+import com.datastax.driver.core.exceptions.NoHostAvailableException;
+import com.datastax.driver.core.exceptions.OperationTimedOutException;
+import com.datastax.driver.core.policies.ExponentialReconnectionPolicy;
 
 public class CassandraSessionPool extends BackendSessionPool {
 
+    private static final Logger LOG = Log.logger(CassandraSessionPool.class);
+
     private static final int SECOND = 1000;
+    private static final String HEALTH_CHECK_CQL =
+            "SELECT now() FROM system.local";
+
+    /**
+     * Guards the one-time JVM-wide warning about {@code commitAsync()} not
+     * being covered by query-time retries. {@link CassandraSessionPool} is
+     * instantiated once per backend store per graph, so without this guard
+     * the warning would fire many times on startup for a structural
+     * limitation that does not change between instances.
+     */
+    private static final AtomicBoolean ASYNC_RETRY_WARNING_LOGGED =
+            new AtomicBoolean(false);
 
     private Cluster cluster;
     private final String keyspace;
+    private final int maxRetries;
+    private final long retryInterval;
+    private final long retryBaseDelay;
+    private final long retryMaxDelay;
 
     public CassandraSessionPool(HugeConfig config,
                                 String keyspace, String store) {
         super(config, keyspace + "/" + store);
         this.cluster = null;
         this.keyspace = keyspace;
+        this.maxRetries = config.get(
+                CassandraOptions.CASSANDRA_QUERY_RETRY_MAX_ATTEMPTS);
+        this.retryInterval = config.get(
+                CassandraOptions.CASSANDRA_QUERY_RETRY_INTERVAL);
+        long reconnectBase = config.get(
+                CassandraOptions.CASSANDRA_RECONNECT_BASE_DELAY);
+        long reconnectMax = config.get(
+                CassandraOptions.CASSANDRA_RECONNECT_MAX_DELAY);
+        E.checkArgument(reconnectMax >= reconnectBase,
+                        "'%s' (%s) must be >= '%s' (%s)",
+                        CassandraOptions.CASSANDRA_RECONNECT_MAX_DELAY.name(),
+                        reconnectMax,
+                        CassandraOptions.CASSANDRA_RECONNECT_BASE_DELAY.name(),
+                        reconnectBase);
+        this.retryBaseDelay = reconnectBase;
+        this.retryMaxDelay = reconnectMax;
+
+        if (this.maxRetries > 0 &&
+            ASYNC_RETRY_WARNING_LOGGED.compareAndSet(false, true)) {
+            LOG.warn("cassandra.query_retry_max_attempts={} applies to sync 
commit()" +
+                     " only. commitAsync() has no retry protection.", 
this.maxRetries);
+        }
     }
 
     @Override
@@ -86,6 +134,12 @@ public class CassandraSessionPool extends 
BackendSessionPool {
 
         builder.withSocketOptions(socketOptions);
 
+        // Reconnection policy: let driver keep retrying nodes in background
+        // with exponential backoff after they go down (see issue #2740).
+        builder.withReconnectionPolicy(
+                new ExponentialReconnectionPolicy(this.retryBaseDelay,
+                                                  this.retryMaxDelay));
+
         // Credential options
         String username = config.get(CassandraOptions.CASSANDRA_USERNAME);
         String password = config.get(CassandraOptions.CASSANDRA_PASSWORD);
@@ -161,7 +215,7 @@ public class CassandraSessionPool extends 
BackendSessionPool {
 
         @Override
         public ResultSet commit() {
-            ResultSet rs = this.session.execute(this.batch);
+            ResultSet rs = this.executeWithRetry(this.batch);
             // Clear batch if execute() successfully (retained if failed)
             this.batch.clear();
             return rs;
@@ -169,12 +223,25 @@ public class CassandraSessionPool extends 
BackendSessionPool {
 
         public void commitAsync() {
             Collection<Statement> statements = this.batch.getStatements();
+            if (statements.isEmpty()) {
+                this.batch.clear();
+                return;
+            }
 
             int count = 0;
             int processors = Math.min(statements.size(), 1023);
             List<ResultSetFuture> results = new ArrayList<>(processors + 1);
+            com.datastax.driver.core.Session driverSession =
+                    this.sessionForAsyncCommit();
             for (Statement s : statements) {
-                ResultSetFuture future = this.session.executeAsync(s);
+                // TODO: track async retry support in a follow-up issue.
+                // commitAsync() bypasses executeWithRetry().
+                // During a Cassandra restart, async writes may fail with
+                // NoHostAvailableException even when maxRetries > 0. Callers
+                // must handle ResultSetFuture failures surfaced by
+                // getUninterruptibly(). A follow-up issue should wrap each
+                // future with retry semantics.
+                ResultSetFuture future = driverSession.executeAsync(s);
                 results.add(future);
 
                 if (++count > processors) {
@@ -197,15 +264,99 @@ public class CassandraSessionPool extends 
BackendSessionPool {
         }
 
         public ResultSet execute(Statement statement) {
-            return this.session.execute(statement);
+            return this.executeWithRetry(statement);
         }
 
         public ResultSet execute(String statement) {
-            return this.session.execute(statement);
+            return this.executeWithRetry(new SimpleStatement(statement));
         }
 
         public ResultSet execute(String statement, Object... args) {
-            return this.session.execute(statement, args);
+            return this.executeWithRetry(new SimpleStatement(statement, args));
+        }
+
+        /**
+         * Execute a statement, retrying on transient connectivity failures
+         * (NoHostAvailableException / OperationTimedOutException). The driver
+         * itself keeps retrying connections in the background via the
+         * reconnection policy, so once Cassandra comes back online, a
+         * subsequent attempt here will succeed without restarting the server.
+         *
+         * <p>OperationTimedOutException is only retried for statements marked
+         * idempotent; otherwise a timed-out mutation might be applied once by
+         * Cassandra and then duplicated by a client-side retry.
+         *
+         * <p>If the driver session has been discarded (e.g. by
+         * {@link #reconnectIfNeeded()} after a failed health-check) it is
+         * lazily reopened at the start of each attempt.
+         *
+         * <p><b>Blocking note:</b> retries block the calling thread via
+         * {@link Thread#sleep(long)}. Worst-case a single call blocks for
+         * {@code maxRetries * retryMaxDelay} ms. Under high-throughput
+         * workloads concurrent threads may pile up in {@code sleep()} during
+         * a Cassandra outage. For such deployments lower
+         * {@code cassandra.query_retry_max_attempts} (default 3) and
+         * {@code cassandra.reconnect_max_delay} (default 10000ms) so the
+         * request fails fast and pressure is released back to the caller.
+         */
+        private ResultSet executeWithRetry(Statement statement) {
+            int retries = CassandraSessionPool.this.maxRetries;
+            long interval = CassandraSessionPool.this.retryInterval;
+            long maxDelay = CassandraSessionPool.this.retryMaxDelay;
+            DriverException lastError = null;
+            for (int attempt = 0; attempt <= retries; attempt++) {
+                try {
+                    if (this.session == null || this.session.isClosed()) {
+                        // Lazy reopen: may itself throw NHAE while
+                        // Cassandra is still unreachable; the catch below
+                        // treats that as a transient failure.
+                        this.session = null;
+                        this.open();
+                    }
+                    return this.session.execute(statement);
+                } catch (NoHostAvailableException | OperationTimedOutException 
e) {
+                    lastError = e;
+                    if (e instanceof OperationTimedOutException &&
+                        !Boolean.TRUE.equals(statement.isIdempotent())) {
+                        throw new BackendException(
+                                "Cassandra query timed out and won't be " +
+                                "retried because the statement is not " +
+                                "marked idempotent", e);
+                    }
+                    if (attempt >= retries) {
+                        break;
+                    }
+                    long cap = maxDelay > 0 ? maxDelay : interval;
+                    long shift = 1L << Math.min(attempt, 20);
+                    long delay;
+                    try {
+                        // Guard against Long overflow when retryInterval is 
huge.
+                        delay = Math.min(Math.multiplyExact(interval, shift), 
cap);
+                    } catch (ArithmeticException overflow) {
+                        delay = cap;
+                    }
+                    LOG.warn("Cassandra temporarily unavailable ({}), " +
+                             "retry {}/{} in {} ms",
+                             e.getClass().getSimpleName(), attempt + 1,
+                             retries, delay);
+                    try {
+                        Thread.sleep(delay);
+                    } catch (InterruptedException ie) {
+                        Thread.currentThread().interrupt();
+                        throw new BackendException("Interrupted while " +
+                                                   "waiting to retry " +
+                                                   "Cassandra query", ie);
+                    }
+                }
+            }
+            // Preserve original exception as cause (stack trace + type) by
+            // pre-formatting the message and using the (String, Throwable)
+            // constructor explicitly to avoid ambiguity with varargs 
overloads.
+            String msg = String.format(
+                    "Failed to execute Cassandra query after %s retries: %s",
+                    retries,
+                    lastError == null ? "<null>" : lastError.getMessage());
+            throw new BackendException(msg, lastError);
         }
 
         private void tryOpen() {
@@ -217,6 +368,24 @@ public class CassandraSessionPool extends 
BackendSessionPool {
             }
         }
 
+        private com.datastax.driver.core.Session sessionForAsyncCommit() {
+            if (this.session == null || this.session.isClosed()) {
+                this.session = null;
+                try {
+                    this.open();
+                } catch (DriverException e) {
+                    throw new BackendException(
+                            "Failed to open Cassandra session for async 
commit",
+                            e);
+                }
+            }
+            if (this.session == null) {
+                throw new BackendException(
+                        "Cassandra session is unavailable for async commit");
+            }
+            return this.session;
+        }
+
         @Override
         public void open() {
             this.opened = true;
@@ -255,6 +424,56 @@ public class CassandraSessionPool extends 
BackendSessionPool {
             return this.batch.size() > 0;
         }
 
+        /**
+         * Periodic liveness probe invoked by {@link BackendSessionPool} to
+         * recover thread-local sessions after Cassandra has been restarted.
+         * Reopens the driver session if it was closed and pings the cluster
+         * with a lightweight query. On failure the session is discarded via
+         * {@link #reset()} so the next call to
+         * {@link #executeWithRetry(Statement)} reopens it; any exception
+         * here is swallowed so the caller can still issue the real query.
+         */
+        @Override
+        public void reconnectIfNeeded() {
+            if (!this.opened) {
+                return;
+            }
+            try {
+                if (this.session == null || this.session.isClosed()) {
+                    this.session = null;
+                    this.tryOpen();
+                }
+                if (this.session != null) {
+                    this.session.execute(new 
SimpleStatement(HEALTH_CHECK_CQL));
+                }
+            } catch (NoHostAvailableException | OperationTimedOutException e) {
+                LOG.debug("Cassandra health-check failed, resetting session: 
{}",
+                          e.getMessage());
+                this.reset();
+            }
+        }
+
+        /**
+         * Force-close the driver session so it is re-opened on the next
+         * {@link #opened()} call. Used when a failure is observed and we
+         * want to start fresh on the next attempt.
+         */
+        @Override
+        public void reset() {
+            if (this.session == null) {
+                return;
+            }
+            try {
+                this.session.close();
+            } catch (Exception e) {
+                // Do not swallow Error (OOM / StackOverflow); only log
+                // ordinary exceptions raised by the driver on close.
+                LOG.warn("Failed to reset Cassandra session", e);
+            } finally {
+                this.session = null;
+            }
+        }
+
         public Collection<Statement> statements() {
             return this.batch.getStatements();
         }
diff --git 
a/hugegraph-server/hugegraph-test/src/main/java/org/apache/hugegraph/unit/cassandra/CassandraTest.java
 
b/hugegraph-server/hugegraph-test/src/main/java/org/apache/hugegraph/unit/cassandra/CassandraTest.java
index ef5a8e896..6445fc38b 100644
--- 
a/hugegraph-server/hugegraph-test/src/main/java/org/apache/hugegraph/unit/cassandra/CassandraTest.java
+++ 
b/hugegraph-server/hugegraph-test/src/main/java/org/apache/hugegraph/unit/cassandra/CassandraTest.java
@@ -17,11 +17,15 @@
 
 package org.apache.hugegraph.unit.cassandra;
 
+import java.net.InetSocketAddress;
+import java.util.Collections;
 import java.util.Map;
 
 import org.apache.commons.configuration2.Configuration;
 import org.apache.commons.configuration2.PropertiesConfiguration;
+import org.apache.hugegraph.backend.BackendException;
 import org.apache.hugegraph.backend.store.cassandra.CassandraOptions;
+import org.apache.hugegraph.backend.store.cassandra.CassandraSessionPool;
 import org.apache.hugegraph.backend.store.cassandra.CassandraStore;
 import org.apache.hugegraph.config.HugeConfig;
 import org.apache.hugegraph.config.OptionSpace;
@@ -30,7 +34,14 @@ import org.apache.hugegraph.testutil.Whitebox;
 import org.junit.After;
 import org.junit.Before;
 import org.junit.Test;
-
+import org.mockito.Mockito;
+
+import com.datastax.driver.core.ResultSet;
+import com.datastax.driver.core.ResultSetFuture;
+import com.datastax.driver.core.SimpleStatement;
+import com.datastax.driver.core.Statement;
+import com.datastax.driver.core.exceptions.NoHostAvailableException;
+import com.datastax.driver.core.exceptions.OperationTimedOutException;
 import com.google.common.collect.ImmutableList;
 import com.google.common.collect.ImmutableMap;
 
@@ -192,4 +203,235 @@ public class CassandraTest {
             Whitebox.invokeStatic(CassandraStore.class, "parseReplica", 
config);
         });
     }
+
+    @Test
+    public void testReconnectOptionsHaveSensibleDefaults() {
+        // Runtime-reconnection options must exist with non-zero defaults so
+        // HugeGraph keeps running when Cassandra restarts (issue #2740).
+        Assert.assertEquals(1000L, (long) CassandraOptions
+                .CASSANDRA_RECONNECT_BASE_DELAY.defaultValue());
+        Assert.assertEquals(10_000L, (long) CassandraOptions
+                .CASSANDRA_RECONNECT_MAX_DELAY.defaultValue());
+        Assert.assertEquals(3, (int) CassandraOptions
+                .CASSANDRA_QUERY_RETRY_MAX_ATTEMPTS.defaultValue());
+        Assert.assertEquals(1000L, (long) CassandraOptions
+                .CASSANDRA_QUERY_RETRY_INTERVAL.defaultValue());
+    }
+
+    @Test
+    public void testReconnectOptionsAreOverridable() {
+        String base = CassandraOptions.CASSANDRA_RECONNECT_BASE_DELAY.name();
+        String max = CassandraOptions.CASSANDRA_RECONNECT_MAX_DELAY.name();
+        String retries = CassandraOptions.CASSANDRA_QUERY_RETRY_MAX_ATTEMPTS
+                                         .name();
+        String interval = 
CassandraOptions.CASSANDRA_QUERY_RETRY_INTERVAL.name();
+
+        Configuration conf = new PropertiesConfiguration();
+        conf.setProperty(base, 500L);
+        conf.setProperty(max, 30_000L);
+        conf.setProperty(retries, 3);
+        conf.setProperty(interval, 1000L);
+        HugeConfig config = new HugeConfig(conf);
+
+        Assert.assertEquals(500L, (long) config.get(
+                CassandraOptions.CASSANDRA_RECONNECT_BASE_DELAY));
+        Assert.assertEquals(30_000L, (long) config.get(
+                CassandraOptions.CASSANDRA_RECONNECT_MAX_DELAY));
+        Assert.assertEquals(3, (int) config.get(
+                CassandraOptions.CASSANDRA_QUERY_RETRY_MAX_ATTEMPTS));
+        Assert.assertEquals(1000L, (long) config.get(
+                CassandraOptions.CASSANDRA_QUERY_RETRY_INTERVAL));
+    }
+
+    @Test
+    public void testQueryRetryAttemptsCanBeDisabled() {
+        String retries = CassandraOptions.CASSANDRA_QUERY_RETRY_MAX_ATTEMPTS
+                                         .name();
+        Configuration conf = new PropertiesConfiguration();
+        conf.setProperty(retries, 0);
+        HugeConfig config = new HugeConfig(conf);
+        Assert.assertEquals(0, (int) config.get(
+                CassandraOptions.CASSANDRA_QUERY_RETRY_MAX_ATTEMPTS));
+    }
+
+    @Test
+    public void testExecuteWithRetrySucceedsAfterTransientFailures() {
+        // Configure retry knobs via config so the pool reads them through
+        // the normal path (no Whitebox overrides on retry fields). Keep the
+        // values within the validators' lower bounds (base >= 100, max >=
+        // base, interval >= 100).
+        Configuration conf = new PropertiesConfiguration();
+        conf.setProperty(
+                CassandraOptions.CASSANDRA_RECONNECT_BASE_DELAY.name(), 100L);
+        conf.setProperty(
+                CassandraOptions.CASSANDRA_RECONNECT_MAX_DELAY.name(), 1000L);
+        conf.setProperty(
+                CassandraOptions.CASSANDRA_QUERY_RETRY_MAX_ATTEMPTS.name(), 3);
+        conf.setProperty(
+                CassandraOptions.CASSANDRA_QUERY_RETRY_INTERVAL.name(), 100L);
+        HugeConfig config = new HugeConfig(conf);
+        CassandraSessionPool pool = new CassandraSessionPool(config,
+                                                             "ks", "store");
+
+        com.datastax.driver.core.Session driverSession = Mockito.mock(
+                com.datastax.driver.core.Session.class);
+        ResultSet rs = Mockito.mock(ResultSet.class);
+        NoHostAvailableException transientFailure =
+                new NoHostAvailableException(Collections.emptyMap());
+        Mockito.when(driverSession.execute(Mockito.any(Statement.class)))
+               .thenThrow(transientFailure)
+               .thenThrow(transientFailure)
+               .thenReturn(rs);
+
+        CassandraSessionPool.Session session = pool.new Session();
+        Whitebox.setInternalState(session, "session", driverSession);
+
+        ResultSet result = session.execute("SELECT now() FROM system.local");
+        Assert.assertSame(rs, result);
+        Mockito.verify(driverSession, Mockito.times(3))
+               .execute(Mockito.any(Statement.class));
+        Mockito.verify(driverSession, Mockito.never()).close();
+    }
+
+    @Test
+    public void testExecuteWithRetrySkipsNonIdempotentTimeoutRetry() {
+        Configuration conf = new PropertiesConfiguration();
+        conf.setProperty(
+                CassandraOptions.CASSANDRA_RECONNECT_BASE_DELAY.name(), 100L);
+        conf.setProperty(
+                CassandraOptions.CASSANDRA_RECONNECT_MAX_DELAY.name(), 1000L);
+        conf.setProperty(
+                CassandraOptions.CASSANDRA_QUERY_RETRY_MAX_ATTEMPTS.name(), 3);
+        conf.setProperty(
+                CassandraOptions.CASSANDRA_QUERY_RETRY_INTERVAL.name(), 100L);
+        HugeConfig config = new HugeConfig(conf);
+        CassandraSessionPool pool = new CassandraSessionPool(config,
+                                                             "ks", "store");
+
+        com.datastax.driver.core.Session driverSession = Mockito.mock(
+                com.datastax.driver.core.Session.class);
+        OperationTimedOutException timeout = new OperationTimedOutException(
+                new InetSocketAddress("127.0.0.1", 9042));
+        Mockito.when(driverSession.execute(Mockito.any(Statement.class)))
+               .thenThrow(timeout);
+
+        CassandraSessionPool.Session session = pool.new Session();
+        Whitebox.setInternalState(session, "session", driverSession);
+
+        Assert.assertThrows(BackendException.class, () ->
+                session.execute("UPDATE counter SET value = value + 1"));
+        Mockito.verify(driverSession, Mockito.times(1))
+               .execute(Mockito.any(Statement.class));
+    }
+
+    @Test
+    public void testExecuteWithRetryAllowsIdempotentTimeoutRetry() {
+        Configuration conf = new PropertiesConfiguration();
+        conf.setProperty(
+                CassandraOptions.CASSANDRA_RECONNECT_BASE_DELAY.name(), 100L);
+        conf.setProperty(
+                CassandraOptions.CASSANDRA_RECONNECT_MAX_DELAY.name(), 1000L);
+        conf.setProperty(
+                CassandraOptions.CASSANDRA_QUERY_RETRY_MAX_ATTEMPTS.name(), 3);
+        conf.setProperty(
+                CassandraOptions.CASSANDRA_QUERY_RETRY_INTERVAL.name(), 100L);
+        HugeConfig config = new HugeConfig(conf);
+        CassandraSessionPool pool = new CassandraSessionPool(config,
+                                                             "ks", "store");
+
+        com.datastax.driver.core.Session driverSession = Mockito.mock(
+                com.datastax.driver.core.Session.class);
+        ResultSet rs = Mockito.mock(ResultSet.class);
+        OperationTimedOutException timeout = new OperationTimedOutException(
+                new InetSocketAddress("127.0.0.1", 9042));
+        SimpleStatement statement = new SimpleStatement(
+                "SELECT now() FROM system.local");
+        statement.setIdempotent(true);
+        Mockito.when(driverSession.execute(statement))
+               .thenThrow(timeout)
+               .thenReturn(rs);
+
+        CassandraSessionPool.Session session = pool.new Session();
+        Whitebox.setInternalState(session, "session", driverSession);
+
+        ResultSet result = session.execute(statement);
+        Assert.assertSame(rs, result);
+        Mockito.verify(driverSession, Mockito.times(2)).execute(statement);
+    }
+
+    @Test
+    public void testCommitAsyncOpensSessionBeforeExecuteAsync() {
+        Configuration conf = new PropertiesConfiguration();
+        conf.setProperty(
+                CassandraOptions.CASSANDRA_RECONNECT_BASE_DELAY.name(), 100L);
+        conf.setProperty(
+                CassandraOptions.CASSANDRA_RECONNECT_MAX_DELAY.name(), 1000L);
+        conf.setProperty(
+                CassandraOptions.CASSANDRA_QUERY_RETRY_MAX_ATTEMPTS.name(), 3);
+        conf.setProperty(
+                CassandraOptions.CASSANDRA_QUERY_RETRY_INTERVAL.name(), 100L);
+        HugeConfig config = new HugeConfig(conf);
+        CassandraSessionPool pool = new CassandraSessionPool(config,
+                                                             "ks", "store");
+
+        com.datastax.driver.core.Cluster mockCluster = Mockito.mock(
+                com.datastax.driver.core.Cluster.class);
+        com.datastax.driver.core.Session driverSession = Mockito.mock(
+                com.datastax.driver.core.Session.class);
+        ResultSetFuture future = Mockito.mock(ResultSetFuture.class);
+        Mockito.when(mockCluster.isClosed()).thenReturn(false);
+        Mockito.when(mockCluster.connect(Mockito.anyString()))
+               .thenReturn(driverSession);
+        Mockito.when(driverSession.executeAsync(Mockito.any(Statement.class)))
+               .thenReturn(future);
+        Whitebox.setInternalState(pool, "cluster", mockCluster);
+
+        CassandraSessionPool.Session session = pool.new Session();
+        Statement statement = new SimpleStatement(
+                "INSERT INTO system.local(key) VALUES ('test')");
+        session.add(statement);
+
+        session.commitAsync();
+
+        Mockito.verify(mockCluster, Mockito.times(1)).connect("ks");
+        Mockito.verify(driverSession, 
Mockito.times(1)).executeAsync(statement);
+        Mockito.verify(future, Mockito.times(1)).getUninterruptibly();
+        Assert.assertFalse(session.hasChanges());
+    }
+
+    @Test
+    public void testReconnectBaseDelayBelowMinimumRejected() {
+        // The validator on CASSANDRA_RECONNECT_BASE_DELAY is
+        // rangeInt(100L, Long.MAX_VALUE); values below 100 must be rejected
+        // at parse time. Setting the property as a String forces HugeConfig
+        // to run parseConvert() which invokes the range check.
+        Configuration conf = new PropertiesConfiguration();
+        Assert.assertThrows(Exception.class, () -> {
+            conf.setProperty(
+                    CassandraOptions.CASSANDRA_RECONNECT_BASE_DELAY.name(),
+                    "50");
+            new HugeConfig(conf);
+        });
+    }
+
+    @Test
+    public void testReconnectMaxDelayLessThanBaseRejected() {
+        // Both values must pass their individual range validators with margin
+        // (base >= 100, max >= 1000), so the only thing that can throw is the
+        // E.checkArgument(max >= base) cross-check inside the pool ctor. Set
+        // all four retry/reconnect properties explicitly so the test does not
+        // depend on default values changing in CassandraOptions.
+        Configuration conf = new PropertiesConfiguration();
+        conf.setProperty(
+                CassandraOptions.CASSANDRA_RECONNECT_BASE_DELAY.name(), 
10_000L);
+        conf.setProperty(
+                CassandraOptions.CASSANDRA_RECONNECT_MAX_DELAY.name(), 2_000L);
+        conf.setProperty(
+                CassandraOptions.CASSANDRA_QUERY_RETRY_MAX_ATTEMPTS.name(), 3);
+        conf.setProperty(
+                CassandraOptions.CASSANDRA_QUERY_RETRY_INTERVAL.name(), 
1_000L);
+        HugeConfig config = new HugeConfig(conf);
+        Assert.assertThrows(IllegalArgumentException.class, () ->
+                new CassandraSessionPool(config, "ks", "store"));
+    }
 }

Reply via email to