IGNITE-4954 - Configurable expiration timeout for Cassandra session. This closes #1785.
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/735ce60d Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/735ce60d Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/735ce60d Branch: refs/heads/ignite-2.0 Commit: 735ce60da02ebadc43aaa29cc97d331b8056df36 Parents: 36a6cd0 Author: Valentin Kulichenko <valentin.luliche...@gmail.com> Authored: Thu Apr 13 11:29:30 2017 +0300 Committer: Andrey V. Mashenkov <andrey.mashen...@gmail.com> Committed: Tue Apr 18 16:59:50 2017 +0300 ---------------------------------------------------------------------- .../store/cassandra/datasource/DataSource.java | 50 ++++++++++++++------ .../cassandra/session/CassandraSessionImpl.java | 23 +++++---- .../cassandra/session/pool/SessionPool.java | 6 +-- .../cassandra/session/pool/SessionWrapper.java | 15 +++--- 4 files changed, 62 insertions(+), 32 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/735ce60d/modules/cassandra/store/src/main/java/org/apache/ignite/cache/store/cassandra/datasource/DataSource.java ---------------------------------------------------------------------- diff --git a/modules/cassandra/store/src/main/java/org/apache/ignite/cache/store/cassandra/datasource/DataSource.java b/modules/cassandra/store/src/main/java/org/apache/ignite/cache/store/cassandra/datasource/DataSource.java index 1ba3c7d..754d902 100644 --- a/modules/cassandra/store/src/main/java/org/apache/ignite/cache/store/cassandra/datasource/DataSource.java +++ b/modules/cassandra/store/src/main/java/org/apache/ignite/cache/store/cassandra/datasource/DataSource.java @@ -17,6 +17,16 @@ package org.apache.ignite.cache.store.cassandra.datasource; +import java.io.Externalizable; +import java.io.IOException; +import java.io.ObjectInput; +import java.io.ObjectOutput; +import java.io.Serializable; +import java.net.InetAddress; +import java.net.InetSocketAddress; +import java.util.LinkedList; +import java.util.List; +import java.util.UUID; import com.datastax.driver.core.AuthProvider; import com.datastax.driver.core.Cluster; import com.datastax.driver.core.ConsistencyLevel; @@ -31,25 +41,13 @@ import com.datastax.driver.core.policies.LoadBalancingPolicy; import com.datastax.driver.core.policies.ReconnectionPolicy; import com.datastax.driver.core.policies.RetryPolicy; import com.datastax.driver.core.policies.SpeculativeExecutionPolicy; - -import java.io.Externalizable; -import java.io.IOException; -import java.io.ObjectInput; -import java.io.ObjectOutput; -import java.io.Serializable; -import java.net.InetAddress; -import java.net.InetSocketAddress; -import java.util.LinkedList; -import java.util.List; -import java.util.UUID; - import org.apache.ignite.IgniteException; import org.apache.ignite.IgniteLogger; import org.apache.ignite.cache.store.cassandra.session.CassandraSession; import org.apache.ignite.cache.store.cassandra.session.CassandraSessionImpl; -import org.apache.ignite.internal.util.typedef.internal.U; import org.apache.ignite.internal.util.tostring.GridToStringExclude; import org.apache.ignite.internal.util.typedef.internal.S; +import org.apache.ignite.internal.util.typedef.internal.U; /** * Data source abstraction to specify configuration of the Cassandra session to be used. @@ -64,6 +62,9 @@ public class DataSource implements Externalizable { */ private static final UUID NULL_OBJECT = UUID.fromString("45ffae47-3193-5910-84a2-048fe65735d9"); + /** Default expiration timeout for Cassandra driver session. */ + public static final long DFLT_SESSION_EXPIRATION_TIMEOUT = 300000; // 5 minutes. + /** Number of rows to immediately fetch in CQL statement execution. */ private Integer fetchSize; @@ -141,6 +142,9 @@ public class DataSource implements Externalizable { /** Netty options to use for connection. */ private NettyOptions nettyOptions; + /** Expiration timeout for Cassandra driver session. */ + private long sessionExpirationTimeout = DFLT_SESSION_EXPIRATION_TIMEOUT; + /** Cassandra session wrapper instance. */ private volatile CassandraSession ses; @@ -460,6 +464,23 @@ public class DataSource implements Externalizable { } /** + * Sets expiration timeout for Cassandra driver session. Idle sessions that are not + * used during this timeout value will be automatically closed and recreated later + * on demand. + * <p> + * If set to {@code 0}, timeout is disabled. + * <p> + * Default value is {@link #DFLT_SESSION_EXPIRATION_TIMEOUT}. + * + * @param sessionExpirationTimeout Expiration timeout for Cassandra driver session. + */ + public void setSessionExpirationTimeout(long sessionExpirationTimeout) { + this.sessionExpirationTimeout = sessionExpirationTimeout; + + invalidate(); + } + + /** * Creates Cassandra session wrapper if it wasn't created yet and returns it * * @param log logger @@ -541,7 +562,8 @@ public class DataSource implements Externalizable { if (nettyOptions != null) builder = builder.withNettyOptions(nettyOptions); - return ses = new CassandraSessionImpl(builder, fetchSize, readConsistency, writeConsistency, log); + return ses = new CassandraSessionImpl( + builder, fetchSize, readConsistency, writeConsistency, sessionExpirationTimeout, log); } /** {@inheritDoc} */ http://git-wip-us.apache.org/repos/asf/ignite/blob/735ce60d/modules/cassandra/store/src/main/java/org/apache/ignite/cache/store/cassandra/session/CassandraSessionImpl.java ---------------------------------------------------------------------- diff --git a/modules/cassandra/store/src/main/java/org/apache/ignite/cache/store/cassandra/session/CassandraSessionImpl.java b/modules/cassandra/store/src/main/java/org/apache/ignite/cache/store/cassandra/session/CassandraSessionImpl.java index ac11686..19b88c9 100644 --- a/modules/cassandra/store/src/main/java/org/apache/ignite/cache/store/cassandra/session/CassandraSessionImpl.java +++ b/modules/cassandra/store/src/main/java/org/apache/ignite/cache/store/cassandra/session/CassandraSessionImpl.java @@ -17,6 +17,13 @@ package org.apache.ignite.cache.store.cassandra.session; +import java.io.IOException; +import java.util.HashMap; +import java.util.LinkedList; +import java.util.List; +import java.util.Map; +import java.util.concurrent.atomic.AtomicInteger; +import javax.cache.Cache; import com.datastax.driver.core.BatchStatement; import com.datastax.driver.core.BoundStatement; import com.datastax.driver.core.Cluster; @@ -30,13 +37,6 @@ import com.datastax.driver.core.Statement; import com.datastax.driver.core.exceptions.AlreadyExistsException; import com.datastax.driver.core.exceptions.InvalidQueryException; import com.datastax.driver.core.querybuilder.Batch; -import java.io.IOException; -import java.util.HashMap; -import java.util.LinkedList; -import java.util.List; -import java.util.Map; -import java.util.concurrent.atomic.AtomicInteger; -import javax.cache.Cache; import org.apache.ignite.IgniteException; import org.apache.ignite.IgniteLogger; import org.apache.ignite.cache.store.cassandra.common.CassandraHelper; @@ -83,6 +83,9 @@ public class CassandraSessionImpl implements CassandraSession { /** Consistency level for Cassandra WRITE operations (insert/update/delete). */ private ConsistencyLevel writeConsistency; + /** Expiration timeout. */ + private long expirationTimeout; + /** Logger. */ private IgniteLogger log; @@ -102,11 +105,12 @@ public class CassandraSessionImpl implements CassandraSession { * @param log Logger. */ public CassandraSessionImpl(Cluster.Builder builder, Integer fetchSize, ConsistencyLevel readConsistency, - ConsistencyLevel writeConsistency, IgniteLogger log) { + ConsistencyLevel writeConsistency, long expirationTimeout, IgniteLogger log) { this.builder = builder; this.fetchSize = fetchSize; this.readConsistency = readConsistency; this.writeConsistency = writeConsistency; + this.expirationTimeout = expirationTimeout; this.log = log; } @@ -504,7 +508,8 @@ public class CassandraSessionImpl implements CassandraSession { /** {@inheritDoc} */ @Override public synchronized void close() throws IOException { if (decrementSessionRefs() == 0 && ses != null) { - SessionPool.put(this, ses); + SessionPool.put(this, ses, expirationTimeout); + ses = null; } } http://git-wip-us.apache.org/repos/asf/ignite/blob/735ce60d/modules/cassandra/store/src/main/java/org/apache/ignite/cache/store/cassandra/session/pool/SessionPool.java ---------------------------------------------------------------------- diff --git a/modules/cassandra/store/src/main/java/org/apache/ignite/cache/store/cassandra/session/pool/SessionPool.java b/modules/cassandra/store/src/main/java/org/apache/ignite/cache/store/cassandra/session/pool/SessionPool.java index 95938bd..4de8516 100644 --- a/modules/cassandra/store/src/main/java/org/apache/ignite/cache/store/cassandra/session/pool/SessionPool.java +++ b/modules/cassandra/store/src/main/java/org/apache/ignite/cache/store/cassandra/session/pool/SessionPool.java @@ -17,13 +17,13 @@ package org.apache.ignite.cache.store.cassandra.session.pool; -import com.datastax.driver.core.Session; import java.lang.Thread.State; import java.util.Collection; import java.util.HashMap; import java.util.LinkedList; import java.util.List; import java.util.Map; +import com.datastax.driver.core.Session; import org.apache.ignite.cache.store.cassandra.session.CassandraSessionImpl; /** @@ -98,14 +98,14 @@ public class SessionPool { * @param cassandraSes Session wrapper. * @param driverSes Driver session. */ - public static void put(CassandraSessionImpl cassandraSes, Session driverSes) { + public static void put(CassandraSessionImpl cassandraSes, Session driverSes, long expirationTimeout) { if (cassandraSes == null || driverSes == null) return; SessionWrapper old; synchronized (sessions) { - old = sessions.put(cassandraSes, new SessionWrapper(driverSes)); + old = sessions.put(cassandraSes, new SessionWrapper(driverSes, expirationTimeout)); if (monitorSingleton == null || State.TERMINATED.equals(monitorSingleton.getState())) { monitorSingleton = new SessionMonitor(); http://git-wip-us.apache.org/repos/asf/ignite/blob/735ce60d/modules/cassandra/store/src/main/java/org/apache/ignite/cache/store/cassandra/session/pool/SessionWrapper.java ---------------------------------------------------------------------- diff --git a/modules/cassandra/store/src/main/java/org/apache/ignite/cache/store/cassandra/session/pool/SessionWrapper.java b/modules/cassandra/store/src/main/java/org/apache/ignite/cache/store/cassandra/session/pool/SessionWrapper.java index 7c5722b..68b9dd4 100644 --- a/modules/cassandra/store/src/main/java/org/apache/ignite/cache/store/cassandra/session/pool/SessionWrapper.java +++ b/modules/cassandra/store/src/main/java/org/apache/ignite/cache/store/cassandra/session/pool/SessionWrapper.java @@ -24,12 +24,12 @@ import org.apache.ignite.cache.store.cassandra.common.CassandraHelper; * Wrapper for Cassandra driver session, responsible for monitoring session expiration and its closing. */ public class SessionWrapper { - /** Expiration timeout for Cassandra driver session. */ - public static final long DFLT_EXPIRATION_TIMEOUT = 300000; // 5 minutes. - /** Cassandra driver session. */ private Session ses; + /** Expiration timeout. */ + private long expirationTimeout; + /** Wrapper creation time. */ private long time; @@ -38,9 +38,11 @@ public class SessionWrapper { * * @param ses Cassandra driver session. */ - public SessionWrapper(Session ses) { + public SessionWrapper(Session ses, long expirationTimeout) { this.ses = ses; - this.time = System.currentTimeMillis(); + this.expirationTimeout = expirationTimeout; + + time = System.currentTimeMillis(); } /** @@ -49,7 +51,7 @@ public class SessionWrapper { * @return true if session expired. */ public boolean expired() { - return System.currentTimeMillis() - time > DFLT_EXPIRATION_TIMEOUT; + return expirationTimeout > 0 && System.currentTimeMillis() - time > expirationTimeout; } /** @@ -66,6 +68,7 @@ public class SessionWrapper { */ public void release() { CassandraHelper.closeSession(ses); + ses = null; } }