http://git-wip-us.apache.org/repos/asf/ignite/blob/231ead01/modules/cassandra/src/main/java/org/apache/ignite/cache/store/cassandra/persistence/PersistenceController.java ---------------------------------------------------------------------- diff --git a/modules/cassandra/src/main/java/org/apache/ignite/cache/store/cassandra/persistence/PersistenceController.java b/modules/cassandra/src/main/java/org/apache/ignite/cache/store/cassandra/persistence/PersistenceController.java deleted file mode 100644 index e734ca3..0000000 --- a/modules/cassandra/src/main/java/org/apache/ignite/cache/store/cassandra/persistence/PersistenceController.java +++ /dev/null @@ -1,421 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.ignite.cache.store.cassandra.persistence; - -import com.datastax.driver.core.BoundStatement; -import com.datastax.driver.core.PreparedStatement; -import com.datastax.driver.core.Row; -import java.nio.ByteBuffer; -import java.util.LinkedList; -import java.util.List; -import org.apache.ignite.IgniteException; -import org.apache.ignite.cache.store.cassandra.common.PropertyMappingHelper; -import org.apache.ignite.cache.store.cassandra.serializer.Serializer; - -/** - * Intermediate layer between persistent store (Cassandra) and Ignite cache key/value classes. - * Handles all the mappings to/from Java classes into Cassandra and responsible for all the details - * of how Java objects should be written/loaded to/from Cassandra. - */ -public class PersistenceController { - /** Ignite cache key/value persistence settings. */ - private KeyValuePersistenceSettings persistenceSettings; - - /** CQL statement to insert row into Cassandra table. */ - private String writeStatement; - - /** CQL statement to delete row from Cassandra table. */ - private String delStatement; - - /** CQL statement to select value fields from Cassandra table. */ - private String loadStatement; - - /** CQL statement to select key/value fields from Cassandra table. */ - private String loadStatementWithKeyFields; - - /** - * Constructs persistence controller from Ignite cache persistence settings. - * - * @param settings persistence settings. - */ - public PersistenceController(KeyValuePersistenceSettings settings) { - if (settings == null) - throw new IllegalArgumentException("Persistent settings can't be null"); - - this.persistenceSettings = settings; - } - - /** - * Returns Ignite cache persistence settings. - * - * @return persistence settings. - */ - public KeyValuePersistenceSettings getPersistenceSettings() { - return persistenceSettings; - } - - /** - * Returns Cassandra keyspace to use. - * - * @return keyspace. - */ - public String getKeyspace() { - return persistenceSettings.getKeyspace(); - } - - /** - * Returns Cassandra table to use. - * - * @return table. - */ - public String getTable() { - return persistenceSettings.getTable(); - } - - /** - * Returns CQL statement to insert row into Cassandra table. - * - * @return CQL statement. - */ - public String getWriteStatement() { - if (writeStatement != null) - return writeStatement; - - List<String> cols = getKeyValueColumns(); - - StringBuilder colsList = new StringBuilder(); - StringBuilder questionsList = new StringBuilder(); - - for (String column : cols) { - if (colsList.length() != 0) { - colsList.append(", "); - questionsList.append(","); - } - - colsList.append(column); - questionsList.append("?"); - } - - writeStatement = "insert into " + persistenceSettings.getKeyspace() + "." + persistenceSettings.getTable() + " (" + - colsList.toString() + ") values (" + questionsList.toString() + ")"; - - if (persistenceSettings.getTTL() != null) - writeStatement += " using ttl " + persistenceSettings.getTTL(); - - writeStatement += ";"; - - return writeStatement; - } - - /** - * Returns CQL statement to delete row from Cassandra table. - * - * @return CQL statement. - */ - public String getDeleteStatement() { - if (delStatement != null) - return delStatement; - - List<String> cols = getKeyColumns(); - - StringBuilder statement = new StringBuilder(); - - for (String column : cols) { - if (statement.length() != 0) - statement.append(" and "); - - statement.append(column).append("=?"); - } - - statement.append(";"); - - delStatement = "delete from " + - persistenceSettings.getKeyspace() + "." + - persistenceSettings.getTable() + " where " + - statement.toString(); - - return delStatement; - } - - /** - * Returns CQL statement to select key/value fields from Cassandra table. - * - * @param includeKeyFields whether to include/exclude key fields from the returned row. - * - * @return CQL statement. - */ - public String getLoadStatement(boolean includeKeyFields) { - if (loadStatement != null && loadStatementWithKeyFields != null) - return includeKeyFields ? loadStatementWithKeyFields : loadStatement; - - List<String> valCols = getValueColumns(); - - List<String> keyCols = getKeyColumns(); - - StringBuilder hdrWithKeyFields = new StringBuilder("select "); - - for (int i = 0; i < keyCols.size(); i++) { - if (i > 0) - hdrWithKeyFields.append(", "); - - hdrWithKeyFields.append(keyCols.get(i)); - } - - StringBuilder hdr = new StringBuilder("select "); - - for (int i = 0; i < valCols.size(); i++) { - if (i > 0) - hdr.append(", "); - - hdrWithKeyFields.append(","); - - hdr.append(valCols.get(i)); - hdrWithKeyFields.append(valCols.get(i)); - } - - StringBuilder statement = new StringBuilder(); - - statement.append(" from "); - statement.append(persistenceSettings.getKeyspace()); - statement.append(".").append(persistenceSettings.getTable()); - statement.append(" where "); - - for (int i = 0; i < keyCols.size(); i++) { - if (i > 0) - statement.append(" and "); - - statement.append(keyCols.get(i)).append("=?"); - } - - statement.append(";"); - - loadStatement = hdr.toString() + statement.toString(); - loadStatementWithKeyFields = hdrWithKeyFields.toString() + statement.toString(); - - return includeKeyFields ? loadStatementWithKeyFields : loadStatement; - } - - /** - * Binds Ignite cache key object to {@link com.datastax.driver.core.PreparedStatement}. - * - * @param statement statement to which key object should be bind. - * @param key key object. - * - * @return statement with bounded key. - */ - public BoundStatement bindKey(PreparedStatement statement, Object key) { - KeyPersistenceSettings settings = persistenceSettings.getKeyPersistenceSettings(); - - Object[] values = getBindingValues(settings.getStrategy(), - settings.getSerializer(), settings.getFields(), key); - - return statement.bind(values); - } - - /** - * Binds Ignite cache key and value object to {@link com.datastax.driver.core.PreparedStatement}. - * - * @param statement statement to which key and value object should be bind. - * @param key key object. - * @param val value object. - * - * @return statement with bounded key and value. - */ - public BoundStatement bindKeyValue(PreparedStatement statement, Object key, Object val) { - KeyPersistenceSettings keySettings = persistenceSettings.getKeyPersistenceSettings(); - Object[] keyValues = getBindingValues(keySettings.getStrategy(), - keySettings.getSerializer(), keySettings.getFields(), key); - - ValuePersistenceSettings valSettings = persistenceSettings.getValuePersistenceSettings(); - Object[] valValues = getBindingValues(valSettings.getStrategy(), - valSettings.getSerializer(), valSettings.getFields(), val); - - Object[] values = new Object[keyValues.length + valValues.length]; - - int i = 0; - - for (Object keyVal : keyValues) { - values[i] = keyVal; - i++; - } - - for (Object valVal : valValues) { - values[i] = valVal; - i++; - } - - return statement.bind(values); - } - - /** - * Builds Ignite cache key object from returned Cassandra table row. - * - * @param row Cassandra table row. - * - * @return key object. - */ - @SuppressWarnings("UnusedDeclaration") - public Object buildKeyObject(Row row) { - return buildObject(row, persistenceSettings.getKeyPersistenceSettings()); - } - - /** - * Builds Ignite cache value object from Cassandra table row . - * - * @param row Cassandra table row. - * - * @return value object. - */ - public Object buildValueObject(Row row) { - return buildObject(row, persistenceSettings.getValuePersistenceSettings()); - } - - /** - * Builds object from Cassandra table row. - * - * @param row Cassandra table row. - * @param settings persistence settings to use. - * - * @return object. - */ - private Object buildObject(Row row, PersistenceSettings settings) { - if (row == null) - return null; - - PersistenceStrategy stgy = settings.getStrategy(); - - Class clazz = settings.getJavaClass(); - - String col = settings.getColumn(); - - List<PojoField> fields = settings.getFields(); - - if (PersistenceStrategy.PRIMITIVE.equals(stgy)) - return PropertyMappingHelper.getCassandraColumnValue(row, col, clazz, null); - - if (PersistenceStrategy.BLOB.equals(stgy)) - return settings.getSerializer().deserialize(row.getBytes(col)); - - Object obj; - - try { - obj = clazz.newInstance(); - } - catch (Throwable e) { - throw new IgniteException("Failed to instantiate object of type '" + clazz.getName() + "' using reflection", e); - } - - for (PojoField field : fields) - field.setValueFromRow(row, obj, settings.getSerializer()); - - return obj; - } - - /** - * Extracts field values from POJO object and converts them into Java types - * which could be mapped to Cassandra types. - * - * @param stgy persistence strategy to use. - * @param serializer serializer to use for BLOBs. - * @param fields fields who's values should be extracted. - * @param obj object instance who's field values should be extracted. - * - * @return array of object field values converted into Java object instances having Cassandra compatible types - */ - private Object[] getBindingValues(PersistenceStrategy stgy, Serializer serializer, List<PojoField> fields, Object obj) { - if (PersistenceStrategy.PRIMITIVE.equals(stgy)) { - if (PropertyMappingHelper.getCassandraType(obj.getClass()) == null || - obj.getClass().equals(ByteBuffer.class) || obj instanceof byte[]) { - throw new IllegalArgumentException("Couldn't deserialize instance of class '" + - obj.getClass().getName() + "' using PRIMITIVE strategy. Please use BLOB strategy for this case."); - } - - return new Object[] {obj}; - } - - if (PersistenceStrategy.BLOB.equals(stgy)) - return new Object[] {serializer.serialize(obj)}; - - Object[] values = new Object[fields.size()]; - - int i = 0; - - for (PojoField field : fields) { - Object val = field.getValueFromObject(obj, serializer); - - if (val instanceof byte[]) - val = ByteBuffer.wrap((byte[]) val); - - values[i] = val; - - i++; - } - - return values; - } - - /** - * Returns list of Cassandra table columns mapped to Ignite cache key and value fields - * - * @return list of column names - */ - private List<String> getKeyValueColumns() { - List<String> cols = getKeyColumns(); - - cols.addAll(getValueColumns()); - - return cols; - } - - /** - * Returns list of Cassandra table columns mapped to Ignite cache key fields - * - * @return list of column names - */ - private List<String> getKeyColumns() { - return getColumns(persistenceSettings.getKeyPersistenceSettings()); - } - - /** - * Returns list of Cassandra table columns mapped to Ignite cache value fields - * - * @return list of column names - */ - private List<String> getValueColumns() { - return getColumns(persistenceSettings.getValuePersistenceSettings()); - } - - /** - * Returns list of Cassandra table columns based on persistence strategy to use - * - * @return list of column names - */ - private List<String> getColumns(PersistenceSettings settings) { - List<String> cols = new LinkedList<>(); - - if (!PersistenceStrategy.POJO.equals(settings.getStrategy())) { - cols.add(settings.getColumn()); - return cols; - } - - for (PojoField field : settings.getFields()) - cols.add(field.getColumn()); - - return cols; - } -}
http://git-wip-us.apache.org/repos/asf/ignite/blob/231ead01/modules/cassandra/src/main/java/org/apache/ignite/cache/store/cassandra/persistence/PersistenceSettings.java ---------------------------------------------------------------------- diff --git a/modules/cassandra/src/main/java/org/apache/ignite/cache/store/cassandra/persistence/PersistenceSettings.java b/modules/cassandra/src/main/java/org/apache/ignite/cache/store/cassandra/persistence/PersistenceSettings.java deleted file mode 100644 index 20d790a..0000000 --- a/modules/cassandra/src/main/java/org/apache/ignite/cache/store/cassandra/persistence/PersistenceSettings.java +++ /dev/null @@ -1,335 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.ignite.cache.store.cassandra.persistence; - -import com.datastax.driver.core.DataType; -import java.beans.PropertyDescriptor; -import java.io.Serializable; -import java.nio.ByteBuffer; -import java.util.List; -import org.apache.ignite.IgniteException; -import org.apache.ignite.cache.store.cassandra.common.PropertyMappingHelper; -import org.apache.ignite.cache.store.cassandra.serializer.JavaSerializer; -import org.apache.ignite.cache.store.cassandra.serializer.Serializer; -import org.w3c.dom.Element; - -/** - * Stores persistence settings, which describes how particular key/value - * from Ignite cache should be stored in Cassandra. - */ -public abstract class PersistenceSettings implements Serializable { - /** Xml attribute specifying persistence strategy. */ - private static final String STRATEGY_ATTR = "strategy"; - - /** Xml attribute specifying Cassandra column name. */ - private static final String COLUMN_ATTR = "column"; - - /** Xml attribute specifying BLOB serializer to use. */ - private static final String SERIALIZER_ATTR = "serializer"; - - /** Xml attribute specifying java class of the object to be persisted. */ - private static final String CLASS_ATTR = "class"; - - /** Persistence strategy to use. */ - private PersistenceStrategy stgy; - - /** Java class of the object to be persisted. */ - private Class javaCls; - - /** Cassandra table column name where object should be persisted in - * case of using BLOB or PRIMITIVE persistence strategy. */ - private String col; - - /** Serializer for BLOBs. */ - private Serializer serializer = new JavaSerializer(); - - /** - * Extracts property descriptor from the descriptors list by its name. - * - * @param descriptors descriptors list. - * @param propName property name. - * - * @return property descriptor. - */ - public static PropertyDescriptor findPropertyDescriptor(List<PropertyDescriptor> descriptors, String propName) { - if (descriptors == null || descriptors.isEmpty() || propName == null || propName.trim().isEmpty()) - return null; - - for (PropertyDescriptor descriptor : descriptors) { - if (descriptor.getName().equals(propName)) - return descriptor; - } - - return null; - } - - /** - * Constructs persistence settings from corresponding XML element. - * - * @param el xml element containing persistence settings configuration. - */ - @SuppressWarnings("unchecked") - public PersistenceSettings(Element el) { - if (el == null) - throw new IllegalArgumentException("DOM element representing key/value persistence object can't be null"); - - if (!el.hasAttribute(STRATEGY_ATTR)) { - throw new IllegalArgumentException("DOM element representing key/value persistence object should have '" + - STRATEGY_ATTR + "' attribute"); - } - - try { - stgy = PersistenceStrategy.valueOf(el.getAttribute(STRATEGY_ATTR).trim().toUpperCase()); - } - catch (IllegalArgumentException e) { - throw new IllegalArgumentException("Incorrect persistence strategy specified: " + el.getAttribute(STRATEGY_ATTR)); - } - - if (!el.hasAttribute(CLASS_ATTR) && !PersistenceStrategy.BLOB.equals(stgy)) { - throw new IllegalArgumentException("DOM element representing key/value persistence object should have '" + - CLASS_ATTR + "' attribute or have BLOB persistence strategy"); - } - - try { - javaCls = el.hasAttribute(CLASS_ATTR) ? getClassInstance(el.getAttribute(CLASS_ATTR).trim()) : null; - } - catch (Throwable e) { - throw new IllegalArgumentException("Incorrect java class specified '" + el.getAttribute(CLASS_ATTR) + "' " + - "for Cassandra persistence", e); - } - - if (!PersistenceStrategy.BLOB.equals(stgy) && - (ByteBuffer.class.equals(javaCls) || byte[].class.equals(javaCls))) { - throw new IllegalArgumentException("Java class '" + el.getAttribute(CLASS_ATTR) + "' " + - "specified could only be persisted using BLOB persistence strategy"); - } - - if (PersistenceStrategy.PRIMITIVE.equals(stgy) && - PropertyMappingHelper.getCassandraType(javaCls) == null) { - throw new IllegalArgumentException("Current implementation doesn't support persisting '" + - javaCls.getName() + "' object using PRIMITIVE strategy"); - } - - if (PersistenceStrategy.POJO.equals(stgy)) { - if (javaCls == null) - throw new IllegalStateException("Object java class should be specified for POJO persistence strategy"); - - try { - javaCls.getConstructor(); - } - catch (Throwable e) { - throw new IllegalArgumentException("Java class '" + javaCls.getName() + "' couldn't be used as POJO " + - "cause it doesn't have no arguments constructor", e); - } - } - - if (el.hasAttribute(COLUMN_ATTR)) { - if (!PersistenceStrategy.BLOB.equals(stgy) && !PersistenceStrategy.PRIMITIVE.equals(stgy)) { - throw new IllegalArgumentException("Incorrect configuration of Cassandra key/value persistence settings, " + - "'" + COLUMN_ATTR + "' attribute is only applicable for PRIMITIVE or BLOB strategy"); - } - - col = el.getAttribute(COLUMN_ATTR).trim(); - } - - if (el.hasAttribute(SERIALIZER_ATTR)) { - if (!PersistenceStrategy.BLOB.equals(stgy) && !PersistenceStrategy.POJO.equals(stgy)) { - throw new IllegalArgumentException("Incorrect configuration of Cassandra key/value persistence settings, " + - "'" + SERIALIZER_ATTR + "' attribute is only applicable for BLOB and POJO strategies"); - } - - Object obj = newObjectInstance(el.getAttribute(SERIALIZER_ATTR).trim()); - - if (!(obj instanceof Serializer)) { - throw new IllegalArgumentException("Incorrect configuration of Cassandra key/value persistence settings, " + - "serializer class '" + el.getAttribute(SERIALIZER_ATTR) + "' doesn't implement '" + - Serializer.class.getName() + "' interface"); - } - - serializer = (Serializer)obj; - } - - if ((PersistenceStrategy.BLOB.equals(stgy) || PersistenceStrategy.PRIMITIVE.equals(stgy)) && col == null) - col = defaultColumnName(); - } - - /** - * Returns java class of the object to be persisted. - * - * @return java class. - */ - public Class getJavaClass() { - return javaCls; - } - - /** - * Returns persistence strategy to use. - * - * @return persistence strategy. - */ - public PersistenceStrategy getStrategy() { - return stgy; - } - - /** - * Returns Cassandra table column name where object should be persisted in - * case of using BLOB or PRIMITIVE persistence strategy. - * - * @return column name. - */ - public String getColumn() { - return col; - } - - /** - * Returns serializer to be used for BLOBs. - * - * @return serializer. - */ - public Serializer getSerializer() { - return serializer; - } - - /** - * Returns list of POJO fields to be persisted. - * - * @return list of fields. - */ - public abstract List<PojoField> getFields(); - - /** - * Returns Cassandra table columns DDL, corresponding to POJO fields which should be persisted. - * - * @return DDL statement for Cassandra table fields - */ - public String getTableColumnsDDL() { - if (PersistenceStrategy.BLOB.equals(stgy)) - return " " + col + " " + DataType.Name.BLOB.toString(); - - if (PersistenceStrategy.PRIMITIVE.equals(stgy)) - return " " + col + " " + PropertyMappingHelper.getCassandraType(javaCls); - - StringBuilder builder = new StringBuilder(); - - for (PojoField field : getFields()) { - if (builder.length() > 0) - builder.append(",\n"); - - builder.append(" ").append(field.getColumnDDL()); - } - - if (builder.length() == 0) { - throw new IllegalStateException("There are no POJO fields found for '" + javaCls.toString() - + "' class to be presented as a Cassandra primary key"); - } - - return builder.toString(); - } - - /** - * Returns default name for Cassandra column (if it's not specified explicitly). - * - * @return column name - */ - protected abstract String defaultColumnName(); - - /** - * Checks if there are POJO filed with the same name or same Cassandra column specified in persistence settings - * - * @param fields list of fields to be persisted into Cassandra - */ - protected void checkDuplicates(List<PojoField> fields) { - if (fields == null || fields.isEmpty()) - return; - - for (PojoField field1 : fields) { - boolean sameNames = false; - boolean sameCols = false; - - for (PojoField field2 : fields) { - if (field1.getName().equals(field2.getName())) { - if (sameNames) { - throw new IllegalArgumentException("Incorrect Cassandra key persistence settings, " + - "two POJO fields with the same name '" + field1.getName() + "' specified"); - } - - sameNames = true; - } - - if (field1.getColumn().equals(field2.getColumn())) { - if (sameCols) { - throw new IllegalArgumentException("Incorrect Cassandra persistence settings, " + - "two POJO fields with the same column '" + field1.getColumn() + "' specified"); - } - - sameCols = true; - } - } - } - } - - /** - * Instantiates Class object for particular class - * - * @param clazz class name - * @return Class object - */ - private Class getClassInstance(String clazz) { - try { - return Class.forName(clazz); - } - catch (ClassNotFoundException ignored) { - } - - try { - return Class.forName(clazz, true, Thread.currentThread().getContextClassLoader()); - } - catch (ClassNotFoundException ignored) { - } - - try { - return Class.forName(clazz, true, PersistenceSettings.class.getClassLoader()); - } - catch (ClassNotFoundException ignored) { - } - - try { - return Class.forName(clazz, true, ClassLoader.getSystemClassLoader()); - } - catch (ClassNotFoundException ignored) { - } - - throw new IgniteException("Failed to load class '" + clazz + "' using reflection"); - } - - /** - * Creates new object instance of particular class - * - * @param clazz class name - * @return object - */ - private Object newObjectInstance(String clazz) { - try { - return getClassInstance(clazz).newInstance(); - } - catch (Throwable e) { - throw new IgniteException("Failed to instantiate class '" + clazz + "' using default constructor", e); - } - } - -} http://git-wip-us.apache.org/repos/asf/ignite/blob/231ead01/modules/cassandra/src/main/java/org/apache/ignite/cache/store/cassandra/persistence/PersistenceStrategy.java ---------------------------------------------------------------------- diff --git a/modules/cassandra/src/main/java/org/apache/ignite/cache/store/cassandra/persistence/PersistenceStrategy.java b/modules/cassandra/src/main/java/org/apache/ignite/cache/store/cassandra/persistence/PersistenceStrategy.java deleted file mode 100644 index 4b1e2d8..0000000 --- a/modules/cassandra/src/main/java/org/apache/ignite/cache/store/cassandra/persistence/PersistenceStrategy.java +++ /dev/null @@ -1,62 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.ignite.cache.store.cassandra.persistence; - -/** - * Describes persistence strategy to be used to persist object data into Cassandra. - */ -public enum PersistenceStrategy { - /** - * Stores object value as is, by mapping its value to Cassandra table column with corresponding type. - * <p> - * Could be used for primitive java type (like Integer, String, Long and etc) which could be directly mapped - * to appropriate Cassandra types. - */ - PRIMITIVE, - - /** - * Stores object value as BLOB, by mapping its value to Cassandra table column with blob type. - * Could be used for any java type. Conversion of java object to BLOB is handled by specified serializer. - * <p> - * Available serializer implementations: - * <ul> - * <li> - * org.apache.ignite.cache.store.cassandra.serializer.JavaSerializer - uses standard Java - * serialization framework. - * </li> - * <li> - * org.apache.ignite.cache.store.cassandra.serializer.KryoSerializer - uses Kryo serialization - * framework. - * </li> - * </ul> - */ - BLOB, - - /** - * Stores each field of an object as a column having corresponding type in Cassandra table. - * Provides ability to utilize Cassandra secondary indexes for object fields. - * <p> - * Could be used for objects which follow JavaBeans convention and having empty public constructor. - * Object fields should be: - * <ul> - * <li>Primitive java types like int, long, String and etc.</li> - * <li>Collections of primitive java types like List<Integer>, Map<Integer, String>, Set<Long></li> - * </ul> - */ - POJO -} http://git-wip-us.apache.org/repos/asf/ignite/blob/231ead01/modules/cassandra/src/main/java/org/apache/ignite/cache/store/cassandra/persistence/PojoField.java ---------------------------------------------------------------------- diff --git a/modules/cassandra/src/main/java/org/apache/ignite/cache/store/cassandra/persistence/PojoField.java b/modules/cassandra/src/main/java/org/apache/ignite/cache/store/cassandra/persistence/PojoField.java deleted file mode 100644 index af569fd..0000000 --- a/modules/cassandra/src/main/java/org/apache/ignite/cache/store/cassandra/persistence/PojoField.java +++ /dev/null @@ -1,219 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.ignite.cache.store.cassandra.persistence; - -import com.datastax.driver.core.DataType; -import com.datastax.driver.core.Row; -import java.beans.PropertyDescriptor; -import java.io.Serializable; -import org.apache.ignite.IgniteException; -import org.apache.ignite.cache.query.annotations.QuerySqlField; -import org.apache.ignite.cache.store.cassandra.common.PropertyMappingHelper; -import org.apache.ignite.cache.store.cassandra.serializer.Serializer; -import org.w3c.dom.Element; - -/** - * Descriptor for particular field in a POJO object, specifying how this field - * should be written to or loaded from Cassandra. - */ -public abstract class PojoField implements Serializable { - /** Name attribute of XML element describing Pojo field. */ - private static final String NAME_ATTR = "name"; - - /** Column attribute of XML element describing Pojo field. */ - private static final String COLUMN_ATTR = "column"; - - /** Field name. */ - private String name; - - /** Java class to which the field belongs. */ - private Class javaCls; - - /** Field column name in Cassandra table. */ - private String col; - - /** Field column DDL. */ - private String colDDL; - - /** Field property descriptor. */ - private transient PropertyDescriptor desc; - - /** - * Creates instance of {@link PojoField} based on it's description in XML element. - * - * @param el XML element describing Pojo field - * @param pojoCls Pojo java class. - */ - public PojoField(Element el, Class<?> pojoCls) { - if (el == null) - throw new IllegalArgumentException("DOM element representing POJO field object can't be null"); - - if (!el.hasAttribute(NAME_ATTR)) { - throw new IllegalArgumentException("DOM element representing POJO field object should have '" - + NAME_ATTR + "' attribute"); - } - - this.name = el.getAttribute(NAME_ATTR).trim(); - this.col = el.hasAttribute(COLUMN_ATTR) ? el.getAttribute(COLUMN_ATTR).trim() : name.toLowerCase(); - - init(PropertyMappingHelper.getPojoPropertyDescriptor(pojoCls, name)); - } - - /** - * Creates instance of {@link PojoField} from its property descriptor. - * - * @param desc Field property descriptor. - */ - public PojoField(PropertyDescriptor desc) { - this.name = desc.getName(); - - QuerySqlField sqlField = desc.getReadMethod() != null ? - desc.getReadMethod().getAnnotation(QuerySqlField.class) : - desc.getWriteMethod() == null ? - null : - desc.getWriteMethod().getAnnotation(QuerySqlField.class); - - this.col = sqlField != null && sqlField.name() != null ? sqlField.name() : name.toLowerCase(); - - init(desc); - - if (sqlField != null) - init(sqlField); - } - - /** - * @return field name. - */ - public String getName() { - return name; - } - - /** - * @return Cassandra table column name. - */ - public String getColumn() { - return col; - } - - /** - * @return Cassandra table column DDL statement. - */ - public String getColumnDDL() { - return colDDL; - } - - /** - * Gets field value as an object having Cassandra compatible type. - * This it could be stored directly into Cassandra without any conversions. - * - * @param obj Object instance. - * @param serializer {@link org.apache.ignite.cache.store.cassandra.serializer.Serializer} to use. - * @return Object to store in Cassandra table column. - */ - public Object getValueFromObject(Object obj, Serializer serializer) { - try { - Object val = propDesc().getReadMethod().invoke(obj); - - if (val == null) - return null; - - DataType.Name cassandraType = PropertyMappingHelper.getCassandraType(val.getClass()); - - if (cassandraType != null) - return val; - - if (serializer == null) { - throw new IllegalStateException("Can't serialize value from object '" + - val.getClass().getName() + "' field '" + name + "', cause there is no BLOB serializer specified"); - } - - return serializer.serialize(val); - } - catch (Throwable e) { - throw new IgniteException("Failed to get value of the field '" + name + "' from the instance " + - " of '" + obj.getClass().toString() + "' class", e); - } - } - - /** - * Sets object field value from a {@link com.datastax.driver.core.Row} returned by Cassandra CQL statement. - * - * @param row {@link com.datastax.driver.core.Row} - * @param obj object which field should be populated from {@link com.datastax.driver.core.Row} - * @param serializer {@link org.apache.ignite.cache.store.cassandra.serializer.Serializer} to use. - */ - public void setValueFromRow(Row row, Object obj, Serializer serializer) { - Object val = PropertyMappingHelper.getCassandraColumnValue(row, col, propDesc().getPropertyType(), serializer); - - try { - propDesc().getWriteMethod().invoke(obj, val); - } - catch (Throwable e) { - throw new IgniteException("Failed to set value of the field '" + name + "' of the instance " + - " of '" + obj.getClass().toString() + "' class", e); - } - } - - /** - * Initializes field info from annotation. - * - * @param sqlField {@link QuerySqlField} annotation. - */ - protected abstract void init(QuerySqlField sqlField); - - /** - * Initializes field info from property descriptor. - * - * @param desc {@link PropertyDescriptor} descriptor. - */ - protected void init(PropertyDescriptor desc) { - if (desc.getReadMethod() == null) { - throw new IllegalArgumentException("Field '" + desc.getName() + - "' of the class instance '" + desc.getPropertyType().getName() + - "' doesn't provide getter method"); - } - - if (desc.getWriteMethod() == null) { - throw new IllegalArgumentException("Field '" + desc.getName() + - "' of POJO object instance of the class '" + desc.getPropertyType().getName() + - "' doesn't provide write method"); - } - - if (!desc.getReadMethod().isAccessible()) - desc.getReadMethod().setAccessible(true); - - if (!desc.getWriteMethod().isAccessible()) - desc.getWriteMethod().setAccessible(true); - - DataType.Name cassandraType = PropertyMappingHelper.getCassandraType(desc.getPropertyType()); - cassandraType = cassandraType == null ? DataType.Name.BLOB : cassandraType; - - this.javaCls = desc.getReadMethod().getDeclaringClass(); - this.desc = desc; - this.colDDL = col + " " + cassandraType.toString(); - } - - /** - * Returns property descriptor of the POJO field - * - * @return Property descriptor - */ - private PropertyDescriptor propDesc() { - return desc != null ? desc : (desc = PropertyMappingHelper.getPojoPropertyDescriptor(javaCls, name)); - } -} http://git-wip-us.apache.org/repos/asf/ignite/blob/231ead01/modules/cassandra/src/main/java/org/apache/ignite/cache/store/cassandra/persistence/PojoKeyField.java ---------------------------------------------------------------------- diff --git a/modules/cassandra/src/main/java/org/apache/ignite/cache/store/cassandra/persistence/PojoKeyField.java b/modules/cassandra/src/main/java/org/apache/ignite/cache/store/cassandra/persistence/PojoKeyField.java deleted file mode 100644 index 4e86d74..0000000 --- a/modules/cassandra/src/main/java/org/apache/ignite/cache/store/cassandra/persistence/PojoKeyField.java +++ /dev/null @@ -1,91 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.ignite.cache.store.cassandra.persistence; - -import java.beans.PropertyDescriptor; -import org.apache.ignite.cache.query.annotations.QuerySqlField; -import org.w3c.dom.Element; - -/** - * Descriptor for Ignite key POJO class - */ -public class PojoKeyField extends PojoField { - - /** - * Specifies sort order for POJO key field - */ - public enum SortOrder { - /** Ascending sort order. */ - ASC, - /** Descending sort order. */ - DESC - } - - /** Xml attribute specifying sort order. */ - private static final String SORT_ATTR = "sort"; - - /** Sort order. */ - private SortOrder sortOrder = null; - - /** - * Constructs Ignite cache key POJO object descriptor. - * - * @param el xml configuration element. - * @param pojoCls java class of key POJO field. - */ - public PojoKeyField(Element el, Class pojoCls) { - super(el, pojoCls); - - if (el.hasAttribute(SORT_ATTR)) { - try { - sortOrder = SortOrder.valueOf(el.getAttribute(SORT_ATTR).trim().toUpperCase()); - } - catch (IllegalArgumentException e) { - throw new IllegalArgumentException("Incorrect sort order '" + el.getAttribute(SORT_ATTR) + "' specified"); - } - } - } - - /** - * Constructs Ignite cache key POJO object descriptor. - * - * @param desc property descriptor. - */ - public PojoKeyField(PropertyDescriptor desc) { - super(desc); - } - - /** - * Returns sort order for the field. - * - * @return sort order. - */ - public SortOrder getSortOrder() { - return sortOrder; - } - - /** - * Initializes descriptor from {@link QuerySqlField} annotation. - * - * @param sqlField {@link QuerySqlField} annotation. - */ - protected void init(QuerySqlField sqlField) { - if (sqlField.descending()) - sortOrder = SortOrder.DESC; - } -} http://git-wip-us.apache.org/repos/asf/ignite/blob/231ead01/modules/cassandra/src/main/java/org/apache/ignite/cache/store/cassandra/persistence/PojoValueField.java ---------------------------------------------------------------------- diff --git a/modules/cassandra/src/main/java/org/apache/ignite/cache/store/cassandra/persistence/PojoValueField.java b/modules/cassandra/src/main/java/org/apache/ignite/cache/store/cassandra/persistence/PojoValueField.java deleted file mode 100644 index c29f1db..0000000 --- a/modules/cassandra/src/main/java/org/apache/ignite/cache/store/cassandra/persistence/PojoValueField.java +++ /dev/null @@ -1,152 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.ignite.cache.store.cassandra.persistence; - -import java.beans.PropertyDescriptor; -import org.apache.ignite.cache.query.annotations.QuerySqlField; -import org.w3c.dom.Element; - -/** - * Descriptor for Ignite value POJO class - */ -public class PojoValueField extends PojoField { - /** Xml attribute specifying that Cassandra column is static. */ - private static final String STATIC_ATTR = "static"; - - /** Xml attribute specifying that secondary index should be created for Cassandra column. */ - private static final String INDEX_ATTR = "index"; - - /** Xml attribute specifying secondary index custom class. */ - private static final String INDEX_CLASS_ATTR = "indexClass"; - - /** Xml attribute specifying secondary index options. */ - private static final String INDEX_OPTIONS_ATTR = "indexOptions"; - - /** Indicates if Cassandra column should be indexed. */ - private Boolean isIndexed; - - /** Custom java class for Cassandra secondary index. */ - private String idxCls; - - /** Secondary index options. */ - private String idxOptions; - - /** Indicates if Cassandra column is static. */ - private Boolean isStatic; - - /** - * Constructs Ignite cache value field descriptor. - * - * @param el field descriptor xml configuration element. - * @param pojoCls field java class - */ - public PojoValueField(Element el, Class pojoCls) { - super(el, pojoCls); - - if (el.hasAttribute(STATIC_ATTR)) - isStatic = Boolean.parseBoolean(el.getAttribute(STATIC_ATTR).trim().toLowerCase()); - - if (el.hasAttribute(INDEX_ATTR)) - isIndexed = Boolean.parseBoolean(el.getAttribute(INDEX_ATTR).trim().toLowerCase()); - - if (el.hasAttribute(INDEX_CLASS_ATTR)) - idxCls = el.getAttribute(INDEX_CLASS_ATTR).trim(); - - if (el.hasAttribute(INDEX_OPTIONS_ATTR)) { - idxOptions = el.getAttribute(INDEX_OPTIONS_ATTR).trim(); - - if (!idxOptions.toLowerCase().startsWith("with")) { - idxOptions = idxOptions.toLowerCase().startsWith("options") ? - "with " + idxOptions : - "with options = " + idxOptions; - } - } - } - - /** - * Constructs Ignite cache value field descriptor. - * - * @param desc field property descriptor. - */ - public PojoValueField(PropertyDescriptor desc) { - super(desc); - } - - /** - * Returns DDL for Cassandra columns corresponding to POJO field. - * - * @return columns DDL. - */ - public String getColumnDDL() { - String colDDL = super.getColumnDDL(); - - if (isStatic != null && isStatic) - colDDL = colDDL + " static"; - - return colDDL; - } - - /** - * Indicates if secondary index should be created for the field. - * - * @return true/false if secondary index should/shouldn't be created for the field. - */ - public boolean isIndexed() { - return isIndexed != null && isIndexed; - } - - /** - * Returns DDL for the field secondary index. - * - * @param keyspace Cassandra keyspace where index should be created. - * @param tbl Cassandra table for which secondary index should be created. - * - * @return secondary index DDL. - */ - public String getIndexDDL(String keyspace, String tbl) { - if (isIndexed == null || !isIndexed) - return null; - - StringBuilder builder = new StringBuilder(); - - if (idxCls != null) - builder.append("create custom index if not exists on ").append(keyspace).append(".").append(tbl); - else - builder.append("create index if not exists on ").append(keyspace).append(".").append(tbl); - - builder.append(" (").append(getColumn()).append(")"); - - if (idxCls != null) - builder.append(" using '").append(idxCls).append("'"); - - if (idxOptions != null) - builder.append(" ").append(idxOptions); - - return builder.append(";").toString(); - } - - /** - * Initializes descriptor from {@link QuerySqlField} annotation. - * - * @param sqlField {@link QuerySqlField} annotation. - */ - protected void init(QuerySqlField sqlField) { - if (sqlField.index()) - isIndexed = true; - } -} http://git-wip-us.apache.org/repos/asf/ignite/blob/231ead01/modules/cassandra/src/main/java/org/apache/ignite/cache/store/cassandra/persistence/ValuePersistenceSettings.java ---------------------------------------------------------------------- diff --git a/modules/cassandra/src/main/java/org/apache/ignite/cache/store/cassandra/persistence/ValuePersistenceSettings.java b/modules/cassandra/src/main/java/org/apache/ignite/cache/store/cassandra/persistence/ValuePersistenceSettings.java deleted file mode 100644 index 877167d..0000000 --- a/modules/cassandra/src/main/java/org/apache/ignite/cache/store/cassandra/persistence/ValuePersistenceSettings.java +++ /dev/null @@ -1,107 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.ignite.cache.store.cassandra.persistence; - -import java.beans.PropertyDescriptor; -import java.util.Collections; -import java.util.LinkedList; -import java.util.List; -import org.apache.ignite.cache.store.cassandra.common.PropertyMappingHelper; -import org.w3c.dom.Element; -import org.w3c.dom.NodeList; - -/** - * Stores persistence settings for Ignite cache value - */ -public class ValuePersistenceSettings extends PersistenceSettings { - /** XML element describing value field settings. */ - private static final String FIELD_ELEMENT = "field"; - - /** Value fields. */ - private List<PojoField> fields = new LinkedList<>(); - - /** - * Creates class instance from XML configuration. - * - * @param el XML element describing value persistence settings. - */ - public ValuePersistenceSettings(Element el) { - super(el); - - if (!PersistenceStrategy.POJO.equals(getStrategy())) - return; - - NodeList nodes = el.getElementsByTagName(FIELD_ELEMENT); - - fields = detectFields(nodes); - - if (fields.isEmpty()) - throw new IllegalStateException("Failed to initialize value fields for class '" + getJavaClass().getName() + "'"); - - checkDuplicates(fields); - } - - /** - * @return List of value fields. - */ - public List<PojoField> getFields() { - return fields == null ? null : Collections.unmodifiableList(fields); - } - - /** {@inheritDoc} */ - @Override protected String defaultColumnName() { - return "value"; - } - - /** - * Extracts POJO fields from a list of corresponding XML field nodes. - * - * @param fieldNodes Field nodes to process. - * @return POJO fields list. - */ - private List<PojoField> detectFields(NodeList fieldNodes) { - List<PojoField> list = new LinkedList<>(); - - if (fieldNodes == null || fieldNodes.getLength() == 0) { - List<PropertyDescriptor> primitivePropDescriptors = PropertyMappingHelper.getPojoPropertyDescriptors(getJavaClass(), true); - for (PropertyDescriptor descriptor : primitivePropDescriptors) - list.add(new PojoValueField(descriptor)); - - return list; - } - - List<PropertyDescriptor> allPropDescriptors = PropertyMappingHelper.getPojoPropertyDescriptors(getJavaClass(), false); - - int cnt = fieldNodes.getLength(); - - for (int i = 0; i < cnt; i++) { - PojoValueField field = new PojoValueField((Element)fieldNodes.item(i), getJavaClass()); - - PropertyDescriptor desc = findPropertyDescriptor(allPropDescriptors, field.getName()); - - if (desc == null) { - throw new IllegalArgumentException("Specified POJO field '" + field.getName() + - "' doesn't exist in '" + getJavaClass().getName() + "' class"); - } - - list.add(field); - } - - return list; - } -} http://git-wip-us.apache.org/repos/asf/ignite/blob/231ead01/modules/cassandra/src/main/java/org/apache/ignite/cache/store/cassandra/persistence/package-info.java ---------------------------------------------------------------------- diff --git a/modules/cassandra/src/main/java/org/apache/ignite/cache/store/cassandra/persistence/package-info.java b/modules/cassandra/src/main/java/org/apache/ignite/cache/store/cassandra/persistence/package-info.java deleted file mode 100644 index 76d32fb..0000000 --- a/modules/cassandra/src/main/java/org/apache/ignite/cache/store/cassandra/persistence/package-info.java +++ /dev/null @@ -1,21 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -/** - * Contains persistent settings configuration - */ -package org.apache.ignite.cache.store.cassandra.persistence; \ No newline at end of file http://git-wip-us.apache.org/repos/asf/ignite/blob/231ead01/modules/cassandra/src/main/java/org/apache/ignite/cache/store/cassandra/serializer/JavaSerializer.java ---------------------------------------------------------------------- diff --git a/modules/cassandra/src/main/java/org/apache/ignite/cache/store/cassandra/serializer/JavaSerializer.java b/modules/cassandra/src/main/java/org/apache/ignite/cache/store/cassandra/serializer/JavaSerializer.java deleted file mode 100644 index e9f93a0..0000000 --- a/modules/cassandra/src/main/java/org/apache/ignite/cache/store/cassandra/serializer/JavaSerializer.java +++ /dev/null @@ -1,81 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.ignite.cache.store.cassandra.serializer; - -import java.io.ByteArrayInputStream; -import java.io.ByteArrayOutputStream; -import java.io.IOException; -import java.io.ObjectInputStream; -import java.io.ObjectOutputStream; -import java.nio.ByteBuffer; -import org.apache.ignite.IgniteException; -import org.apache.ignite.internal.util.typedef.internal.U; - -/** - * Serializer based on standard Java serialization. - */ -public class JavaSerializer implements Serializer { - /** */ - private static final int DFLT_BUFFER_SIZE = 4096; - - /** {@inheritDoc} */ - @Override public ByteBuffer serialize(Object obj) { - if (obj == null) - return null; - - ByteArrayOutputStream stream = null; - ObjectOutputStream out = null; - - try { - stream = new ByteArrayOutputStream(DFLT_BUFFER_SIZE); - - out = new ObjectOutputStream(stream); - out.writeObject(obj); - out.flush(); - - return ByteBuffer.wrap(stream.toByteArray()); - } - catch (IOException e) { - throw new IgniteException("Failed to serialize object of the class '" + obj.getClass().getName() + "'", e); - } - finally { - U.closeQuiet(out); - U.closeQuiet(stream); - } - } - - /** {@inheritDoc} */ - @Override public Object deserialize(ByteBuffer buf) { - ByteArrayInputStream stream = null; - ObjectInputStream in = null; - - try { - stream = new ByteArrayInputStream(buf.array()); - in = new ObjectInputStream(stream); - - return in.readObject(); - } - catch (Throwable e) { - throw new IgniteException("Failed to deserialize object from byte stream", e); - } - finally { - U.closeQuiet(in); - U.closeQuiet(stream); - } - } -} http://git-wip-us.apache.org/repos/asf/ignite/blob/231ead01/modules/cassandra/src/main/java/org/apache/ignite/cache/store/cassandra/serializer/KryoSerializer.java ---------------------------------------------------------------------- diff --git a/modules/cassandra/src/main/java/org/apache/ignite/cache/store/cassandra/serializer/KryoSerializer.java b/modules/cassandra/src/main/java/org/apache/ignite/cache/store/cassandra/serializer/KryoSerializer.java deleted file mode 100644 index 88379de..0000000 --- a/modules/cassandra/src/main/java/org/apache/ignite/cache/store/cassandra/serializer/KryoSerializer.java +++ /dev/null @@ -1,93 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.ignite.cache.store.cassandra.serializer; - -import com.esotericsoftware.kryo.Kryo; -import com.esotericsoftware.kryo.io.Input; -import com.esotericsoftware.kryo.io.Output; -import java.io.ByteArrayInputStream; -import java.io.ByteArrayOutputStream; -import java.nio.ByteBuffer; -import org.apache.ignite.IgniteException; -import org.apache.ignite.internal.util.typedef.internal.U; - -/** - * Serializer based on Kryo serialization. - */ -public class KryoSerializer implements Serializer { - /** */ - private static final long serialVersionUID = 0L; - - /** */ - private static final int DFLT_BUFFER_SIZE = 4096; - - /** Thread local instance of {@link com.esotericsoftware.kryo.Kryo} */ - private transient ThreadLocal<Kryo> kryos = new ThreadLocal<Kryo>() { - protected Kryo initialValue() { - return new Kryo(); - } - }; - - /** {@inheritDoc} */ - @Override public ByteBuffer serialize(Object obj) { - if (obj == null) - return null; - - ByteArrayOutputStream stream = null; - - Output out = null; - - try { - stream = new ByteArrayOutputStream(DFLT_BUFFER_SIZE); - - out = new Output(stream); - - kryos.get().writeClassAndObject(out, obj); - out.flush(); - - return ByteBuffer.wrap(stream.toByteArray()); - } - catch (Throwable e) { - throw new IgniteException("Failed to serialize object of the class '" + obj.getClass().getName() + "'", e); - } - finally { - U.closeQuiet(out); - U.closeQuiet(stream); - } - } - - /** {@inheritDoc} */ - @Override public Object deserialize(ByteBuffer buf) { - ByteArrayInputStream stream = null; - Input in = null; - - try { - stream = new ByteArrayInputStream(buf.array()); - in = new Input(stream); - - return kryos.get().readClassAndObject(in); - } - catch (Throwable e) { - throw new IgniteException("Failed to deserialize object from byte stream", e); - } - finally { - U.closeQuiet(in); - U.closeQuiet(stream); - } - } -} http://git-wip-us.apache.org/repos/asf/ignite/blob/231ead01/modules/cassandra/src/main/java/org/apache/ignite/cache/store/cassandra/serializer/Serializer.java ---------------------------------------------------------------------- diff --git a/modules/cassandra/src/main/java/org/apache/ignite/cache/store/cassandra/serializer/Serializer.java b/modules/cassandra/src/main/java/org/apache/ignite/cache/store/cassandra/serializer/Serializer.java deleted file mode 100644 index 5b8d542..0000000 --- a/modules/cassandra/src/main/java/org/apache/ignite/cache/store/cassandra/serializer/Serializer.java +++ /dev/null @@ -1,43 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.ignite.cache.store.cassandra.serializer; - -import java.io.Serializable; -import java.nio.ByteBuffer; - -/** - * Interface which should be implemented by all serializers responsible - * for writing/loading data to/from Cassandra in binary (BLOB) format. - */ -public interface Serializer extends Serializable { - /** - * Serializes object into byte buffer. - * - * @param obj Object to serialize. - * @return Byte buffer with binary data. - */ - public ByteBuffer serialize(Object obj); - - /** - * Deserializes object from byte buffer. - * - * @param buf Byte buffer. - * @return Deserialized object. - */ - public Object deserialize(ByteBuffer buf); -} http://git-wip-us.apache.org/repos/asf/ignite/blob/231ead01/modules/cassandra/src/main/java/org/apache/ignite/cache/store/cassandra/serializer/package-info.java ---------------------------------------------------------------------- diff --git a/modules/cassandra/src/main/java/org/apache/ignite/cache/store/cassandra/serializer/package-info.java b/modules/cassandra/src/main/java/org/apache/ignite/cache/store/cassandra/serializer/package-info.java deleted file mode 100644 index 4edd759..0000000 --- a/modules/cassandra/src/main/java/org/apache/ignite/cache/store/cassandra/serializer/package-info.java +++ /dev/null @@ -1,21 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -/** - * Contains serializers implementation, to store BLOBs into Cassandra - */ -package org.apache.ignite.cache.store.cassandra.serializer; \ No newline at end of file http://git-wip-us.apache.org/repos/asf/ignite/blob/231ead01/modules/cassandra/src/main/java/org/apache/ignite/cache/store/cassandra/session/BatchExecutionAssistant.java ---------------------------------------------------------------------- diff --git a/modules/cassandra/src/main/java/org/apache/ignite/cache/store/cassandra/session/BatchExecutionAssistant.java b/modules/cassandra/src/main/java/org/apache/ignite/cache/store/cassandra/session/BatchExecutionAssistant.java deleted file mode 100644 index e43db1d..0000000 --- a/modules/cassandra/src/main/java/org/apache/ignite/cache/store/cassandra/session/BatchExecutionAssistant.java +++ /dev/null @@ -1,95 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.ignite.cache.store.cassandra.session; - -import com.datastax.driver.core.BoundStatement; -import com.datastax.driver.core.PreparedStatement; -import com.datastax.driver.core.Row; -import org.apache.ignite.cache.store.cassandra.persistence.KeyValuePersistenceSettings; - -/** - * Provides information for batch operations (loadAll, deleteAll, writeAll) of Ignite cache - * backed by {@link org.apache.ignite.cache.store.cassandra.CassandraCacheStore}. - * - * @param <R> type of the result returned from batch operation. - * @param <V> type of the value used in batch operation. - */ -public interface BatchExecutionAssistant<R, V> { - /** - * Indicates if Cassandra tables existence is required for this batch operation. - * - * @return {@code true} true if table existence required. - */ - public boolean tableExistenceRequired(); - - /** - * Returns unbind CLQ statement for to be executed inside batch operation. - * - * @return Unbind CQL statement. - */ - public String getStatement(); - - /** - * Binds prepared statement to current Cassandra session. - * - * @param statement Statement. - * @param obj Parameters for statement binding. - * @return Bounded statement. - */ - public BoundStatement bindStatement(PreparedStatement statement, V obj); - - /** - * Returns Ignite cache key/value persistence settings. - * - * @return persistence settings. - */ - public KeyValuePersistenceSettings getPersistenceSettings(); - - /** - * Display name for the batch operation. - * - * @return Operation display name. - */ - public String operationName(); - - /** - * Processes particular row inside batch operation. - * - * @param row Row to process. - * @param seqNum Sequential number of the row. - */ - public void process(Row row, int seqNum); - - /** - * Checks if row/object with specified sequential number is already processed. - * - * @param seqNum object sequential number - * @return {@code true} if object is already processed - */ - public boolean alreadyProcessed(int seqNum); - - /** - * @return number of processed objects/rows. - */ - public int processedCount(); - - /** - * @return batch operation result. - */ - public R processedData(); -} http://git-wip-us.apache.org/repos/asf/ignite/blob/231ead01/modules/cassandra/src/main/java/org/apache/ignite/cache/store/cassandra/session/BatchLoaderAssistant.java ---------------------------------------------------------------------- diff --git a/modules/cassandra/src/main/java/org/apache/ignite/cache/store/cassandra/session/BatchLoaderAssistant.java b/modules/cassandra/src/main/java/org/apache/ignite/cache/store/cassandra/session/BatchLoaderAssistant.java deleted file mode 100644 index 387c98f..0000000 --- a/modules/cassandra/src/main/java/org/apache/ignite/cache/store/cassandra/session/BatchLoaderAssistant.java +++ /dev/null @@ -1,47 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.ignite.cache.store.cassandra.session; - -import com.datastax.driver.core.Row; -import com.datastax.driver.core.Statement; - -/** - * Provides information for loadCache operation of {@link org.apache.ignite.cache.store.cassandra.CassandraCacheStore}. - */ -public interface BatchLoaderAssistant { - /** - * Returns name of the batch load operation. - * - * @return operation name. - */ - public String operationName(); - - /** - * Returns CQL statement to use in batch load operation. - * - * @return CQL statement for batch load operation. - */ - public Statement getStatement(); - - /** - * Processes each row returned by batch load operation. - * - * @param row row selected from Cassandra table. - */ - public void process(Row row); -} http://git-wip-us.apache.org/repos/asf/ignite/blob/231ead01/modules/cassandra/src/main/java/org/apache/ignite/cache/store/cassandra/session/CassandraSession.java ---------------------------------------------------------------------- diff --git a/modules/cassandra/src/main/java/org/apache/ignite/cache/store/cassandra/session/CassandraSession.java b/modules/cassandra/src/main/java/org/apache/ignite/cache/store/cassandra/session/CassandraSession.java deleted file mode 100644 index 506982f..0000000 --- a/modules/cassandra/src/main/java/org/apache/ignite/cache/store/cassandra/session/CassandraSession.java +++ /dev/null @@ -1,60 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.ignite.cache.store.cassandra.session; - -import java.io.Closeable; - -/** - * Wrapper around Cassandra driver session, to automatically handle: - * <ul> - * <li>Keyspace and table absence exceptions</li> - * <li>Timeout exceptions</li> - * <li>Batch operations</li> - * </ul> - */ -public interface CassandraSession extends Closeable { - /** - * Execute single synchronous operation against Cassandra database. - * - * @param assistant execution assistance to perform the main operation logic. - * @param <V> type of the result returned from operation. - * - * @return result of the operation. - */ - public <V> V execute(ExecutionAssistant<V> assistant); - - /** - * Executes batch asynchronous operation against Cassandra database. - * - * @param assistant execution assistance to perform the main operation logic. - * @param data data which should be processed in batch operation. - * @param <R> type of the result returned from batch operation. - * @param <V> type of the value used in batch operation. - * - * @return result of the operation. - */ - public <R, V> R execute(BatchExecutionAssistant<R, V> assistant, Iterable<? extends V> data); - - /** - * Executes batch asynchronous operation to load bunch of records - * specified by CQL statement from Cassandra database - * - * @param assistant execution assistance to perform the main operation logic. - */ - public void execute(BatchLoaderAssistant assistant); -}