Add get functionality for aerospike module
Project: http://git-wip-us.apache.org/repos/asf/gora/repo Commit: http://git-wip-us.apache.org/repos/asf/gora/commit/00d392f2 Tree: http://git-wip-us.apache.org/repos/asf/gora/tree/00d392f2 Diff: http://git-wip-us.apache.org/repos/asf/gora/diff/00d392f2 Branch: refs/heads/master Commit: 00d392f2c71e4c6b3e1c8a2fefff0b30afd7c05a Parents: eae36f4 Author: nishadi <ndime...@gmail.com> Authored: Thu Jun 22 10:43:16 2017 +0530 Committer: nishadi <ndime...@gmail.com> Committed: Thu Jun 22 10:43:16 2017 +0530 ---------------------------------------------------------------------- .../gora/aerospike/store/AerospikeMapping.java | 6 +- .../gora/aerospike/store/AerospikeStore.java | 119 +++++++++++++++++-- 2 files changed, 115 insertions(+), 10 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/gora/blob/00d392f2/gora-aerospike/src/main/java/org/apache/gora/aerospike/store/AerospikeMapping.java ---------------------------------------------------------------------- diff --git a/gora-aerospike/src/main/java/org/apache/gora/aerospike/store/AerospikeMapping.java b/gora-aerospike/src/main/java/org/apache/gora/aerospike/store/AerospikeMapping.java index ae8284d..1a17241 100644 --- a/gora-aerospike/src/main/java/org/apache/gora/aerospike/store/AerospikeMapping.java +++ b/gora-aerospike/src/main/java/org/apache/gora/aerospike/store/AerospikeMapping.java @@ -1,4 +1,4 @@ -/** +/* * Licensed to the Apache Software Foundation (ASF) under one or more * contributor license agreements. See the NOTICE file distributed with * this work for additional information regarding copyright ownership. @@ -74,4 +74,8 @@ public class AerospikeMapping { public void setBinMapping(Map<String, String> binMapping) { this.binMapping = binMapping; } + + public String getBinName(String fieldName){ + return binMapping.get(fieldName); + } } http://git-wip-us.apache.org/repos/asf/gora/blob/00d392f2/gora-aerospike/src/main/java/org/apache/gora/aerospike/store/AerospikeStore.java ---------------------------------------------------------------------- diff --git a/gora-aerospike/src/main/java/org/apache/gora/aerospike/store/AerospikeStore.java b/gora-aerospike/src/main/java/org/apache/gora/aerospike/store/AerospikeStore.java index df2f38d..653cbf8 100644 --- a/gora-aerospike/src/main/java/org/apache/gora/aerospike/store/AerospikeStore.java +++ b/gora-aerospike/src/main/java/org/apache/gora/aerospike/store/AerospikeStore.java @@ -1,4 +1,4 @@ -/** +/* * Licensed to the Apache Software Foundation (ASF) under one or more * contributor license agreements. See the NOTICE file distributed with * this work for additional information regarding copyright ownership. @@ -28,7 +28,6 @@ import org.apache.gora.persistency.impl.PersistentBase; import org.apache.gora.query.PartitionQuery; import org.apache.gora.query.Query; import org.apache.gora.query.Result; -import org.apache.gora.store.DataStoreFactory; import org.apache.gora.store.impl.DataStoreBase; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -41,7 +40,7 @@ import org.slf4j.LoggerFactory; */ public class AerospikeStore<K, T extends PersistentBase> extends DataStoreBase<K, T> { - public static final Logger LOG = LoggerFactory.getLogger(AerospikeStore.class); + public static final Logger logger = LoggerFactory.getLogger(AerospikeStore.class); private static final String PARSE_MAPPING_FILE_KEY = "gora.aerospike.mapping.file"; @@ -51,7 +50,17 @@ public class AerospikeStore<K, T extends PersistentBase> extends DataStoreBase<K private AerospikeParameters aerospikeParameters; - @Override public void initialize(Class<K> keyClass, Class<T> persistentClass, + /** + * {@inheritDoc} + * In initializing the aerospike datastore, read the mapping file, sets the basic + * aerospike specific parameters and creates the client with the user defined policies + * + * @param keyClass key class + * @param persistentClass persistent class + * @param properties properties + */ + @Override + public void initialize(Class<K> keyClass, Class<T> persistentClass, Properties properties) { super.initialize(keyClass, persistentClass, properties); @@ -87,11 +96,36 @@ public class AerospikeStore<K, T extends PersistentBase> extends DataStoreBase<K return true; } - @Override public T get(K key, String[] fields) { - return null; + /** + * {@inheritDoc} + * + * @param key the key of the object + * @param fields the fields required in the object. Pass null, to retrieve all fields + * @return the Object corresponding to the key or null if it cannot be found + */ + @Override + public T get(K key, String[] fields) { + fields = getFieldsToQuery(fields); + Key recordKey = new Key(aerospikeParameters.getAerospikeMapping().getNamespace(), + aerospikeParameters.getAerospikeMapping().getSet(), Value.get(key)); + Record record = aerospikeClient + .get(aerospikeParameters.getAerospikeMapping().getReadPolicy(), recordKey, fields); + if (record == null) { + return null; + } + return createPersistentInstance(record, fields); } - @Override public void put(K key, T persistent) { + /** + * Method to insert the persistent objects with the given key to the aerospike database server. + * In writing the records, the policy defined in the mapping file is used to decide on the + * behaviour of transaction handling. + * + * @param key key of the object + * @param persistent object to be persisted + */ + @Override + public void put(K key, T persistent) { Key recordKey = new Key(aerospikeParameters.getAerospikeMapping().getNamespace(), aerospikeParameters.getAerospikeMapping().getSet(), Value.get(key)); @@ -131,10 +165,15 @@ public class AerospikeStore<K, T extends PersistentBase> extends DataStoreBase<K return null; } + @Override public void flush() { } - @Override public void close() { + /** + * Method to close aerospike client connections to database server nodes + */ + @Override + public void close() { aerospikeClient.close(); } @@ -143,11 +182,12 @@ public class AerospikeStore<K, T extends PersistentBase> extends DataStoreBase<K * This method provides those utf8 valued bin values as strings * for aerospike Value to obtain the corresponding bin value, * and returns the Bin + * Bin is the concept in Aerospike equivalent to a column in RDBMS * * @param binName name of the bin * @param value value of the bin * @param field field corresponding to bin - * @return + * @return Bin */ private Bin getBin(String binName, Object value, Field field) { @@ -166,4 +206,65 @@ public class AerospikeStore<K, T extends PersistentBase> extends DataStoreBase<K else return new Bin(binName, Value.get(value)); } + + /** + * Method to create a persistent object given the retrieved record + * from Aerospike database + * + * @param record record retrieved from database + * @param fields fields + * @return persistent object created + */ + private T createPersistentInstance(Record record, String[] fields) { + + T persistent = newPersistent(); + for (String field : fields) { + setPersistentField(field, record, persistent); + } + return persistent; + } + + /** + * Method to set a field in the persistent object + * + * @param field field name + * @param record record retrieved from database + * @param persistent persistent object for the field to be set + */ + private void setPersistentField(String field, Record record, T persistent) { + + String binName = aerospikeParameters.getAerospikeMapping().getBinName(field); + if (binName == null) { + throw new RuntimeException("Aerospike mapping for field [" + field + "] not found. " + + "Wrong gora-aerospike-mapping.xml?"); + } + Schema.Type fieldDataType = fieldMap.get(field).schema().getType(); + String binDataType = record.bins.get(field).getClass().getSimpleName(); + Object binValue = record.bins.get(binName); + + if (fieldDataType.toString().equalsIgnoreCase(binDataType)) { + persistent.put(field, record.bins.get(binName)); + } else { + switch (fieldDataType) { + case UNION: + Schema unionSchema = fieldMap.get(field).schema(); + if (unionSchema.getTypes().size() == 2) { + Schema.Type type0 = unionSchema.getTypes().get(0).getType(); + Schema.Type type1 = unionSchema.getTypes().get(1).getType(); + + if ((type0.equals(Schema.Type.NULL)) || (type1.equals(Schema.Type.NULL))) { + persistent.put(field, binValue); + } + } + break; + case INT: + if (binDataType.equalsIgnoreCase("long")) { + persistent.put(field, Math.toIntExact((Long) binValue)); + } + break; + default: + break; + } + } + } }