http://git-wip-us.apache.org/repos/asf/ignite/blob/231ead01/modules/cassandra/store/src/main/java/org/apache/ignite/cache/store/cassandra/persistence/KeyPersistenceSettings.java ---------------------------------------------------------------------- diff --git a/modules/cassandra/store/src/main/java/org/apache/ignite/cache/store/cassandra/persistence/KeyPersistenceSettings.java b/modules/cassandra/store/src/main/java/org/apache/ignite/cache/store/cassandra/persistence/KeyPersistenceSettings.java new file mode 100644 index 0000000..393dbe4 --- /dev/null +++ b/modules/cassandra/store/src/main/java/org/apache/ignite/cache/store/cassandra/persistence/KeyPersistenceSettings.java @@ -0,0 +1,274 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.cache.store.cassandra.persistence; + +import java.beans.PropertyDescriptor; +import java.util.LinkedList; +import java.util.List; +import org.apache.ignite.cache.affinity.AffinityKeyMapped; +import org.apache.ignite.cache.store.cassandra.common.PropertyMappingHelper; +import org.w3c.dom.Element; +import org.w3c.dom.NodeList; + +/** + * Stores persistence settings for Ignite cache key + */ +public class KeyPersistenceSettings extends PersistenceSettings { + /** Partition key XML tag. */ + private static final String PARTITION_KEY_ELEMENT = "partitionKey"; + + /** Cluster key XML tag. */ + private static final String CLUSTER_KEY_ELEMENT = "clusterKey"; + + /** POJO field XML tag. */ + private static final String FIELD_ELEMENT = "field"; + + /** POJO fields. */ + private List<PojoField> fields = new LinkedList<>(); + + /** Partition key fields. */ + private List<PojoField> partKeyFields = new LinkedList<>(); + + /** Cluster key fields. */ + private List<PojoField> clusterKeyFields = new LinkedList<>(); + + /** + * Creates key persistence settings object based on it's XML configuration. + * + * @param el XML element storing key persistence settings + */ + public KeyPersistenceSettings(Element el) { + super(el); + + if (!PersistenceStrategy.POJO.equals(getStrategy())) + return; + + NodeList keyElem = el.getElementsByTagName(PARTITION_KEY_ELEMENT); + + Element partKeysNode = keyElem != null ? (Element) keyElem.item(0) : null; + + Element clusterKeysNode = el.getElementsByTagName(CLUSTER_KEY_ELEMENT) != null ? + (Element)el.getElementsByTagName(CLUSTER_KEY_ELEMENT).item(0) : null; + + if (partKeysNode == null && clusterKeysNode != null) { + throw new IllegalArgumentException("It's not allowed to specify cluster key fields mapping, but " + + "doesn't specify partition key mappings"); + } + + partKeyFields = detectFields(partKeysNode, getPartitionKeyDescriptors()); + + if (partKeyFields == null || partKeyFields.isEmpty()) { + throw new IllegalStateException("Failed to initialize partition key fields for class '" + + getJavaClass().getName() + "'"); + } + + clusterKeyFields = detectFields(clusterKeysNode, getClusterKeyDescriptors(partKeyFields)); + + fields = new LinkedList<>(); + fields.addAll(partKeyFields); + fields.addAll(clusterKeyFields); + + checkDuplicates(fields); + } + + /** {@inheritDoc} */ + @Override public List<PojoField> getFields() { + return fields; + } + + /** + * Returns Cassandra DDL for primary key. + * + * @return DDL statement. + */ + public String getPrimaryKeyDDL() { + StringBuilder partKey = new StringBuilder(); + + List<String> cols = getPartitionKeyColumns(); + for (String column : cols) { + if (partKey.length() != 0) + partKey.append(", "); + + partKey.append(column); + } + + StringBuilder clusterKey = new StringBuilder(); + + cols = getClusterKeyColumns(); + if (cols != null) { + for (String column : cols) { + if (clusterKey.length() != 0) + clusterKey.append(", "); + + clusterKey.append(column); + } + } + + return clusterKey.length() == 0 ? + " primary key ((" + partKey.toString() + "))" : + " primary key ((" + partKey.toString() + "), " + clusterKey.toString() + ")"; + } + + /** + * Returns Cassandra DDL for cluster key. + * + * @return Cluster key DDL. + */ + public String getClusteringDDL() { + StringBuilder builder = new StringBuilder(); + + for (PojoField field : clusterKeyFields) { + PojoKeyField.SortOrder sortOrder = ((PojoKeyField)field).getSortOrder(); + + if (sortOrder == null) + continue; + + if (builder.length() != 0) + builder.append(", "); + + boolean asc = PojoKeyField.SortOrder.ASC.equals(sortOrder); + + builder.append(field.getColumn()).append(" ").append(asc ? "asc" : "desc"); + } + + return builder.length() == 0 ? null : "clustering order by (" + builder.toString() + ")"; + } + + /** {@inheritDoc} */ + @Override protected String defaultColumnName() { + return "key"; + } + + /** + * Returns partition key columns of Cassandra table. + * + * @return List of column names. + */ + private List<String> getPartitionKeyColumns() { + List<String> cols = new LinkedList<>(); + + if (PersistenceStrategy.BLOB.equals(getStrategy()) || PersistenceStrategy.PRIMITIVE.equals(getStrategy())) { + cols.add(getColumn()); + return cols; + } + + if (partKeyFields != null) { + for (PojoField field : partKeyFields) + cols.add(field.getColumn()); + } + + return cols; + } + + /** + * Returns cluster key columns of Cassandra table. + * + * @return List of column names. + */ + private List<String> getClusterKeyColumns() { + List<String> cols = new LinkedList<>(); + + if (clusterKeyFields != null) { + for (PojoField field : clusterKeyFields) + cols.add(field.getColumn()); + } + + return cols; + } + + /** + * Extracts POJO fields specified in XML element. + * + * @param el XML element describing fields. + * @param descriptors POJO fields descriptors. + * @return List of {@code This} fields. + */ + private List<PojoField> detectFields(Element el, List<PropertyDescriptor> descriptors) { + List<PojoField> list = new LinkedList<>(); + + if (el == null && (descriptors == null || descriptors.isEmpty())) + return list; + + if (el == null) { + for (PropertyDescriptor descriptor : descriptors) + list.add(new PojoKeyField(descriptor)); + + return list; + } + + NodeList nodes = el.getElementsByTagName(FIELD_ELEMENT); + + int cnt = nodes == null ? 0 : nodes.getLength(); + + if (cnt == 0) { + throw new IllegalArgumentException("Incorrect configuration of Cassandra key persistence settings, " + + "no cluster key fields specified inside '" + PARTITION_KEY_ELEMENT + "/" + + CLUSTER_KEY_ELEMENT + "' element"); + } + + for (int i = 0; i < cnt; i++) { + PojoKeyField field = new PojoKeyField((Element)nodes.item(i), getJavaClass()); + + PropertyDescriptor desc = findPropertyDescriptor(descriptors, field.getName()); + + if (desc == null) { + throw new IllegalArgumentException("Specified POJO field '" + field.getName() + + "' doesn't exist in '" + getJavaClass().getName() + "' class"); + } + + list.add(field); + } + + return list; + } + + /** + * @return POJO field descriptors for partition key. + */ + private List<PropertyDescriptor> getPartitionKeyDescriptors() { + List<PropertyDescriptor> primitivePropDescriptors = PropertyMappingHelper.getPojoPropertyDescriptors(getJavaClass(), + AffinityKeyMapped.class, true); + + return primitivePropDescriptors != null && !primitivePropDescriptors.isEmpty() ? + primitivePropDescriptors : + PropertyMappingHelper.getPojoPropertyDescriptors(getJavaClass(), true); + } + + /** + * @return POJO field descriptors for cluster key. + */ + private List<PropertyDescriptor> getClusterKeyDescriptors(List<PojoField> partKeyFields) { + List<PropertyDescriptor> primitivePropDescriptors = + PropertyMappingHelper.getPojoPropertyDescriptors(getJavaClass(), true); + + if (primitivePropDescriptors == null || primitivePropDescriptors.isEmpty() || + partKeyFields.size() == primitivePropDescriptors.size()) + return null; + + for (PojoField field : partKeyFields) { + for (int i = 0; i < primitivePropDescriptors.size(); i++) { + if (primitivePropDescriptors.get(i).getName().equals(field.getName())) { + primitivePropDescriptors.remove(i); + break; + } + } + } + + return primitivePropDescriptors; + } +}
http://git-wip-us.apache.org/repos/asf/ignite/blob/231ead01/modules/cassandra/store/src/main/java/org/apache/ignite/cache/store/cassandra/persistence/KeyValuePersistenceSettings.java ---------------------------------------------------------------------- diff --git a/modules/cassandra/store/src/main/java/org/apache/ignite/cache/store/cassandra/persistence/KeyValuePersistenceSettings.java b/modules/cassandra/store/src/main/java/org/apache/ignite/cache/store/cassandra/persistence/KeyValuePersistenceSettings.java new file mode 100644 index 0000000..2c43ed4 --- /dev/null +++ b/modules/cassandra/store/src/main/java/org/apache/ignite/cache/store/cassandra/persistence/KeyValuePersistenceSettings.java @@ -0,0 +1,478 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.cache.store.cassandra.persistence; + +import java.io.BufferedReader; +import java.io.File; +import java.io.FileInputStream; +import java.io.IOException; +import java.io.InputStream; +import java.io.InputStreamReader; +import java.io.Serializable; +import java.io.StringReader; +import java.util.LinkedList; +import java.util.List; +import javax.xml.parsers.DocumentBuilder; +import javax.xml.parsers.DocumentBuilderFactory; +import org.apache.ignite.IgniteException; +import org.apache.ignite.cache.store.cassandra.common.SystemHelper; +import org.apache.ignite.internal.util.typedef.internal.U; +import org.springframework.core.io.Resource; +import org.w3c.dom.Document; +import org.w3c.dom.Element; +import org.w3c.dom.Node; +import org.w3c.dom.NodeList; +import org.xml.sax.InputSource; + +/** + * Stores persistence settings for Ignite cache key and value + */ +public class KeyValuePersistenceSettings implements Serializable { + /** + * Default Cassandra keyspace options which should be used to create new keyspace. + * <ul> + * <li> <b>SimpleStrategy</b> for replication work well for single data center Cassandra cluster.<br/> + * If your Cassandra cluster deployed across multiple data centers it's better to use <b>NetworkTopologyStrategy</b>. + * </li> + * <li> Three replicas will be created for each data block. </li> + * <li> Setting DURABLE_WRITES to true specifies that all data should be written to commit log. </li> + * </ul> + */ + private static final String DFLT_KEYSPACE_OPTIONS = "replication = {'class' : 'SimpleStrategy', " + + "'replication_factor' : 3} and durable_writes = true"; + + /** Xml attribute specifying Cassandra keyspace to use. */ + private static final String KEYSPACE_ATTR = "keyspace"; + + /** Xml attribute specifying Cassandra table to use. */ + private static final String TABLE_ATTR = "table"; + + /** Xml attribute specifying ttl (time to leave) for rows inserted in Cassandra. */ + private static final String TTL_ATTR = "ttl"; + + /** Root xml element containing persistence settings specification. */ + private static final String PERSISTENCE_NODE = "persistence"; + + /** Xml element specifying Cassandra keyspace options. */ + private static final String KEYSPACE_OPTIONS_NODE = "keyspaceOptions"; + + /** Xml element specifying Cassandra table options. */ + private static final String TABLE_OPTIONS_NODE = "tableOptions"; + + /** Xml element specifying Ignite cache key persistence settings. */ + private static final String KEY_PERSISTENCE_NODE = "keyPersistence"; + + /** Xml element specifying Ignite cache value persistence settings. */ + private static final String VALUE_PERSISTENCE_NODE = "valuePersistence"; + + /** TTL (time to leave) for rows inserted into Cassandra table {@link <a href="https://docs.datastax.com/en/cql/3.1/cql/cql_using/use_expire_c.html">Expiring data</a>}. */ + private Integer ttl; + + /** Cassandra keyspace (analog of tablespace in relational databases). */ + private String keyspace; + + /** Cassandra table. */ + private String tbl; + + /** Cassandra table creation options {@link <a href="https://docs.datastax.com/en/cql/3.0/cql/cql_reference/create_table_r.html">CREATE TABLE</a>}. */ + private String tblOptions; + + /** Cassandra keyspace creation options {@link <a href="https://docs.datastax.com/en/cql/3.0/cql/cql_reference/create_keyspace_r.html">CREATE KEYSPACE</a>}. */ + private String keyspaceOptions = DFLT_KEYSPACE_OPTIONS; + + /** Persistence settings for Ignite cache keys. */ + private KeyPersistenceSettings keyPersistenceSettings; + + /** Persistence settings for Ignite cache values. */ + private ValuePersistenceSettings valPersistenceSettings; + + /** + * Constructs Ignite cache key/value persistence settings. + * + * @param settings string containing xml with persistence settings for Ignite cache key/value + */ + @SuppressWarnings("UnusedDeclaration") + public KeyValuePersistenceSettings(String settings) { + init(settings); + } + + /** + * Constructs Ignite cache key/value persistence settings. + * + * @param settingsFile xml file with persistence settings for Ignite cache key/value + */ + @SuppressWarnings("UnusedDeclaration") + public KeyValuePersistenceSettings(File settingsFile) { + InputStream in; + + try { + in = new FileInputStream(settingsFile); + } + catch (IOException e) { + throw new IgniteException("Failed to get input stream for Cassandra persistence settings file: " + + settingsFile.getAbsolutePath(), e); + } + + init(loadSettings(in)); + } + + /** + * Constructs Ignite cache key/value persistence settings. + * + * @param settingsRsrc resource containing xml with persistence settings for Ignite cache key/value + */ + public KeyValuePersistenceSettings(Resource settingsRsrc) { + InputStream in; + + try { + in = settingsRsrc.getInputStream(); + } + catch (IOException e) { + throw new IgniteException("Failed to get input stream for Cassandra persistence settings resource: " + settingsRsrc, e); + } + + init(loadSettings(in)); + } + + /** + * Returns ttl to use for while inserting new rows into Cassandra table. + * + * @return ttl + */ + public Integer getTTL() { + return ttl; + } + + /** + * Returns Cassandra keyspace to use. + * + * @return keyspace. + */ + public String getKeyspace() { + return keyspace; + } + + /** + * Returns Cassandra table to use. + * + * @return table. + */ + public String getTable() { + return tbl; + } + + /** + * Returns full name of Cassandra table to use (including keyspace). + * + * @return full table name in format "keyspace.table". + */ + public String getTableFullName() + { + return keyspace + "." + tbl; + } + + /** + * Returns persistence settings for Ignite cache keys. + * + * @return keys persistence settings. + */ + public KeyPersistenceSettings getKeyPersistenceSettings() { + return keyPersistenceSettings; + } + + /** + * Returns persistence settings for Ignite cache values. + * + * @return values persistence settings. + */ + public ValuePersistenceSettings getValuePersistenceSettings() { + return valPersistenceSettings; + } + + /** + * Returns list of POJO fields to be mapped to Cassandra table columns. + * + * @return POJO fields list. + */ + @SuppressWarnings("UnusedDeclaration") + public List<PojoField> getFields() { + List<PojoField> fields = new LinkedList<>(); + + for (PojoField field : keyPersistenceSettings.getFields()) + fields.add(field); + + for (PojoField field : valPersistenceSettings.getFields()) + fields.add(field); + + return fields; + } + + /** + * Returns list of Ignite cache key POJO fields to be mapped to Cassandra table columns. + * + * @return POJO fields list. + */ + @SuppressWarnings("UnusedDeclaration") + public List<PojoField> getKeyFields() { + return keyPersistenceSettings.getFields(); + } + + /** + * Returns list of Ignite cache value POJO fields to be mapped to Cassandra table columns. + * + * @return POJO fields list. + */ + @SuppressWarnings("UnusedDeclaration") + public List<PojoField> getValueFields() { + return valPersistenceSettings.getFields(); + } + + /** + * Returns DDL statement to create Cassandra keyspace. + * + * @return Keyspace DDL statement. + */ + public String getKeyspaceDDLStatement() { + StringBuilder builder = new StringBuilder(); + builder.append("create keyspace if not exists ").append(keyspace); + + if (keyspaceOptions != null) { + if (!keyspaceOptions.trim().toLowerCase().startsWith("with")) + builder.append("\nwith"); + + builder.append(" ").append(keyspaceOptions); + } + + String statement = builder.toString().trim().replaceAll(" +", " "); + + return statement.endsWith(";") ? statement : statement + ";"; + } + + /** + * Returns DDL statement to create Cassandra table. + * + * @return Table DDL statement. + */ + public String getTableDDLStatement() { + String colsDDL = keyPersistenceSettings.getTableColumnsDDL() + ",\n" + valPersistenceSettings.getTableColumnsDDL(); + + String primaryKeyDDL = keyPersistenceSettings.getPrimaryKeyDDL(); + + String clusteringDDL = keyPersistenceSettings.getClusteringDDL(); + + String optionsDDL = tblOptions != null && !tblOptions.trim().isEmpty() ? tblOptions.trim() : ""; + + if (clusteringDDL != null && !clusteringDDL.isEmpty()) + optionsDDL = optionsDDL.isEmpty() ? clusteringDDL : optionsDDL + " and " + clusteringDDL; + + if (!optionsDDL.trim().isEmpty()) + optionsDDL = optionsDDL.trim().toLowerCase().startsWith("with") ? optionsDDL.trim() : "with " + optionsDDL.trim(); + + StringBuilder builder = new StringBuilder(); + + builder.append("create table if not exists ").append(keyspace).append(".").append(tbl); + builder.append("\n(\n").append(colsDDL).append(",\n").append(primaryKeyDDL).append("\n)"); + + if (!optionsDDL.isEmpty()) + builder.append(" \n").append(optionsDDL); + + String tblDDL = builder.toString().trim().replaceAll(" +", " "); + + return tblDDL.endsWith(";") ? tblDDL : tblDDL + ";"; + } + + /** + * Returns DDL statements to create Cassandra table secondary indexes. + * + * @return DDL statements to create secondary indexes. + */ + public List<String> getIndexDDLStatements() { + List<String> idxDDLs = new LinkedList<>(); + + List<PojoField> fields = valPersistenceSettings.getFields(); + + for (PojoField field : fields) { + if (((PojoValueField)field).isIndexed()) + idxDDLs.add(((PojoValueField)field).getIndexDDL(keyspace, tbl)); + } + + return idxDDLs; + } + + /** + * Loads Ignite cache persistence settings from resource. + * + * @param in Input stream. + * @return String containing xml with Ignite cache persistence settings. + */ + private String loadSettings(InputStream in) { + StringBuilder settings = new StringBuilder(); + BufferedReader reader = null; + + try { + reader = new BufferedReader(new InputStreamReader(in)); + + String line = reader.readLine(); + + while (line != null) { + if (settings.length() != 0) + settings.append(SystemHelper.LINE_SEPARATOR); + + settings.append(line); + + line = reader.readLine(); + } + } + catch (Throwable e) { + throw new IgniteException("Failed to read input stream for Cassandra persistence settings", e); + } + finally { + U.closeQuiet(reader); + U.closeQuiet(in); + } + + return settings.toString(); + } + + /** + * @param elem Element with data. + * @param attr Attribute name. + * @return Numeric value for specified attribute. + */ + private int extractIntAttribute(Element elem, String attr) { + String val = elem.getAttribute(attr).trim(); + + try { + return Integer.parseInt(val); + } + catch (NumberFormatException e) { + throw new IllegalArgumentException("Incorrect value '" + val + "' specified for '" + attr + "' attribute"); + } + } + + /** + * Initializes persistence settings from XML string. + * + * @param settings XML string containing Ignite cache persistence settings configuration. + */ + @SuppressWarnings("IfCanBeSwitch") + private void init(String settings) { + Document doc; + + try { + DocumentBuilderFactory factory = DocumentBuilderFactory.newInstance(); + DocumentBuilder builder = factory.newDocumentBuilder(); + doc = builder.parse(new InputSource(new StringReader(settings))); + } + catch (Throwable e) { + throw new IllegalArgumentException("Failed to parse persistence settings:" + + SystemHelper.LINE_SEPARATOR + settings, e); + } + + Element root = doc.getDocumentElement(); + + if (!PERSISTENCE_NODE.equals(root.getNodeName())) { + throw new IllegalArgumentException("Incorrect persistence settings specified. " + + "Root XML element should be 'persistence'"); + } + + if (!root.hasAttribute(KEYSPACE_ATTR)) { + throw new IllegalArgumentException("Incorrect persistence settings '" + KEYSPACE_ATTR + + "' attribute should be specified"); + } + + if (!root.hasAttribute(TABLE_ATTR)) { + throw new IllegalArgumentException("Incorrect persistence settings '" + TABLE_ATTR + + "' attribute should be specified"); + } + + keyspace = root.getAttribute(KEYSPACE_ATTR).trim(); + tbl = root.getAttribute(TABLE_ATTR).trim(); + + if (root.hasAttribute(TTL_ATTR)) + ttl = extractIntAttribute(root, TTL_ATTR); + + if (!root.hasChildNodes()) { + throw new IllegalArgumentException("Incorrect Cassandra persistence settings specification, " + + "there are no key and value persistence settings specified"); + } + + NodeList children = root.getChildNodes(); + int cnt = children.getLength(); + + for (int i = 0; i < cnt; i++) { + Node node = children.item(i); + + if (node.getNodeType() != Node.ELEMENT_NODE) + continue; + + Element el = (Element)node; + String nodeName = el.getNodeName(); + + if (nodeName.equals(TABLE_OPTIONS_NODE)) { + tblOptions = el.getTextContent(); + tblOptions = tblOptions.replace("\n", " ").replace("\r", "").replace("\t", " "); + } + else if (nodeName.equals(KEYSPACE_OPTIONS_NODE)) { + keyspaceOptions = el.getTextContent(); + keyspaceOptions = keyspaceOptions.replace("\n", " ").replace("\r", "").replace("\t", " "); + } + else if (nodeName.equals(KEY_PERSISTENCE_NODE)) + keyPersistenceSettings = new KeyPersistenceSettings(el); + else if (nodeName.equals(VALUE_PERSISTENCE_NODE)) + valPersistenceSettings = new ValuePersistenceSettings(el); + } + + if (keyPersistenceSettings == null) { + throw new IllegalArgumentException("Incorrect Cassandra persistence settings specification, " + + "there are no key persistence settings specified"); + } + + if (valPersistenceSettings == null) { + throw new IllegalArgumentException("Incorrect Cassandra persistence settings specification, " + + "there are no value persistence settings specified"); + } + + List<PojoField> keyFields = keyPersistenceSettings.getFields(); + List<PojoField> valFields = valPersistenceSettings.getFields(); + + if (PersistenceStrategy.POJO.equals(keyPersistenceSettings.getStrategy()) && + (keyFields == null || keyFields.isEmpty())) { + throw new IllegalArgumentException("Incorrect Cassandra persistence settings specification, " + + "there are no key fields found"); + } + + if (PersistenceStrategy.POJO.equals(valPersistenceSettings.getStrategy()) && + (valFields == null || valFields.isEmpty())) { + throw new IllegalArgumentException("Incorrect Cassandra persistence settings specification, " + + "there are no value fields found"); + } + + if (keyFields == null || keyFields.isEmpty() || valFields == null || valFields.isEmpty()) + return; + + for (PojoField keyField : keyFields) { + for (PojoField valField : valFields) { + if (keyField.getColumn().equals(valField.getColumn())) { + throw new IllegalArgumentException("Incorrect Cassandra persistence settings specification, " + + "key column '" + keyField.getColumn() + "' also specified as a value column"); + } + } + } + } +} http://git-wip-us.apache.org/repos/asf/ignite/blob/231ead01/modules/cassandra/store/src/main/java/org/apache/ignite/cache/store/cassandra/persistence/PersistenceController.java ---------------------------------------------------------------------- diff --git a/modules/cassandra/store/src/main/java/org/apache/ignite/cache/store/cassandra/persistence/PersistenceController.java b/modules/cassandra/store/src/main/java/org/apache/ignite/cache/store/cassandra/persistence/PersistenceController.java new file mode 100644 index 0000000..e734ca3 --- /dev/null +++ b/modules/cassandra/store/src/main/java/org/apache/ignite/cache/store/cassandra/persistence/PersistenceController.java @@ -0,0 +1,421 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.cache.store.cassandra.persistence; + +import com.datastax.driver.core.BoundStatement; +import com.datastax.driver.core.PreparedStatement; +import com.datastax.driver.core.Row; +import java.nio.ByteBuffer; +import java.util.LinkedList; +import java.util.List; +import org.apache.ignite.IgniteException; +import org.apache.ignite.cache.store.cassandra.common.PropertyMappingHelper; +import org.apache.ignite.cache.store.cassandra.serializer.Serializer; + +/** + * Intermediate layer between persistent store (Cassandra) and Ignite cache key/value classes. + * Handles all the mappings to/from Java classes into Cassandra and responsible for all the details + * of how Java objects should be written/loaded to/from Cassandra. + */ +public class PersistenceController { + /** Ignite cache key/value persistence settings. */ + private KeyValuePersistenceSettings persistenceSettings; + + /** CQL statement to insert row into Cassandra table. */ + private String writeStatement; + + /** CQL statement to delete row from Cassandra table. */ + private String delStatement; + + /** CQL statement to select value fields from Cassandra table. */ + private String loadStatement; + + /** CQL statement to select key/value fields from Cassandra table. */ + private String loadStatementWithKeyFields; + + /** + * Constructs persistence controller from Ignite cache persistence settings. + * + * @param settings persistence settings. + */ + public PersistenceController(KeyValuePersistenceSettings settings) { + if (settings == null) + throw new IllegalArgumentException("Persistent settings can't be null"); + + this.persistenceSettings = settings; + } + + /** + * Returns Ignite cache persistence settings. + * + * @return persistence settings. + */ + public KeyValuePersistenceSettings getPersistenceSettings() { + return persistenceSettings; + } + + /** + * Returns Cassandra keyspace to use. + * + * @return keyspace. + */ + public String getKeyspace() { + return persistenceSettings.getKeyspace(); + } + + /** + * Returns Cassandra table to use. + * + * @return table. + */ + public String getTable() { + return persistenceSettings.getTable(); + } + + /** + * Returns CQL statement to insert row into Cassandra table. + * + * @return CQL statement. + */ + public String getWriteStatement() { + if (writeStatement != null) + return writeStatement; + + List<String> cols = getKeyValueColumns(); + + StringBuilder colsList = new StringBuilder(); + StringBuilder questionsList = new StringBuilder(); + + for (String column : cols) { + if (colsList.length() != 0) { + colsList.append(", "); + questionsList.append(","); + } + + colsList.append(column); + questionsList.append("?"); + } + + writeStatement = "insert into " + persistenceSettings.getKeyspace() + "." + persistenceSettings.getTable() + " (" + + colsList.toString() + ") values (" + questionsList.toString() + ")"; + + if (persistenceSettings.getTTL() != null) + writeStatement += " using ttl " + persistenceSettings.getTTL(); + + writeStatement += ";"; + + return writeStatement; + } + + /** + * Returns CQL statement to delete row from Cassandra table. + * + * @return CQL statement. + */ + public String getDeleteStatement() { + if (delStatement != null) + return delStatement; + + List<String> cols = getKeyColumns(); + + StringBuilder statement = new StringBuilder(); + + for (String column : cols) { + if (statement.length() != 0) + statement.append(" and "); + + statement.append(column).append("=?"); + } + + statement.append(";"); + + delStatement = "delete from " + + persistenceSettings.getKeyspace() + "." + + persistenceSettings.getTable() + " where " + + statement.toString(); + + return delStatement; + } + + /** + * Returns CQL statement to select key/value fields from Cassandra table. + * + * @param includeKeyFields whether to include/exclude key fields from the returned row. + * + * @return CQL statement. + */ + public String getLoadStatement(boolean includeKeyFields) { + if (loadStatement != null && loadStatementWithKeyFields != null) + return includeKeyFields ? loadStatementWithKeyFields : loadStatement; + + List<String> valCols = getValueColumns(); + + List<String> keyCols = getKeyColumns(); + + StringBuilder hdrWithKeyFields = new StringBuilder("select "); + + for (int i = 0; i < keyCols.size(); i++) { + if (i > 0) + hdrWithKeyFields.append(", "); + + hdrWithKeyFields.append(keyCols.get(i)); + } + + StringBuilder hdr = new StringBuilder("select "); + + for (int i = 0; i < valCols.size(); i++) { + if (i > 0) + hdr.append(", "); + + hdrWithKeyFields.append(","); + + hdr.append(valCols.get(i)); + hdrWithKeyFields.append(valCols.get(i)); + } + + StringBuilder statement = new StringBuilder(); + + statement.append(" from "); + statement.append(persistenceSettings.getKeyspace()); + statement.append(".").append(persistenceSettings.getTable()); + statement.append(" where "); + + for (int i = 0; i < keyCols.size(); i++) { + if (i > 0) + statement.append(" and "); + + statement.append(keyCols.get(i)).append("=?"); + } + + statement.append(";"); + + loadStatement = hdr.toString() + statement.toString(); + loadStatementWithKeyFields = hdrWithKeyFields.toString() + statement.toString(); + + return includeKeyFields ? loadStatementWithKeyFields : loadStatement; + } + + /** + * Binds Ignite cache key object to {@link com.datastax.driver.core.PreparedStatement}. + * + * @param statement statement to which key object should be bind. + * @param key key object. + * + * @return statement with bounded key. + */ + public BoundStatement bindKey(PreparedStatement statement, Object key) { + KeyPersistenceSettings settings = persistenceSettings.getKeyPersistenceSettings(); + + Object[] values = getBindingValues(settings.getStrategy(), + settings.getSerializer(), settings.getFields(), key); + + return statement.bind(values); + } + + /** + * Binds Ignite cache key and value object to {@link com.datastax.driver.core.PreparedStatement}. + * + * @param statement statement to which key and value object should be bind. + * @param key key object. + * @param val value object. + * + * @return statement with bounded key and value. + */ + public BoundStatement bindKeyValue(PreparedStatement statement, Object key, Object val) { + KeyPersistenceSettings keySettings = persistenceSettings.getKeyPersistenceSettings(); + Object[] keyValues = getBindingValues(keySettings.getStrategy(), + keySettings.getSerializer(), keySettings.getFields(), key); + + ValuePersistenceSettings valSettings = persistenceSettings.getValuePersistenceSettings(); + Object[] valValues = getBindingValues(valSettings.getStrategy(), + valSettings.getSerializer(), valSettings.getFields(), val); + + Object[] values = new Object[keyValues.length + valValues.length]; + + int i = 0; + + for (Object keyVal : keyValues) { + values[i] = keyVal; + i++; + } + + for (Object valVal : valValues) { + values[i] = valVal; + i++; + } + + return statement.bind(values); + } + + /** + * Builds Ignite cache key object from returned Cassandra table row. + * + * @param row Cassandra table row. + * + * @return key object. + */ + @SuppressWarnings("UnusedDeclaration") + public Object buildKeyObject(Row row) { + return buildObject(row, persistenceSettings.getKeyPersistenceSettings()); + } + + /** + * Builds Ignite cache value object from Cassandra table row . + * + * @param row Cassandra table row. + * + * @return value object. + */ + public Object buildValueObject(Row row) { + return buildObject(row, persistenceSettings.getValuePersistenceSettings()); + } + + /** + * Builds object from Cassandra table row. + * + * @param row Cassandra table row. + * @param settings persistence settings to use. + * + * @return object. + */ + private Object buildObject(Row row, PersistenceSettings settings) { + if (row == null) + return null; + + PersistenceStrategy stgy = settings.getStrategy(); + + Class clazz = settings.getJavaClass(); + + String col = settings.getColumn(); + + List<PojoField> fields = settings.getFields(); + + if (PersistenceStrategy.PRIMITIVE.equals(stgy)) + return PropertyMappingHelper.getCassandraColumnValue(row, col, clazz, null); + + if (PersistenceStrategy.BLOB.equals(stgy)) + return settings.getSerializer().deserialize(row.getBytes(col)); + + Object obj; + + try { + obj = clazz.newInstance(); + } + catch (Throwable e) { + throw new IgniteException("Failed to instantiate object of type '" + clazz.getName() + "' using reflection", e); + } + + for (PojoField field : fields) + field.setValueFromRow(row, obj, settings.getSerializer()); + + return obj; + } + + /** + * Extracts field values from POJO object and converts them into Java types + * which could be mapped to Cassandra types. + * + * @param stgy persistence strategy to use. + * @param serializer serializer to use for BLOBs. + * @param fields fields who's values should be extracted. + * @param obj object instance who's field values should be extracted. + * + * @return array of object field values converted into Java object instances having Cassandra compatible types + */ + private Object[] getBindingValues(PersistenceStrategy stgy, Serializer serializer, List<PojoField> fields, Object obj) { + if (PersistenceStrategy.PRIMITIVE.equals(stgy)) { + if (PropertyMappingHelper.getCassandraType(obj.getClass()) == null || + obj.getClass().equals(ByteBuffer.class) || obj instanceof byte[]) { + throw new IllegalArgumentException("Couldn't deserialize instance of class '" + + obj.getClass().getName() + "' using PRIMITIVE strategy. Please use BLOB strategy for this case."); + } + + return new Object[] {obj}; + } + + if (PersistenceStrategy.BLOB.equals(stgy)) + return new Object[] {serializer.serialize(obj)}; + + Object[] values = new Object[fields.size()]; + + int i = 0; + + for (PojoField field : fields) { + Object val = field.getValueFromObject(obj, serializer); + + if (val instanceof byte[]) + val = ByteBuffer.wrap((byte[]) val); + + values[i] = val; + + i++; + } + + return values; + } + + /** + * Returns list of Cassandra table columns mapped to Ignite cache key and value fields + * + * @return list of column names + */ + private List<String> getKeyValueColumns() { + List<String> cols = getKeyColumns(); + + cols.addAll(getValueColumns()); + + return cols; + } + + /** + * Returns list of Cassandra table columns mapped to Ignite cache key fields + * + * @return list of column names + */ + private List<String> getKeyColumns() { + return getColumns(persistenceSettings.getKeyPersistenceSettings()); + } + + /** + * Returns list of Cassandra table columns mapped to Ignite cache value fields + * + * @return list of column names + */ + private List<String> getValueColumns() { + return getColumns(persistenceSettings.getValuePersistenceSettings()); + } + + /** + * Returns list of Cassandra table columns based on persistence strategy to use + * + * @return list of column names + */ + private List<String> getColumns(PersistenceSettings settings) { + List<String> cols = new LinkedList<>(); + + if (!PersistenceStrategy.POJO.equals(settings.getStrategy())) { + cols.add(settings.getColumn()); + return cols; + } + + for (PojoField field : settings.getFields()) + cols.add(field.getColumn()); + + return cols; + } +} http://git-wip-us.apache.org/repos/asf/ignite/blob/231ead01/modules/cassandra/store/src/main/java/org/apache/ignite/cache/store/cassandra/persistence/PersistenceSettings.java ---------------------------------------------------------------------- diff --git a/modules/cassandra/store/src/main/java/org/apache/ignite/cache/store/cassandra/persistence/PersistenceSettings.java b/modules/cassandra/store/src/main/java/org/apache/ignite/cache/store/cassandra/persistence/PersistenceSettings.java new file mode 100644 index 0000000..20d790a --- /dev/null +++ b/modules/cassandra/store/src/main/java/org/apache/ignite/cache/store/cassandra/persistence/PersistenceSettings.java @@ -0,0 +1,335 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.cache.store.cassandra.persistence; + +import com.datastax.driver.core.DataType; +import java.beans.PropertyDescriptor; +import java.io.Serializable; +import java.nio.ByteBuffer; +import java.util.List; +import org.apache.ignite.IgniteException; +import org.apache.ignite.cache.store.cassandra.common.PropertyMappingHelper; +import org.apache.ignite.cache.store.cassandra.serializer.JavaSerializer; +import org.apache.ignite.cache.store.cassandra.serializer.Serializer; +import org.w3c.dom.Element; + +/** + * Stores persistence settings, which describes how particular key/value + * from Ignite cache should be stored in Cassandra. + */ +public abstract class PersistenceSettings implements Serializable { + /** Xml attribute specifying persistence strategy. */ + private static final String STRATEGY_ATTR = "strategy"; + + /** Xml attribute specifying Cassandra column name. */ + private static final String COLUMN_ATTR = "column"; + + /** Xml attribute specifying BLOB serializer to use. */ + private static final String SERIALIZER_ATTR = "serializer"; + + /** Xml attribute specifying java class of the object to be persisted. */ + private static final String CLASS_ATTR = "class"; + + /** Persistence strategy to use. */ + private PersistenceStrategy stgy; + + /** Java class of the object to be persisted. */ + private Class javaCls; + + /** Cassandra table column name where object should be persisted in + * case of using BLOB or PRIMITIVE persistence strategy. */ + private String col; + + /** Serializer for BLOBs. */ + private Serializer serializer = new JavaSerializer(); + + /** + * Extracts property descriptor from the descriptors list by its name. + * + * @param descriptors descriptors list. + * @param propName property name. + * + * @return property descriptor. + */ + public static PropertyDescriptor findPropertyDescriptor(List<PropertyDescriptor> descriptors, String propName) { + if (descriptors == null || descriptors.isEmpty() || propName == null || propName.trim().isEmpty()) + return null; + + for (PropertyDescriptor descriptor : descriptors) { + if (descriptor.getName().equals(propName)) + return descriptor; + } + + return null; + } + + /** + * Constructs persistence settings from corresponding XML element. + * + * @param el xml element containing persistence settings configuration. + */ + @SuppressWarnings("unchecked") + public PersistenceSettings(Element el) { + if (el == null) + throw new IllegalArgumentException("DOM element representing key/value persistence object can't be null"); + + if (!el.hasAttribute(STRATEGY_ATTR)) { + throw new IllegalArgumentException("DOM element representing key/value persistence object should have '" + + STRATEGY_ATTR + "' attribute"); + } + + try { + stgy = PersistenceStrategy.valueOf(el.getAttribute(STRATEGY_ATTR).trim().toUpperCase()); + } + catch (IllegalArgumentException e) { + throw new IllegalArgumentException("Incorrect persistence strategy specified: " + el.getAttribute(STRATEGY_ATTR)); + } + + if (!el.hasAttribute(CLASS_ATTR) && !PersistenceStrategy.BLOB.equals(stgy)) { + throw new IllegalArgumentException("DOM element representing key/value persistence object should have '" + + CLASS_ATTR + "' attribute or have BLOB persistence strategy"); + } + + try { + javaCls = el.hasAttribute(CLASS_ATTR) ? getClassInstance(el.getAttribute(CLASS_ATTR).trim()) : null; + } + catch (Throwable e) { + throw new IllegalArgumentException("Incorrect java class specified '" + el.getAttribute(CLASS_ATTR) + "' " + + "for Cassandra persistence", e); + } + + if (!PersistenceStrategy.BLOB.equals(stgy) && + (ByteBuffer.class.equals(javaCls) || byte[].class.equals(javaCls))) { + throw new IllegalArgumentException("Java class '" + el.getAttribute(CLASS_ATTR) + "' " + + "specified could only be persisted using BLOB persistence strategy"); + } + + if (PersistenceStrategy.PRIMITIVE.equals(stgy) && + PropertyMappingHelper.getCassandraType(javaCls) == null) { + throw new IllegalArgumentException("Current implementation doesn't support persisting '" + + javaCls.getName() + "' object using PRIMITIVE strategy"); + } + + if (PersistenceStrategy.POJO.equals(stgy)) { + if (javaCls == null) + throw new IllegalStateException("Object java class should be specified for POJO persistence strategy"); + + try { + javaCls.getConstructor(); + } + catch (Throwable e) { + throw new IllegalArgumentException("Java class '" + javaCls.getName() + "' couldn't be used as POJO " + + "cause it doesn't have no arguments constructor", e); + } + } + + if (el.hasAttribute(COLUMN_ATTR)) { + if (!PersistenceStrategy.BLOB.equals(stgy) && !PersistenceStrategy.PRIMITIVE.equals(stgy)) { + throw new IllegalArgumentException("Incorrect configuration of Cassandra key/value persistence settings, " + + "'" + COLUMN_ATTR + "' attribute is only applicable for PRIMITIVE or BLOB strategy"); + } + + col = el.getAttribute(COLUMN_ATTR).trim(); + } + + if (el.hasAttribute(SERIALIZER_ATTR)) { + if (!PersistenceStrategy.BLOB.equals(stgy) && !PersistenceStrategy.POJO.equals(stgy)) { + throw new IllegalArgumentException("Incorrect configuration of Cassandra key/value persistence settings, " + + "'" + SERIALIZER_ATTR + "' attribute is only applicable for BLOB and POJO strategies"); + } + + Object obj = newObjectInstance(el.getAttribute(SERIALIZER_ATTR).trim()); + + if (!(obj instanceof Serializer)) { + throw new IllegalArgumentException("Incorrect configuration of Cassandra key/value persistence settings, " + + "serializer class '" + el.getAttribute(SERIALIZER_ATTR) + "' doesn't implement '" + + Serializer.class.getName() + "' interface"); + } + + serializer = (Serializer)obj; + } + + if ((PersistenceStrategy.BLOB.equals(stgy) || PersistenceStrategy.PRIMITIVE.equals(stgy)) && col == null) + col = defaultColumnName(); + } + + /** + * Returns java class of the object to be persisted. + * + * @return java class. + */ + public Class getJavaClass() { + return javaCls; + } + + /** + * Returns persistence strategy to use. + * + * @return persistence strategy. + */ + public PersistenceStrategy getStrategy() { + return stgy; + } + + /** + * Returns Cassandra table column name where object should be persisted in + * case of using BLOB or PRIMITIVE persistence strategy. + * + * @return column name. + */ + public String getColumn() { + return col; + } + + /** + * Returns serializer to be used for BLOBs. + * + * @return serializer. + */ + public Serializer getSerializer() { + return serializer; + } + + /** + * Returns list of POJO fields to be persisted. + * + * @return list of fields. + */ + public abstract List<PojoField> getFields(); + + /** + * Returns Cassandra table columns DDL, corresponding to POJO fields which should be persisted. + * + * @return DDL statement for Cassandra table fields + */ + public String getTableColumnsDDL() { + if (PersistenceStrategy.BLOB.equals(stgy)) + return " " + col + " " + DataType.Name.BLOB.toString(); + + if (PersistenceStrategy.PRIMITIVE.equals(stgy)) + return " " + col + " " + PropertyMappingHelper.getCassandraType(javaCls); + + StringBuilder builder = new StringBuilder(); + + for (PojoField field : getFields()) { + if (builder.length() > 0) + builder.append(",\n"); + + builder.append(" ").append(field.getColumnDDL()); + } + + if (builder.length() == 0) { + throw new IllegalStateException("There are no POJO fields found for '" + javaCls.toString() + + "' class to be presented as a Cassandra primary key"); + } + + return builder.toString(); + } + + /** + * Returns default name for Cassandra column (if it's not specified explicitly). + * + * @return column name + */ + protected abstract String defaultColumnName(); + + /** + * Checks if there are POJO filed with the same name or same Cassandra column specified in persistence settings + * + * @param fields list of fields to be persisted into Cassandra + */ + protected void checkDuplicates(List<PojoField> fields) { + if (fields == null || fields.isEmpty()) + return; + + for (PojoField field1 : fields) { + boolean sameNames = false; + boolean sameCols = false; + + for (PojoField field2 : fields) { + if (field1.getName().equals(field2.getName())) { + if (sameNames) { + throw new IllegalArgumentException("Incorrect Cassandra key persistence settings, " + + "two POJO fields with the same name '" + field1.getName() + "' specified"); + } + + sameNames = true; + } + + if (field1.getColumn().equals(field2.getColumn())) { + if (sameCols) { + throw new IllegalArgumentException("Incorrect Cassandra persistence settings, " + + "two POJO fields with the same column '" + field1.getColumn() + "' specified"); + } + + sameCols = true; + } + } + } + } + + /** + * Instantiates Class object for particular class + * + * @param clazz class name + * @return Class object + */ + private Class getClassInstance(String clazz) { + try { + return Class.forName(clazz); + } + catch (ClassNotFoundException ignored) { + } + + try { + return Class.forName(clazz, true, Thread.currentThread().getContextClassLoader()); + } + catch (ClassNotFoundException ignored) { + } + + try { + return Class.forName(clazz, true, PersistenceSettings.class.getClassLoader()); + } + catch (ClassNotFoundException ignored) { + } + + try { + return Class.forName(clazz, true, ClassLoader.getSystemClassLoader()); + } + catch (ClassNotFoundException ignored) { + } + + throw new IgniteException("Failed to load class '" + clazz + "' using reflection"); + } + + /** + * Creates new object instance of particular class + * + * @param clazz class name + * @return object + */ + private Object newObjectInstance(String clazz) { + try { + return getClassInstance(clazz).newInstance(); + } + catch (Throwable e) { + throw new IgniteException("Failed to instantiate class '" + clazz + "' using default constructor", e); + } + } + +} http://git-wip-us.apache.org/repos/asf/ignite/blob/231ead01/modules/cassandra/store/src/main/java/org/apache/ignite/cache/store/cassandra/persistence/PersistenceStrategy.java ---------------------------------------------------------------------- diff --git a/modules/cassandra/store/src/main/java/org/apache/ignite/cache/store/cassandra/persistence/PersistenceStrategy.java b/modules/cassandra/store/src/main/java/org/apache/ignite/cache/store/cassandra/persistence/PersistenceStrategy.java new file mode 100644 index 0000000..4b1e2d8 --- /dev/null +++ b/modules/cassandra/store/src/main/java/org/apache/ignite/cache/store/cassandra/persistence/PersistenceStrategy.java @@ -0,0 +1,62 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.cache.store.cassandra.persistence; + +/** + * Describes persistence strategy to be used to persist object data into Cassandra. + */ +public enum PersistenceStrategy { + /** + * Stores object value as is, by mapping its value to Cassandra table column with corresponding type. + * <p> + * Could be used for primitive java type (like Integer, String, Long and etc) which could be directly mapped + * to appropriate Cassandra types. + */ + PRIMITIVE, + + /** + * Stores object value as BLOB, by mapping its value to Cassandra table column with blob type. + * Could be used for any java type. Conversion of java object to BLOB is handled by specified serializer. + * <p> + * Available serializer implementations: + * <ul> + * <li> + * org.apache.ignite.cache.store.cassandra.serializer.JavaSerializer - uses standard Java + * serialization framework. + * </li> + * <li> + * org.apache.ignite.cache.store.cassandra.serializer.KryoSerializer - uses Kryo serialization + * framework. + * </li> + * </ul> + */ + BLOB, + + /** + * Stores each field of an object as a column having corresponding type in Cassandra table. + * Provides ability to utilize Cassandra secondary indexes for object fields. + * <p> + * Could be used for objects which follow JavaBeans convention and having empty public constructor. + * Object fields should be: + * <ul> + * <li>Primitive java types like int, long, String and etc.</li> + * <li>Collections of primitive java types like List<Integer>, Map<Integer, String>, Set<Long></li> + * </ul> + */ + POJO +} http://git-wip-us.apache.org/repos/asf/ignite/blob/231ead01/modules/cassandra/store/src/main/java/org/apache/ignite/cache/store/cassandra/persistence/PojoField.java ---------------------------------------------------------------------- diff --git a/modules/cassandra/store/src/main/java/org/apache/ignite/cache/store/cassandra/persistence/PojoField.java b/modules/cassandra/store/src/main/java/org/apache/ignite/cache/store/cassandra/persistence/PojoField.java new file mode 100644 index 0000000..af569fd --- /dev/null +++ b/modules/cassandra/store/src/main/java/org/apache/ignite/cache/store/cassandra/persistence/PojoField.java @@ -0,0 +1,219 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.cache.store.cassandra.persistence; + +import com.datastax.driver.core.DataType; +import com.datastax.driver.core.Row; +import java.beans.PropertyDescriptor; +import java.io.Serializable; +import org.apache.ignite.IgniteException; +import org.apache.ignite.cache.query.annotations.QuerySqlField; +import org.apache.ignite.cache.store.cassandra.common.PropertyMappingHelper; +import org.apache.ignite.cache.store.cassandra.serializer.Serializer; +import org.w3c.dom.Element; + +/** + * Descriptor for particular field in a POJO object, specifying how this field + * should be written to or loaded from Cassandra. + */ +public abstract class PojoField implements Serializable { + /** Name attribute of XML element describing Pojo field. */ + private static final String NAME_ATTR = "name"; + + /** Column attribute of XML element describing Pojo field. */ + private static final String COLUMN_ATTR = "column"; + + /** Field name. */ + private String name; + + /** Java class to which the field belongs. */ + private Class javaCls; + + /** Field column name in Cassandra table. */ + private String col; + + /** Field column DDL. */ + private String colDDL; + + /** Field property descriptor. */ + private transient PropertyDescriptor desc; + + /** + * Creates instance of {@link PojoField} based on it's description in XML element. + * + * @param el XML element describing Pojo field + * @param pojoCls Pojo java class. + */ + public PojoField(Element el, Class<?> pojoCls) { + if (el == null) + throw new IllegalArgumentException("DOM element representing POJO field object can't be null"); + + if (!el.hasAttribute(NAME_ATTR)) { + throw new IllegalArgumentException("DOM element representing POJO field object should have '" + + NAME_ATTR + "' attribute"); + } + + this.name = el.getAttribute(NAME_ATTR).trim(); + this.col = el.hasAttribute(COLUMN_ATTR) ? el.getAttribute(COLUMN_ATTR).trim() : name.toLowerCase(); + + init(PropertyMappingHelper.getPojoPropertyDescriptor(pojoCls, name)); + } + + /** + * Creates instance of {@link PojoField} from its property descriptor. + * + * @param desc Field property descriptor. + */ + public PojoField(PropertyDescriptor desc) { + this.name = desc.getName(); + + QuerySqlField sqlField = desc.getReadMethod() != null ? + desc.getReadMethod().getAnnotation(QuerySqlField.class) : + desc.getWriteMethod() == null ? + null : + desc.getWriteMethod().getAnnotation(QuerySqlField.class); + + this.col = sqlField != null && sqlField.name() != null ? sqlField.name() : name.toLowerCase(); + + init(desc); + + if (sqlField != null) + init(sqlField); + } + + /** + * @return field name. + */ + public String getName() { + return name; + } + + /** + * @return Cassandra table column name. + */ + public String getColumn() { + return col; + } + + /** + * @return Cassandra table column DDL statement. + */ + public String getColumnDDL() { + return colDDL; + } + + /** + * Gets field value as an object having Cassandra compatible type. + * This it could be stored directly into Cassandra without any conversions. + * + * @param obj Object instance. + * @param serializer {@link org.apache.ignite.cache.store.cassandra.serializer.Serializer} to use. + * @return Object to store in Cassandra table column. + */ + public Object getValueFromObject(Object obj, Serializer serializer) { + try { + Object val = propDesc().getReadMethod().invoke(obj); + + if (val == null) + return null; + + DataType.Name cassandraType = PropertyMappingHelper.getCassandraType(val.getClass()); + + if (cassandraType != null) + return val; + + if (serializer == null) { + throw new IllegalStateException("Can't serialize value from object '" + + val.getClass().getName() + "' field '" + name + "', cause there is no BLOB serializer specified"); + } + + return serializer.serialize(val); + } + catch (Throwable e) { + throw new IgniteException("Failed to get value of the field '" + name + "' from the instance " + + " of '" + obj.getClass().toString() + "' class", e); + } + } + + /** + * Sets object field value from a {@link com.datastax.driver.core.Row} returned by Cassandra CQL statement. + * + * @param row {@link com.datastax.driver.core.Row} + * @param obj object which field should be populated from {@link com.datastax.driver.core.Row} + * @param serializer {@link org.apache.ignite.cache.store.cassandra.serializer.Serializer} to use. + */ + public void setValueFromRow(Row row, Object obj, Serializer serializer) { + Object val = PropertyMappingHelper.getCassandraColumnValue(row, col, propDesc().getPropertyType(), serializer); + + try { + propDesc().getWriteMethod().invoke(obj, val); + } + catch (Throwable e) { + throw new IgniteException("Failed to set value of the field '" + name + "' of the instance " + + " of '" + obj.getClass().toString() + "' class", e); + } + } + + /** + * Initializes field info from annotation. + * + * @param sqlField {@link QuerySqlField} annotation. + */ + protected abstract void init(QuerySqlField sqlField); + + /** + * Initializes field info from property descriptor. + * + * @param desc {@link PropertyDescriptor} descriptor. + */ + protected void init(PropertyDescriptor desc) { + if (desc.getReadMethod() == null) { + throw new IllegalArgumentException("Field '" + desc.getName() + + "' of the class instance '" + desc.getPropertyType().getName() + + "' doesn't provide getter method"); + } + + if (desc.getWriteMethod() == null) { + throw new IllegalArgumentException("Field '" + desc.getName() + + "' of POJO object instance of the class '" + desc.getPropertyType().getName() + + "' doesn't provide write method"); + } + + if (!desc.getReadMethod().isAccessible()) + desc.getReadMethod().setAccessible(true); + + if (!desc.getWriteMethod().isAccessible()) + desc.getWriteMethod().setAccessible(true); + + DataType.Name cassandraType = PropertyMappingHelper.getCassandraType(desc.getPropertyType()); + cassandraType = cassandraType == null ? DataType.Name.BLOB : cassandraType; + + this.javaCls = desc.getReadMethod().getDeclaringClass(); + this.desc = desc; + this.colDDL = col + " " + cassandraType.toString(); + } + + /** + * Returns property descriptor of the POJO field + * + * @return Property descriptor + */ + private PropertyDescriptor propDesc() { + return desc != null ? desc : (desc = PropertyMappingHelper.getPojoPropertyDescriptor(javaCls, name)); + } +} http://git-wip-us.apache.org/repos/asf/ignite/blob/231ead01/modules/cassandra/store/src/main/java/org/apache/ignite/cache/store/cassandra/persistence/PojoKeyField.java ---------------------------------------------------------------------- diff --git a/modules/cassandra/store/src/main/java/org/apache/ignite/cache/store/cassandra/persistence/PojoKeyField.java b/modules/cassandra/store/src/main/java/org/apache/ignite/cache/store/cassandra/persistence/PojoKeyField.java new file mode 100644 index 0000000..4e86d74 --- /dev/null +++ b/modules/cassandra/store/src/main/java/org/apache/ignite/cache/store/cassandra/persistence/PojoKeyField.java @@ -0,0 +1,91 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.cache.store.cassandra.persistence; + +import java.beans.PropertyDescriptor; +import org.apache.ignite.cache.query.annotations.QuerySqlField; +import org.w3c.dom.Element; + +/** + * Descriptor for Ignite key POJO class + */ +public class PojoKeyField extends PojoField { + + /** + * Specifies sort order for POJO key field + */ + public enum SortOrder { + /** Ascending sort order. */ + ASC, + /** Descending sort order. */ + DESC + } + + /** Xml attribute specifying sort order. */ + private static final String SORT_ATTR = "sort"; + + /** Sort order. */ + private SortOrder sortOrder = null; + + /** + * Constructs Ignite cache key POJO object descriptor. + * + * @param el xml configuration element. + * @param pojoCls java class of key POJO field. + */ + public PojoKeyField(Element el, Class pojoCls) { + super(el, pojoCls); + + if (el.hasAttribute(SORT_ATTR)) { + try { + sortOrder = SortOrder.valueOf(el.getAttribute(SORT_ATTR).trim().toUpperCase()); + } + catch (IllegalArgumentException e) { + throw new IllegalArgumentException("Incorrect sort order '" + el.getAttribute(SORT_ATTR) + "' specified"); + } + } + } + + /** + * Constructs Ignite cache key POJO object descriptor. + * + * @param desc property descriptor. + */ + public PojoKeyField(PropertyDescriptor desc) { + super(desc); + } + + /** + * Returns sort order for the field. + * + * @return sort order. + */ + public SortOrder getSortOrder() { + return sortOrder; + } + + /** + * Initializes descriptor from {@link QuerySqlField} annotation. + * + * @param sqlField {@link QuerySqlField} annotation. + */ + protected void init(QuerySqlField sqlField) { + if (sqlField.descending()) + sortOrder = SortOrder.DESC; + } +} http://git-wip-us.apache.org/repos/asf/ignite/blob/231ead01/modules/cassandra/store/src/main/java/org/apache/ignite/cache/store/cassandra/persistence/PojoValueField.java ---------------------------------------------------------------------- diff --git a/modules/cassandra/store/src/main/java/org/apache/ignite/cache/store/cassandra/persistence/PojoValueField.java b/modules/cassandra/store/src/main/java/org/apache/ignite/cache/store/cassandra/persistence/PojoValueField.java new file mode 100644 index 0000000..c29f1db --- /dev/null +++ b/modules/cassandra/store/src/main/java/org/apache/ignite/cache/store/cassandra/persistence/PojoValueField.java @@ -0,0 +1,152 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.cache.store.cassandra.persistence; + +import java.beans.PropertyDescriptor; +import org.apache.ignite.cache.query.annotations.QuerySqlField; +import org.w3c.dom.Element; + +/** + * Descriptor for Ignite value POJO class + */ +public class PojoValueField extends PojoField { + /** Xml attribute specifying that Cassandra column is static. */ + private static final String STATIC_ATTR = "static"; + + /** Xml attribute specifying that secondary index should be created for Cassandra column. */ + private static final String INDEX_ATTR = "index"; + + /** Xml attribute specifying secondary index custom class. */ + private static final String INDEX_CLASS_ATTR = "indexClass"; + + /** Xml attribute specifying secondary index options. */ + private static final String INDEX_OPTIONS_ATTR = "indexOptions"; + + /** Indicates if Cassandra column should be indexed. */ + private Boolean isIndexed; + + /** Custom java class for Cassandra secondary index. */ + private String idxCls; + + /** Secondary index options. */ + private String idxOptions; + + /** Indicates if Cassandra column is static. */ + private Boolean isStatic; + + /** + * Constructs Ignite cache value field descriptor. + * + * @param el field descriptor xml configuration element. + * @param pojoCls field java class + */ + public PojoValueField(Element el, Class pojoCls) { + super(el, pojoCls); + + if (el.hasAttribute(STATIC_ATTR)) + isStatic = Boolean.parseBoolean(el.getAttribute(STATIC_ATTR).trim().toLowerCase()); + + if (el.hasAttribute(INDEX_ATTR)) + isIndexed = Boolean.parseBoolean(el.getAttribute(INDEX_ATTR).trim().toLowerCase()); + + if (el.hasAttribute(INDEX_CLASS_ATTR)) + idxCls = el.getAttribute(INDEX_CLASS_ATTR).trim(); + + if (el.hasAttribute(INDEX_OPTIONS_ATTR)) { + idxOptions = el.getAttribute(INDEX_OPTIONS_ATTR).trim(); + + if (!idxOptions.toLowerCase().startsWith("with")) { + idxOptions = idxOptions.toLowerCase().startsWith("options") ? + "with " + idxOptions : + "with options = " + idxOptions; + } + } + } + + /** + * Constructs Ignite cache value field descriptor. + * + * @param desc field property descriptor. + */ + public PojoValueField(PropertyDescriptor desc) { + super(desc); + } + + /** + * Returns DDL for Cassandra columns corresponding to POJO field. + * + * @return columns DDL. + */ + public String getColumnDDL() { + String colDDL = super.getColumnDDL(); + + if (isStatic != null && isStatic) + colDDL = colDDL + " static"; + + return colDDL; + } + + /** + * Indicates if secondary index should be created for the field. + * + * @return true/false if secondary index should/shouldn't be created for the field. + */ + public boolean isIndexed() { + return isIndexed != null && isIndexed; + } + + /** + * Returns DDL for the field secondary index. + * + * @param keyspace Cassandra keyspace where index should be created. + * @param tbl Cassandra table for which secondary index should be created. + * + * @return secondary index DDL. + */ + public String getIndexDDL(String keyspace, String tbl) { + if (isIndexed == null || !isIndexed) + return null; + + StringBuilder builder = new StringBuilder(); + + if (idxCls != null) + builder.append("create custom index if not exists on ").append(keyspace).append(".").append(tbl); + else + builder.append("create index if not exists on ").append(keyspace).append(".").append(tbl); + + builder.append(" (").append(getColumn()).append(")"); + + if (idxCls != null) + builder.append(" using '").append(idxCls).append("'"); + + if (idxOptions != null) + builder.append(" ").append(idxOptions); + + return builder.append(";").toString(); + } + + /** + * Initializes descriptor from {@link QuerySqlField} annotation. + * + * @param sqlField {@link QuerySqlField} annotation. + */ + protected void init(QuerySqlField sqlField) { + if (sqlField.index()) + isIndexed = true; + } +} http://git-wip-us.apache.org/repos/asf/ignite/blob/231ead01/modules/cassandra/store/src/main/java/org/apache/ignite/cache/store/cassandra/persistence/ValuePersistenceSettings.java ---------------------------------------------------------------------- diff --git a/modules/cassandra/store/src/main/java/org/apache/ignite/cache/store/cassandra/persistence/ValuePersistenceSettings.java b/modules/cassandra/store/src/main/java/org/apache/ignite/cache/store/cassandra/persistence/ValuePersistenceSettings.java new file mode 100644 index 0000000..877167d --- /dev/null +++ b/modules/cassandra/store/src/main/java/org/apache/ignite/cache/store/cassandra/persistence/ValuePersistenceSettings.java @@ -0,0 +1,107 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.cache.store.cassandra.persistence; + +import java.beans.PropertyDescriptor; +import java.util.Collections; +import java.util.LinkedList; +import java.util.List; +import org.apache.ignite.cache.store.cassandra.common.PropertyMappingHelper; +import org.w3c.dom.Element; +import org.w3c.dom.NodeList; + +/** + * Stores persistence settings for Ignite cache value + */ +public class ValuePersistenceSettings extends PersistenceSettings { + /** XML element describing value field settings. */ + private static final String FIELD_ELEMENT = "field"; + + /** Value fields. */ + private List<PojoField> fields = new LinkedList<>(); + + /** + * Creates class instance from XML configuration. + * + * @param el XML element describing value persistence settings. + */ + public ValuePersistenceSettings(Element el) { + super(el); + + if (!PersistenceStrategy.POJO.equals(getStrategy())) + return; + + NodeList nodes = el.getElementsByTagName(FIELD_ELEMENT); + + fields = detectFields(nodes); + + if (fields.isEmpty()) + throw new IllegalStateException("Failed to initialize value fields for class '" + getJavaClass().getName() + "'"); + + checkDuplicates(fields); + } + + /** + * @return List of value fields. + */ + public List<PojoField> getFields() { + return fields == null ? null : Collections.unmodifiableList(fields); + } + + /** {@inheritDoc} */ + @Override protected String defaultColumnName() { + return "value"; + } + + /** + * Extracts POJO fields from a list of corresponding XML field nodes. + * + * @param fieldNodes Field nodes to process. + * @return POJO fields list. + */ + private List<PojoField> detectFields(NodeList fieldNodes) { + List<PojoField> list = new LinkedList<>(); + + if (fieldNodes == null || fieldNodes.getLength() == 0) { + List<PropertyDescriptor> primitivePropDescriptors = PropertyMappingHelper.getPojoPropertyDescriptors(getJavaClass(), true); + for (PropertyDescriptor descriptor : primitivePropDescriptors) + list.add(new PojoValueField(descriptor)); + + return list; + } + + List<PropertyDescriptor> allPropDescriptors = PropertyMappingHelper.getPojoPropertyDescriptors(getJavaClass(), false); + + int cnt = fieldNodes.getLength(); + + for (int i = 0; i < cnt; i++) { + PojoValueField field = new PojoValueField((Element)fieldNodes.item(i), getJavaClass()); + + PropertyDescriptor desc = findPropertyDescriptor(allPropDescriptors, field.getName()); + + if (desc == null) { + throw new IllegalArgumentException("Specified POJO field '" + field.getName() + + "' doesn't exist in '" + getJavaClass().getName() + "' class"); + } + + list.add(field); + } + + return list; + } +} http://git-wip-us.apache.org/repos/asf/ignite/blob/231ead01/modules/cassandra/store/src/main/java/org/apache/ignite/cache/store/cassandra/persistence/package-info.java ---------------------------------------------------------------------- diff --git a/modules/cassandra/store/src/main/java/org/apache/ignite/cache/store/cassandra/persistence/package-info.java b/modules/cassandra/store/src/main/java/org/apache/ignite/cache/store/cassandra/persistence/package-info.java new file mode 100644 index 0000000..76d32fb --- /dev/null +++ b/modules/cassandra/store/src/main/java/org/apache/ignite/cache/store/cassandra/persistence/package-info.java @@ -0,0 +1,21 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +/** + * Contains persistent settings configuration + */ +package org.apache.ignite.cache.store.cassandra.persistence; \ No newline at end of file http://git-wip-us.apache.org/repos/asf/ignite/blob/231ead01/modules/cassandra/store/src/main/java/org/apache/ignite/cache/store/cassandra/serializer/JavaSerializer.java ---------------------------------------------------------------------- diff --git a/modules/cassandra/store/src/main/java/org/apache/ignite/cache/store/cassandra/serializer/JavaSerializer.java b/modules/cassandra/store/src/main/java/org/apache/ignite/cache/store/cassandra/serializer/JavaSerializer.java new file mode 100644 index 0000000..44d2d47 --- /dev/null +++ b/modules/cassandra/store/src/main/java/org/apache/ignite/cache/store/cassandra/serializer/JavaSerializer.java @@ -0,0 +1,80 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.cache.store.cassandra.serializer; + +import java.io.ByteArrayInputStream; +import java.io.ByteArrayOutputStream; +import java.io.IOException; +import java.io.ObjectInputStream; +import java.io.ObjectOutputStream; +import java.nio.ByteBuffer; +import org.apache.ignite.internal.util.typedef.internal.U; + +/** + * Serializer based on standard Java serialization. + */ +public class JavaSerializer implements Serializer { + /** */ + private static final int DFLT_BUFFER_SIZE = 4096; + + /** {@inheritDoc} */ + @Override public ByteBuffer serialize(Object obj) { + if (obj == null) + return null; + + ByteArrayOutputStream stream = null; + ObjectOutputStream out = null; + + try { + stream = new ByteArrayOutputStream(DFLT_BUFFER_SIZE); + + out = new ObjectOutputStream(stream); + out.writeObject(obj); + out.flush(); + + return ByteBuffer.wrap(stream.toByteArray()); + } + catch (IOException e) { + throw new IllegalStateException("Failed to serialize object of the class '" + obj.getClass().getName() + "'", e); + } + finally { + U.closeQuiet(out); + U.closeQuiet(stream); + } + } + + /** {@inheritDoc} */ + @Override public Object deserialize(ByteBuffer buf) { + ByteArrayInputStream stream = null; + ObjectInputStream in = null; + + try { + stream = new ByteArrayInputStream(buf.array()); + in = new ObjectInputStream(stream); + + return in.readObject(); + } + catch (Throwable e) { + throw new IllegalStateException("Failed to deserialize object from byte stream", e); + } + finally { + U.closeQuiet(in); + U.closeQuiet(stream); + } + } +} http://git-wip-us.apache.org/repos/asf/ignite/blob/231ead01/modules/cassandra/store/src/main/java/org/apache/ignite/cache/store/cassandra/serializer/Serializer.java ---------------------------------------------------------------------- diff --git a/modules/cassandra/store/src/main/java/org/apache/ignite/cache/store/cassandra/serializer/Serializer.java b/modules/cassandra/store/src/main/java/org/apache/ignite/cache/store/cassandra/serializer/Serializer.java new file mode 100644 index 0000000..5b8d542 --- /dev/null +++ b/modules/cassandra/store/src/main/java/org/apache/ignite/cache/store/cassandra/serializer/Serializer.java @@ -0,0 +1,43 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.cache.store.cassandra.serializer; + +import java.io.Serializable; +import java.nio.ByteBuffer; + +/** + * Interface which should be implemented by all serializers responsible + * for writing/loading data to/from Cassandra in binary (BLOB) format. + */ +public interface Serializer extends Serializable { + /** + * Serializes object into byte buffer. + * + * @param obj Object to serialize. + * @return Byte buffer with binary data. + */ + public ByteBuffer serialize(Object obj); + + /** + * Deserializes object from byte buffer. + * + * @param buf Byte buffer. + * @return Deserialized object. + */ + public Object deserialize(ByteBuffer buf); +}