Update mapping reading to support mappings of field names to bin values in aerospike module
Project: http://git-wip-us.apache.org/repos/asf/gora/repo Commit: http://git-wip-us.apache.org/repos/asf/gora/commit/b0c19774 Tree: http://git-wip-us.apache.org/repos/asf/gora/tree/b0c19774 Diff: http://git-wip-us.apache.org/repos/asf/gora/diff/b0c19774 Branch: refs/heads/master Commit: b0c197744e40816bd5a1780684e9f31081949790 Parents: 3900a41 Author: nishadi <ndime...@gmail.com> Authored: Mon Jun 19 17:35:44 2017 +0530 Committer: nishadi <ndime...@gmail.com> Committed: Mon Jun 19 17:35:44 2017 +0530 ---------------------------------------------------------------------- .../gora/aerospike/store/AerospikeMapping.java | 13 ++ .../store/AerospikeMappingBuilder.java | 165 ++++++++++--------- .../gora/aerospike/store/AerospikeStore.java | 76 ++++----- 3 files changed, 132 insertions(+), 122 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/gora/blob/b0c19774/gora-aerospike/src/main/java/org/apache/gora/aerospike/store/AerospikeMapping.java ---------------------------------------------------------------------- diff --git a/gora-aerospike/src/main/java/org/apache/gora/aerospike/store/AerospikeMapping.java b/gora-aerospike/src/main/java/org/apache/gora/aerospike/store/AerospikeMapping.java index 5df8a92..ae8284d 100644 --- a/gora-aerospike/src/main/java/org/apache/gora/aerospike/store/AerospikeMapping.java +++ b/gora-aerospike/src/main/java/org/apache/gora/aerospike/store/AerospikeMapping.java @@ -19,15 +19,20 @@ package org.apache.gora.aerospike.store; import com.aerospike.client.policy.Policy; import com.aerospike.client.policy.WritePolicy; +import java.util.HashMap; +import java.util.Map; + public class AerospikeMapping { private String namespace; private String set; private WritePolicy writePolicy; private Policy readPolicy; + private Map<String, String> binMapping; public AerospikeMapping() { writePolicy = new WritePolicy(); readPolicy = new Policy(); + binMapping = new HashMap<>(); } public String getNamespace() { @@ -61,4 +66,12 @@ public class AerospikeMapping { public void setReadPolicy(Policy readPolicy) { this.readPolicy = readPolicy; } + + public Map<String, String> getBinMapping() { + return binMapping; + } + + public void setBinMapping(Map<String, String> binMapping) { + this.binMapping = binMapping; + } } http://git-wip-us.apache.org/repos/asf/gora/blob/b0c19774/gora-aerospike/src/main/java/org/apache/gora/aerospike/store/AerospikeMappingBuilder.java ---------------------------------------------------------------------- diff --git a/gora-aerospike/src/main/java/org/apache/gora/aerospike/store/AerospikeMappingBuilder.java b/gora-aerospike/src/main/java/org/apache/gora/aerospike/store/AerospikeMappingBuilder.java index 8744709..7b422f1 100644 --- a/gora-aerospike/src/main/java/org/apache/gora/aerospike/store/AerospikeMappingBuilder.java +++ b/gora-aerospike/src/main/java/org/apache/gora/aerospike/store/AerospikeMappingBuilder.java @@ -25,12 +25,14 @@ import org.jdom.Element; import org.jdom.input.SAXBuilder; import org.slf4j.Logger; import org.slf4j.LoggerFactory; - +import org.jdom.JDOMException; import javax.naming.ConfigurationException; import java.io.IOException; import java.io.InputStream; import java.util.List; import java.util.Locale; +import java.util.Map; +import java.util.HashMap; public class AerospikeMappingBuilder { @@ -38,107 +40,108 @@ public class AerospikeMappingBuilder { private AerospikeMapping aerospikeMapping; - public AerospikeMappingBuilder(String mappingFile, Class<?> keyClass, Class<?> persistentClass) throws IOException { + public AerospikeMappingBuilder() throws IOException { this.aerospikeMapping = new AerospikeMapping(); - this.readMappingFile(mappingFile, keyClass, persistentClass); } public AerospikeMapping getAerospikeMapping() { return this.aerospikeMapping; } - private void readMappingFile(String fileName, Class<?> keyClass, Class<?> persistentClass) throws IOException { - try { - SAXBuilder saxBuilder = new SAXBuilder(); - InputStream inputStream = getClass().getClassLoader().getResourceAsStream(fileName); - if (inputStream == null) { - LOG.warn("Mapping file '" + fileName + "' could not be found!"); - throw new IOException("Mapping file '" + fileName + "' could not be found!"); - } - Document document = saxBuilder.build(inputStream); - if (document == null) { - LOG.warn("Mapping file '" + fileName + "' could not be found!"); - throw new IOException("Mapping file '" + fileName + "' could not be found!"); - } + public void readMappingFile(String mappingFile, Class<?> keyClass, Class<?> persistentClass) + throws IOException, JDOMException, ConfigurationException { - Element root = document.getRootElement(); - - // Mapping the defined policies - List<Element> policyElements = root.getChildren("policy"); - - for (Element policyElement : policyElements) { - - String policy = policyElement.getAttributeValue("name"); - if (policy != null) { - if (policy.equals("write")) { - WritePolicy writePolicy = new WritePolicy(); - if (policyElement.getAttributeValue("gen") != null) - writePolicy.generationPolicy = getGenerationPolicyMapping(policyElement.getAttributeValue - ("gen").toUpperCase(Locale.getDefault())); - if (policyElement.getAttributeValue("exists") != null) - writePolicy.recordExistsAction = getRecordExistsAction(policyElement.getAttributeValue - ("exists").toUpperCase(Locale.getDefault())); - if (policyElement.getAttributeValue("key") != null) - writePolicy.sendKey = getKeyUsagePolicy(policyElement.getAttributeValue("key").toUpperCase - (Locale.getDefault())); - if (policyElement.getAttributeValue("retry") != null) - writePolicy.retryOnTimeout = getRetryOnTimeoutPolicy(policyElement.getAttributeValue - ("retry").toUpperCase(Locale.getDefault())); - if (policyElement.getAttributeValue("timeout") != null) - writePolicy.timeout = getTimeoutValue(policyElement.getAttributeValue("timeout")); - aerospikeMapping.setWritePolicy(writePolicy); - } else if (policy.equals("read")) { - Policy readPolicy = new Policy(); - if (policyElement.getAttributeValue("key") != null) - readPolicy.sendKey = getKeyUsagePolicy(policyElement.getAttributeValue("key").toUpperCase(Locale - .getDefault())); - if (policyElement.getAttributeValue("timeout") != null) - readPolicy.timeout = getTimeoutValue(policyElement.getAttributeValue("timeout")); - aerospikeMapping.setReadPolicy(readPolicy); - } + SAXBuilder saxBuilder = new SAXBuilder(); + InputStream inputStream = getClass().getClassLoader().getResourceAsStream(mappingFile); + if (inputStream == null) { + LOG.warn("Mapping file '" + mappingFile + "' could not be found!"); + throw new IOException("Mapping file '" + mappingFile + "' could not be found!"); + } + Document document = saxBuilder.build(inputStream); + if (document == null) { + LOG.warn("Mapping file '" + mappingFile + "' could not be found!"); + throw new IOException("Mapping file '" + mappingFile + "' could not be found!"); + } + + Element root = document.getRootElement(); + + List<Element> policyElements = root.getChildren("policy"); + + for (Element policyElement : policyElements) { + + String policy = policyElement.getAttributeValue("name"); + if (policy != null) { + if (policy.equals("write")) { + WritePolicy writePolicy = new WritePolicy(); + if (policyElement.getAttributeValue("gen") != null) + writePolicy.generationPolicy = getGenerationPolicyMapping( + policyElement.getAttributeValue("gen").toUpperCase(Locale.getDefault())); + if (policyElement.getAttributeValue("exists") != null) + writePolicy.recordExistsAction = getRecordExistsAction( + policyElement.getAttributeValue("exists").toUpperCase(Locale.getDefault())); + if (policyElement.getAttributeValue("key") != null) + writePolicy.sendKey = getKeyUsagePolicy( + policyElement.getAttributeValue("key").toUpperCase(Locale.getDefault())); + if (policyElement.getAttributeValue("retry") != null) + writePolicy.retryOnTimeout = getRetryOnTimeoutPolicy( + policyElement.getAttributeValue("retry").toUpperCase(Locale.getDefault())); + if (policyElement.getAttributeValue("timeout") != null) + writePolicy.timeout = getTimeoutValue(policyElement.getAttributeValue("timeout")); + aerospikeMapping.setWritePolicy(writePolicy); + } else if (policy.equals("read")) { + Policy readPolicy = new Policy(); + if (policyElement.getAttributeValue("key") != null) + readPolicy.sendKey = getKeyUsagePolicy( + policyElement.getAttributeValue("key").toUpperCase(Locale.getDefault())); + if (policyElement.getAttributeValue("timeout") != null) + readPolicy.timeout = getTimeoutValue(policyElement.getAttributeValue("timeout")); + aerospikeMapping.setReadPolicy(readPolicy); } } + } + + List<Element> classElements = root.getChildren("class"); - // Mapping the defined classes - List<Element> classElements = root.getChildren("class"); + boolean persistentClassAndKeyClassMatches = false; + for (Element classElement : classElements) { - boolean persistentAndKeyClassMatches = false; - for (Element classElement : classElements) { + String mappingKeyClass = classElement.getAttributeValue("keyClass"); + String mappingClassName = classElement.getAttributeValue("name"); - String mappingKeyClass = classElement.getAttributeValue("keyClass"); - String mappingClassName = classElement.getAttributeValue("name"); + if (mappingKeyClass != null && mappingClassName != null) { + if (mappingKeyClass.equals(keyClass.getCanonicalName()) && mappingClassName + .equals(persistentClass.getCanonicalName())) { - if (mappingKeyClass != null && mappingClassName != null) { - if (mappingKeyClass.equals(keyClass.getCanonicalName()) - && mappingClassName.equals(persistentClass.getCanonicalName())) { + persistentClassAndKeyClassMatches = true; - persistentAndKeyClassMatches = true; + List<Element> fields = classElement.getChildren("field"); + Map<String, String> binMapping = new HashMap<>(); + for (Element field : fields) { + String fieldName = field.getAttributeValue("name"); + String binName = field.getAttributeValue("bin"); + if (fieldName != null && binName != null) + binMapping.put(fieldName, binName); + } + aerospikeMapping.setBinMapping(binMapping); - String nameSpace = classElement.getAttributeValue("namespace"); - if (nameSpace == null || nameSpace.isEmpty()) { - throw new ConfigurationException("Gora-aerospike-mapping does not include the relevant namespace for " + - "the class"); - } - aerospikeMapping.setNamespace(nameSpace); + String nameSpace = classElement.getAttributeValue("namespace"); + if (nameSpace == null || nameSpace.isEmpty()) { + throw new ConfigurationException( + "Gora-aerospike-mapping does not include the relevant namespace for " + + "the class"); + } + aerospikeMapping.setNamespace(nameSpace); - String set = classElement.getAttributeValue("set"); - if (set != null && !set.isEmpty()) { - aerospikeMapping.setSet(set); - } + String set = classElement.getAttributeValue("set"); + if (set != null && !set.isEmpty()) { + aerospikeMapping.setSet(set); } } - } - if (!persistentAndKeyClassMatches) - throw new ConfigurationException("Gora-aerospike-mapping does not include the name and keyClass in the " + - "databean"); - } catch (IOException e) { - LOG.error(e.getMessage(), e); - throw new IOException(e); - } catch (Exception e) { - LOG.error(e.getMessage(), e); - throw new IOException(e); } + if (!persistentClassAndKeyClassMatches) + throw new ConfigurationException( + "Gora-aerospike-mapping does not include the name and keyClass in the databean"); } private GenerationPolicy getGenerationPolicyMapping(String genPolicy) { http://git-wip-us.apache.org/repos/asf/gora/blob/b0c19774/gora-aerospike/src/main/java/org/apache/gora/aerospike/store/AerospikeStore.java ---------------------------------------------------------------------- diff --git a/gora-aerospike/src/main/java/org/apache/gora/aerospike/store/AerospikeStore.java b/gora-aerospike/src/main/java/org/apache/gora/aerospike/store/AerospikeStore.java index c8610e1..c452857 100644 --- a/gora-aerospike/src/main/java/org/apache/gora/aerospike/store/AerospikeStore.java +++ b/gora-aerospike/src/main/java/org/apache/gora/aerospike/store/AerospikeStore.java @@ -42,60 +42,59 @@ import org.slf4j.LoggerFactory; public class AerospikeStore<K, T extends PersistentBase> extends DataStoreBase<K, T> { public static final Logger LOG = LoggerFactory.getLogger(AerospikeStore.class); + private static final String PARSE_MAPPING_FILE_KEY = "gora.aerospike.mapping.file"; - private static final String DEFAULT_MAPPING_FILE = "gora-aerospike-mapping.xml"; + private static final String DEFAULT_MAPPING_FILE = "gora-aerospike-mapping.xml"; private AerospikeClient aerospikeClient; + private AerospikeParameters aerospikeParameters; - @Override - public void initialize(Class<K> keyClass, Class<T> persistentClass, Properties properties) { + @Override public void initialize(Class<K> keyClass, Class<T> persistentClass, + Properties properties) { super.initialize(keyClass, persistentClass, properties); try { - AerospikeMappingBuilder aerospikeMappingBuilder = new AerospikeMappingBuilder(getConf().get - (PARSE_MAPPING_FILE_KEY, DEFAULT_MAPPING_FILE), keyClass, - persistentClass); - aerospikeParameters = new AerospikeParameters(aerospikeMappingBuilder.getAerospikeMapping(), properties); + AerospikeMappingBuilder aerospikeMappingBuilder = new AerospikeMappingBuilder(); + aerospikeMappingBuilder + .readMappingFile(getConf().get(PARSE_MAPPING_FILE_KEY, DEFAULT_MAPPING_FILE), + keyClass, persistentClass); + aerospikeParameters = new AerospikeParameters(aerospikeMappingBuilder.getAerospikeMapping(), + properties); ClientPolicy policy = new ClientPolicy(); policy.writePolicyDefault = aerospikeParameters.getAerospikeMapping().getWritePolicy(); policy.readPolicyDefault = aerospikeParameters.getAerospikeMapping().getReadPolicy(); - aerospikeClient = new AerospikeClient(aerospikeParameters.getHost(), aerospikeParameters.getPort()); + aerospikeClient = new AerospikeClient(aerospikeParameters.getHost(), + aerospikeParameters.getPort()); aerospikeParameters.setServerSpecificParameters(aerospikeClient); } catch (Exception e) { throw new RuntimeException(e); } } - @Override - public String getSchemaName() { + @Override public String getSchemaName() { return null; } - @Override - public void createSchema() { + @Override public void createSchema() { } - @Override - public void deleteSchema() { + @Override public void deleteSchema() { } - @Override - public boolean schemaExists() { + @Override public boolean schemaExists() { return true; } - @Override - public T get(K key, String[] fields) { + @Override public T get(K key, String[] fields) { return null; } - @Override - public void put(K key, T value) { + @Override public void put(K key, T value) { - Key recordKey = new Key(aerospikeParameters.getAerospikeMapping().getNamespace(), aerospikeParameters - .getAerospikeMapping().getSet(), Value.get(key)); + Key recordKey = new Key(aerospikeParameters.getAerospikeMapping().getNamespace(), + aerospikeParameters.getAerospikeMapping().getSet(), Value.get(key)); List<Field> fields = value.getSchema().getFields(); @@ -104,40 +103,35 @@ public class AerospikeStore<K, T extends PersistentBase> extends DataStoreBase<K // In retrieving the bin name, it is checked whether the server is single bin valued String binName = aerospikeParameters.getBinName(fields.get(i).name()); Bin bin = getBin(binName, value.get(i), fields.get(i)); - aerospikeClient.put(aerospikeParameters.getAerospikeMapping().getWritePolicy(), recordKey, bin); + aerospikeClient + .put(aerospikeParameters.getAerospikeMapping().getWritePolicy(), recordKey, bin); } } - @Override - public boolean delete(K key) { + @Override public boolean delete(K key) { return true; } - @Override - public long deleteByQuery(Query<K, T> query) { + @Override public long deleteByQuery(Query<K, T> query) { return 0; } - @Override - public Result<K, T> execute(Query<K, T> query) { + @Override public Result<K, T> execute(Query<K, T> query) { return null; } - @Override - public Query<K, T> newQuery() { + @Override public Query<K, T> newQuery() { return null; } - @Override - public List<PartitionQuery<K, T>> getPartitions(Query<K, T> query) throws IOException { + @Override public List<PartitionQuery<K, T>> getPartitions(Query<K, T> query) throws IOException { return null; } public void flush() { } - @Override - public void close() { + @Override public void close() { aerospikeClient.close(); } @@ -148,17 +142,17 @@ public class AerospikeStore<K, T extends PersistentBase> extends DataStoreBase<K * and returns the Bin * * @param binName name of the bin - * @param value value of the bin - * @param field field corresponding to bin + * @param value value of the bin + * @param field field corresponding to bin * @return */ - private Bin getBin(String binName, Object value, Field field){ - + private Bin getBin(String binName, Object value, Field field) { + boolean isStringType = false; if (field.schema().getType().equals(Schema.Type.STRING)) isStringType = true; - if (field.schema().getType().equals(Schema.Type.UNION)){ - for (Schema schema :field.schema().getTypes()) { + if (field.schema().getType().equals(Schema.Type.UNION)) { + for (Schema schema : field.schema().getTypes()) { if (schema.getName().equals("string")) isStringType = true; }