http://git-wip-us.apache.org/repos/asf/ignite/blob/231ead01/modules/cassandra/src/main/java/org/apache/ignite/cache/store/cassandra/CassandraCacheStoreFactory.java ---------------------------------------------------------------------- diff --git a/modules/cassandra/src/main/java/org/apache/ignite/cache/store/cassandra/CassandraCacheStoreFactory.java b/modules/cassandra/src/main/java/org/apache/ignite/cache/store/cassandra/CassandraCacheStoreFactory.java deleted file mode 100644 index 7584dfb..0000000 --- a/modules/cassandra/src/main/java/org/apache/ignite/cache/store/cassandra/CassandraCacheStoreFactory.java +++ /dev/null @@ -1,200 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.ignite.cache.store.cassandra; - -import javax.cache.configuration.Factory; -import org.apache.ignite.IgniteException; -import org.apache.ignite.cache.store.cassandra.datasource.DataSource; -import org.apache.ignite.cache.store.cassandra.persistence.KeyValuePersistenceSettings; -import org.apache.ignite.internal.IgniteComponentType; -import org.apache.ignite.internal.util.spring.IgniteSpringHelper; -import org.apache.ignite.resources.SpringApplicationContextResource; - -/** - * Factory class to instantiate {@link CassandraCacheStore}. - * - * @param <K> Ignite cache key type - * @param <V> Ignite cache value type - */ -public class CassandraCacheStoreFactory<K, V> implements Factory<CassandraCacheStore<K, V>> { - /** */ - private static final long serialVersionUID = 0L; - - /** Auto-injected Spring ApplicationContext resource. */ - @SpringApplicationContextResource - private Object appCtx; - - /** Name of data source bean. */ - private String dataSrcBean; - - /** Name of persistence settings bean. */ - private String persistenceSettingsBean; - - /** Data source. */ - private transient DataSource dataSrc; - - /** Persistence settings. */ - private KeyValuePersistenceSettings persistenceSettings; - - /** Max workers thread count. These threads are responsible for load cache. */ - private int maxPoolSize = Runtime.getRuntime().availableProcessors(); - - /** {@inheritDoc} */ - @Override public CassandraCacheStore<K, V> create() { - return new CassandraCacheStore<>(getDataSource(), getPersistenceSettings(), getMaxPoolSize()); - } - - /** - * Sets data source. - * - * @param dataSrc Data source. - * - * @return {@code This} for chaining. - */ - @SuppressWarnings("UnusedDeclaration") - public CassandraCacheStoreFactory<K, V> setDataSource(DataSource dataSrc) { - this.dataSrc = dataSrc; - - return this; - } - - /** - * Sets data source bean name. - * - * @param beanName Data source bean name. - * @return {@code This} for chaining. - */ - public CassandraCacheStoreFactory<K, V> setDataSourceBean(String beanName) { - this.dataSrcBean = beanName; - - return this; - } - - /** - * Sets persistence settings. - * - * @param settings Persistence settings. - * @return {@code This} for chaining. - */ - @SuppressWarnings("UnusedDeclaration") - public CassandraCacheStoreFactory<K, V> setPersistenceSettings(KeyValuePersistenceSettings settings) { - this.persistenceSettings = settings; - - return this; - } - - /** - * Sets persistence settings bean name. - * - * @param beanName Persistence settings bean name. - * @return {@code This} for chaining. - */ - public CassandraCacheStoreFactory<K, V> setPersistenceSettingsBean(String beanName) { - this.persistenceSettingsBean = beanName; - - return this; - } - - /** - * @return Data source. - */ - private DataSource getDataSource() { - if (dataSrc != null) - return dataSrc; - - if (dataSrcBean == null) - throw new IllegalStateException("Either DataSource bean or DataSource itself should be specified"); - - if (appCtx == null) { - throw new IllegalStateException("Failed to get Cassandra DataSource cause Spring application " + - "context wasn't injected into CassandraCacheStoreFactory"); - } - - Object obj = loadSpringContextBean(appCtx, dataSrcBean); - - if (!(obj instanceof DataSource)) - throw new IllegalStateException("Incorrect connection bean '" + dataSrcBean + "' specified"); - - return dataSrc = (DataSource)obj; - } - - /** - * @return Persistence settings. - */ - private KeyValuePersistenceSettings getPersistenceSettings() { - if (persistenceSettings != null) - return persistenceSettings; - - if (persistenceSettingsBean == null) { - throw new IllegalStateException("Either persistence settings bean or persistence settings itself " + - "should be specified"); - } - - if (appCtx == null) { - throw new IllegalStateException("Failed to get Cassandra persistence settings cause Spring application " + - "context wasn't injected into CassandraCacheStoreFactory"); - } - - Object obj = loadSpringContextBean(appCtx, persistenceSettingsBean); - - if (!(obj instanceof KeyValuePersistenceSettings)) { - throw new IllegalStateException("Incorrect persistence settings bean '" + - persistenceSettingsBean + "' specified"); - } - - return persistenceSettings = (KeyValuePersistenceSettings)obj; - } - - /** - * Get maximum workers thread count. These threads are responsible for queries execution. - * - * @return Maximum workers thread count. - */ - public int getMaxPoolSize() { - return maxPoolSize; - } - - /** - * Set Maximum workers thread count. These threads are responsible for queries execution. - * - * @param maxPoolSize Max workers thread count. - * @return {@code This} for chaining. - */ - public CassandraCacheStoreFactory<K, V> setMaxPoolSize(int maxPoolSize) { - this.maxPoolSize = maxPoolSize; - - return this; - } - - /** - * Loads bean from Spring ApplicationContext. - * - * @param appCtx Application context. - * @param beanName Bean name to load. - * @return Loaded bean. - */ - private Object loadSpringContextBean(Object appCtx, String beanName) { - try { - IgniteSpringHelper spring = IgniteComponentType.SPRING.create(false); - return spring.loadBeanFromAppContext(appCtx, beanName); - } - catch (Exception e) { - throw new IgniteException("Failed to load bean in application context [beanName=" + beanName + ", igniteConfig=" + appCtx + ']', e); - } - } -}
http://git-wip-us.apache.org/repos/asf/ignite/blob/231ead01/modules/cassandra/src/main/java/org/apache/ignite/cache/store/cassandra/common/CassandraHelper.java ---------------------------------------------------------------------- diff --git a/modules/cassandra/src/main/java/org/apache/ignite/cache/store/cassandra/common/CassandraHelper.java b/modules/cassandra/src/main/java/org/apache/ignite/cache/store/cassandra/common/CassandraHelper.java deleted file mode 100644 index d3bff7f..0000000 --- a/modules/cassandra/src/main/java/org/apache/ignite/cache/store/cassandra/common/CassandraHelper.java +++ /dev/null @@ -1,133 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.ignite.cache.store.cassandra.common; - -import com.datastax.driver.core.Cluster; -import com.datastax.driver.core.Session; -import com.datastax.driver.core.exceptions.InvalidQueryException; -import com.datastax.driver.core.exceptions.NoHostAvailableException; -import com.datastax.driver.core.exceptions.ReadTimeoutException; -import java.util.regex.Pattern; -import org.apache.ignite.internal.util.typedef.internal.U; - -/** - * Helper class providing methods to work with Cassandra session and exceptions - */ -public class CassandraHelper { - /** Cassandra error message if specified keyspace doesn't exist. */ - private static final Pattern KEYSPACE_EXIST_ERROR1 = Pattern.compile("Keyspace [0-9a-zA-Z_]+ does not exist"); - - /** Cassandra error message if trying to create table inside nonexistent keyspace. */ - private static final Pattern KEYSPACE_EXIST_ERROR2 = Pattern.compile("Cannot add table '[0-9a-zA-Z_]+' to non existing keyspace.*"); - - /** Cassandra error message if specified table doesn't exist. */ - private static final Pattern TABLE_EXIST_ERROR = Pattern.compile("unconfigured table [0-9a-zA-Z_]+"); - - /** Cassandra error message if trying to use prepared statement created from another session. */ - private static final String PREP_STATEMENT_CLUSTER_INSTANCE_ERROR = "You may have used a PreparedStatement that " + - "was created with another Cluster instance"; - - /** Closes Cassandra driver session. */ - public static void closeSession(Session driverSes) { - if (driverSes == null) - return; - - Cluster cluster = driverSes.getCluster(); - - if (!driverSes.isClosed()) - U.closeQuiet(driverSes); - - if (!cluster.isClosed()) - U.closeQuiet(cluster); - } - - /** - * Checks if Cassandra keyspace absence error occur. - * - * @param e Exception to check. - * @return {@code true} in case of keyspace absence error. - */ - public static boolean isKeyspaceAbsenceError(Throwable e) { - while (e != null) { - if (e instanceof InvalidQueryException && - (KEYSPACE_EXIST_ERROR1.matcher(e.getMessage()).matches() || - KEYSPACE_EXIST_ERROR2.matcher(e.getMessage()).matches())) - return true; - - e = e.getCause(); - } - - return false; - } - - /** - * Checks if Cassandra table absence error occur. - * - * @param e Exception to check. - * @return {@code true} in case of table absence error. - */ - public static boolean isTableAbsenceError(Throwable e) { - while (e != null) { - if (e instanceof InvalidQueryException && - (TABLE_EXIST_ERROR.matcher(e.getMessage()).matches() || - KEYSPACE_EXIST_ERROR1.matcher(e.getMessage()).matches() || - KEYSPACE_EXIST_ERROR2.matcher(e.getMessage()).matches())) - return true; - - e = e.getCause(); - } - - return false; - } - - /** - * Checks if Cassandra host availability error occur, thus host became unavailable. - * - * @param e Exception to check. - * @return {@code true} in case of host not available error. - */ - public static boolean isHostsAvailabilityError(Throwable e) { - while (e != null) { - if (e instanceof NoHostAvailableException || - e instanceof ReadTimeoutException) - return true; - - e = e.getCause(); - } - - return false; - } - - /** - * Checks if Cassandra error occur because of prepared statement created in one session was used in another session. - * - * @param e Exception to check. - * @return {@code true} in case of invalid usage of prepared statement. - */ - public static boolean isPreparedStatementClusterError(Throwable e) { - while (e != null) { - if (e instanceof InvalidQueryException && e.getMessage().contains(PREP_STATEMENT_CLUSTER_INSTANCE_ERROR)) - return true; - - e = e.getCause(); - } - - return false; - } -} - http://git-wip-us.apache.org/repos/asf/ignite/blob/231ead01/modules/cassandra/src/main/java/org/apache/ignite/cache/store/cassandra/common/PropertyMappingHelper.java ---------------------------------------------------------------------- diff --git a/modules/cassandra/src/main/java/org/apache/ignite/cache/store/cassandra/common/PropertyMappingHelper.java b/modules/cassandra/src/main/java/org/apache/ignite/cache/store/cassandra/common/PropertyMappingHelper.java deleted file mode 100644 index 9053a93..0000000 --- a/modules/cassandra/src/main/java/org/apache/ignite/cache/store/cassandra/common/PropertyMappingHelper.java +++ /dev/null @@ -1,220 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.ignite.cache.store.cassandra.common; - -import com.datastax.driver.core.DataType; -import com.datastax.driver.core.Row; -import java.beans.PropertyDescriptor; -import java.lang.annotation.Annotation; -import java.math.BigDecimal; -import java.math.BigInteger; -import java.net.InetAddress; -import java.nio.ByteBuffer; -import java.util.ArrayList; -import java.util.Date; -import java.util.HashMap; -import java.util.List; -import java.util.Map; -import java.util.UUID; -import org.apache.commons.beanutils.PropertyUtils; -import org.apache.ignite.cache.store.cassandra.serializer.Serializer; - -/** - * Helper class providing bunch of methods to discover fields of POJO objects and - * map builtin Java types to appropriate Cassandra types. - */ -public class PropertyMappingHelper { - /** Bytes array Class type. */ - private static final Class BYTES_ARRAY_CLASS = (new byte[] {}).getClass(); - - /** Mapping from Java to Cassandra types. */ - private static final Map<Class, DataType.Name> JAVA_TO_CASSANDRA_MAPPING = new HashMap<Class, DataType.Name>() {{ - put(String.class, DataType.Name.TEXT); - put(Integer.class, DataType.Name.INT); - put(int.class, DataType.Name.INT); - put(Short.class, DataType.Name.INT); - put(short.class, DataType.Name.INT); - put(Long.class, DataType.Name.BIGINT); - put(long.class, DataType.Name.BIGINT); - put(Double.class, DataType.Name.DOUBLE); - put(double.class, DataType.Name.DOUBLE); - put(Boolean.class, DataType.Name.BOOLEAN); - put(boolean.class, DataType.Name.BOOLEAN); - put(Float.class, DataType.Name.FLOAT); - put(float.class, DataType.Name.FLOAT); - put(ByteBuffer.class, DataType.Name.BLOB); - put(BYTES_ARRAY_CLASS, DataType.Name.BLOB); - put(BigDecimal.class, DataType.Name.DECIMAL); - put(InetAddress.class, DataType.Name.INET); - put(Date.class, DataType.Name.TIMESTAMP); - put(UUID.class, DataType.Name.UUID); - put(BigInteger.class, DataType.Name.VARINT); - }}; - - /** - * Maps Cassandra type to specified Java type. - * - * @param clazz java class. - * - * @return Cassandra type. - */ - public static DataType.Name getCassandraType(Class clazz) { - return JAVA_TO_CASSANDRA_MAPPING.get(clazz); - } - - /** - * Returns property descriptor by class property name. - * - * @param clazz class from which to get property descriptor. - * @param prop name of the property. - * - * @return property descriptor. - */ - public static PropertyDescriptor getPojoPropertyDescriptor(Class clazz, String prop) { - List<PropertyDescriptor> descriptors = getPojoPropertyDescriptors(clazz, false); - - if (descriptors == null || descriptors.isEmpty()) - throw new IllegalArgumentException("POJO class " + clazz.getName() + " doesn't have '" + prop + "' property"); - - for (PropertyDescriptor descriptor : descriptors) { - if (descriptor.getName().equals(prop)) - return descriptor; - } - - throw new IllegalArgumentException("POJO class " + clazz.getName() + " doesn't have '" + prop + "' property"); - } - - /** - * Extracts all property descriptors from a class. - * - * @param clazz class which property descriptors should be extracted. - * @param primitive boolean flag indicating that only property descriptors for primitive properties should be extracted. - * - * @return list of class property descriptors - */ - public static List<PropertyDescriptor> getPojoPropertyDescriptors(Class clazz, boolean primitive) { - return getPojoPropertyDescriptors(clazz, null, primitive); - } - - /** - * Extracts all property descriptors having specific annotation from a class. - * - * @param clazz class which property descriptors should be extracted. - * @param annotation annotation to look for. - * @param primitive boolean flag indicating that only property descriptors for primitive properties should be extracted. - * - * @return list of class property descriptors - */ - public static <T extends Annotation> List<PropertyDescriptor> getPojoPropertyDescriptors(Class clazz, - Class<T> annotation, boolean primitive) { - PropertyDescriptor[] descriptors = PropertyUtils.getPropertyDescriptors(clazz); - - List<PropertyDescriptor> list = new ArrayList<>(descriptors == null ? 1 : descriptors.length); - - if (descriptors == null || descriptors.length == 0) - return list; - - for (PropertyDescriptor descriptor : descriptors) { - if (descriptor.getReadMethod() == null || descriptor.getWriteMethod() == null || - (primitive && !isPrimitivePropertyDescriptor(descriptor))) - continue; - - if (annotation == null || descriptor.getReadMethod().getAnnotation(annotation) != null) - list.add(descriptor); - } - - return list; - } - - /** - * Checks if property descriptor describes primitive property (int, boolean, long and etc.) - * - * @param desc property descriptor. - * - * @return {@code true} property is primitive - */ - public static boolean isPrimitivePropertyDescriptor(PropertyDescriptor desc) { - return PropertyMappingHelper.JAVA_TO_CASSANDRA_MAPPING.containsKey(desc.getPropertyType()); - } - - /** - * Returns value of specific column in the row returned by CQL statement. - * - * @param row row returned by CQL statement. - * @param col column name. - * @param clazz java class to which column value should be casted. - * @param serializer serializer to use if column stores BLOB otherwise could be null. - * - * @return row column value. - */ - public static Object getCassandraColumnValue(Row row, String col, Class clazz, Serializer serializer) { - if (String.class.equals(clazz)) - return row.getString(col); - - if (Integer.class.equals(clazz) || int.class.equals(clazz)) - return row.getInt(col); - - if (Short.class.equals(clazz) || short.class.equals(clazz)) - return (short)row.getInt(col); - - if (Long.class.equals(clazz) || long.class.equals(clazz)) - return row.getLong(col); - - if (Double.class.equals(clazz) || double.class.equals(clazz)) - return row.getDouble(col); - - if (Boolean.class.equals(clazz) || boolean.class.equals(clazz)) - return row.getBool(col); - - if (Float.class.equals(clazz) || float.class.equals(clazz)) - return row.getFloat(col); - - if (ByteBuffer.class.equals(clazz)) - return row.getBytes(col); - - if (PropertyMappingHelper.BYTES_ARRAY_CLASS.equals(clazz)) { - ByteBuffer buf = row.getBytes(col); - - return buf == null ? null : buf.array(); - } - - if (BigDecimal.class.equals(clazz)) - return row.getDecimal(col); - - if (InetAddress.class.equals(clazz)) - return row.getInet(col); - - if (Date.class.equals(clazz)) - return row.getTimestamp(col); - - if (UUID.class.equals(clazz)) - return row.getUUID(col); - - if (BigInteger.class.equals(clazz)) - return row.getVarint(col); - - if (serializer == null) { - throw new IllegalStateException("Can't deserialize value from '" + col + "' Cassandra column, " + - "cause there is no BLOB serializer specified"); - } - - ByteBuffer buf = row.getBytes(col); - - return buf == null ? null : serializer.deserialize(buf); - } -} http://git-wip-us.apache.org/repos/asf/ignite/blob/231ead01/modules/cassandra/src/main/java/org/apache/ignite/cache/store/cassandra/common/RandomSleeper.java ---------------------------------------------------------------------- diff --git a/modules/cassandra/src/main/java/org/apache/ignite/cache/store/cassandra/common/RandomSleeper.java b/modules/cassandra/src/main/java/org/apache/ignite/cache/store/cassandra/common/RandomSleeper.java deleted file mode 100644 index 6745a16..0000000 --- a/modules/cassandra/src/main/java/org/apache/ignite/cache/store/cassandra/common/RandomSleeper.java +++ /dev/null @@ -1,104 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.ignite.cache.store.cassandra.common; - -import java.util.Random; -import org.apache.ignite.IgniteException; -import org.apache.ignite.IgniteLogger; - -/** - * Provides sleep method with randomly selected sleep time from specified range and - * incrementally shifts sleep time range for each next sleep attempt - * - */ -public class RandomSleeper { - /** */ - private int min; - - /** */ - private int max; - - /** */ - private int incr; - - /** */ - private IgniteLogger log; - - /** */ - private Random random = new Random(System.currentTimeMillis()); - - /** */ - private int summary = 0; - - /** - * Creates sleeper instance. - * - * @param min minimum sleep time (in milliseconds) - * @param max maximum sleep time (in milliseconds) - * @param incr time range shift increment (in milliseconds) - */ - public RandomSleeper(int min, int max, int incr, IgniteLogger log) { - if (min <= 0) - throw new IllegalArgumentException("Incorrect min time specified: " + min); - - if (max <= min) - throw new IllegalArgumentException("Incorrect max time specified: " + max); - - if (incr < 10) - throw new IllegalArgumentException("Incorrect increment specified: " + incr); - - this.min = min; - this.max = max; - this.incr = incr; - this.log = log; - } - - /** - * Sleeps - */ - public void sleep() { - try { - int timeout = random.nextInt(max - min + 1) + min; - - if (log != null) - log.info("Sleeping for " + timeout + "ms"); - - Thread.sleep(timeout); - - summary += timeout; - - if (log != null) - log.info("Sleep completed"); - } - catch (InterruptedException e) { - throw new IgniteException("Random sleep interrupted", e); - } - - min += incr; - max += incr; - } - - /** - * Returns summary sleep time. - * - * @return Summary sleep time in milliseconds. - */ - public int getSleepSummary() { - return summary; - } -} http://git-wip-us.apache.org/repos/asf/ignite/blob/231ead01/modules/cassandra/src/main/java/org/apache/ignite/cache/store/cassandra/common/SystemHelper.java ---------------------------------------------------------------------- diff --git a/modules/cassandra/src/main/java/org/apache/ignite/cache/store/cassandra/common/SystemHelper.java b/modules/cassandra/src/main/java/org/apache/ignite/cache/store/cassandra/common/SystemHelper.java deleted file mode 100644 index 5d51488..0000000 --- a/modules/cassandra/src/main/java/org/apache/ignite/cache/store/cassandra/common/SystemHelper.java +++ /dev/null @@ -1,46 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.ignite.cache.store.cassandra.common; - -import java.net.InetAddress; -import java.net.UnknownHostException; - -/** - * Helper class providing system information about the host (ip, hostname, os and etc.) - */ -public class SystemHelper { - /** System line separator. */ - public static final String LINE_SEPARATOR = System.getProperty("line.separator"); - - /** Host name. */ - public static final String HOST_NAME; - - /** Host IP address */ - public static final String HOST_IP; - - static { - try { - InetAddress addr = InetAddress.getLocalHost(); - HOST_NAME = addr.getHostName(); - HOST_IP = addr.getHostAddress(); - } - catch (UnknownHostException e) { - throw new IllegalStateException("Failed to get host/ip of current computer", e); - } - } -} http://git-wip-us.apache.org/repos/asf/ignite/blob/231ead01/modules/cassandra/src/main/java/org/apache/ignite/cache/store/cassandra/common/package-info.java ---------------------------------------------------------------------- diff --git a/modules/cassandra/src/main/java/org/apache/ignite/cache/store/cassandra/common/package-info.java b/modules/cassandra/src/main/java/org/apache/ignite/cache/store/cassandra/common/package-info.java deleted file mode 100644 index c4f5d3b..0000000 --- a/modules/cassandra/src/main/java/org/apache/ignite/cache/store/cassandra/common/package-info.java +++ /dev/null @@ -1,21 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -/** - * Contains commonly used helper classes - */ -package org.apache.ignite.cache.store.cassandra.common; \ No newline at end of file http://git-wip-us.apache.org/repos/asf/ignite/blob/231ead01/modules/cassandra/src/main/java/org/apache/ignite/cache/store/cassandra/datasource/Credentials.java ---------------------------------------------------------------------- diff --git a/modules/cassandra/src/main/java/org/apache/ignite/cache/store/cassandra/datasource/Credentials.java b/modules/cassandra/src/main/java/org/apache/ignite/cache/store/cassandra/datasource/Credentials.java deleted file mode 100644 index a2358a6..0000000 --- a/modules/cassandra/src/main/java/org/apache/ignite/cache/store/cassandra/datasource/Credentials.java +++ /dev/null @@ -1,39 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.ignite.cache.store.cassandra.datasource; - -import java.io.Serializable; - -/** - * Provides credentials for Cassandra (instead of specifying user/password directly in Spring context XML). - */ -public interface Credentials extends Serializable { - /** - * Returns user name - * - * @return user name - */ - public String getUser(); - - /** - * Returns password - * - * @return password - */ - public String getPassword(); -} http://git-wip-us.apache.org/repos/asf/ignite/blob/231ead01/modules/cassandra/src/main/java/org/apache/ignite/cache/store/cassandra/datasource/DataSource.java ---------------------------------------------------------------------- diff --git a/modules/cassandra/src/main/java/org/apache/ignite/cache/store/cassandra/datasource/DataSource.java b/modules/cassandra/src/main/java/org/apache/ignite/cache/store/cassandra/datasource/DataSource.java deleted file mode 100644 index f582aac..0000000 --- a/modules/cassandra/src/main/java/org/apache/ignite/cache/store/cassandra/datasource/DataSource.java +++ /dev/null @@ -1,647 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.ignite.cache.store.cassandra.datasource; - -import com.datastax.driver.core.AuthProvider; -import com.datastax.driver.core.Cluster; -import com.datastax.driver.core.ConsistencyLevel; -import com.datastax.driver.core.NettyOptions; -import com.datastax.driver.core.PoolingOptions; -import com.datastax.driver.core.ProtocolOptions; -import com.datastax.driver.core.ProtocolVersion; -import com.datastax.driver.core.SSLOptions; -import com.datastax.driver.core.SocketOptions; -import com.datastax.driver.core.policies.AddressTranslator; -import com.datastax.driver.core.policies.LoadBalancingPolicy; -import com.datastax.driver.core.policies.ReconnectionPolicy; -import com.datastax.driver.core.policies.RetryPolicy; -import com.datastax.driver.core.policies.SpeculativeExecutionPolicy; - -import java.io.Externalizable; -import java.io.IOException; -import java.io.ObjectInput; -import java.io.ObjectOutput; -import java.io.Serializable; -import java.net.InetAddress; -import java.net.InetSocketAddress; -import java.util.LinkedList; -import java.util.List; -import java.util.UUID; - -import org.apache.ignite.IgniteException; -import org.apache.ignite.IgniteLogger; -import org.apache.ignite.cache.store.cassandra.session.CassandraSession; -import org.apache.ignite.cache.store.cassandra.session.CassandraSessionImpl; -import org.apache.ignite.internal.util.typedef.internal.U; - -/** - * Data source abstraction to specify configuration of the Cassandra session to be used. - */ -public class DataSource implements Externalizable { - /** */ - private static final long serialVersionUID = 0L; - - /** - * Null object, used as a replacement for those Cassandra connection options which - * don't support serialization (RetryPolicy, LoadBalancingPolicy and etc). - */ - private static final UUID NULL_OBJECT = UUID.fromString("45ffae47-3193-5910-84a2-048fe65735d9"); - - /** Number of rows to immediately fetch in CQL statement execution. */ - private Integer fetchSize; - - /** Consistency level for READ operations. */ - private ConsistencyLevel readConsistency; - - /** Consistency level for WRITE operations. */ - private ConsistencyLevel writeConsistency; - - /** Username to use for authentication. */ - private String user; - - /** Password to use for authentication. */ - private String pwd; - - /** Port to use for Cassandra connection. */ - private Integer port; - - /** List of contact points to connect to Cassandra cluster. */ - private List<InetAddress> contactPoints; - - /** List of contact points with ports to connect to Cassandra cluster. */ - private List<InetSocketAddress> contactPointsWithPorts; - - /** Maximum time to wait for schema agreement before returning from a DDL query. */ - private Integer maxSchemaAgreementWaitSeconds; - - /** The native protocol version to use. */ - private Integer protoVer; - - /** Compression to use for the transport. */ - private String compression; - - /** Use SSL for communications with Cassandra. */ - private Boolean useSSL; - - /** Enables metrics collection. */ - private Boolean collectMetrix; - - /** Enables JMX reporting of the metrics. */ - private Boolean jmxReporting; - - /** Credentials to use for authentication. */ - private Credentials creds; - - /** Load balancing policy to use. */ - private LoadBalancingPolicy loadBalancingPlc; - - /** Reconnection policy to use. */ - private ReconnectionPolicy reconnectionPlc; - - /** Retry policy to use. */ - private RetryPolicy retryPlc; - - /** Address translator to use. */ - private AddressTranslator addrTranslator; - - /** Speculative execution policy to use. */ - private SpeculativeExecutionPolicy speculativeExecutionPlc; - - /** Authentication provider to use. */ - private AuthProvider authProvider; - - /** SSL options to use. */ - private SSLOptions sslOptions; - - /** Connection pooling options to use. */ - private PoolingOptions poolingOptions; - - /** Socket options to use. */ - private SocketOptions sockOptions; - - /** Netty options to use for connection. */ - private NettyOptions nettyOptions; - - /** Cassandra session wrapper instance. */ - private volatile CassandraSession ses; - - /** - * Sets user name to use for authentication. - * - * @param user user name - */ - @SuppressWarnings("UnusedDeclaration") - public void setUser(String user) { - this.user = user; - - invalidate(); - } - - /** - * Sets password to use for authentication. - * - * @param pwd password - */ - @SuppressWarnings("UnusedDeclaration") - public void setPassword(String pwd) { - this.pwd = pwd; - - invalidate(); - } - - /** - * Sets port to use for Cassandra connection. - * - * @param port port - */ - @SuppressWarnings("UnusedDeclaration") - public void setPort(int port) { - this.port = port; - - invalidate(); - } - - /** - * Sets list of contact points to connect to Cassandra cluster. - * - * @param points contact points - */ - public void setContactPoints(String... points) { - if (points == null || points.length == 0) - return; - - for (String point : points) { - if (point.contains(":")) { - if (contactPointsWithPorts == null) - contactPointsWithPorts = new LinkedList<>(); - - String[] chunks = point.split(":"); - - try { - contactPointsWithPorts.add(InetSocketAddress.createUnresolved(chunks[0].trim(), Integer.parseInt(chunks[1].trim()))); - } - catch (Throwable e) { - throw new IllegalArgumentException("Incorrect contact point '" + point + "' specified for Cassandra cache storage", e); - } - } - else { - if (contactPoints == null) - contactPoints = new LinkedList<>(); - - try { - contactPoints.add(InetAddress.getByName(point)); - } - catch (Throwable e) { - throw new IllegalArgumentException("Incorrect contact point '" + point + "' specified for Cassandra cache storage", e); - } - } - } - - invalidate(); - } - - /** Sets maximum time to wait for schema agreement before returning from a DDL query. */ - @SuppressWarnings("UnusedDeclaration") - public void setMaxSchemaAgreementWaitSeconds(int seconds) { - maxSchemaAgreementWaitSeconds = seconds; - - invalidate(); - } - - /** - * Sets the native protocol version to use. - * - * @param ver version number - */ - @SuppressWarnings("UnusedDeclaration") - public void setProtocolVersion(int ver) { - protoVer = ver; - - invalidate(); - } - - /** - * Sets compression algorithm to use for the transport. - * - * @param compression Compression algorithm. - */ - @SuppressWarnings("UnusedDeclaration") - public void setCompression(String compression) { - this.compression = compression == null || compression.trim().isEmpty() ? null : compression.trim(); - - try { - if (this.compression != null) - ProtocolOptions.Compression.valueOf(this.compression); - } - catch (Throwable e) { - throw new IgniteException("Incorrect compression '" + compression + "' specified for Cassandra connection", e); - } - - invalidate(); - } - - /** - * Enables SSL for communications with Cassandra. - * - * @param use Flag to enable/disable SSL. - */ - @SuppressWarnings("UnusedDeclaration") - public void setUseSSL(boolean use) { - useSSL = use; - - invalidate(); - } - - /** - * Enables metrics collection. - * - * @param collect Flag to enable/disable metrics collection. - */ - @SuppressWarnings("UnusedDeclaration") - public void setCollectMetrix(boolean collect) { - collectMetrix = collect; - - invalidate(); - } - - /** - * Enables JMX reporting of the metrics. - * - * @param enableReporting Flag to enable/disable JMX reporting. - */ - @SuppressWarnings("UnusedDeclaration") - public void setJmxReporting(boolean enableReporting) { - jmxReporting = enableReporting; - - invalidate(); - } - - /** - * Sets number of rows to immediately fetch in CQL statement execution. - * - * @param size Number of rows to fetch. - */ - @SuppressWarnings("UnusedDeclaration") - public void setFetchSize(int size) { - fetchSize = size; - - invalidate(); - } - - /** - * Set consistency level for READ operations. - * - * @param level Consistency level. - */ - public void setReadConsistency(String level) { - readConsistency = parseConsistencyLevel(level); - - invalidate(); - } - - /** - * Set consistency level for WRITE operations. - * - * @param level Consistency level. - */ - public void setWriteConsistency(String level) { - writeConsistency = parseConsistencyLevel(level); - - invalidate(); - } - - /** - * Sets credentials to use for authentication. - * - * @param creds Credentials. - */ - public void setCredentials(Credentials creds) { - this.creds = creds; - - invalidate(); - } - - /** - * Sets load balancing policy. - * - * @param plc Load balancing policy. - */ - public void setLoadBalancingPolicy(LoadBalancingPolicy plc) { - loadBalancingPlc = plc; - - invalidate(); - } - - /** - * Sets reconnection policy. - * - * @param plc Reconnection policy. - */ - @SuppressWarnings("UnusedDeclaration") - public void setReconnectionPolicy(ReconnectionPolicy plc) { - reconnectionPlc = plc; - - invalidate(); - } - - /** - * Sets retry policy. - * - * @param plc Retry policy. - */ - @SuppressWarnings("UnusedDeclaration") - public void setRetryPolicy(RetryPolicy plc) { - retryPlc = plc; - - invalidate(); - } - - /** - * Sets address translator. - * - * @param translator Address translator. - */ - @SuppressWarnings("UnusedDeclaration") - public void setAddressTranslator(AddressTranslator translator) { - addrTranslator = translator; - - invalidate(); - } - - /** - * Sets speculative execution policy. - * - * @param plc Speculative execution policy. - */ - @SuppressWarnings("UnusedDeclaration") - public void setSpeculativeExecutionPolicy(SpeculativeExecutionPolicy plc) { - speculativeExecutionPlc = plc; - - invalidate(); - } - - /** - * Sets authentication provider. - * - * @param provider Authentication provider. - */ - @SuppressWarnings("UnusedDeclaration") - public void setAuthProvider(AuthProvider provider) { - authProvider = provider; - - invalidate(); - } - - /** - * Sets SSL options. - * - * @param options SSL options. - */ - @SuppressWarnings("UnusedDeclaration") - public void setSslOptions(SSLOptions options) { - sslOptions = options; - - invalidate(); - } - - /** - * Sets pooling options. - * - * @param options pooling options to use. - */ - @SuppressWarnings("UnusedDeclaration") - public void setPoolingOptions(PoolingOptions options) { - poolingOptions = options; - - invalidate(); - } - - /** - * Sets socket options to use. - * - * @param options Socket options. - */ - @SuppressWarnings("UnusedDeclaration") - public void setSocketOptions(SocketOptions options) { - sockOptions = options; - - invalidate(); - } - - /** - * Sets netty options to use. - * - * @param options netty options. - */ - @SuppressWarnings("UnusedDeclaration") - public void setNettyOptions(NettyOptions options) { - nettyOptions = options; - - invalidate(); - } - - /** - * Creates Cassandra session wrapper if it wasn't created yet and returns it - * - * @param log logger - * @return Cassandra session wrapper - */ - @SuppressWarnings("deprecation") - public synchronized CassandraSession session(IgniteLogger log) { - if (ses != null) - return ses; - - Cluster.Builder builder = Cluster.builder(); - - if (user != null) - builder = builder.withCredentials(user, pwd); - - if (port != null) - builder = builder.withPort(port); - - if (contactPoints != null) - builder = builder.addContactPoints(contactPoints); - - if (contactPointsWithPorts != null) - builder = builder.addContactPointsWithPorts(contactPointsWithPorts); - - if (maxSchemaAgreementWaitSeconds != null) - builder = builder.withMaxSchemaAgreementWaitSeconds(maxSchemaAgreementWaitSeconds); - - if (protoVer != null) - builder = builder.withProtocolVersion(ProtocolVersion.fromInt(protoVer)); - - if (compression != null) { - try { - builder = builder.withCompression(ProtocolOptions.Compression.valueOf(compression.trim().toLowerCase())); - } - catch (IllegalArgumentException e) { - throw new IgniteException("Incorrect compression option '" + compression + "' specified for Cassandra connection", e); - } - } - - if (useSSL != null && useSSL) - builder = builder.withSSL(); - - if (sslOptions != null) - builder = builder.withSSL(sslOptions); - - if (collectMetrix != null && !collectMetrix) - builder = builder.withoutMetrics(); - - if (jmxReporting != null && !jmxReporting) - builder = builder.withoutJMXReporting(); - - if (creds != null) - builder = builder.withCredentials(creds.getUser(), creds.getPassword()); - - if (loadBalancingPlc != null) - builder = builder.withLoadBalancingPolicy(loadBalancingPlc); - - if (reconnectionPlc != null) - builder = builder.withReconnectionPolicy(reconnectionPlc); - - if (retryPlc != null) - builder = builder.withRetryPolicy(retryPlc); - - if (addrTranslator != null) - builder = builder.withAddressTranslator(addrTranslator); - - if (speculativeExecutionPlc != null) - builder = builder.withSpeculativeExecutionPolicy(speculativeExecutionPlc); - - if (authProvider != null) - builder = builder.withAuthProvider(authProvider); - - if (poolingOptions != null) - builder = builder.withPoolingOptions(poolingOptions); - - if (sockOptions != null) - builder = builder.withSocketOptions(sockOptions); - - if (nettyOptions != null) - builder = builder.withNettyOptions(nettyOptions); - - return ses = new CassandraSessionImpl(builder, fetchSize, readConsistency, writeConsistency, log); - } - - /** {@inheritDoc} */ - @Override public void writeExternal(ObjectOutput out) throws IOException { - out.writeObject(fetchSize); - out.writeObject(readConsistency); - out.writeObject(writeConsistency); - U.writeString(out, user); - U.writeString(out, pwd); - out.writeObject(port); - out.writeObject(contactPoints); - out.writeObject(contactPointsWithPorts); - out.writeObject(maxSchemaAgreementWaitSeconds); - out.writeObject(protoVer); - U.writeString(out, compression); - out.writeObject(useSSL); - out.writeObject(collectMetrix); - out.writeObject(jmxReporting); - out.writeObject(creds); - writeObject(out, loadBalancingPlc); - writeObject(out, reconnectionPlc); - writeObject(out, addrTranslator); - writeObject(out, speculativeExecutionPlc); - writeObject(out, authProvider); - writeObject(out, sslOptions); - writeObject(out, poolingOptions); - writeObject(out, sockOptions); - writeObject(out, nettyOptions); - } - - /** {@inheritDoc} */ - @SuppressWarnings("unchecked") - @Override public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException { - fetchSize = (Integer)in.readObject(); - readConsistency = (ConsistencyLevel)in.readObject(); - writeConsistency = (ConsistencyLevel)in.readObject(); - user = U.readString(in); - pwd = U.readString(in); - port = (Integer)in.readObject(); - contactPoints = (List<InetAddress>)in.readObject(); - contactPointsWithPorts = (List<InetSocketAddress>)in.readObject(); - maxSchemaAgreementWaitSeconds = (Integer)in.readObject(); - protoVer = (Integer)in.readObject(); - compression = U.readString(in); - useSSL = (Boolean)in.readObject(); - collectMetrix = (Boolean)in.readObject(); - jmxReporting = (Boolean)in.readObject(); - creds = (Credentials)in.readObject(); - loadBalancingPlc = (LoadBalancingPolicy)readObject(in); - reconnectionPlc = (ReconnectionPolicy)readObject(in); - addrTranslator = (AddressTranslator)readObject(in); - speculativeExecutionPlc = (SpeculativeExecutionPolicy)readObject(in); - authProvider = (AuthProvider)readObject(in); - sslOptions = (SSLOptions)readObject(in); - poolingOptions = (PoolingOptions)readObject(in); - sockOptions = (SocketOptions)readObject(in); - nettyOptions = (NettyOptions)readObject(in); - } - - /** - * Helper method used to serialize class members - * @param out the stream to write the object to - * @param obj the object to be written - * @throws IOException Includes any I/O exceptions that may occur - */ - private void writeObject(ObjectOutput out, Object obj) throws IOException { - out.writeObject(obj == null || !(obj instanceof Serializable) ? NULL_OBJECT : obj); - } - - /** - * Helper method used to deserialize class members - * @param in the stream to read data from in order to restore the object - * @throws IOException Includes any I/O exceptions that may occur - * @throws ClassNotFoundException If the class for an object being restored cannot be found - * @return deserialized object - */ - private Object readObject(ObjectInput in) throws IOException, ClassNotFoundException { - Object obj = in.readObject(); - return NULL_OBJECT.equals(obj) ? null : obj; - } - - /** - * Parses consistency level provided as string. - * - * @param level consistency level string. - * - * @return consistency level. - */ - private ConsistencyLevel parseConsistencyLevel(String level) { - if (level == null) - return null; - - try { - return ConsistencyLevel.valueOf(level.trim().toUpperCase()); - } - catch (Throwable e) { - throw new IgniteException("Incorrect consistency level '" + level + "' specified for Cassandra connection", e); - } - } - - /** - * Invalidates session. - */ - private synchronized void invalidate() { - ses = null; - } -} http://git-wip-us.apache.org/repos/asf/ignite/blob/231ead01/modules/cassandra/src/main/java/org/apache/ignite/cache/store/cassandra/datasource/PlainCredentials.java ---------------------------------------------------------------------- diff --git a/modules/cassandra/src/main/java/org/apache/ignite/cache/store/cassandra/datasource/PlainCredentials.java b/modules/cassandra/src/main/java/org/apache/ignite/cache/store/cassandra/datasource/PlainCredentials.java deleted file mode 100644 index 46ebdc5..0000000 --- a/modules/cassandra/src/main/java/org/apache/ignite/cache/store/cassandra/datasource/PlainCredentials.java +++ /dev/null @@ -1,53 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.ignite.cache.store.cassandra.datasource; - -/** - * Simple implementation of {@link Credentials} which just uses its constructor to hold user/password values. - */ -public class PlainCredentials implements Credentials { - /** */ - private static final long serialVersionUID = 0L; - - /** User name. */ - private String user; - - /** User password. */ - private String pwd; - - /** - * Creates credentials object. - * - * @param user User name. - * @param pwd User password. - */ - public PlainCredentials(String user, String pwd) { - this.user = user; - this.pwd = pwd; - } - - /** {@inheritDoc} */ - @Override public String getUser() { - return user; - } - - /** {@inheritDoc} */ - @Override public String getPassword() { - return pwd; - } -} http://git-wip-us.apache.org/repos/asf/ignite/blob/231ead01/modules/cassandra/src/main/java/org/apache/ignite/cache/store/cassandra/datasource/package-info.java ---------------------------------------------------------------------- diff --git a/modules/cassandra/src/main/java/org/apache/ignite/cache/store/cassandra/datasource/package-info.java b/modules/cassandra/src/main/java/org/apache/ignite/cache/store/cassandra/datasource/package-info.java deleted file mode 100644 index d5003ae..0000000 --- a/modules/cassandra/src/main/java/org/apache/ignite/cache/store/cassandra/datasource/package-info.java +++ /dev/null @@ -1,21 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -/** - * Contains data source implementation - */ -package org.apache.ignite.cache.store.cassandra.datasource; \ No newline at end of file http://git-wip-us.apache.org/repos/asf/ignite/blob/231ead01/modules/cassandra/src/main/java/org/apache/ignite/cache/store/cassandra/package-info.java ---------------------------------------------------------------------- diff --git a/modules/cassandra/src/main/java/org/apache/ignite/cache/store/cassandra/package-info.java b/modules/cassandra/src/main/java/org/apache/ignite/cache/store/cassandra/package-info.java deleted file mode 100644 index 46f5635..0000000 --- a/modules/cassandra/src/main/java/org/apache/ignite/cache/store/cassandra/package-info.java +++ /dev/null @@ -1,21 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -/** - * Contains {@link org.apache.ignite.cache.store.CacheStore} implementation backed by Cassandra database - */ -package org.apache.ignite.cache.store.cassandra; \ No newline at end of file http://git-wip-us.apache.org/repos/asf/ignite/blob/231ead01/modules/cassandra/src/main/java/org/apache/ignite/cache/store/cassandra/persistence/KeyPersistenceSettings.java ---------------------------------------------------------------------- diff --git a/modules/cassandra/src/main/java/org/apache/ignite/cache/store/cassandra/persistence/KeyPersistenceSettings.java b/modules/cassandra/src/main/java/org/apache/ignite/cache/store/cassandra/persistence/KeyPersistenceSettings.java deleted file mode 100644 index 393dbe4..0000000 --- a/modules/cassandra/src/main/java/org/apache/ignite/cache/store/cassandra/persistence/KeyPersistenceSettings.java +++ /dev/null @@ -1,274 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.ignite.cache.store.cassandra.persistence; - -import java.beans.PropertyDescriptor; -import java.util.LinkedList; -import java.util.List; -import org.apache.ignite.cache.affinity.AffinityKeyMapped; -import org.apache.ignite.cache.store.cassandra.common.PropertyMappingHelper; -import org.w3c.dom.Element; -import org.w3c.dom.NodeList; - -/** - * Stores persistence settings for Ignite cache key - */ -public class KeyPersistenceSettings extends PersistenceSettings { - /** Partition key XML tag. */ - private static final String PARTITION_KEY_ELEMENT = "partitionKey"; - - /** Cluster key XML tag. */ - private static final String CLUSTER_KEY_ELEMENT = "clusterKey"; - - /** POJO field XML tag. */ - private static final String FIELD_ELEMENT = "field"; - - /** POJO fields. */ - private List<PojoField> fields = new LinkedList<>(); - - /** Partition key fields. */ - private List<PojoField> partKeyFields = new LinkedList<>(); - - /** Cluster key fields. */ - private List<PojoField> clusterKeyFields = new LinkedList<>(); - - /** - * Creates key persistence settings object based on it's XML configuration. - * - * @param el XML element storing key persistence settings - */ - public KeyPersistenceSettings(Element el) { - super(el); - - if (!PersistenceStrategy.POJO.equals(getStrategy())) - return; - - NodeList keyElem = el.getElementsByTagName(PARTITION_KEY_ELEMENT); - - Element partKeysNode = keyElem != null ? (Element) keyElem.item(0) : null; - - Element clusterKeysNode = el.getElementsByTagName(CLUSTER_KEY_ELEMENT) != null ? - (Element)el.getElementsByTagName(CLUSTER_KEY_ELEMENT).item(0) : null; - - if (partKeysNode == null && clusterKeysNode != null) { - throw new IllegalArgumentException("It's not allowed to specify cluster key fields mapping, but " + - "doesn't specify partition key mappings"); - } - - partKeyFields = detectFields(partKeysNode, getPartitionKeyDescriptors()); - - if (partKeyFields == null || partKeyFields.isEmpty()) { - throw new IllegalStateException("Failed to initialize partition key fields for class '" + - getJavaClass().getName() + "'"); - } - - clusterKeyFields = detectFields(clusterKeysNode, getClusterKeyDescriptors(partKeyFields)); - - fields = new LinkedList<>(); - fields.addAll(partKeyFields); - fields.addAll(clusterKeyFields); - - checkDuplicates(fields); - } - - /** {@inheritDoc} */ - @Override public List<PojoField> getFields() { - return fields; - } - - /** - * Returns Cassandra DDL for primary key. - * - * @return DDL statement. - */ - public String getPrimaryKeyDDL() { - StringBuilder partKey = new StringBuilder(); - - List<String> cols = getPartitionKeyColumns(); - for (String column : cols) { - if (partKey.length() != 0) - partKey.append(", "); - - partKey.append(column); - } - - StringBuilder clusterKey = new StringBuilder(); - - cols = getClusterKeyColumns(); - if (cols != null) { - for (String column : cols) { - if (clusterKey.length() != 0) - clusterKey.append(", "); - - clusterKey.append(column); - } - } - - return clusterKey.length() == 0 ? - " primary key ((" + partKey.toString() + "))" : - " primary key ((" + partKey.toString() + "), " + clusterKey.toString() + ")"; - } - - /** - * Returns Cassandra DDL for cluster key. - * - * @return Cluster key DDL. - */ - public String getClusteringDDL() { - StringBuilder builder = new StringBuilder(); - - for (PojoField field : clusterKeyFields) { - PojoKeyField.SortOrder sortOrder = ((PojoKeyField)field).getSortOrder(); - - if (sortOrder == null) - continue; - - if (builder.length() != 0) - builder.append(", "); - - boolean asc = PojoKeyField.SortOrder.ASC.equals(sortOrder); - - builder.append(field.getColumn()).append(" ").append(asc ? "asc" : "desc"); - } - - return builder.length() == 0 ? null : "clustering order by (" + builder.toString() + ")"; - } - - /** {@inheritDoc} */ - @Override protected String defaultColumnName() { - return "key"; - } - - /** - * Returns partition key columns of Cassandra table. - * - * @return List of column names. - */ - private List<String> getPartitionKeyColumns() { - List<String> cols = new LinkedList<>(); - - if (PersistenceStrategy.BLOB.equals(getStrategy()) || PersistenceStrategy.PRIMITIVE.equals(getStrategy())) { - cols.add(getColumn()); - return cols; - } - - if (partKeyFields != null) { - for (PojoField field : partKeyFields) - cols.add(field.getColumn()); - } - - return cols; - } - - /** - * Returns cluster key columns of Cassandra table. - * - * @return List of column names. - */ - private List<String> getClusterKeyColumns() { - List<String> cols = new LinkedList<>(); - - if (clusterKeyFields != null) { - for (PojoField field : clusterKeyFields) - cols.add(field.getColumn()); - } - - return cols; - } - - /** - * Extracts POJO fields specified in XML element. - * - * @param el XML element describing fields. - * @param descriptors POJO fields descriptors. - * @return List of {@code This} fields. - */ - private List<PojoField> detectFields(Element el, List<PropertyDescriptor> descriptors) { - List<PojoField> list = new LinkedList<>(); - - if (el == null && (descriptors == null || descriptors.isEmpty())) - return list; - - if (el == null) { - for (PropertyDescriptor descriptor : descriptors) - list.add(new PojoKeyField(descriptor)); - - return list; - } - - NodeList nodes = el.getElementsByTagName(FIELD_ELEMENT); - - int cnt = nodes == null ? 0 : nodes.getLength(); - - if (cnt == 0) { - throw new IllegalArgumentException("Incorrect configuration of Cassandra key persistence settings, " + - "no cluster key fields specified inside '" + PARTITION_KEY_ELEMENT + "/" + - CLUSTER_KEY_ELEMENT + "' element"); - } - - for (int i = 0; i < cnt; i++) { - PojoKeyField field = new PojoKeyField((Element)nodes.item(i), getJavaClass()); - - PropertyDescriptor desc = findPropertyDescriptor(descriptors, field.getName()); - - if (desc == null) { - throw new IllegalArgumentException("Specified POJO field '" + field.getName() + - "' doesn't exist in '" + getJavaClass().getName() + "' class"); - } - - list.add(field); - } - - return list; - } - - /** - * @return POJO field descriptors for partition key. - */ - private List<PropertyDescriptor> getPartitionKeyDescriptors() { - List<PropertyDescriptor> primitivePropDescriptors = PropertyMappingHelper.getPojoPropertyDescriptors(getJavaClass(), - AffinityKeyMapped.class, true); - - return primitivePropDescriptors != null && !primitivePropDescriptors.isEmpty() ? - primitivePropDescriptors : - PropertyMappingHelper.getPojoPropertyDescriptors(getJavaClass(), true); - } - - /** - * @return POJO field descriptors for cluster key. - */ - private List<PropertyDescriptor> getClusterKeyDescriptors(List<PojoField> partKeyFields) { - List<PropertyDescriptor> primitivePropDescriptors = - PropertyMappingHelper.getPojoPropertyDescriptors(getJavaClass(), true); - - if (primitivePropDescriptors == null || primitivePropDescriptors.isEmpty() || - partKeyFields.size() == primitivePropDescriptors.size()) - return null; - - for (PojoField field : partKeyFields) { - for (int i = 0; i < primitivePropDescriptors.size(); i++) { - if (primitivePropDescriptors.get(i).getName().equals(field.getName())) { - primitivePropDescriptors.remove(i); - break; - } - } - } - - return primitivePropDescriptors; - } -} http://git-wip-us.apache.org/repos/asf/ignite/blob/231ead01/modules/cassandra/src/main/java/org/apache/ignite/cache/store/cassandra/persistence/KeyValuePersistenceSettings.java ---------------------------------------------------------------------- diff --git a/modules/cassandra/src/main/java/org/apache/ignite/cache/store/cassandra/persistence/KeyValuePersistenceSettings.java b/modules/cassandra/src/main/java/org/apache/ignite/cache/store/cassandra/persistence/KeyValuePersistenceSettings.java deleted file mode 100644 index 2c43ed4..0000000 --- a/modules/cassandra/src/main/java/org/apache/ignite/cache/store/cassandra/persistence/KeyValuePersistenceSettings.java +++ /dev/null @@ -1,478 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.ignite.cache.store.cassandra.persistence; - -import java.io.BufferedReader; -import java.io.File; -import java.io.FileInputStream; -import java.io.IOException; -import java.io.InputStream; -import java.io.InputStreamReader; -import java.io.Serializable; -import java.io.StringReader; -import java.util.LinkedList; -import java.util.List; -import javax.xml.parsers.DocumentBuilder; -import javax.xml.parsers.DocumentBuilderFactory; -import org.apache.ignite.IgniteException; -import org.apache.ignite.cache.store.cassandra.common.SystemHelper; -import org.apache.ignite.internal.util.typedef.internal.U; -import org.springframework.core.io.Resource; -import org.w3c.dom.Document; -import org.w3c.dom.Element; -import org.w3c.dom.Node; -import org.w3c.dom.NodeList; -import org.xml.sax.InputSource; - -/** - * Stores persistence settings for Ignite cache key and value - */ -public class KeyValuePersistenceSettings implements Serializable { - /** - * Default Cassandra keyspace options which should be used to create new keyspace. - * <ul> - * <li> <b>SimpleStrategy</b> for replication work well for single data center Cassandra cluster.<br/> - * If your Cassandra cluster deployed across multiple data centers it's better to use <b>NetworkTopologyStrategy</b>. - * </li> - * <li> Three replicas will be created for each data block. </li> - * <li> Setting DURABLE_WRITES to true specifies that all data should be written to commit log. </li> - * </ul> - */ - private static final String DFLT_KEYSPACE_OPTIONS = "replication = {'class' : 'SimpleStrategy', " + - "'replication_factor' : 3} and durable_writes = true"; - - /** Xml attribute specifying Cassandra keyspace to use. */ - private static final String KEYSPACE_ATTR = "keyspace"; - - /** Xml attribute specifying Cassandra table to use. */ - private static final String TABLE_ATTR = "table"; - - /** Xml attribute specifying ttl (time to leave) for rows inserted in Cassandra. */ - private static final String TTL_ATTR = "ttl"; - - /** Root xml element containing persistence settings specification. */ - private static final String PERSISTENCE_NODE = "persistence"; - - /** Xml element specifying Cassandra keyspace options. */ - private static final String KEYSPACE_OPTIONS_NODE = "keyspaceOptions"; - - /** Xml element specifying Cassandra table options. */ - private static final String TABLE_OPTIONS_NODE = "tableOptions"; - - /** Xml element specifying Ignite cache key persistence settings. */ - private static final String KEY_PERSISTENCE_NODE = "keyPersistence"; - - /** Xml element specifying Ignite cache value persistence settings. */ - private static final String VALUE_PERSISTENCE_NODE = "valuePersistence"; - - /** TTL (time to leave) for rows inserted into Cassandra table {@link <a href="https://docs.datastax.com/en/cql/3.1/cql/cql_using/use_expire_c.html">Expiring data</a>}. */ - private Integer ttl; - - /** Cassandra keyspace (analog of tablespace in relational databases). */ - private String keyspace; - - /** Cassandra table. */ - private String tbl; - - /** Cassandra table creation options {@link <a href="https://docs.datastax.com/en/cql/3.0/cql/cql_reference/create_table_r.html">CREATE TABLE</a>}. */ - private String tblOptions; - - /** Cassandra keyspace creation options {@link <a href="https://docs.datastax.com/en/cql/3.0/cql/cql_reference/create_keyspace_r.html">CREATE KEYSPACE</a>}. */ - private String keyspaceOptions = DFLT_KEYSPACE_OPTIONS; - - /** Persistence settings for Ignite cache keys. */ - private KeyPersistenceSettings keyPersistenceSettings; - - /** Persistence settings for Ignite cache values. */ - private ValuePersistenceSettings valPersistenceSettings; - - /** - * Constructs Ignite cache key/value persistence settings. - * - * @param settings string containing xml with persistence settings for Ignite cache key/value - */ - @SuppressWarnings("UnusedDeclaration") - public KeyValuePersistenceSettings(String settings) { - init(settings); - } - - /** - * Constructs Ignite cache key/value persistence settings. - * - * @param settingsFile xml file with persistence settings for Ignite cache key/value - */ - @SuppressWarnings("UnusedDeclaration") - public KeyValuePersistenceSettings(File settingsFile) { - InputStream in; - - try { - in = new FileInputStream(settingsFile); - } - catch (IOException e) { - throw new IgniteException("Failed to get input stream for Cassandra persistence settings file: " + - settingsFile.getAbsolutePath(), e); - } - - init(loadSettings(in)); - } - - /** - * Constructs Ignite cache key/value persistence settings. - * - * @param settingsRsrc resource containing xml with persistence settings for Ignite cache key/value - */ - public KeyValuePersistenceSettings(Resource settingsRsrc) { - InputStream in; - - try { - in = settingsRsrc.getInputStream(); - } - catch (IOException e) { - throw new IgniteException("Failed to get input stream for Cassandra persistence settings resource: " + settingsRsrc, e); - } - - init(loadSettings(in)); - } - - /** - * Returns ttl to use for while inserting new rows into Cassandra table. - * - * @return ttl - */ - public Integer getTTL() { - return ttl; - } - - /** - * Returns Cassandra keyspace to use. - * - * @return keyspace. - */ - public String getKeyspace() { - return keyspace; - } - - /** - * Returns Cassandra table to use. - * - * @return table. - */ - public String getTable() { - return tbl; - } - - /** - * Returns full name of Cassandra table to use (including keyspace). - * - * @return full table name in format "keyspace.table". - */ - public String getTableFullName() - { - return keyspace + "." + tbl; - } - - /** - * Returns persistence settings for Ignite cache keys. - * - * @return keys persistence settings. - */ - public KeyPersistenceSettings getKeyPersistenceSettings() { - return keyPersistenceSettings; - } - - /** - * Returns persistence settings for Ignite cache values. - * - * @return values persistence settings. - */ - public ValuePersistenceSettings getValuePersistenceSettings() { - return valPersistenceSettings; - } - - /** - * Returns list of POJO fields to be mapped to Cassandra table columns. - * - * @return POJO fields list. - */ - @SuppressWarnings("UnusedDeclaration") - public List<PojoField> getFields() { - List<PojoField> fields = new LinkedList<>(); - - for (PojoField field : keyPersistenceSettings.getFields()) - fields.add(field); - - for (PojoField field : valPersistenceSettings.getFields()) - fields.add(field); - - return fields; - } - - /** - * Returns list of Ignite cache key POJO fields to be mapped to Cassandra table columns. - * - * @return POJO fields list. - */ - @SuppressWarnings("UnusedDeclaration") - public List<PojoField> getKeyFields() { - return keyPersistenceSettings.getFields(); - } - - /** - * Returns list of Ignite cache value POJO fields to be mapped to Cassandra table columns. - * - * @return POJO fields list. - */ - @SuppressWarnings("UnusedDeclaration") - public List<PojoField> getValueFields() { - return valPersistenceSettings.getFields(); - } - - /** - * Returns DDL statement to create Cassandra keyspace. - * - * @return Keyspace DDL statement. - */ - public String getKeyspaceDDLStatement() { - StringBuilder builder = new StringBuilder(); - builder.append("create keyspace if not exists ").append(keyspace); - - if (keyspaceOptions != null) { - if (!keyspaceOptions.trim().toLowerCase().startsWith("with")) - builder.append("\nwith"); - - builder.append(" ").append(keyspaceOptions); - } - - String statement = builder.toString().trim().replaceAll(" +", " "); - - return statement.endsWith(";") ? statement : statement + ";"; - } - - /** - * Returns DDL statement to create Cassandra table. - * - * @return Table DDL statement. - */ - public String getTableDDLStatement() { - String colsDDL = keyPersistenceSettings.getTableColumnsDDL() + ",\n" + valPersistenceSettings.getTableColumnsDDL(); - - String primaryKeyDDL = keyPersistenceSettings.getPrimaryKeyDDL(); - - String clusteringDDL = keyPersistenceSettings.getClusteringDDL(); - - String optionsDDL = tblOptions != null && !tblOptions.trim().isEmpty() ? tblOptions.trim() : ""; - - if (clusteringDDL != null && !clusteringDDL.isEmpty()) - optionsDDL = optionsDDL.isEmpty() ? clusteringDDL : optionsDDL + " and " + clusteringDDL; - - if (!optionsDDL.trim().isEmpty()) - optionsDDL = optionsDDL.trim().toLowerCase().startsWith("with") ? optionsDDL.trim() : "with " + optionsDDL.trim(); - - StringBuilder builder = new StringBuilder(); - - builder.append("create table if not exists ").append(keyspace).append(".").append(tbl); - builder.append("\n(\n").append(colsDDL).append(",\n").append(primaryKeyDDL).append("\n)"); - - if (!optionsDDL.isEmpty()) - builder.append(" \n").append(optionsDDL); - - String tblDDL = builder.toString().trim().replaceAll(" +", " "); - - return tblDDL.endsWith(";") ? tblDDL : tblDDL + ";"; - } - - /** - * Returns DDL statements to create Cassandra table secondary indexes. - * - * @return DDL statements to create secondary indexes. - */ - public List<String> getIndexDDLStatements() { - List<String> idxDDLs = new LinkedList<>(); - - List<PojoField> fields = valPersistenceSettings.getFields(); - - for (PojoField field : fields) { - if (((PojoValueField)field).isIndexed()) - idxDDLs.add(((PojoValueField)field).getIndexDDL(keyspace, tbl)); - } - - return idxDDLs; - } - - /** - * Loads Ignite cache persistence settings from resource. - * - * @param in Input stream. - * @return String containing xml with Ignite cache persistence settings. - */ - private String loadSettings(InputStream in) { - StringBuilder settings = new StringBuilder(); - BufferedReader reader = null; - - try { - reader = new BufferedReader(new InputStreamReader(in)); - - String line = reader.readLine(); - - while (line != null) { - if (settings.length() != 0) - settings.append(SystemHelper.LINE_SEPARATOR); - - settings.append(line); - - line = reader.readLine(); - } - } - catch (Throwable e) { - throw new IgniteException("Failed to read input stream for Cassandra persistence settings", e); - } - finally { - U.closeQuiet(reader); - U.closeQuiet(in); - } - - return settings.toString(); - } - - /** - * @param elem Element with data. - * @param attr Attribute name. - * @return Numeric value for specified attribute. - */ - private int extractIntAttribute(Element elem, String attr) { - String val = elem.getAttribute(attr).trim(); - - try { - return Integer.parseInt(val); - } - catch (NumberFormatException e) { - throw new IllegalArgumentException("Incorrect value '" + val + "' specified for '" + attr + "' attribute"); - } - } - - /** - * Initializes persistence settings from XML string. - * - * @param settings XML string containing Ignite cache persistence settings configuration. - */ - @SuppressWarnings("IfCanBeSwitch") - private void init(String settings) { - Document doc; - - try { - DocumentBuilderFactory factory = DocumentBuilderFactory.newInstance(); - DocumentBuilder builder = factory.newDocumentBuilder(); - doc = builder.parse(new InputSource(new StringReader(settings))); - } - catch (Throwable e) { - throw new IllegalArgumentException("Failed to parse persistence settings:" + - SystemHelper.LINE_SEPARATOR + settings, e); - } - - Element root = doc.getDocumentElement(); - - if (!PERSISTENCE_NODE.equals(root.getNodeName())) { - throw new IllegalArgumentException("Incorrect persistence settings specified. " + - "Root XML element should be 'persistence'"); - } - - if (!root.hasAttribute(KEYSPACE_ATTR)) { - throw new IllegalArgumentException("Incorrect persistence settings '" + KEYSPACE_ATTR + - "' attribute should be specified"); - } - - if (!root.hasAttribute(TABLE_ATTR)) { - throw new IllegalArgumentException("Incorrect persistence settings '" + TABLE_ATTR + - "' attribute should be specified"); - } - - keyspace = root.getAttribute(KEYSPACE_ATTR).trim(); - tbl = root.getAttribute(TABLE_ATTR).trim(); - - if (root.hasAttribute(TTL_ATTR)) - ttl = extractIntAttribute(root, TTL_ATTR); - - if (!root.hasChildNodes()) { - throw new IllegalArgumentException("Incorrect Cassandra persistence settings specification, " + - "there are no key and value persistence settings specified"); - } - - NodeList children = root.getChildNodes(); - int cnt = children.getLength(); - - for (int i = 0; i < cnt; i++) { - Node node = children.item(i); - - if (node.getNodeType() != Node.ELEMENT_NODE) - continue; - - Element el = (Element)node; - String nodeName = el.getNodeName(); - - if (nodeName.equals(TABLE_OPTIONS_NODE)) { - tblOptions = el.getTextContent(); - tblOptions = tblOptions.replace("\n", " ").replace("\r", "").replace("\t", " "); - } - else if (nodeName.equals(KEYSPACE_OPTIONS_NODE)) { - keyspaceOptions = el.getTextContent(); - keyspaceOptions = keyspaceOptions.replace("\n", " ").replace("\r", "").replace("\t", " "); - } - else if (nodeName.equals(KEY_PERSISTENCE_NODE)) - keyPersistenceSettings = new KeyPersistenceSettings(el); - else if (nodeName.equals(VALUE_PERSISTENCE_NODE)) - valPersistenceSettings = new ValuePersistenceSettings(el); - } - - if (keyPersistenceSettings == null) { - throw new IllegalArgumentException("Incorrect Cassandra persistence settings specification, " + - "there are no key persistence settings specified"); - } - - if (valPersistenceSettings == null) { - throw new IllegalArgumentException("Incorrect Cassandra persistence settings specification, " + - "there are no value persistence settings specified"); - } - - List<PojoField> keyFields = keyPersistenceSettings.getFields(); - List<PojoField> valFields = valPersistenceSettings.getFields(); - - if (PersistenceStrategy.POJO.equals(keyPersistenceSettings.getStrategy()) && - (keyFields == null || keyFields.isEmpty())) { - throw new IllegalArgumentException("Incorrect Cassandra persistence settings specification, " + - "there are no key fields found"); - } - - if (PersistenceStrategy.POJO.equals(valPersistenceSettings.getStrategy()) && - (valFields == null || valFields.isEmpty())) { - throw new IllegalArgumentException("Incorrect Cassandra persistence settings specification, " + - "there are no value fields found"); - } - - if (keyFields == null || keyFields.isEmpty() || valFields == null || valFields.isEmpty()) - return; - - for (PojoField keyField : keyFields) { - for (PojoField valField : valFields) { - if (keyField.getColumn().equals(valField.getColumn())) { - throw new IllegalArgumentException("Incorrect Cassandra persistence settings specification, " + - "key column '" + keyField.getColumn() + "' also specified as a value column"); - } - } - } - } -}