Repository: ignite Updated Branches: refs/heads/master 988ca3224 -> 8cd9fbe59
IGNTIE-4220: Support statements for JDBC and Cassandra store. This closes #1898. Project: http://git-wip-us.apache.org/repos/asf/ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/8cd9fbe5 Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/8cd9fbe5 Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/8cd9fbe5 Branch: refs/heads/master Commit: 8cd9fbe59050b788ce9cec7f0f2b59bca36e7545 Parents: 988ca32 Author: Andrey V. Mashenkov <andrey.mashen...@gmail.com> Authored: Thu May 4 17:08:29 2017 +0300 Committer: Andrey V. Mashenkov <andrey.mashen...@gmail.com> Committed: Thu May 4 18:20:47 2017 +0300 ---------------------------------------------------------------------- .../store/cassandra/CassandraCacheStore.java | 16 ++++- .../session/LoadCacheCustomQueryWorker.java | 26 +++++-- .../ignite/tests/IgnitePersistentStoreTest.java | 30 ++++++--- .../tests/persistence/blob/ignite-config.xml | 4 +- .../tests/persistence/pojo/ignite-config.xml | 4 +- .../persistence/primitive/ignite-config.xml | 4 +- .../primitive/ignite-remote-server-config.xml | 4 +- .../store/jdbc/CacheAbstractJdbcStore.java | 71 ++++++++++++++------ .../CacheJdbcPojoStoreAbstractSelfTest.java | 49 ++++++++++++++ 9 files changed, 163 insertions(+), 45 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/8cd9fbe5/modules/cassandra/store/src/main/java/org/apache/ignite/cache/store/cassandra/CassandraCacheStore.java ---------------------------------------------------------------------- diff --git a/modules/cassandra/store/src/main/java/org/apache/ignite/cache/store/cassandra/CassandraCacheStore.java b/modules/cassandra/store/src/main/java/org/apache/ignite/cache/store/cassandra/CassandraCacheStore.java index 70d798b..98c8b40 100644 --- a/modules/cassandra/store/src/main/java/org/apache/ignite/cache/store/cassandra/CassandraCacheStore.java +++ b/modules/cassandra/store/src/main/java/org/apache/ignite/cache/store/cassandra/CassandraCacheStore.java @@ -20,6 +20,7 @@ package org.apache.ignite.cache.store.cassandra; import com.datastax.driver.core.BoundStatement; import com.datastax.driver.core.PreparedStatement; import com.datastax.driver.core.Row; +import com.datastax.driver.core.Statement; import java.util.ArrayList; import java.util.Collection; import java.util.LinkedList; @@ -113,10 +114,19 @@ public class CassandraCacheStore<K, V> implements CacheStore<K, V> { CassandraSession ses = getCassandraSession(); for (Object obj : args) { - if (obj == null || !(obj instanceof String) || !((String)obj).trim().toLowerCase().startsWith("select")) - continue; + LoadCacheCustomQueryWorker<K, V> task = null; - futs.add(pool.submit(new LoadCacheCustomQueryWorker<>(ses, (String) obj, controller, log, clo))); + if (obj instanceof Statement) + task = new LoadCacheCustomQueryWorker<>(ses, (Statement)obj, controller, log, clo); + else if (obj instanceof String) { + String qry = ((String)obj).trim(); + + if (qry.toLowerCase().startsWith("select")) + task = new LoadCacheCustomQueryWorker<>(ses, (String) obj, controller, log, clo); + } + + if (task != null) + futs.add(pool.submit(task)); } for (Future<?> fut : futs) http://git-wip-us.apache.org/repos/asf/ignite/blob/8cd9fbe5/modules/cassandra/store/src/main/java/org/apache/ignite/cache/store/cassandra/session/LoadCacheCustomQueryWorker.java ---------------------------------------------------------------------- diff --git a/modules/cassandra/store/src/main/java/org/apache/ignite/cache/store/cassandra/session/LoadCacheCustomQueryWorker.java b/modules/cassandra/store/src/main/java/org/apache/ignite/cache/store/cassandra/session/LoadCacheCustomQueryWorker.java index d3ace7d..d186b98 100644 --- a/modules/cassandra/store/src/main/java/org/apache/ignite/cache/store/cassandra/session/LoadCacheCustomQueryWorker.java +++ b/modules/cassandra/store/src/main/java/org/apache/ignite/cache/store/cassandra/session/LoadCacheCustomQueryWorker.java @@ -36,8 +36,8 @@ public class LoadCacheCustomQueryWorker<K, V> implements Callable<Void> { /** Cassandra session to execute CQL query */ private final CassandraSession ses; - /** User query. */ - private final String qry; + /** Statement. */ + private final Statement stmt; /** Persistence controller */ private final PersistenceController ctrl; @@ -49,12 +49,28 @@ public class LoadCacheCustomQueryWorker<K, V> implements Callable<Void> { private final IgniteBiInClosure<K, V> clo; /** + * @param ses Session. + * @param qry Query. + * @param ctrl Control. + * @param log Logger. * @param clo Closure for loaded values. */ public LoadCacheCustomQueryWorker(CassandraSession ses, String qry, PersistenceController ctrl, - IgniteLogger log, IgniteBiInClosure<K, V> clo) { + IgniteLogger log, IgniteBiInClosure<K, V> clo) { + this(ses, new SimpleStatement(qry.trim().endsWith(";") ? qry : qry + ';'), ctrl, log, clo); + } + + /** + * @param ses Session. + * @param stmt Statement. + * @param ctrl Control. + * @param log Logger. + * @param clo Closure for loaded values. + */ + public LoadCacheCustomQueryWorker(CassandraSession ses, Statement stmt, PersistenceController ctrl, + IgniteLogger log, IgniteBiInClosure<K, V> clo) { this.ses = ses; - this.qry = qry.trim().endsWith(";") ? qry : qry + ";"; + this.stmt = stmt; this.ctrl = ctrl; this.log = log; this.clo = clo; @@ -70,7 +86,7 @@ public class LoadCacheCustomQueryWorker<K, V> implements Callable<Void> { /** {@inheritDoc} */ @Override public Statement getStatement() { - return new SimpleStatement(qry); + return stmt; } /** {@inheritDoc} */ http://git-wip-us.apache.org/repos/asf/ignite/blob/8cd9fbe5/modules/cassandra/store/src/test/java/org/apache/ignite/tests/IgnitePersistentStoreTest.java ---------------------------------------------------------------------- diff --git a/modules/cassandra/store/src/test/java/org/apache/ignite/tests/IgnitePersistentStoreTest.java b/modules/cassandra/store/src/test/java/org/apache/ignite/tests/IgnitePersistentStoreTest.java index 97e7230..c8c7139 100644 --- a/modules/cassandra/store/src/test/java/org/apache/ignite/tests/IgnitePersistentStoreTest.java +++ b/modules/cassandra/store/src/test/java/org/apache/ignite/tests/IgnitePersistentStoreTest.java @@ -17,6 +17,7 @@ package org.apache.ignite.tests; +import com.datastax.driver.core.SimpleStatement; import java.util.Collection; import java.util.Date; import java.util.Map; @@ -43,6 +44,7 @@ import org.apache.log4j.Logger; import org.junit.AfterClass; import org.junit.BeforeClass; import org.junit.Test; +import org.junit.Assert; import org.springframework.core.io.ClassPathResource; /** @@ -429,31 +431,39 @@ public class IgnitePersistentStoreTest { LOGGER.info("Running loadCache test"); try (Ignite ignite = Ignition.start("org/apache/ignite/tests/persistence/pojo/ignite-config.xml")) { - IgniteCache<PersonId, Person> personCache3 = ignite.getOrCreateCache(new CacheConfiguration<PersonId, Person>("cache3")); + CacheConfiguration<PersonId, Person> ccfg = new CacheConfiguration<>("cache3"); + + IgniteCache<PersonId, Person> personCache3 = ignite.getOrCreateCache(ccfg); + int size = personCache3.size(CachePeekMode.ALL); LOGGER.info("Initial cache size " + size); LOGGER.info("Loading cache data from Cassandra table"); - personCache3.loadCache(null, new String[] {"select * from test1.pojo_test3 limit 3"}); + String qry = "select * from test1.pojo_test3 limit 3"; + + personCache3.loadCache(null, qry); size = personCache3.size(CachePeekMode.ALL); - if (size != 3) { - throw new RuntimeException("Cache data was incorrectly loaded from Cassandra. " + - "Expected number of records is 3, but loaded number of records is " + size); - } + Assert.assertEquals("Cache data was incorrectly loaded from Cassandra table by '" + qry + "'", 3, size); + + personCache3.clear(); + + personCache3.loadCache(null, new SimpleStatement(qry)); + + size = personCache3.size(CachePeekMode.ALL); + Assert.assertEquals("Cache data was incorrectly loaded from Cassandra table by statement", 3, size); personCache3.clear(); personCache3.loadCache(null); size = personCache3.size(CachePeekMode.ALL); - if (size != TestsHelper.getBulkOperationSize()) { - throw new RuntimeException("Cache data was incorrectly loaded from Cassandra. " + + Assert.assertEquals("Cache data was incorrectly loaded from Cassandra. " + "Expected number of records is " + TestsHelper.getBulkOperationSize() + - ", but loaded number of records is " + size); - } + ", but loaded number of records is " + size, + TestsHelper.getBulkOperationSize(), size); LOGGER.info("Cache data loaded from Cassandra table"); } http://git-wip-us.apache.org/repos/asf/ignite/blob/8cd9fbe5/modules/cassandra/store/src/test/resources/org/apache/ignite/tests/persistence/blob/ignite-config.xml ---------------------------------------------------------------------- diff --git a/modules/cassandra/store/src/test/resources/org/apache/ignite/tests/persistence/blob/ignite-config.xml b/modules/cassandra/store/src/test/resources/org/apache/ignite/tests/persistence/blob/ignite-config.xml index fbf38e9..db360d5 100644 --- a/modules/cassandra/store/src/test/resources/org/apache/ignite/tests/persistence/blob/ignite-config.xml +++ b/modules/cassandra/store/src/test/resources/org/apache/ignite/tests/persistence/blob/ignite-config.xml @@ -78,8 +78,8 @@ to our documentation: http://apacheignite.readme.io/docs/cluster-config --> <!-- Uncomment static IP finder to enable static-based discovery of initial nodes. --> - <!--<bean class="org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder">--> - <bean class="org.apache.ignite.spi.discovery.tcp.ipfinder.multicast.TcpDiscoveryMulticastIpFinder"> + <bean class="org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder"> + <!--<bean class="org.apache.ignite.spi.discovery.tcp.ipfinder.multicast.TcpDiscoveryMulticastIpFinder">--> <property name="addresses"> <list> <!-- In distributed environment, replace with actual host IP address. --> http://git-wip-us.apache.org/repos/asf/ignite/blob/8cd9fbe5/modules/cassandra/store/src/test/resources/org/apache/ignite/tests/persistence/pojo/ignite-config.xml ---------------------------------------------------------------------- diff --git a/modules/cassandra/store/src/test/resources/org/apache/ignite/tests/persistence/pojo/ignite-config.xml b/modules/cassandra/store/src/test/resources/org/apache/ignite/tests/persistence/pojo/ignite-config.xml index c9b45c8..af4ffef 100644 --- a/modules/cassandra/store/src/test/resources/org/apache/ignite/tests/persistence/pojo/ignite-config.xml +++ b/modules/cassandra/store/src/test/resources/org/apache/ignite/tests/persistence/pojo/ignite-config.xml @@ -153,8 +153,8 @@ to our documentation: http://apacheignite.readme.io/docs/cluster-config --> <!-- Uncomment static IP finder to enable static-based discovery of initial nodes. --> - <!--<bean class="org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder">--> - <bean class="org.apache.ignite.spi.discovery.tcp.ipfinder.multicast.TcpDiscoveryMulticastIpFinder"> + <bean class="org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder"> + <!--<bean class="org.apache.ignite.spi.discovery.tcp.ipfinder.multicast.TcpDiscoveryMulticastIpFinder">--> <property name="addresses"> <list> <!-- In distributed environment, replace with actual host IP address. --> http://git-wip-us.apache.org/repos/asf/ignite/blob/8cd9fbe5/modules/cassandra/store/src/test/resources/org/apache/ignite/tests/persistence/primitive/ignite-config.xml ---------------------------------------------------------------------- diff --git a/modules/cassandra/store/src/test/resources/org/apache/ignite/tests/persistence/primitive/ignite-config.xml b/modules/cassandra/store/src/test/resources/org/apache/ignite/tests/persistence/primitive/ignite-config.xml index 13e0922..a7d101d 100644 --- a/modules/cassandra/store/src/test/resources/org/apache/ignite/tests/persistence/primitive/ignite-config.xml +++ b/modules/cassandra/store/src/test/resources/org/apache/ignite/tests/persistence/primitive/ignite-config.xml @@ -78,8 +78,8 @@ to our documentation: http://apacheignite.readme.io/docs/cluster-config --> <!-- Uncomment static IP finder to enable static-based discovery of initial nodes. --> - <!--<bean class="org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder">--> - <bean class="org.apache.ignite.spi.discovery.tcp.ipfinder.multicast.TcpDiscoveryMulticastIpFinder"> + <bean class="org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder"> + <!--<bean class="org.apache.ignite.spi.discovery.tcp.ipfinder.multicast.TcpDiscoveryMulticastIpFinder">--> <property name="addresses"> <list> <!-- In distributed environment, replace with actual host IP address. --> http://git-wip-us.apache.org/repos/asf/ignite/blob/8cd9fbe5/modules/cassandra/store/src/test/resources/org/apache/ignite/tests/persistence/primitive/ignite-remote-server-config.xml ---------------------------------------------------------------------- diff --git a/modules/cassandra/store/src/test/resources/org/apache/ignite/tests/persistence/primitive/ignite-remote-server-config.xml b/modules/cassandra/store/src/test/resources/org/apache/ignite/tests/persistence/primitive/ignite-remote-server-config.xml index 8d71aec..bbaff8c 100644 --- a/modules/cassandra/store/src/test/resources/org/apache/ignite/tests/persistence/primitive/ignite-remote-server-config.xml +++ b/modules/cassandra/store/src/test/resources/org/apache/ignite/tests/persistence/primitive/ignite-remote-server-config.xml @@ -94,8 +94,8 @@ to our documentation: http://apacheignite.readme.io/docs/cluster-config --> <!-- Uncomment static IP finder to enable static-based discovery of initial nodes. --> - <!--<bean class="org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder">--> - <bean class="org.apache.ignite.spi.discovery.tcp.ipfinder.multicast.TcpDiscoveryMulticastIpFinder"> + <bean class="org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder"> + <!--<bean class="org.apache.ignite.spi.discovery.tcp.ipfinder.multicast.TcpDiscoveryMulticastIpFinder">--> <property name="addresses"> <list> <!-- In distributed environment, replace with actual host IP address. --> http://git-wip-us.apache.org/repos/asf/ignite/blob/8cd9fbe5/modules/core/src/main/java/org/apache/ignite/cache/store/jdbc/CacheAbstractJdbcStore.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/cache/store/jdbc/CacheAbstractJdbcStore.java b/modules/core/src/main/java/org/apache/ignite/cache/store/jdbc/CacheAbstractJdbcStore.java index ba2a98d..46e9022 100644 --- a/modules/core/src/main/java/org/apache/ignite/cache/store/jdbc/CacheAbstractJdbcStore.java +++ b/modules/core/src/main/java/org/apache/ignite/cache/store/jdbc/CacheAbstractJdbcStore.java @@ -702,17 +702,34 @@ public abstract class CacheAbstractJdbcStore<K, V> implements CacheStore<K, V>, } })) throw new CacheLoaderException("Provided key type is not found in store or cache configuration " + - "[cache=" + U.maskName(cacheName) + ", key=" + keyType + "]"); - - String qry = args[i + 1].toString(); + "[cache=" + U.maskName(cacheName) + ", key=" + keyType + ']'); EntryMapping em = entryMapping(cacheName, typeIdForTypeName(kindForName(keyType), keyType)); - if (log.isInfoEnabled()) - log.info("Started load cache using custom query [cache=" + U.maskName(cacheName) + - ", keyType=" + keyType + ", query=" + qry + "]"); + Object arg = args[i + 1]; + + LoadCacheCustomQueryWorker<K, V> task; + + if (arg instanceof PreparedStatement) { + PreparedStatement stmt = (PreparedStatement)arg; + + if (log.isInfoEnabled()) + log.info("Started load cache using custom statement [cache=" + U.maskName(cacheName) + + ", keyType=" + keyType + ", stmt=" + stmt + ']'); + + task = new LoadCacheCustomQueryWorker<>(em, stmt, clo); + } + else { + String qry = arg.toString(); - futs.add(pool.submit(new LoadCacheCustomQueryWorker<>(em, qry, clo))); + if (log.isInfoEnabled()) + log.info("Started load cache using custom query [cache=" + U.maskName(cacheName) + + ", keyType=" + keyType + ", query=" + qry + ']'); + + task = new LoadCacheCustomQueryWorker<>(em, qry, clo); + } + + futs.add(pool.submit(task)); } } else { @@ -727,7 +744,7 @@ public abstract class CacheAbstractJdbcStore<K, V> implements CacheStore<K, V>, processedKeyTypes.add(keyType); if (log.isInfoEnabled()) - log.info("Started load cache [cache=" + U.maskName(cacheName) + ", keyType=" + keyType + "]"); + log.info("Started load cache [cache=" + U.maskName(cacheName) + ", keyType=" + keyType + ']'); if (parallelLoadCacheMinThreshold > 0) { Connection conn = null; @@ -744,7 +761,7 @@ public abstract class CacheAbstractJdbcStore<K, V> implements CacheStore<K, V>, if (rs.next()) { if (log.isDebugEnabled()) log.debug("Multithread loading entries from db [cache=" + U.maskName(cacheName) + - ", keyType=" + keyType + "]"); + ", keyType=" + keyType + ']'); int keyCnt = em.keyCols.size(); @@ -773,7 +790,7 @@ public abstract class CacheAbstractJdbcStore<K, V> implements CacheStore<K, V>, } catch (SQLException e) { log.warning("Failed to load entries from db in multithreaded mode, will try in single thread " + - "[cache=" + U.maskName(cacheName) + ", keyType=" + keyType + " ]", e); + "[cache=" + U.maskName(cacheName) + ", keyType=" + keyType + ']', e); } finally { U.closeQuiet(conn); @@ -782,7 +799,7 @@ public abstract class CacheAbstractJdbcStore<K, V> implements CacheStore<K, V>, if (log.isDebugEnabled()) log.debug("Single thread loading entries from db [cache=" + U.maskName(cacheName) + - ", keyType=" + keyType + "]"); + ", keyType=" + keyType + ']'); futs.add(pool.submit(loadCacheFull(em, clo))); } @@ -809,7 +826,7 @@ public abstract class CacheAbstractJdbcStore<K, V> implements CacheStore<K, V>, EntryMapping em = entryMapping(session().cacheName(), typeIdForObject(key)); if (log.isDebugEnabled()) - log.debug("Load value from db [table= " + em.fullTableName() + ", key=" + key + "]"); + log.debug("Load value from db [table= " + em.fullTableName() + ", key=" + key + ']'); Connection conn = null; @@ -1910,14 +1927,28 @@ public abstract class CacheAbstractJdbcStore<K, V> implements CacheStore<K, V>, /** Entry mapping description. */ private final EntryMapping em; + /** User statement. */ + private PreparedStatement stmt; + /** User query. */ - private final String qry; + private String qry; /** Closure for loaded values. */ private final IgniteBiInClosure<K1, V1> clo; /** * @param em Entry mapping description. + * @param stmt User statement. + * @param clo Closure for loaded values. + */ + private LoadCacheCustomQueryWorker(EntryMapping em, PreparedStatement stmt, IgniteBiInClosure<K1, V1> clo) { + this.em = em; + this.stmt = stmt; + this.clo = clo; + } + + /** + * @param em Entry mapping description. * @param qry User query. * @param clo Closure for loaded values. */ @@ -1931,12 +1962,12 @@ public abstract class CacheAbstractJdbcStore<K, V> implements CacheStore<K, V>, @Override public Void call() throws Exception { Connection conn = null; - PreparedStatement stmt = null; - try { - conn = openConnection(true); + if (stmt == null) { + conn = openConnection(true); - stmt = conn.prepareStatement(qry); + stmt = conn.prepareStatement(qry); + } stmt.setFetchSize(dialect.getFetchSize()); @@ -1962,9 +1993,11 @@ public abstract class CacheAbstractJdbcStore<K, V> implements CacheStore<K, V>, throw new CacheLoaderException("Failed to execute custom query for load cache", e); } finally { - U.closeQuiet(stmt); + if (conn != null) { + U.closeQuiet(stmt); - U.closeQuiet(conn); + U.closeQuiet(conn); + } } } } http://git-wip-us.apache.org/repos/asf/ignite/blob/8cd9fbe5/modules/core/src/test/java/org/apache/ignite/cache/store/jdbc/CacheJdbcPojoStoreAbstractSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/cache/store/jdbc/CacheJdbcPojoStoreAbstractSelfTest.java b/modules/core/src/test/java/org/apache/ignite/cache/store/jdbc/CacheJdbcPojoStoreAbstractSelfTest.java index 8544f71..703cbe1 100644 --- a/modules/core/src/test/java/org/apache/ignite/cache/store/jdbc/CacheJdbcPojoStoreAbstractSelfTest.java +++ b/modules/core/src/test/java/org/apache/ignite/cache/store/jdbc/CacheJdbcPojoStoreAbstractSelfTest.java @@ -337,6 +337,55 @@ public abstract class CacheJdbcPojoStoreAbstractSelfTest extends GridCommonAbstr } /** + * Checks that data was loaded correctly with prepared statement. + */ + protected void checkCacheLoadWithStatement() throws SQLException { + Connection conn = null; + + PreparedStatement stmt = null; + + try { + conn = getConnection(); + + conn.setAutoCommit(true); + + String qry = "select id, org_id, name, birthday, gender from Person"; + + stmt = conn.prepareStatement(qry); + + IgniteCache<Object, Object> c1 = grid().cache(CACHE_NAME); + + c1.loadCache(null, "org.apache.ignite.cache.store.jdbc.model.PersonKey", stmt); + + assertEquals(PERSON_CNT, c1.size()); + } + finally { + U.closeQuiet(stmt); + + U.closeQuiet(conn); + } + + } + + /** + * @throws Exception If failed. + */ + public void testLoadCacheWithStatement() throws Exception { + startTestGrid(false, false, false, false, 512); + + checkCacheLoadWithStatement(); + } + + /** + * @throws Exception If failed. + */ + public void testLoadCacheWithStatementTx() throws Exception { + startTestGrid(false, false, false, true, 512); + + checkCacheLoadWithStatement(); + } + + /** * @throws Exception If failed. */ public void testLoadCache() throws Exception {