Update put and get functionality with support for different AVRO data types
Project: http://git-wip-us.apache.org/repos/asf/gora/repo Commit: http://git-wip-us.apache.org/repos/asf/gora/commit/27b015e5 Tree: http://git-wip-us.apache.org/repos/asf/gora/tree/27b015e5 Diff: http://git-wip-us.apache.org/repos/asf/gora/diff/27b015e5 Branch: refs/heads/master Commit: 27b015e5dfdb67f8530e7c110d36aa55d0e42cb1 Parents: 7c3390d Author: nishadi <ndime...@gmail.com> Authored: Mon Jun 26 18:35:02 2017 +0530 Committer: nishadi <ndime...@gmail.com> Committed: Mon Jun 26 18:35:02 2017 +0530 ---------------------------------------------------------------------- .../gora/aerospike/store/AerospikeStore.java | 310 +++++++++++++++---- 1 file changed, 246 insertions(+), 64 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/gora/blob/27b015e5/gora-aerospike/src/main/java/org/apache/gora/aerospike/store/AerospikeStore.java ---------------------------------------------------------------------- diff --git a/gora-aerospike/src/main/java/org/apache/gora/aerospike/store/AerospikeStore.java b/gora-aerospike/src/main/java/org/apache/gora/aerospike/store/AerospikeStore.java index 9ff3c60..5a880d6 100644 --- a/gora-aerospike/src/main/java/org/apache/gora/aerospike/store/AerospikeStore.java +++ b/gora-aerospike/src/main/java/org/apache/gora/aerospike/store/AerospikeStore.java @@ -17,18 +17,31 @@ package org.apache.gora.aerospike.store; import java.io.IOException; +import java.nio.ByteBuffer; +import java.util.HashMap; import java.util.List; +import java.util.ArrayList; +import java.util.Map; import java.util.Properties; -import com.aerospike.client.*; +import com.aerospike.client.Key; +import com.aerospike.client.Value; +import com.aerospike.client.Bin; +import com.aerospike.client.Record; +import com.aerospike.client.AerospikeClient; import com.aerospike.client.policy.ClientPolicy; import org.apache.avro.Schema; import org.apache.avro.Schema.Field; +import org.apache.avro.util.Utf8; +import org.apache.gora.persistency.Persistent; +import org.apache.gora.persistency.impl.DirtyListWrapper; +import org.apache.gora.persistency.impl.DirtyMapWrapper; import org.apache.gora.persistency.impl.PersistentBase; import org.apache.gora.query.PartitionQuery; import org.apache.gora.query.Query; import org.apache.gora.query.Result; import org.apache.gora.store.impl.DataStoreBase; +import org.apache.gora.util.AvroUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -80,19 +93,41 @@ public class AerospikeStore<K, T extends PersistentBase> extends DataStoreBase<K LOG.info("Aerospike Gora datastore initialized successfully."); } + /** + * Aerospike, being a schemaless database does not support explicit schema creation through the + * provided java client. When the records are added to the database, the schema is created on + * the fly. Thus, schema related functionality is unavailable in gora-aerospike module. + * + * @return null + */ @Override public String getSchemaName() { return null; } + /** + * Aerospike, being a schemaless database does not support explicit schema creation through the + * provided java client. When the records are added to the database, the schema is created on + * the fly. Thus, schema creation functionality is unavailable in gora-aerospike module. + */ @Override public void createSchema() { } + /** + * Aerospike, being a schemaless database does not support explicit schema creation through the + * provided java client. When the records are added to the database, the schema is created on + * the fly. Thus, schema deletion functionality is unavailable in gora-aerospike module. + */ @Override public void deleteSchema() { } + /** + * Aerospike, being a schemaless database does not support explicit schema creation through the + * provided java client. When the records are added to the database, the schema is created on + * the fly. Thus, schema exists functionality is unavailable in gora-aerospike module. + */ @Override public boolean schemaExists() { return true; @@ -107,9 +142,17 @@ public class AerospikeStore<K, T extends PersistentBase> extends DataStoreBase<K */ @Override public T get(K key, String[] fields) { + + Value keyValue; + if (keyClass.getSimpleName().equalsIgnoreCase("string")) { + keyValue = Value.get(key.toString()); + } else { + keyValue = Value.get(key); + } + fields = getFieldsToQuery(fields); Key recordKey = new Key(aerospikeParameters.getAerospikeMapping().getNamespace(), - aerospikeParameters.getAerospikeMapping().getSet(), Value.get(key)); + aerospikeParameters.getAerospikeMapping().getSet(), keyValue); Record record = aerospikeClient .get(aerospikeParameters.getAerospikeMapping().getReadPolicy(), recordKey, fields); if (record == null) { @@ -129,23 +172,35 @@ public class AerospikeStore<K, T extends PersistentBase> extends DataStoreBase<K @Override public void put(K key, T persistent) { + Value keyValue; + if (keyClass.getSimpleName().equalsIgnoreCase("string")) { + keyValue = Value.get(key.toString()); + } else { + keyValue = Value.get(key); + } + Key recordKey = new Key(aerospikeParameters.getAerospikeMapping().getNamespace(), - aerospikeParameters.getAerospikeMapping().getSet(), Value.get(key)); + aerospikeParameters.getAerospikeMapping().getSet(), keyValue); List<Field> fields = persistent.getSchema().getFields(); for (int i = 0; i < fields.size(); i++) { - String mappingBinName = aerospikeParameters.getAerospikeMapping().getBinMapping() - .get(fields.get(i).name()); - if (mappingBinName == null) { - LOG.error("Aerospike mapping for field {}#{} not found. Wrong gora-aerospike-mapping.xml?", - persistent.getClass().getName(), fields.get(i).name()); - throw new RuntimeException( - "Aerospike mapping for field [" + persistent.getClass().getName() + "#" + fields - .get(i).name() + "] not found. Wrong gora-aerospike-mapping.xml?"); + Object persistentValue = persistent.get(i); + if (persistentValue != null) { + String mappingBinName = aerospikeParameters.getAerospikeMapping().getBinMapping() + .get(fields.get(i).name()); + if (mappingBinName == null) { + LOG.error( + "Aerospike mapping for field {}#{} not found. Wrong gora-aerospike-mapping.xml?", + persistent.getClass().getName(), fields.get(i).name()); + throw new RuntimeException( + "Aerospike mapping for field [" + persistent.getClass().getName() + "#" + fields + .get(i).name() + "] not found. Wrong gora-aerospike-mapping.xml?"); + } + Bin bin = new Bin(mappingBinName, + getSerializableValue(persistentValue, fields.get(i).schema())); + aerospikeClient + .put(aerospikeParameters.getAerospikeMapping().getWritePolicy(), recordKey, bin); } - Bin bin = getBin(mappingBinName, persistent.get(i), fields.get(i)); - aerospikeClient - .put(aerospikeParameters.getAerospikeMapping().getWritePolicy(), recordKey, bin); } } @@ -188,31 +243,50 @@ public class AerospikeStore<K, T extends PersistentBase> extends DataStoreBase<K } /** - * Aerospike does not support Utf8 format returned from Avro. This method provides those utf8 - * valued bin (column) values as strings for aerospike Value to obtain the corresponding bin - * value, and returns the Bin (column in RDBMS) + * Method to get the value serializable in database from the Avro persistent object * - * @param binName name of the bin - * @param value value of the bin - * @param field field corresponding to bin - * @return Bin + * @param object persistent object + * @param schema schema of the persistent object + * @return serializable value */ - private Bin getBin(String binName, Object value, Field field) { - - boolean isStringType = false; - if (field.schema().getType().equals(Schema.Type.STRING)) - isStringType = true; - if (field.schema().getType().equals(Schema.Type.UNION)) { - for (Schema schema : field.schema().getTypes()) { - if (schema.getName().equals("string")) - isStringType = true; - } + private Value getSerializableValue(Object object, Schema schema) { + + Value value = null; + switch (schema.getType()) { + case UNION: + if (object != null) { + int schemaPos = getUnionSchema(object, schema); + Schema unionSchema = schema.getTypes().get(schemaPos); + value = getSerializableValue(object, unionSchema); + } + break; + case STRING: + value = Value.get(object.toString()); + break; + case BYTES: + value = Value.get(((ByteBuffer) object).array()); + break; + case MAP: + Map<Object, Object> newMap = new HashMap<>(); + Map<?, ?> fieldMap = (Map<?, ?>) object; + for (Object key : fieldMap.keySet()) { + newMap.put(key.toString(), + getSerializableValue(fieldMap.get(key), schema.getValueType())); + } + value = Value.get(newMap); + break; + case ARRAY: + List<Object> objectList = new ArrayList<>(); + for (Object obj : (List<Object>) object) { + objectList.add(getSerializableValue(obj, schema.getElementType())); + } + value = Value.get(objectList); + break; + default: + value = Value.get(object); + break; } - - if (isStringType) - return new Bin(binName, Value.get(value.toString())); - else - return new Bin(binName, Value.get(value)); + return value; } /** @@ -235,46 +309,154 @@ public class AerospikeStore<K, T extends PersistentBase> extends DataStoreBase<K /** * Method to set a field in the persistent object * - * @param field field name + * @param fieldName field name * @param record record retrieved from database * @param persistent persistent object for the field to be set */ - private void setPersistentField(String field, Record record, T persistent) { + private void setPersistentField(String fieldName, Record record, T persistent) { - String binName = aerospikeParameters.getAerospikeMapping().getBinName(field); + String binName = aerospikeParameters.getAerospikeMapping().getBinName(fieldName); if (binName == null) { LOG.error("Aerospike mapping for field {} not found. Wrong gora-aerospike-mapping.xml", - field); - throw new RuntimeException("Aerospike mapping for field [" + field + "] not found. " + fieldName); + throw new RuntimeException("Aerospike mapping for field [" + fieldName + "] not found. " + "Wrong gora-aerospike-mapping.xml?"); } - Schema.Type fieldDataType = fieldMap.get(field).schema().getType(); - String binDataType = record.bins.get(field).getClass().getSimpleName(); + if (record.bins.get(fieldName) == null) { + return; + } + String binDataType = record.bins.get(fieldName).getClass().getSimpleName(); Object binValue = record.bins.get(binName); - if (fieldDataType.toString().equalsIgnoreCase(binDataType)) { - persistent.put(field, record.bins.get(binName)); - } else { - switch (fieldDataType) { - case UNION: - Schema unionSchema = fieldMap.get(field).schema(); - if (unionSchema.getTypes().size() == 2) { - Schema.Type type0 = unionSchema.getTypes().get(0).getType(); - Schema.Type type1 = unionSchema.getTypes().get(1).getType(); - - if ((type0.equals(Schema.Type.NULL)) || (type1.equals(Schema.Type.NULL))) { - persistent.put(field, binValue); - } - } + persistent.put(fieldName, + getDeserializedObject(binValue, binDataType, fieldMap.get(fieldName).schema())); + } + + /** + * Method to get Avro mapped persistent object from the record retrieved from the database + * + * @param binValue value retrieved from database + * @param binDataType data type of the database value + * @param schema corresponding schema in the persistent class + * @return persistent object + */ + private Object getDeserializedObject(Object binValue, String binDataType, Schema schema) { + + Object result; + switch (schema.getType()) { + + case MAP: + Map<String, Object> rawMap = (Map<String, Object>) binValue; + Map<Utf8, Object> deserializableMap = new HashMap<>(); + if (rawMap == null) { + result = new DirtyMapWrapper(deserializableMap); break; - case INT: - if (binDataType.equalsIgnoreCase("long")) { - persistent.put(field, Math.toIntExact((Long) binValue)); + } + for (Map.Entry<?, ?> e : rawMap.entrySet()) { + Schema innerSchema = schema.getValueType(); + Object obj = getDeserializedObject(e.getValue(), e.getValue().getClass().getSimpleName(), + innerSchema); + if (e.getKey().getClass().getSimpleName().equalsIgnoreCase("Utf8")) { + deserializableMap.put((Utf8) e.getKey(), obj); + } else { + deserializableMap.put(new Utf8((String) e.getKey()), obj); } - break; - default: - break; - } + } + result = new DirtyMapWrapper<>(deserializableMap); + break; + + case ARRAY: + List<Object> rawList = (List<Object>) binValue; + List<Object> deserializableList = new ArrayList<>(); + if (rawList == null) { + return new DirtyListWrapper(deserializableList); + } + for (Object item : rawList) { + Object obj = getDeserializedObject(item, item.getClass().getSimpleName(), + schema.getElementType()); + deserializableList.add(obj); + } + result = new DirtyListWrapper<>(deserializableList); + break; + + case RECORD: + result = (PersistentBase) binValue; + break; + + case UNION: + int index = getUnionSchema(binValue, schema); + Schema resolvedSchema = schema.getTypes().get(index); + result = getDeserializedObject(binValue, binDataType, resolvedSchema); + break; + + case ENUM: + result = AvroUtils.getEnumValue(schema, (String) binValue); + break; + + case BYTES: + result = ByteBuffer.wrap((byte[]) binValue); + break; + + case STRING: + if (binValue instanceof org.apache.avro.util.Utf8) + result = binValue; + else + result = new Utf8((String) binValue); + break; + + case INT: + if (binDataType.equalsIgnoreCase("long")) { + result = Math.toIntExact((Long) binValue); + } else { + result = binValue; + } + break; + + default: + result = binValue; } + return result; + } + + /** + * Method to retrieve the corresponding schema type index of a particular object having UNION + * schema. As UNION type can have one or more types and at a given instance, it holds an object + * of only one type of the defined types, this method is used to figure out the corresponding + * instance's + * schema type index. + * + * @param instanceValue value that the object holds + * @param unionSchema union schema containing all of the data types + * @return the unionSchemaPosition corresponding schema position + */ + private int getUnionSchema(Object instanceValue, Schema unionSchema) { + int unionSchemaPos = 0; + for (Schema currentSchema : unionSchema.getTypes()) { + Schema.Type schemaType = currentSchema.getType(); + if (instanceValue instanceof CharSequence && schemaType.equals(Schema.Type.STRING)) + return unionSchemaPos; + else if (instanceValue instanceof ByteBuffer && schemaType.equals(Schema.Type.BYTES)) + return unionSchemaPos; + else if (instanceValue instanceof byte[] && schemaType.equals(Schema.Type.BYTES)) + return unionSchemaPos; + else if (instanceValue instanceof Integer && schemaType.equals(Schema.Type.INT)) + return unionSchemaPos; + else if (instanceValue instanceof Long && schemaType.equals(Schema.Type.LONG)) + return unionSchemaPos; + else if (instanceValue instanceof Double && schemaType.equals(Schema.Type.DOUBLE)) + return unionSchemaPos; + else if (instanceValue instanceof Float && schemaType.equals(Schema.Type.FLOAT)) + return unionSchemaPos; + else if (instanceValue instanceof Boolean && schemaType.equals(Schema.Type.BOOLEAN)) + return unionSchemaPos; + else if (instanceValue instanceof Map && schemaType.equals(Schema.Type.MAP)) + return unionSchemaPos; + else if (instanceValue instanceof List && schemaType.equals(Schema.Type.ARRAY)) + return unionSchemaPos; + else if (instanceValue instanceof Persistent && schemaType.equals(Schema.Type.RECORD)) + return unionSchemaPos; + unionSchemaPos++; + } + return 0; } }