http://git-wip-us.apache.org/repos/asf/ignite/blob/231ead01/modules/cassandra/store/pom.xml ---------------------------------------------------------------------- diff --git a/modules/cassandra/store/pom.xml b/modules/cassandra/store/pom.xml new file mode 100644 index 0000000..0b233fa --- /dev/null +++ b/modules/cassandra/store/pom.xml @@ -0,0 +1,305 @@ +<?xml version="1.0" encoding="UTF-8"?> + +<!-- + Licensed to the Apache Software Foundation (ASF) under one or more + contributor license agreements. See the NOTICE file distributed with + this work for additional information regarding copyright ownership. + The ASF licenses this file to You under the Apache License, Version 2.0 + (the "License"); you may not use this file except in compliance with + the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. +--> + +<!-- + POM file. +--> +<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> + <modelVersion>4.0.0</modelVersion> + + <parent> + <groupId>org.apache.ignite</groupId> + <artifactId>ignite-cassandra</artifactId> + <version>1.8.0-SNAPSHOT</version> + <relativePath>..</relativePath> + </parent> + + <artifactId>ignite-cassandra-store</artifactId> + <version>1.8.0-SNAPSHOT</version> + <url>http://ignite.apache.org</url> + + <properties> + <commons-beanutils.version>1.8.3</commons-beanutils.version> + <cassandra-driver.version>3.0.0</cassandra-driver.version> + <cassandra-all.version>3.3</cassandra-all.version> + <netty.version>4.0.33.Final</netty.version> + <guava.version>19.0</guava.version> + <metrics-core.version>3.0.2</metrics-core.version> + </properties> + + <dependencies> + <!-- Apache commons --> + <dependency> + <groupId>commons-beanutils</groupId> + <artifactId>commons-beanutils</artifactId> + <version>${commons-beanutils.version}</version> + </dependency> + + <!-- Ignite --> + <dependency> + <groupId>org.apache.ignite</groupId> + <artifactId>ignite-core</artifactId> + <version>${project.version}</version> + </dependency> + + <dependency> + <groupId>org.apache.ignite</groupId> + <artifactId>ignite-spring</artifactId> + <version>${project.version}</version> + </dependency> + + <dependency> + <groupId>org.apache.ignite</groupId> + <artifactId>ignite-log4j</artifactId> + <version>${project.version}</version> + <scope>test</scope> + </dependency> + + <!-- Cassandra and required dependencies --> + <dependency> + <groupId>com.datastax.cassandra</groupId> + <artifactId>cassandra-driver-core</artifactId> + <version>${cassandra-driver.version}</version> + </dependency> + + <dependency> + <groupId>io.netty</groupId> + <artifactId>netty-handler</artifactId> + <version>${netty.version}</version> + </dependency> + + <dependency> + <groupId>io.netty</groupId> + <artifactId>netty-buffer</artifactId> + <version>${netty.version}</version> + </dependency> + + <dependency> + <groupId>io.netty</groupId> + <artifactId>netty-common</artifactId> + <version>${netty.version}</version> + </dependency> + + <dependency> + <groupId>io.netty</groupId> + <artifactId>netty-transport</artifactId> + <version>${netty.version}</version> + </dependency> + + <dependency> + <groupId>io.netty</groupId> + <artifactId>netty-codec</artifactId> + <version>${netty.version}</version> + </dependency> + + <dependency> + <groupId>com.google.guava</groupId> + <artifactId>guava</artifactId> + <version>${guava.version}</version> + </dependency> + + <dependency> + <groupId>com.codahale.metrics</groupId> + <artifactId>metrics-core</artifactId> + <version>${metrics-core.version}</version> + </dependency> + + <dependency> + <groupId>org.apache.cassandra</groupId> + <artifactId>cassandra-all</artifactId> + <version>${cassandra-all.version}</version> + <scope>test</scope> + <exclusions> + <exclusion> + <artifactId>log4j-over-slf4j</artifactId> + <groupId>org.slf4j</groupId> + </exclusion> + </exclusions> + </dependency> + + <!-- Apache log4j --> + <dependency> + <groupId>log4j</groupId> + <artifactId>log4j</artifactId> + <scope>test</scope> + </dependency> + + </dependencies> + + <build> + <plugins> + <plugin> + <groupId>org.apache.maven.plugins</groupId> + <artifactId>maven-compiler-plugin</artifactId> + <version>3.2</version> + <configuration> + <source>1.7</source> + <target>1.7</target> + <compilerVersion>1.7</compilerVersion> + <encoding>UTF-8</encoding> + <fork>true</fork> + <debug>false</debug> + <debuglevel>lines,vars,source</debuglevel> + <meminitial>256</meminitial> + <maxmem>512</maxmem> + </configuration> + </plugin> + + <plugin> + <groupId>org.apache.maven.plugins</groupId> + <artifactId>maven-dependency-plugin</artifactId> + <version>2.10</version> + <executions> + <execution> + <id>copy-all-dependencies</id> + <phase>package</phase> + <goals> + <goal>copy-dependencies</goal> + </goals> + <configuration> + <outputDirectory>${project.build.directory}/tests-package/lib</outputDirectory> + <overWriteReleases>false</overWriteReleases> + <overWriteSnapshots>false</overWriteSnapshots> + <overWriteIfNewer>true</overWriteIfNewer> + <excludeArtifactIds> + netty-all,cassandra-all,snappy-java,lz4,compress-lzf,commons-codec,commons-lang3,commons-math3, + concurrentlinkedhashmap-lru,antlr,ST4,antlr-runtime,jcl-over-slf4j,jackson-core-asl, + jackson-mapper-asl,json-simple,high-scale-lib,snakeyaml,jbcrypt,reporter-config3, + reporter-config-base,hibernate-validator,validation-api,jboss-logging,thrift-server, + disruptor,stream,fastutil,logback-core,logback-classic,libthrift,httpclient,httpcore, + cassandra-thrift,jna,jamm,joda-time,sigar,ecj,tools + </excludeArtifactIds> + </configuration> + </execution> +<!-- --> + <execution> + <id>copy-main-dependencies</id> + <phase>package</phase> + <goals> + <goal>copy-dependencies</goal> + </goals> + <configuration> + <outputDirectory>${project.build.directory}/libs</outputDirectory> + <overWriteReleases>false</overWriteReleases> + <overWriteSnapshots>false</overWriteSnapshots> + <overWriteIfNewer>true</overWriteIfNewer> + <excludeTransitive>true</excludeTransitive> + <excludeGroupIds> + org.apache.ignite,org.springframework,org.gridgain + </excludeGroupIds> + <excludeArtifactIds> + commons-logging,slf4j-api,cache-api,slf4j-api,aopalliance + </excludeArtifactIds> + <includeScope>runtime</includeScope> + </configuration> + </execution> +<!-- --> + </executions> + </plugin> + + <plugin> + <groupId>org.apache.maven.plugins</groupId> + <artifactId>maven-antrun-plugin</artifactId> + <version>1.8</version> + <dependencies> + <dependency> + <groupId>ant-contrib</groupId> + <artifactId>ant-contrib</artifactId> + <version>1.0b3</version> + <exclusions> + <exclusion> + <groupId>ant</groupId> + <artifactId>ant</artifactId> + </exclusion> + </exclusions> + </dependency> + </dependencies> + <executions> + <execution> + <id>package-tests</id> + <phase>package</phase> + <goals> + <goal>run</goal> + </goals> + <configuration> + <target> + <taskdef resource="net/sf/antcontrib/antlib.xml" /> + <if> + <available file="${project.build.directory}/test-classes" type="dir" /> + <then> + <copy todir="${project.build.directory}/tests-package/lib"> + <fileset dir="${project.build.directory}"> + <include name="*.jar" /> + </fileset> + </copy> + + <jar destfile="${project.build.directory}/tests-package/lib/${project.artifactId}-${project.version}-tests.jar"> + <fileset dir="${project.build.directory}/test-classes"> + <include name="**/*.class" /> + </fileset> + </jar> + + <copy todir="${project.build.directory}/tests-package/settings"> + <fileset dir="${project.build.directory}/test-classes"> + <include name="**/*.properties" /> + <include name="**/*.xml" /> + </fileset> + </copy> + + <copy todir="${project.build.directory}/tests-package"> + <fileset dir="${project.build.testSourceDirectory}/../scripts"> + <include name="**/*" /> + </fileset> + </copy> + + <fixcrlf srcdir="${project.build.directory}/tests-package" eol="lf" eof="remove"> + <include name="*.sh" /> + </fixcrlf> + + <copy todir="${project.build.directory}/tests-package"> + <fileset dir="${project.build.testSourceDirectory}/.."> + <include name="bootstrap/**" /> + </fileset> + </copy> + + <fixcrlf srcdir="${project.build.directory}/tests-package/bootstrap" eol="lf" eof="remove"> + <include name="**" /> + </fixcrlf> + + <zip destfile="${project.build.directory}/ignite-cassandra-tests-${project.version}.zip" compress="true" whenempty="create" level="9" encoding="UTF-8" useLanguageEncodingFlag="true" createUnicodeExtraFields="not-encodeable"> + + <zipfileset dir="${project.build.directory}/tests-package" prefix="ignite-cassandra-tests"> + <exclude name="**/*.sh" /> + </zipfileset> + + <zipfileset dir="${project.build.directory}/tests-package" prefix="ignite-cassandra-tests" filemode="555"> + <include name="**/*.sh" /> + </zipfileset> + </zip> + </then> + </if> + </target> + </configuration> + </execution> + </executions> + </plugin> + + </plugins> + </build> +</project>
http://git-wip-us.apache.org/repos/asf/ignite/blob/231ead01/modules/cassandra/store/src/main/java/org/apache/ignite/cache/store/cassandra/CassandraCacheStore.java ---------------------------------------------------------------------- diff --git a/modules/cassandra/store/src/main/java/org/apache/ignite/cache/store/cassandra/CassandraCacheStore.java b/modules/cassandra/store/src/main/java/org/apache/ignite/cache/store/cassandra/CassandraCacheStore.java new file mode 100644 index 0000000..f7e7917 --- /dev/null +++ b/modules/cassandra/store/src/main/java/org/apache/ignite/cache/store/cassandra/CassandraCacheStore.java @@ -0,0 +1,409 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.cache.store.cassandra; + +import com.datastax.driver.core.BoundStatement; +import com.datastax.driver.core.PreparedStatement; +import com.datastax.driver.core.Row; +import java.util.ArrayList; +import java.util.Collection; +import java.util.HashMap; +import java.util.Map; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.Future; +import javax.cache.Cache; +import javax.cache.integration.CacheLoaderException; +import javax.cache.integration.CacheWriterException; +import org.apache.ignite.IgniteCheckedException; +import org.apache.ignite.IgniteLogger; +import org.apache.ignite.cache.store.CacheStore; +import org.apache.ignite.cache.store.CacheStoreSession; +import org.apache.ignite.cache.store.cassandra.datasource.DataSource; +import org.apache.ignite.cache.store.cassandra.persistence.KeyValuePersistenceSettings; +import org.apache.ignite.cache.store.cassandra.persistence.PersistenceController; +import org.apache.ignite.cache.store.cassandra.session.CassandraSession; +import org.apache.ignite.cache.store.cassandra.session.ExecutionAssistant; +import org.apache.ignite.cache.store.cassandra.session.GenericBatchExecutionAssistant; +import org.apache.ignite.cache.store.cassandra.session.LoadCacheCustomQueryWorker; +import org.apache.ignite.internal.util.typedef.internal.U; +import org.apache.ignite.lang.IgniteBiInClosure; +import org.apache.ignite.logger.NullLogger; +import org.apache.ignite.resources.CacheStoreSessionResource; +import org.apache.ignite.resources.LoggerResource; + +/** + * Implementation of {@link CacheStore} backed by Cassandra database. + * + * @param <K> Ignite cache key type. + * @param <V> Ignite cache value type. + */ +public class CassandraCacheStore<K, V> implements CacheStore<K, V> { + /** Connection attribute property name. */ + private static final String ATTR_CONN_PROP = "CASSANDRA_STORE_CONNECTION"; + + /** Auto-injected store session. */ + @CacheStoreSessionResource + private CacheStoreSession storeSes; + + /** Auto-injected logger instance. */ + @LoggerResource + private IgniteLogger log; + + /** Cassandra data source. */ + private DataSource dataSrc; + + /** Max workers thread count. These threads are responsible for load cache. */ + private int maxPoolSize = Runtime.getRuntime().availableProcessors(); + + /** Controller component responsible for serialization logic. */ + private PersistenceController controller; + + /** + * Store constructor. + * + * @param dataSrc Data source. + * @param settings Persistence settings for Ignite key and value objects. + * @param maxPoolSize Max workers thread count. + */ + public CassandraCacheStore(DataSource dataSrc, KeyValuePersistenceSettings settings, int maxPoolSize) { + this.dataSrc = dataSrc; + this.controller = new PersistenceController(settings); + this.maxPoolSize = maxPoolSize; + } + + /** {@inheritDoc} */ + @Override public void loadCache(IgniteBiInClosure<K, V> clo, Object... args) throws CacheLoaderException { + if (clo == null || args == null || args.length == 0) + return; + + ExecutorService pool = null; + + Collection<Future<?>> futs = new ArrayList<>(args.length); + + try { + pool = Executors.newFixedThreadPool(maxPoolSize); + + CassandraSession ses = getCassandraSession(); + + for (Object obj : args) { + if (obj == null || !(obj instanceof String) || !((String)obj).trim().toLowerCase().startsWith("select")) + continue; + + futs.add(pool.submit(new LoadCacheCustomQueryWorker<>(ses, (String) obj, controller, log, clo))); + } + + for (Future<?> fut : futs) + U.get(fut); + + if (log != null && log.isDebugEnabled() && storeSes != null) + log.debug("Cache loaded from db: " + storeSes.cacheName()); + } + catch (IgniteCheckedException e) { + if (storeSes != null) + throw new CacheLoaderException("Failed to load Ignite cache: " + storeSes.cacheName(), e.getCause()); + else + throw new CacheLoaderException("Failed to load cache", e.getCause()); + } + finally { + U.shutdownNow(getClass(), pool, log); + } + } + + /** {@inheritDoc} */ + @Override public void sessionEnd(boolean commit) throws CacheWriterException { + if (storeSes == null || storeSes.transaction() == null) + return; + + CassandraSession cassandraSes = (CassandraSession) storeSes.properties().remove(ATTR_CONN_PROP); + + U.closeQuiet(cassandraSes); + } + + /** {@inheritDoc} */ + @SuppressWarnings({"unchecked"}) + @Override public V load(final K key) throws CacheLoaderException { + if (key == null) + return null; + + CassandraSession ses = getCassandraSession(); + + try { + return ses.execute(new ExecutionAssistant<V>() { + @Override public boolean tableExistenceRequired() { + return false; + } + + @Override public String getStatement() { + return controller.getLoadStatement(false); + } + + @Override public BoundStatement bindStatement(PreparedStatement statement) { + return controller.bindKey(statement, key); + } + + @Override public KeyValuePersistenceSettings getPersistenceSettings() { + return controller.getPersistenceSettings(); + } + + @Override public String operationName() { + return "READ"; + } + + @Override public V process(Row row) { + return row == null ? null : (V)controller.buildValueObject(row); + } + }); + } + finally { + closeCassandraSession(ses); + } + } + + /** {@inheritDoc} */ + @SuppressWarnings("unchecked") + @Override public Map<K, V> loadAll(Iterable<? extends K> keys) throws CacheLoaderException { + if (keys == null || !keys.iterator().hasNext()) + return new HashMap<>(); + + CassandraSession ses = getCassandraSession(); + + try { + return ses.execute(new GenericBatchExecutionAssistant<Map<K, V>, K>() { + private Map<K, V> data = new HashMap<>(); + + /** {@inheritDoc} */ + @Override public String getStatement() { + return controller.getLoadStatement(true); + } + + /** {@inheritDoc} */ + @Override public BoundStatement bindStatement(PreparedStatement statement, K key) { + return controller.bindKey(statement, key); + } + + /** {@inheritDoc} */ + @Override public KeyValuePersistenceSettings getPersistenceSettings() { + return controller.getPersistenceSettings(); + } + + /** {@inheritDoc} */ + @Override public String operationName() { + return "BULK_READ"; + } + + /** {@inheritDoc} */ + @Override public Map<K, V> processedData() { + return data; + } + + /** {@inheritDoc} */ + @Override protected void process(Row row) { + data.put((K)controller.buildKeyObject(row), (V)controller.buildValueObject(row)); + } + }, keys); + } + finally { + closeCassandraSession(ses); + } + } + + /** {@inheritDoc} */ + @Override public void write(final Cache.Entry<? extends K, ? extends V> entry) throws CacheWriterException { + if (entry == null || entry.getKey() == null) + return; + + CassandraSession ses = getCassandraSession(); + + try { + ses.execute(new ExecutionAssistant<Void>() { + @Override public boolean tableExistenceRequired() { + return true; + } + + @Override public String getStatement() { + return controller.getWriteStatement(); + } + + @Override public BoundStatement bindStatement(PreparedStatement statement) { + return controller.bindKeyValue(statement, entry.getKey(), entry.getValue()); + } + + @Override public KeyValuePersistenceSettings getPersistenceSettings() { + return controller.getPersistenceSettings(); + } + + @Override public String operationName() { + return "WRITE"; + } + + @Override public Void process(Row row) { + return null; + } + }); + } + finally { + closeCassandraSession(ses); + } + } + + /** {@inheritDoc} */ + @Override public void writeAll(Collection<Cache.Entry<? extends K, ? extends V>> entries) throws CacheWriterException { + if (entries == null || entries.isEmpty()) + return; + + CassandraSession ses = getCassandraSession(); + + try { + ses.execute(new GenericBatchExecutionAssistant<Void, Cache.Entry<? extends K, ? extends V>>() { + /** {@inheritDoc} */ + @Override public String getStatement() { + return controller.getWriteStatement(); + } + + /** {@inheritDoc} */ + @Override public BoundStatement bindStatement(PreparedStatement statement, + Cache.Entry<? extends K, ? extends V> entry) { + return controller.bindKeyValue(statement, entry.getKey(), entry.getValue()); + } + + /** {@inheritDoc} */ + @Override public KeyValuePersistenceSettings getPersistenceSettings() { + return controller.getPersistenceSettings(); + } + + /** {@inheritDoc} */ + @Override public String operationName() { + return "BULK_WRITE"; + } + + /** {@inheritDoc} */ + @Override public boolean tableExistenceRequired() { + return true; + } + }, entries); + } + finally { + closeCassandraSession(ses); + } + } + + /** {@inheritDoc} */ + @Override public void delete(final Object key) throws CacheWriterException { + if (key == null) + return; + + CassandraSession ses = getCassandraSession(); + + try { + ses.execute(new ExecutionAssistant<Void>() { + @Override public boolean tableExistenceRequired() { + return false; + } + + @Override public String getStatement() { + return controller.getDeleteStatement(); + } + + @Override public BoundStatement bindStatement(PreparedStatement statement) { + return controller.bindKey(statement, key); + } + + + @Override public KeyValuePersistenceSettings getPersistenceSettings() { + return controller.getPersistenceSettings(); + } + + @Override public String operationName() { + return "DELETE"; + } + + @Override public Void process(Row row) { + return null; + } + }); + } + finally { + closeCassandraSession(ses); + } + } + + /** {@inheritDoc} */ + @Override public void deleteAll(Collection<?> keys) throws CacheWriterException { + if (keys == null || keys.isEmpty()) + return; + + CassandraSession ses = getCassandraSession(); + + try { + ses.execute(new GenericBatchExecutionAssistant<Void, Object>() { + /** {@inheritDoc} */ + @Override public String getStatement() { + return controller.getDeleteStatement(); + } + + /** {@inheritDoc} */ + @Override public BoundStatement bindStatement(PreparedStatement statement, Object key) { + return controller.bindKey(statement, key); + } + + /** {@inheritDoc} */ + @Override public KeyValuePersistenceSettings getPersistenceSettings() { + return controller.getPersistenceSettings(); + } + + @Override public String operationName() { + return "BULK_DELETE"; + } + }, keys); + } + finally { + closeCassandraSession(ses); + } + } + + /** + * Gets Cassandra session wrapper or creates new if it doesn't exist. + * This wrapper hides all the low-level Cassandra interaction details by providing only high-level methods. + * + * @return Cassandra session wrapper. + */ + private CassandraSession getCassandraSession() { + if (storeSes == null || storeSes.transaction() == null) + return dataSrc.session(log != null ? log : new NullLogger()); + + CassandraSession ses = (CassandraSession) storeSes.properties().get(ATTR_CONN_PROP); + + if (ses == null) { + ses = dataSrc.session(log != null ? log : new NullLogger()); + storeSes.properties().put(ATTR_CONN_PROP, ses); + } + + return ses; + } + + /** + * Releases Cassandra related resources. + * + * @param ses Cassandra session wrapper. + */ + private void closeCassandraSession(CassandraSession ses) { + if (ses != null && (storeSes == null || storeSes.transaction() == null)) + U.closeQuiet(ses); + } +} http://git-wip-us.apache.org/repos/asf/ignite/blob/231ead01/modules/cassandra/store/src/main/java/org/apache/ignite/cache/store/cassandra/CassandraCacheStoreFactory.java ---------------------------------------------------------------------- diff --git a/modules/cassandra/store/src/main/java/org/apache/ignite/cache/store/cassandra/CassandraCacheStoreFactory.java b/modules/cassandra/store/src/main/java/org/apache/ignite/cache/store/cassandra/CassandraCacheStoreFactory.java new file mode 100644 index 0000000..7584dfb --- /dev/null +++ b/modules/cassandra/store/src/main/java/org/apache/ignite/cache/store/cassandra/CassandraCacheStoreFactory.java @@ -0,0 +1,200 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.cache.store.cassandra; + +import javax.cache.configuration.Factory; +import org.apache.ignite.IgniteException; +import org.apache.ignite.cache.store.cassandra.datasource.DataSource; +import org.apache.ignite.cache.store.cassandra.persistence.KeyValuePersistenceSettings; +import org.apache.ignite.internal.IgniteComponentType; +import org.apache.ignite.internal.util.spring.IgniteSpringHelper; +import org.apache.ignite.resources.SpringApplicationContextResource; + +/** + * Factory class to instantiate {@link CassandraCacheStore}. + * + * @param <K> Ignite cache key type + * @param <V> Ignite cache value type + */ +public class CassandraCacheStoreFactory<K, V> implements Factory<CassandraCacheStore<K, V>> { + /** */ + private static final long serialVersionUID = 0L; + + /** Auto-injected Spring ApplicationContext resource. */ + @SpringApplicationContextResource + private Object appCtx; + + /** Name of data source bean. */ + private String dataSrcBean; + + /** Name of persistence settings bean. */ + private String persistenceSettingsBean; + + /** Data source. */ + private transient DataSource dataSrc; + + /** Persistence settings. */ + private KeyValuePersistenceSettings persistenceSettings; + + /** Max workers thread count. These threads are responsible for load cache. */ + private int maxPoolSize = Runtime.getRuntime().availableProcessors(); + + /** {@inheritDoc} */ + @Override public CassandraCacheStore<K, V> create() { + return new CassandraCacheStore<>(getDataSource(), getPersistenceSettings(), getMaxPoolSize()); + } + + /** + * Sets data source. + * + * @param dataSrc Data source. + * + * @return {@code This} for chaining. + */ + @SuppressWarnings("UnusedDeclaration") + public CassandraCacheStoreFactory<K, V> setDataSource(DataSource dataSrc) { + this.dataSrc = dataSrc; + + return this; + } + + /** + * Sets data source bean name. + * + * @param beanName Data source bean name. + * @return {@code This} for chaining. + */ + public CassandraCacheStoreFactory<K, V> setDataSourceBean(String beanName) { + this.dataSrcBean = beanName; + + return this; + } + + /** + * Sets persistence settings. + * + * @param settings Persistence settings. + * @return {@code This} for chaining. + */ + @SuppressWarnings("UnusedDeclaration") + public CassandraCacheStoreFactory<K, V> setPersistenceSettings(KeyValuePersistenceSettings settings) { + this.persistenceSettings = settings; + + return this; + } + + /** + * Sets persistence settings bean name. + * + * @param beanName Persistence settings bean name. + * @return {@code This} for chaining. + */ + public CassandraCacheStoreFactory<K, V> setPersistenceSettingsBean(String beanName) { + this.persistenceSettingsBean = beanName; + + return this; + } + + /** + * @return Data source. + */ + private DataSource getDataSource() { + if (dataSrc != null) + return dataSrc; + + if (dataSrcBean == null) + throw new IllegalStateException("Either DataSource bean or DataSource itself should be specified"); + + if (appCtx == null) { + throw new IllegalStateException("Failed to get Cassandra DataSource cause Spring application " + + "context wasn't injected into CassandraCacheStoreFactory"); + } + + Object obj = loadSpringContextBean(appCtx, dataSrcBean); + + if (!(obj instanceof DataSource)) + throw new IllegalStateException("Incorrect connection bean '" + dataSrcBean + "' specified"); + + return dataSrc = (DataSource)obj; + } + + /** + * @return Persistence settings. + */ + private KeyValuePersistenceSettings getPersistenceSettings() { + if (persistenceSettings != null) + return persistenceSettings; + + if (persistenceSettingsBean == null) { + throw new IllegalStateException("Either persistence settings bean or persistence settings itself " + + "should be specified"); + } + + if (appCtx == null) { + throw new IllegalStateException("Failed to get Cassandra persistence settings cause Spring application " + + "context wasn't injected into CassandraCacheStoreFactory"); + } + + Object obj = loadSpringContextBean(appCtx, persistenceSettingsBean); + + if (!(obj instanceof KeyValuePersistenceSettings)) { + throw new IllegalStateException("Incorrect persistence settings bean '" + + persistenceSettingsBean + "' specified"); + } + + return persistenceSettings = (KeyValuePersistenceSettings)obj; + } + + /** + * Get maximum workers thread count. These threads are responsible for queries execution. + * + * @return Maximum workers thread count. + */ + public int getMaxPoolSize() { + return maxPoolSize; + } + + /** + * Set Maximum workers thread count. These threads are responsible for queries execution. + * + * @param maxPoolSize Max workers thread count. + * @return {@code This} for chaining. + */ + public CassandraCacheStoreFactory<K, V> setMaxPoolSize(int maxPoolSize) { + this.maxPoolSize = maxPoolSize; + + return this; + } + + /** + * Loads bean from Spring ApplicationContext. + * + * @param appCtx Application context. + * @param beanName Bean name to load. + * @return Loaded bean. + */ + private Object loadSpringContextBean(Object appCtx, String beanName) { + try { + IgniteSpringHelper spring = IgniteComponentType.SPRING.create(false); + return spring.loadBeanFromAppContext(appCtx, beanName); + } + catch (Exception e) { + throw new IgniteException("Failed to load bean in application context [beanName=" + beanName + ", igniteConfig=" + appCtx + ']', e); + } + } +} http://git-wip-us.apache.org/repos/asf/ignite/blob/231ead01/modules/cassandra/store/src/main/java/org/apache/ignite/cache/store/cassandra/common/CassandraHelper.java ---------------------------------------------------------------------- diff --git a/modules/cassandra/store/src/main/java/org/apache/ignite/cache/store/cassandra/common/CassandraHelper.java b/modules/cassandra/store/src/main/java/org/apache/ignite/cache/store/cassandra/common/CassandraHelper.java new file mode 100644 index 0000000..d3bff7f --- /dev/null +++ b/modules/cassandra/store/src/main/java/org/apache/ignite/cache/store/cassandra/common/CassandraHelper.java @@ -0,0 +1,133 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.cache.store.cassandra.common; + +import com.datastax.driver.core.Cluster; +import com.datastax.driver.core.Session; +import com.datastax.driver.core.exceptions.InvalidQueryException; +import com.datastax.driver.core.exceptions.NoHostAvailableException; +import com.datastax.driver.core.exceptions.ReadTimeoutException; +import java.util.regex.Pattern; +import org.apache.ignite.internal.util.typedef.internal.U; + +/** + * Helper class providing methods to work with Cassandra session and exceptions + */ +public class CassandraHelper { + /** Cassandra error message if specified keyspace doesn't exist. */ + private static final Pattern KEYSPACE_EXIST_ERROR1 = Pattern.compile("Keyspace [0-9a-zA-Z_]+ does not exist"); + + /** Cassandra error message if trying to create table inside nonexistent keyspace. */ + private static final Pattern KEYSPACE_EXIST_ERROR2 = Pattern.compile("Cannot add table '[0-9a-zA-Z_]+' to non existing keyspace.*"); + + /** Cassandra error message if specified table doesn't exist. */ + private static final Pattern TABLE_EXIST_ERROR = Pattern.compile("unconfigured table [0-9a-zA-Z_]+"); + + /** Cassandra error message if trying to use prepared statement created from another session. */ + private static final String PREP_STATEMENT_CLUSTER_INSTANCE_ERROR = "You may have used a PreparedStatement that " + + "was created with another Cluster instance"; + + /** Closes Cassandra driver session. */ + public static void closeSession(Session driverSes) { + if (driverSes == null) + return; + + Cluster cluster = driverSes.getCluster(); + + if (!driverSes.isClosed()) + U.closeQuiet(driverSes); + + if (!cluster.isClosed()) + U.closeQuiet(cluster); + } + + /** + * Checks if Cassandra keyspace absence error occur. + * + * @param e Exception to check. + * @return {@code true} in case of keyspace absence error. + */ + public static boolean isKeyspaceAbsenceError(Throwable e) { + while (e != null) { + if (e instanceof InvalidQueryException && + (KEYSPACE_EXIST_ERROR1.matcher(e.getMessage()).matches() || + KEYSPACE_EXIST_ERROR2.matcher(e.getMessage()).matches())) + return true; + + e = e.getCause(); + } + + return false; + } + + /** + * Checks if Cassandra table absence error occur. + * + * @param e Exception to check. + * @return {@code true} in case of table absence error. + */ + public static boolean isTableAbsenceError(Throwable e) { + while (e != null) { + if (e instanceof InvalidQueryException && + (TABLE_EXIST_ERROR.matcher(e.getMessage()).matches() || + KEYSPACE_EXIST_ERROR1.matcher(e.getMessage()).matches() || + KEYSPACE_EXIST_ERROR2.matcher(e.getMessage()).matches())) + return true; + + e = e.getCause(); + } + + return false; + } + + /** + * Checks if Cassandra host availability error occur, thus host became unavailable. + * + * @param e Exception to check. + * @return {@code true} in case of host not available error. + */ + public static boolean isHostsAvailabilityError(Throwable e) { + while (e != null) { + if (e instanceof NoHostAvailableException || + e instanceof ReadTimeoutException) + return true; + + e = e.getCause(); + } + + return false; + } + + /** + * Checks if Cassandra error occur because of prepared statement created in one session was used in another session. + * + * @param e Exception to check. + * @return {@code true} in case of invalid usage of prepared statement. + */ + public static boolean isPreparedStatementClusterError(Throwable e) { + while (e != null) { + if (e instanceof InvalidQueryException && e.getMessage().contains(PREP_STATEMENT_CLUSTER_INSTANCE_ERROR)) + return true; + + e = e.getCause(); + } + + return false; + } +} + http://git-wip-us.apache.org/repos/asf/ignite/blob/231ead01/modules/cassandra/store/src/main/java/org/apache/ignite/cache/store/cassandra/common/PropertyMappingHelper.java ---------------------------------------------------------------------- diff --git a/modules/cassandra/store/src/main/java/org/apache/ignite/cache/store/cassandra/common/PropertyMappingHelper.java b/modules/cassandra/store/src/main/java/org/apache/ignite/cache/store/cassandra/common/PropertyMappingHelper.java new file mode 100644 index 0000000..9053a93 --- /dev/null +++ b/modules/cassandra/store/src/main/java/org/apache/ignite/cache/store/cassandra/common/PropertyMappingHelper.java @@ -0,0 +1,220 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.cache.store.cassandra.common; + +import com.datastax.driver.core.DataType; +import com.datastax.driver.core.Row; +import java.beans.PropertyDescriptor; +import java.lang.annotation.Annotation; +import java.math.BigDecimal; +import java.math.BigInteger; +import java.net.InetAddress; +import java.nio.ByteBuffer; +import java.util.ArrayList; +import java.util.Date; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.UUID; +import org.apache.commons.beanutils.PropertyUtils; +import org.apache.ignite.cache.store.cassandra.serializer.Serializer; + +/** + * Helper class providing bunch of methods to discover fields of POJO objects and + * map builtin Java types to appropriate Cassandra types. + */ +public class PropertyMappingHelper { + /** Bytes array Class type. */ + private static final Class BYTES_ARRAY_CLASS = (new byte[] {}).getClass(); + + /** Mapping from Java to Cassandra types. */ + private static final Map<Class, DataType.Name> JAVA_TO_CASSANDRA_MAPPING = new HashMap<Class, DataType.Name>() {{ + put(String.class, DataType.Name.TEXT); + put(Integer.class, DataType.Name.INT); + put(int.class, DataType.Name.INT); + put(Short.class, DataType.Name.INT); + put(short.class, DataType.Name.INT); + put(Long.class, DataType.Name.BIGINT); + put(long.class, DataType.Name.BIGINT); + put(Double.class, DataType.Name.DOUBLE); + put(double.class, DataType.Name.DOUBLE); + put(Boolean.class, DataType.Name.BOOLEAN); + put(boolean.class, DataType.Name.BOOLEAN); + put(Float.class, DataType.Name.FLOAT); + put(float.class, DataType.Name.FLOAT); + put(ByteBuffer.class, DataType.Name.BLOB); + put(BYTES_ARRAY_CLASS, DataType.Name.BLOB); + put(BigDecimal.class, DataType.Name.DECIMAL); + put(InetAddress.class, DataType.Name.INET); + put(Date.class, DataType.Name.TIMESTAMP); + put(UUID.class, DataType.Name.UUID); + put(BigInteger.class, DataType.Name.VARINT); + }}; + + /** + * Maps Cassandra type to specified Java type. + * + * @param clazz java class. + * + * @return Cassandra type. + */ + public static DataType.Name getCassandraType(Class clazz) { + return JAVA_TO_CASSANDRA_MAPPING.get(clazz); + } + + /** + * Returns property descriptor by class property name. + * + * @param clazz class from which to get property descriptor. + * @param prop name of the property. + * + * @return property descriptor. + */ + public static PropertyDescriptor getPojoPropertyDescriptor(Class clazz, String prop) { + List<PropertyDescriptor> descriptors = getPojoPropertyDescriptors(clazz, false); + + if (descriptors == null || descriptors.isEmpty()) + throw new IllegalArgumentException("POJO class " + clazz.getName() + " doesn't have '" + prop + "' property"); + + for (PropertyDescriptor descriptor : descriptors) { + if (descriptor.getName().equals(prop)) + return descriptor; + } + + throw new IllegalArgumentException("POJO class " + clazz.getName() + " doesn't have '" + prop + "' property"); + } + + /** + * Extracts all property descriptors from a class. + * + * @param clazz class which property descriptors should be extracted. + * @param primitive boolean flag indicating that only property descriptors for primitive properties should be extracted. + * + * @return list of class property descriptors + */ + public static List<PropertyDescriptor> getPojoPropertyDescriptors(Class clazz, boolean primitive) { + return getPojoPropertyDescriptors(clazz, null, primitive); + } + + /** + * Extracts all property descriptors having specific annotation from a class. + * + * @param clazz class which property descriptors should be extracted. + * @param annotation annotation to look for. + * @param primitive boolean flag indicating that only property descriptors for primitive properties should be extracted. + * + * @return list of class property descriptors + */ + public static <T extends Annotation> List<PropertyDescriptor> getPojoPropertyDescriptors(Class clazz, + Class<T> annotation, boolean primitive) { + PropertyDescriptor[] descriptors = PropertyUtils.getPropertyDescriptors(clazz); + + List<PropertyDescriptor> list = new ArrayList<>(descriptors == null ? 1 : descriptors.length); + + if (descriptors == null || descriptors.length == 0) + return list; + + for (PropertyDescriptor descriptor : descriptors) { + if (descriptor.getReadMethod() == null || descriptor.getWriteMethod() == null || + (primitive && !isPrimitivePropertyDescriptor(descriptor))) + continue; + + if (annotation == null || descriptor.getReadMethod().getAnnotation(annotation) != null) + list.add(descriptor); + } + + return list; + } + + /** + * Checks if property descriptor describes primitive property (int, boolean, long and etc.) + * + * @param desc property descriptor. + * + * @return {@code true} property is primitive + */ + public static boolean isPrimitivePropertyDescriptor(PropertyDescriptor desc) { + return PropertyMappingHelper.JAVA_TO_CASSANDRA_MAPPING.containsKey(desc.getPropertyType()); + } + + /** + * Returns value of specific column in the row returned by CQL statement. + * + * @param row row returned by CQL statement. + * @param col column name. + * @param clazz java class to which column value should be casted. + * @param serializer serializer to use if column stores BLOB otherwise could be null. + * + * @return row column value. + */ + public static Object getCassandraColumnValue(Row row, String col, Class clazz, Serializer serializer) { + if (String.class.equals(clazz)) + return row.getString(col); + + if (Integer.class.equals(clazz) || int.class.equals(clazz)) + return row.getInt(col); + + if (Short.class.equals(clazz) || short.class.equals(clazz)) + return (short)row.getInt(col); + + if (Long.class.equals(clazz) || long.class.equals(clazz)) + return row.getLong(col); + + if (Double.class.equals(clazz) || double.class.equals(clazz)) + return row.getDouble(col); + + if (Boolean.class.equals(clazz) || boolean.class.equals(clazz)) + return row.getBool(col); + + if (Float.class.equals(clazz) || float.class.equals(clazz)) + return row.getFloat(col); + + if (ByteBuffer.class.equals(clazz)) + return row.getBytes(col); + + if (PropertyMappingHelper.BYTES_ARRAY_CLASS.equals(clazz)) { + ByteBuffer buf = row.getBytes(col); + + return buf == null ? null : buf.array(); + } + + if (BigDecimal.class.equals(clazz)) + return row.getDecimal(col); + + if (InetAddress.class.equals(clazz)) + return row.getInet(col); + + if (Date.class.equals(clazz)) + return row.getTimestamp(col); + + if (UUID.class.equals(clazz)) + return row.getUUID(col); + + if (BigInteger.class.equals(clazz)) + return row.getVarint(col); + + if (serializer == null) { + throw new IllegalStateException("Can't deserialize value from '" + col + "' Cassandra column, " + + "cause there is no BLOB serializer specified"); + } + + ByteBuffer buf = row.getBytes(col); + + return buf == null ? null : serializer.deserialize(buf); + } +} http://git-wip-us.apache.org/repos/asf/ignite/blob/231ead01/modules/cassandra/store/src/main/java/org/apache/ignite/cache/store/cassandra/common/RandomSleeper.java ---------------------------------------------------------------------- diff --git a/modules/cassandra/store/src/main/java/org/apache/ignite/cache/store/cassandra/common/RandomSleeper.java b/modules/cassandra/store/src/main/java/org/apache/ignite/cache/store/cassandra/common/RandomSleeper.java new file mode 100644 index 0000000..6745a16 --- /dev/null +++ b/modules/cassandra/store/src/main/java/org/apache/ignite/cache/store/cassandra/common/RandomSleeper.java @@ -0,0 +1,104 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.cache.store.cassandra.common; + +import java.util.Random; +import org.apache.ignite.IgniteException; +import org.apache.ignite.IgniteLogger; + +/** + * Provides sleep method with randomly selected sleep time from specified range and + * incrementally shifts sleep time range for each next sleep attempt + * + */ +public class RandomSleeper { + /** */ + private int min; + + /** */ + private int max; + + /** */ + private int incr; + + /** */ + private IgniteLogger log; + + /** */ + private Random random = new Random(System.currentTimeMillis()); + + /** */ + private int summary = 0; + + /** + * Creates sleeper instance. + * + * @param min minimum sleep time (in milliseconds) + * @param max maximum sleep time (in milliseconds) + * @param incr time range shift increment (in milliseconds) + */ + public RandomSleeper(int min, int max, int incr, IgniteLogger log) { + if (min <= 0) + throw new IllegalArgumentException("Incorrect min time specified: " + min); + + if (max <= min) + throw new IllegalArgumentException("Incorrect max time specified: " + max); + + if (incr < 10) + throw new IllegalArgumentException("Incorrect increment specified: " + incr); + + this.min = min; + this.max = max; + this.incr = incr; + this.log = log; + } + + /** + * Sleeps + */ + public void sleep() { + try { + int timeout = random.nextInt(max - min + 1) + min; + + if (log != null) + log.info("Sleeping for " + timeout + "ms"); + + Thread.sleep(timeout); + + summary += timeout; + + if (log != null) + log.info("Sleep completed"); + } + catch (InterruptedException e) { + throw new IgniteException("Random sleep interrupted", e); + } + + min += incr; + max += incr; + } + + /** + * Returns summary sleep time. + * + * @return Summary sleep time in milliseconds. + */ + public int getSleepSummary() { + return summary; + } +} http://git-wip-us.apache.org/repos/asf/ignite/blob/231ead01/modules/cassandra/store/src/main/java/org/apache/ignite/cache/store/cassandra/common/SystemHelper.java ---------------------------------------------------------------------- diff --git a/modules/cassandra/store/src/main/java/org/apache/ignite/cache/store/cassandra/common/SystemHelper.java b/modules/cassandra/store/src/main/java/org/apache/ignite/cache/store/cassandra/common/SystemHelper.java new file mode 100644 index 0000000..5d51488 --- /dev/null +++ b/modules/cassandra/store/src/main/java/org/apache/ignite/cache/store/cassandra/common/SystemHelper.java @@ -0,0 +1,46 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.cache.store.cassandra.common; + +import java.net.InetAddress; +import java.net.UnknownHostException; + +/** + * Helper class providing system information about the host (ip, hostname, os and etc.) + */ +public class SystemHelper { + /** System line separator. */ + public static final String LINE_SEPARATOR = System.getProperty("line.separator"); + + /** Host name. */ + public static final String HOST_NAME; + + /** Host IP address */ + public static final String HOST_IP; + + static { + try { + InetAddress addr = InetAddress.getLocalHost(); + HOST_NAME = addr.getHostName(); + HOST_IP = addr.getHostAddress(); + } + catch (UnknownHostException e) { + throw new IllegalStateException("Failed to get host/ip of current computer", e); + } + } +} http://git-wip-us.apache.org/repos/asf/ignite/blob/231ead01/modules/cassandra/store/src/main/java/org/apache/ignite/cache/store/cassandra/common/package-info.java ---------------------------------------------------------------------- diff --git a/modules/cassandra/store/src/main/java/org/apache/ignite/cache/store/cassandra/common/package-info.java b/modules/cassandra/store/src/main/java/org/apache/ignite/cache/store/cassandra/common/package-info.java new file mode 100644 index 0000000..c4f5d3b --- /dev/null +++ b/modules/cassandra/store/src/main/java/org/apache/ignite/cache/store/cassandra/common/package-info.java @@ -0,0 +1,21 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +/** + * Contains commonly used helper classes + */ +package org.apache.ignite.cache.store.cassandra.common; \ No newline at end of file http://git-wip-us.apache.org/repos/asf/ignite/blob/231ead01/modules/cassandra/store/src/main/java/org/apache/ignite/cache/store/cassandra/datasource/Credentials.java ---------------------------------------------------------------------- diff --git a/modules/cassandra/store/src/main/java/org/apache/ignite/cache/store/cassandra/datasource/Credentials.java b/modules/cassandra/store/src/main/java/org/apache/ignite/cache/store/cassandra/datasource/Credentials.java new file mode 100644 index 0000000..a2358a6 --- /dev/null +++ b/modules/cassandra/store/src/main/java/org/apache/ignite/cache/store/cassandra/datasource/Credentials.java @@ -0,0 +1,39 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.cache.store.cassandra.datasource; + +import java.io.Serializable; + +/** + * Provides credentials for Cassandra (instead of specifying user/password directly in Spring context XML). + */ +public interface Credentials extends Serializable { + /** + * Returns user name + * + * @return user name + */ + public String getUser(); + + /** + * Returns password + * + * @return password + */ + public String getPassword(); +} http://git-wip-us.apache.org/repos/asf/ignite/blob/231ead01/modules/cassandra/store/src/main/java/org/apache/ignite/cache/store/cassandra/datasource/DataSource.java ---------------------------------------------------------------------- diff --git a/modules/cassandra/store/src/main/java/org/apache/ignite/cache/store/cassandra/datasource/DataSource.java b/modules/cassandra/store/src/main/java/org/apache/ignite/cache/store/cassandra/datasource/DataSource.java new file mode 100644 index 0000000..f582aac --- /dev/null +++ b/modules/cassandra/store/src/main/java/org/apache/ignite/cache/store/cassandra/datasource/DataSource.java @@ -0,0 +1,647 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.cache.store.cassandra.datasource; + +import com.datastax.driver.core.AuthProvider; +import com.datastax.driver.core.Cluster; +import com.datastax.driver.core.ConsistencyLevel; +import com.datastax.driver.core.NettyOptions; +import com.datastax.driver.core.PoolingOptions; +import com.datastax.driver.core.ProtocolOptions; +import com.datastax.driver.core.ProtocolVersion; +import com.datastax.driver.core.SSLOptions; +import com.datastax.driver.core.SocketOptions; +import com.datastax.driver.core.policies.AddressTranslator; +import com.datastax.driver.core.policies.LoadBalancingPolicy; +import com.datastax.driver.core.policies.ReconnectionPolicy; +import com.datastax.driver.core.policies.RetryPolicy; +import com.datastax.driver.core.policies.SpeculativeExecutionPolicy; + +import java.io.Externalizable; +import java.io.IOException; +import java.io.ObjectInput; +import java.io.ObjectOutput; +import java.io.Serializable; +import java.net.InetAddress; +import java.net.InetSocketAddress; +import java.util.LinkedList; +import java.util.List; +import java.util.UUID; + +import org.apache.ignite.IgniteException; +import org.apache.ignite.IgniteLogger; +import org.apache.ignite.cache.store.cassandra.session.CassandraSession; +import org.apache.ignite.cache.store.cassandra.session.CassandraSessionImpl; +import org.apache.ignite.internal.util.typedef.internal.U; + +/** + * Data source abstraction to specify configuration of the Cassandra session to be used. + */ +public class DataSource implements Externalizable { + /** */ + private static final long serialVersionUID = 0L; + + /** + * Null object, used as a replacement for those Cassandra connection options which + * don't support serialization (RetryPolicy, LoadBalancingPolicy and etc). + */ + private static final UUID NULL_OBJECT = UUID.fromString("45ffae47-3193-5910-84a2-048fe65735d9"); + + /** Number of rows to immediately fetch in CQL statement execution. */ + private Integer fetchSize; + + /** Consistency level for READ operations. */ + private ConsistencyLevel readConsistency; + + /** Consistency level for WRITE operations. */ + private ConsistencyLevel writeConsistency; + + /** Username to use for authentication. */ + private String user; + + /** Password to use for authentication. */ + private String pwd; + + /** Port to use for Cassandra connection. */ + private Integer port; + + /** List of contact points to connect to Cassandra cluster. */ + private List<InetAddress> contactPoints; + + /** List of contact points with ports to connect to Cassandra cluster. */ + private List<InetSocketAddress> contactPointsWithPorts; + + /** Maximum time to wait for schema agreement before returning from a DDL query. */ + private Integer maxSchemaAgreementWaitSeconds; + + /** The native protocol version to use. */ + private Integer protoVer; + + /** Compression to use for the transport. */ + private String compression; + + /** Use SSL for communications with Cassandra. */ + private Boolean useSSL; + + /** Enables metrics collection. */ + private Boolean collectMetrix; + + /** Enables JMX reporting of the metrics. */ + private Boolean jmxReporting; + + /** Credentials to use for authentication. */ + private Credentials creds; + + /** Load balancing policy to use. */ + private LoadBalancingPolicy loadBalancingPlc; + + /** Reconnection policy to use. */ + private ReconnectionPolicy reconnectionPlc; + + /** Retry policy to use. */ + private RetryPolicy retryPlc; + + /** Address translator to use. */ + private AddressTranslator addrTranslator; + + /** Speculative execution policy to use. */ + private SpeculativeExecutionPolicy speculativeExecutionPlc; + + /** Authentication provider to use. */ + private AuthProvider authProvider; + + /** SSL options to use. */ + private SSLOptions sslOptions; + + /** Connection pooling options to use. */ + private PoolingOptions poolingOptions; + + /** Socket options to use. */ + private SocketOptions sockOptions; + + /** Netty options to use for connection. */ + private NettyOptions nettyOptions; + + /** Cassandra session wrapper instance. */ + private volatile CassandraSession ses; + + /** + * Sets user name to use for authentication. + * + * @param user user name + */ + @SuppressWarnings("UnusedDeclaration") + public void setUser(String user) { + this.user = user; + + invalidate(); + } + + /** + * Sets password to use for authentication. + * + * @param pwd password + */ + @SuppressWarnings("UnusedDeclaration") + public void setPassword(String pwd) { + this.pwd = pwd; + + invalidate(); + } + + /** + * Sets port to use for Cassandra connection. + * + * @param port port + */ + @SuppressWarnings("UnusedDeclaration") + public void setPort(int port) { + this.port = port; + + invalidate(); + } + + /** + * Sets list of contact points to connect to Cassandra cluster. + * + * @param points contact points + */ + public void setContactPoints(String... points) { + if (points == null || points.length == 0) + return; + + for (String point : points) { + if (point.contains(":")) { + if (contactPointsWithPorts == null) + contactPointsWithPorts = new LinkedList<>(); + + String[] chunks = point.split(":"); + + try { + contactPointsWithPorts.add(InetSocketAddress.createUnresolved(chunks[0].trim(), Integer.parseInt(chunks[1].trim()))); + } + catch (Throwable e) { + throw new IllegalArgumentException("Incorrect contact point '" + point + "' specified for Cassandra cache storage", e); + } + } + else { + if (contactPoints == null) + contactPoints = new LinkedList<>(); + + try { + contactPoints.add(InetAddress.getByName(point)); + } + catch (Throwable e) { + throw new IllegalArgumentException("Incorrect contact point '" + point + "' specified for Cassandra cache storage", e); + } + } + } + + invalidate(); + } + + /** Sets maximum time to wait for schema agreement before returning from a DDL query. */ + @SuppressWarnings("UnusedDeclaration") + public void setMaxSchemaAgreementWaitSeconds(int seconds) { + maxSchemaAgreementWaitSeconds = seconds; + + invalidate(); + } + + /** + * Sets the native protocol version to use. + * + * @param ver version number + */ + @SuppressWarnings("UnusedDeclaration") + public void setProtocolVersion(int ver) { + protoVer = ver; + + invalidate(); + } + + /** + * Sets compression algorithm to use for the transport. + * + * @param compression Compression algorithm. + */ + @SuppressWarnings("UnusedDeclaration") + public void setCompression(String compression) { + this.compression = compression == null || compression.trim().isEmpty() ? null : compression.trim(); + + try { + if (this.compression != null) + ProtocolOptions.Compression.valueOf(this.compression); + } + catch (Throwable e) { + throw new IgniteException("Incorrect compression '" + compression + "' specified for Cassandra connection", e); + } + + invalidate(); + } + + /** + * Enables SSL for communications with Cassandra. + * + * @param use Flag to enable/disable SSL. + */ + @SuppressWarnings("UnusedDeclaration") + public void setUseSSL(boolean use) { + useSSL = use; + + invalidate(); + } + + /** + * Enables metrics collection. + * + * @param collect Flag to enable/disable metrics collection. + */ + @SuppressWarnings("UnusedDeclaration") + public void setCollectMetrix(boolean collect) { + collectMetrix = collect; + + invalidate(); + } + + /** + * Enables JMX reporting of the metrics. + * + * @param enableReporting Flag to enable/disable JMX reporting. + */ + @SuppressWarnings("UnusedDeclaration") + public void setJmxReporting(boolean enableReporting) { + jmxReporting = enableReporting; + + invalidate(); + } + + /** + * Sets number of rows to immediately fetch in CQL statement execution. + * + * @param size Number of rows to fetch. + */ + @SuppressWarnings("UnusedDeclaration") + public void setFetchSize(int size) { + fetchSize = size; + + invalidate(); + } + + /** + * Set consistency level for READ operations. + * + * @param level Consistency level. + */ + public void setReadConsistency(String level) { + readConsistency = parseConsistencyLevel(level); + + invalidate(); + } + + /** + * Set consistency level for WRITE operations. + * + * @param level Consistency level. + */ + public void setWriteConsistency(String level) { + writeConsistency = parseConsistencyLevel(level); + + invalidate(); + } + + /** + * Sets credentials to use for authentication. + * + * @param creds Credentials. + */ + public void setCredentials(Credentials creds) { + this.creds = creds; + + invalidate(); + } + + /** + * Sets load balancing policy. + * + * @param plc Load balancing policy. + */ + public void setLoadBalancingPolicy(LoadBalancingPolicy plc) { + loadBalancingPlc = plc; + + invalidate(); + } + + /** + * Sets reconnection policy. + * + * @param plc Reconnection policy. + */ + @SuppressWarnings("UnusedDeclaration") + public void setReconnectionPolicy(ReconnectionPolicy plc) { + reconnectionPlc = plc; + + invalidate(); + } + + /** + * Sets retry policy. + * + * @param plc Retry policy. + */ + @SuppressWarnings("UnusedDeclaration") + public void setRetryPolicy(RetryPolicy plc) { + retryPlc = plc; + + invalidate(); + } + + /** + * Sets address translator. + * + * @param translator Address translator. + */ + @SuppressWarnings("UnusedDeclaration") + public void setAddressTranslator(AddressTranslator translator) { + addrTranslator = translator; + + invalidate(); + } + + /** + * Sets speculative execution policy. + * + * @param plc Speculative execution policy. + */ + @SuppressWarnings("UnusedDeclaration") + public void setSpeculativeExecutionPolicy(SpeculativeExecutionPolicy plc) { + speculativeExecutionPlc = plc; + + invalidate(); + } + + /** + * Sets authentication provider. + * + * @param provider Authentication provider. + */ + @SuppressWarnings("UnusedDeclaration") + public void setAuthProvider(AuthProvider provider) { + authProvider = provider; + + invalidate(); + } + + /** + * Sets SSL options. + * + * @param options SSL options. + */ + @SuppressWarnings("UnusedDeclaration") + public void setSslOptions(SSLOptions options) { + sslOptions = options; + + invalidate(); + } + + /** + * Sets pooling options. + * + * @param options pooling options to use. + */ + @SuppressWarnings("UnusedDeclaration") + public void setPoolingOptions(PoolingOptions options) { + poolingOptions = options; + + invalidate(); + } + + /** + * Sets socket options to use. + * + * @param options Socket options. + */ + @SuppressWarnings("UnusedDeclaration") + public void setSocketOptions(SocketOptions options) { + sockOptions = options; + + invalidate(); + } + + /** + * Sets netty options to use. + * + * @param options netty options. + */ + @SuppressWarnings("UnusedDeclaration") + public void setNettyOptions(NettyOptions options) { + nettyOptions = options; + + invalidate(); + } + + /** + * Creates Cassandra session wrapper if it wasn't created yet and returns it + * + * @param log logger + * @return Cassandra session wrapper + */ + @SuppressWarnings("deprecation") + public synchronized CassandraSession session(IgniteLogger log) { + if (ses != null) + return ses; + + Cluster.Builder builder = Cluster.builder(); + + if (user != null) + builder = builder.withCredentials(user, pwd); + + if (port != null) + builder = builder.withPort(port); + + if (contactPoints != null) + builder = builder.addContactPoints(contactPoints); + + if (contactPointsWithPorts != null) + builder = builder.addContactPointsWithPorts(contactPointsWithPorts); + + if (maxSchemaAgreementWaitSeconds != null) + builder = builder.withMaxSchemaAgreementWaitSeconds(maxSchemaAgreementWaitSeconds); + + if (protoVer != null) + builder = builder.withProtocolVersion(ProtocolVersion.fromInt(protoVer)); + + if (compression != null) { + try { + builder = builder.withCompression(ProtocolOptions.Compression.valueOf(compression.trim().toLowerCase())); + } + catch (IllegalArgumentException e) { + throw new IgniteException("Incorrect compression option '" + compression + "' specified for Cassandra connection", e); + } + } + + if (useSSL != null && useSSL) + builder = builder.withSSL(); + + if (sslOptions != null) + builder = builder.withSSL(sslOptions); + + if (collectMetrix != null && !collectMetrix) + builder = builder.withoutMetrics(); + + if (jmxReporting != null && !jmxReporting) + builder = builder.withoutJMXReporting(); + + if (creds != null) + builder = builder.withCredentials(creds.getUser(), creds.getPassword()); + + if (loadBalancingPlc != null) + builder = builder.withLoadBalancingPolicy(loadBalancingPlc); + + if (reconnectionPlc != null) + builder = builder.withReconnectionPolicy(reconnectionPlc); + + if (retryPlc != null) + builder = builder.withRetryPolicy(retryPlc); + + if (addrTranslator != null) + builder = builder.withAddressTranslator(addrTranslator); + + if (speculativeExecutionPlc != null) + builder = builder.withSpeculativeExecutionPolicy(speculativeExecutionPlc); + + if (authProvider != null) + builder = builder.withAuthProvider(authProvider); + + if (poolingOptions != null) + builder = builder.withPoolingOptions(poolingOptions); + + if (sockOptions != null) + builder = builder.withSocketOptions(sockOptions); + + if (nettyOptions != null) + builder = builder.withNettyOptions(nettyOptions); + + return ses = new CassandraSessionImpl(builder, fetchSize, readConsistency, writeConsistency, log); + } + + /** {@inheritDoc} */ + @Override public void writeExternal(ObjectOutput out) throws IOException { + out.writeObject(fetchSize); + out.writeObject(readConsistency); + out.writeObject(writeConsistency); + U.writeString(out, user); + U.writeString(out, pwd); + out.writeObject(port); + out.writeObject(contactPoints); + out.writeObject(contactPointsWithPorts); + out.writeObject(maxSchemaAgreementWaitSeconds); + out.writeObject(protoVer); + U.writeString(out, compression); + out.writeObject(useSSL); + out.writeObject(collectMetrix); + out.writeObject(jmxReporting); + out.writeObject(creds); + writeObject(out, loadBalancingPlc); + writeObject(out, reconnectionPlc); + writeObject(out, addrTranslator); + writeObject(out, speculativeExecutionPlc); + writeObject(out, authProvider); + writeObject(out, sslOptions); + writeObject(out, poolingOptions); + writeObject(out, sockOptions); + writeObject(out, nettyOptions); + } + + /** {@inheritDoc} */ + @SuppressWarnings("unchecked") + @Override public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException { + fetchSize = (Integer)in.readObject(); + readConsistency = (ConsistencyLevel)in.readObject(); + writeConsistency = (ConsistencyLevel)in.readObject(); + user = U.readString(in); + pwd = U.readString(in); + port = (Integer)in.readObject(); + contactPoints = (List<InetAddress>)in.readObject(); + contactPointsWithPorts = (List<InetSocketAddress>)in.readObject(); + maxSchemaAgreementWaitSeconds = (Integer)in.readObject(); + protoVer = (Integer)in.readObject(); + compression = U.readString(in); + useSSL = (Boolean)in.readObject(); + collectMetrix = (Boolean)in.readObject(); + jmxReporting = (Boolean)in.readObject(); + creds = (Credentials)in.readObject(); + loadBalancingPlc = (LoadBalancingPolicy)readObject(in); + reconnectionPlc = (ReconnectionPolicy)readObject(in); + addrTranslator = (AddressTranslator)readObject(in); + speculativeExecutionPlc = (SpeculativeExecutionPolicy)readObject(in); + authProvider = (AuthProvider)readObject(in); + sslOptions = (SSLOptions)readObject(in); + poolingOptions = (PoolingOptions)readObject(in); + sockOptions = (SocketOptions)readObject(in); + nettyOptions = (NettyOptions)readObject(in); + } + + /** + * Helper method used to serialize class members + * @param out the stream to write the object to + * @param obj the object to be written + * @throws IOException Includes any I/O exceptions that may occur + */ + private void writeObject(ObjectOutput out, Object obj) throws IOException { + out.writeObject(obj == null || !(obj instanceof Serializable) ? NULL_OBJECT : obj); + } + + /** + * Helper method used to deserialize class members + * @param in the stream to read data from in order to restore the object + * @throws IOException Includes any I/O exceptions that may occur + * @throws ClassNotFoundException If the class for an object being restored cannot be found + * @return deserialized object + */ + private Object readObject(ObjectInput in) throws IOException, ClassNotFoundException { + Object obj = in.readObject(); + return NULL_OBJECT.equals(obj) ? null : obj; + } + + /** + * Parses consistency level provided as string. + * + * @param level consistency level string. + * + * @return consistency level. + */ + private ConsistencyLevel parseConsistencyLevel(String level) { + if (level == null) + return null; + + try { + return ConsistencyLevel.valueOf(level.trim().toUpperCase()); + } + catch (Throwable e) { + throw new IgniteException("Incorrect consistency level '" + level + "' specified for Cassandra connection", e); + } + } + + /** + * Invalidates session. + */ + private synchronized void invalidate() { + ses = null; + } +} http://git-wip-us.apache.org/repos/asf/ignite/blob/231ead01/modules/cassandra/store/src/main/java/org/apache/ignite/cache/store/cassandra/datasource/PlainCredentials.java ---------------------------------------------------------------------- diff --git a/modules/cassandra/store/src/main/java/org/apache/ignite/cache/store/cassandra/datasource/PlainCredentials.java b/modules/cassandra/store/src/main/java/org/apache/ignite/cache/store/cassandra/datasource/PlainCredentials.java new file mode 100644 index 0000000..46ebdc5 --- /dev/null +++ b/modules/cassandra/store/src/main/java/org/apache/ignite/cache/store/cassandra/datasource/PlainCredentials.java @@ -0,0 +1,53 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.cache.store.cassandra.datasource; + +/** + * Simple implementation of {@link Credentials} which just uses its constructor to hold user/password values. + */ +public class PlainCredentials implements Credentials { + /** */ + private static final long serialVersionUID = 0L; + + /** User name. */ + private String user; + + /** User password. */ + private String pwd; + + /** + * Creates credentials object. + * + * @param user User name. + * @param pwd User password. + */ + public PlainCredentials(String user, String pwd) { + this.user = user; + this.pwd = pwd; + } + + /** {@inheritDoc} */ + @Override public String getUser() { + return user; + } + + /** {@inheritDoc} */ + @Override public String getPassword() { + return pwd; + } +} http://git-wip-us.apache.org/repos/asf/ignite/blob/231ead01/modules/cassandra/store/src/main/java/org/apache/ignite/cache/store/cassandra/datasource/package-info.java ---------------------------------------------------------------------- diff --git a/modules/cassandra/store/src/main/java/org/apache/ignite/cache/store/cassandra/datasource/package-info.java b/modules/cassandra/store/src/main/java/org/apache/ignite/cache/store/cassandra/datasource/package-info.java new file mode 100644 index 0000000..d5003ae --- /dev/null +++ b/modules/cassandra/store/src/main/java/org/apache/ignite/cache/store/cassandra/datasource/package-info.java @@ -0,0 +1,21 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +/** + * Contains data source implementation + */ +package org.apache.ignite.cache.store.cassandra.datasource; \ No newline at end of file http://git-wip-us.apache.org/repos/asf/ignite/blob/231ead01/modules/cassandra/store/src/main/java/org/apache/ignite/cache/store/cassandra/package-info.java ---------------------------------------------------------------------- diff --git a/modules/cassandra/store/src/main/java/org/apache/ignite/cache/store/cassandra/package-info.java b/modules/cassandra/store/src/main/java/org/apache/ignite/cache/store/cassandra/package-info.java new file mode 100644 index 0000000..46f5635 --- /dev/null +++ b/modules/cassandra/store/src/main/java/org/apache/ignite/cache/store/cassandra/package-info.java @@ -0,0 +1,21 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +/** + * Contains {@link org.apache.ignite.cache.store.CacheStore} implementation backed by Cassandra database + */ +package org.apache.ignite.cache.store.cassandra; \ No newline at end of file