Update put and get functionality with support for different AVRO data types


Project: http://git-wip-us.apache.org/repos/asf/gora/repo
Commit: http://git-wip-us.apache.org/repos/asf/gora/commit/27b015e5
Tree: http://git-wip-us.apache.org/repos/asf/gora/tree/27b015e5
Diff: http://git-wip-us.apache.org/repos/asf/gora/diff/27b015e5

Branch: refs/heads/master
Commit: 27b015e5dfdb67f8530e7c110d36aa55d0e42cb1
Parents: 7c3390d
Author: nishadi <ndime...@gmail.com>
Authored: Mon Jun 26 18:35:02 2017 +0530
Committer: nishadi <ndime...@gmail.com>
Committed: Mon Jun 26 18:35:02 2017 +0530

----------------------------------------------------------------------
 .../gora/aerospike/store/AerospikeStore.java    | 310 +++++++++++++++----
 1 file changed, 246 insertions(+), 64 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/gora/blob/27b015e5/gora-aerospike/src/main/java/org/apache/gora/aerospike/store/AerospikeStore.java
----------------------------------------------------------------------
diff --git 
a/gora-aerospike/src/main/java/org/apache/gora/aerospike/store/AerospikeStore.java
 
b/gora-aerospike/src/main/java/org/apache/gora/aerospike/store/AerospikeStore.java
index 9ff3c60..5a880d6 100644
--- 
a/gora-aerospike/src/main/java/org/apache/gora/aerospike/store/AerospikeStore.java
+++ 
b/gora-aerospike/src/main/java/org/apache/gora/aerospike/store/AerospikeStore.java
@@ -17,18 +17,31 @@
 package org.apache.gora.aerospike.store;
 
 import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.HashMap;
 import java.util.List;
+import java.util.ArrayList;
+import java.util.Map;
 import java.util.Properties;
 
-import com.aerospike.client.*;
+import com.aerospike.client.Key;
+import com.aerospike.client.Value;
+import com.aerospike.client.Bin;
+import com.aerospike.client.Record;
+import com.aerospike.client.AerospikeClient;
 import com.aerospike.client.policy.ClientPolicy;
 import org.apache.avro.Schema;
 import org.apache.avro.Schema.Field;
+import org.apache.avro.util.Utf8;
+import org.apache.gora.persistency.Persistent;
+import org.apache.gora.persistency.impl.DirtyListWrapper;
+import org.apache.gora.persistency.impl.DirtyMapWrapper;
 import org.apache.gora.persistency.impl.PersistentBase;
 import org.apache.gora.query.PartitionQuery;
 import org.apache.gora.query.Query;
 import org.apache.gora.query.Result;
 import org.apache.gora.store.impl.DataStoreBase;
+import org.apache.gora.util.AvroUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -80,19 +93,41 @@ public class AerospikeStore<K, T extends PersistentBase> 
extends DataStoreBase<K
     LOG.info("Aerospike Gora datastore initialized successfully.");
   }
 
+  /**
+   * Aerospike, being a schemaless database does not support explicit schema 
creation through the
+   * provided java client. When the records are added to the database, the 
schema is created on
+   * the fly. Thus, schema related functionality is unavailable in 
gora-aerospike module.
+   *
+   * @return null
+   */
   @Override
   public String getSchemaName() {
     return null;
   }
 
+  /**
+   * Aerospike, being a schemaless database does not support explicit schema 
creation through the
+   * provided java client. When the records are added to the database, the 
schema is created on
+   * the fly. Thus, schema creation functionality is unavailable in 
gora-aerospike module.
+   */
   @Override
   public void createSchema() {
   }
 
+  /**
+   * Aerospike, being a schemaless database does not support explicit schema 
creation through the
+   * provided java client. When the records are added to the database, the 
schema is created on
+   * the fly. Thus, schema deletion functionality is unavailable in 
gora-aerospike module.
+   */
   @Override
   public void deleteSchema() {
   }
 
+  /**
+   * Aerospike, being a schemaless database does not support explicit schema 
creation through the
+   * provided java client. When the records are added to the database, the 
schema is created on
+   * the fly. Thus, schema exists functionality is unavailable in 
gora-aerospike module.
+   */
   @Override
   public boolean schemaExists() {
     return true;
@@ -107,9 +142,17 @@ public class AerospikeStore<K, T extends PersistentBase> 
extends DataStoreBase<K
    */
   @Override
   public T get(K key, String[] fields) {
+
+    Value keyValue;
+    if (keyClass.getSimpleName().equalsIgnoreCase("string")) {
+      keyValue = Value.get(key.toString());
+    } else {
+      keyValue = Value.get(key);
+    }
+
     fields = getFieldsToQuery(fields);
     Key recordKey = new 
Key(aerospikeParameters.getAerospikeMapping().getNamespace(),
-            aerospikeParameters.getAerospikeMapping().getSet(), 
Value.get(key));
+            aerospikeParameters.getAerospikeMapping().getSet(), keyValue);
     Record record = aerospikeClient
             .get(aerospikeParameters.getAerospikeMapping().getReadPolicy(), 
recordKey, fields);
     if (record == null) {
@@ -129,23 +172,35 @@ public class AerospikeStore<K, T extends PersistentBase> 
extends DataStoreBase<K
   @Override
   public void put(K key, T persistent) {
 
+    Value keyValue;
+    if (keyClass.getSimpleName().equalsIgnoreCase("string")) {
+      keyValue = Value.get(key.toString());
+    } else {
+      keyValue = Value.get(key);
+    }
+
     Key recordKey = new 
Key(aerospikeParameters.getAerospikeMapping().getNamespace(),
-            aerospikeParameters.getAerospikeMapping().getSet(), 
Value.get(key));
+            aerospikeParameters.getAerospikeMapping().getSet(), keyValue);
 
     List<Field> fields = persistent.getSchema().getFields();
     for (int i = 0; i < fields.size(); i++) {
-      String mappingBinName = 
aerospikeParameters.getAerospikeMapping().getBinMapping()
-              .get(fields.get(i).name());
-      if (mappingBinName == null) {
-        LOG.error("Aerospike mapping for field {}#{} not found. Wrong 
gora-aerospike-mapping.xml?",
-                persistent.getClass().getName(), fields.get(i).name());
-        throw new RuntimeException(
-                "Aerospike mapping for field [" + 
persistent.getClass().getName() + "#" + fields
-                        .get(i).name() + "] not found. Wrong 
gora-aerospike-mapping.xml?");
+      Object persistentValue = persistent.get(i);
+      if (persistentValue != null) {
+        String mappingBinName = 
aerospikeParameters.getAerospikeMapping().getBinMapping()
+                .get(fields.get(i).name());
+        if (mappingBinName == null) {
+          LOG.error(
+                  "Aerospike mapping for field {}#{} not found. Wrong 
gora-aerospike-mapping.xml?",
+                  persistent.getClass().getName(), fields.get(i).name());
+          throw new RuntimeException(
+                  "Aerospike mapping for field [" + 
persistent.getClass().getName() + "#" + fields
+                          .get(i).name() + "] not found. Wrong 
gora-aerospike-mapping.xml?");
+        }
+        Bin bin = new Bin(mappingBinName,
+                getSerializableValue(persistentValue, fields.get(i).schema()));
+        aerospikeClient
+                
.put(aerospikeParameters.getAerospikeMapping().getWritePolicy(), recordKey, 
bin);
       }
-      Bin bin = getBin(mappingBinName, persistent.get(i), fields.get(i));
-      aerospikeClient
-              .put(aerospikeParameters.getAerospikeMapping().getWritePolicy(), 
recordKey, bin);
     }
   }
 
@@ -188,31 +243,50 @@ public class AerospikeStore<K, T extends PersistentBase> 
extends DataStoreBase<K
   }
 
   /**
-   * Aerospike does not support Utf8 format returned from Avro. This method 
provides those utf8
-   * valued bin (column) values as strings for aerospike Value to obtain the 
corresponding bin
-   * value, and returns the Bin (column in RDBMS)
+   * Method to get the value serializable in database from the Avro persistent 
object
    *
-   * @param binName name of the bin
-   * @param value   value of the bin
-   * @param field   field corresponding to bin
-   * @return Bin
+   * @param object persistent object
+   * @param schema schema of the persistent object
+   * @return serializable value
    */
-  private Bin getBin(String binName, Object value, Field field) {
-
-    boolean isStringType = false;
-    if (field.schema().getType().equals(Schema.Type.STRING))
-      isStringType = true;
-    if (field.schema().getType().equals(Schema.Type.UNION)) {
-      for (Schema schema : field.schema().getTypes()) {
-        if (schema.getName().equals("string"))
-          isStringType = true;
-      }
+  private Value getSerializableValue(Object object, Schema schema) {
+
+    Value value = null;
+    switch (schema.getType()) {
+      case UNION:
+        if (object != null) {
+          int schemaPos = getUnionSchema(object, schema);
+          Schema unionSchema = schema.getTypes().get(schemaPos);
+          value = getSerializableValue(object, unionSchema);
+        }
+        break;
+      case STRING:
+        value = Value.get(object.toString());
+        break;
+      case BYTES:
+        value = Value.get(((ByteBuffer) object).array());
+        break;
+      case MAP:
+        Map<Object, Object> newMap = new HashMap<>();
+        Map<?, ?> fieldMap = (Map<?, ?>) object;
+        for (Object key : fieldMap.keySet()) {
+          newMap.put(key.toString(),
+                  getSerializableValue(fieldMap.get(key), 
schema.getValueType()));
+        }
+        value = Value.get(newMap);
+        break;
+      case ARRAY:
+        List<Object> objectList = new ArrayList<>();
+        for (Object obj : (List<Object>) object) {
+          objectList.add(getSerializableValue(obj, schema.getElementType()));
+        }
+        value = Value.get(objectList);
+        break;
+      default:
+        value = Value.get(object);
+        break;
     }
-
-    if (isStringType)
-      return new Bin(binName, Value.get(value.toString()));
-    else
-      return new Bin(binName, Value.get(value));
+    return value;
   }
 
   /**
@@ -235,46 +309,154 @@ public class AerospikeStore<K, T extends PersistentBase> 
extends DataStoreBase<K
   /**
    * Method to set a field in the persistent object
    *
-   * @param field      field name
+   * @param fieldName  field name
    * @param record     record retrieved from database
    * @param persistent persistent object for the field to be set
    */
-  private void setPersistentField(String field, Record record, T persistent) {
+  private void setPersistentField(String fieldName, Record record, T 
persistent) {
 
-    String binName = 
aerospikeParameters.getAerospikeMapping().getBinName(field);
+    String binName = 
aerospikeParameters.getAerospikeMapping().getBinName(fieldName);
     if (binName == null) {
       LOG.error("Aerospike mapping for field {} not found. Wrong 
gora-aerospike-mapping.xml",
-              field);
-      throw new RuntimeException("Aerospike mapping for field [" + field + "] 
not found. "
+              fieldName);
+      throw new RuntimeException("Aerospike mapping for field [" + fieldName + 
"] not found. "
               + "Wrong gora-aerospike-mapping.xml?");
     }
-    Schema.Type fieldDataType = fieldMap.get(field).schema().getType();
-    String binDataType = record.bins.get(field).getClass().getSimpleName();
+    if (record.bins.get(fieldName) == null) {
+      return;
+    }
+    String binDataType = record.bins.get(fieldName).getClass().getSimpleName();
     Object binValue = record.bins.get(binName);
 
-    if (fieldDataType.toString().equalsIgnoreCase(binDataType)) {
-      persistent.put(field, record.bins.get(binName));
-    } else {
-      switch (fieldDataType) {
-        case UNION:
-          Schema unionSchema = fieldMap.get(field).schema();
-          if (unionSchema.getTypes().size() == 2) {
-            Schema.Type type0 = unionSchema.getTypes().get(0).getType();
-            Schema.Type type1 = unionSchema.getTypes().get(1).getType();
-
-            if ((type0.equals(Schema.Type.NULL)) || 
(type1.equals(Schema.Type.NULL))) {
-              persistent.put(field, binValue);
-            }
-          }
+    persistent.put(fieldName,
+            getDeserializedObject(binValue, binDataType, 
fieldMap.get(fieldName).schema()));
+  }
+
+  /**
+   * Method to get Avro mapped persistent object from the record retrieved 
from the database
+   *
+   * @param binValue    value retrieved from database
+   * @param binDataType data type of the database value
+   * @param schema      corresponding schema in the persistent class
+   * @return persistent object
+   */
+  private Object getDeserializedObject(Object binValue, String binDataType, 
Schema schema) {
+
+    Object result;
+    switch (schema.getType()) {
+
+      case MAP:
+        Map<String, Object> rawMap = (Map<String, Object>) binValue;
+        Map<Utf8, Object> deserializableMap = new HashMap<>();
+        if (rawMap == null) {
+          result = new DirtyMapWrapper(deserializableMap);
           break;
-        case INT:
-          if (binDataType.equalsIgnoreCase("long")) {
-            persistent.put(field, Math.toIntExact((Long) binValue));
+        }
+        for (Map.Entry<?, ?> e : rawMap.entrySet()) {
+          Schema innerSchema = schema.getValueType();
+          Object obj = getDeserializedObject(e.getValue(), 
e.getValue().getClass().getSimpleName(),
+                  innerSchema);
+          if (e.getKey().getClass().getSimpleName().equalsIgnoreCase("Utf8")) {
+            deserializableMap.put((Utf8) e.getKey(), obj);
+          } else {
+            deserializableMap.put(new Utf8((String) e.getKey()), obj);
           }
-          break;
-        default:
-          break;
-      }
+        }
+        result = new DirtyMapWrapper<>(deserializableMap);
+        break;
+
+      case ARRAY:
+        List<Object> rawList = (List<Object>) binValue;
+        List<Object> deserializableList = new ArrayList<>();
+        if (rawList == null) {
+          return new DirtyListWrapper(deserializableList);
+        }
+        for (Object item : rawList) {
+          Object obj = getDeserializedObject(item, 
item.getClass().getSimpleName(),
+                  schema.getElementType());
+          deserializableList.add(obj);
+        }
+        result = new DirtyListWrapper<>(deserializableList);
+        break;
+
+      case RECORD:
+        result = (PersistentBase) binValue;
+        break;
+
+      case UNION:
+        int index = getUnionSchema(binValue, schema);
+        Schema resolvedSchema = schema.getTypes().get(index);
+        result = getDeserializedObject(binValue, binDataType, resolvedSchema);
+        break;
+
+      case ENUM:
+        result = AvroUtils.getEnumValue(schema, (String) binValue);
+        break;
+
+      case BYTES:
+        result = ByteBuffer.wrap((byte[]) binValue);
+        break;
+
+      case STRING:
+        if (binValue instanceof org.apache.avro.util.Utf8)
+          result = binValue;
+        else
+          result = new Utf8((String) binValue);
+        break;
+
+      case INT:
+        if (binDataType.equalsIgnoreCase("long")) {
+          result = Math.toIntExact((Long) binValue);
+        } else {
+          result = binValue;
+        }
+        break;
+
+      default:
+        result = binValue;
     }
+    return result;
+  }
+
+  /**
+   * Method to retrieve the corresponding schema type index of a particular 
object having UNION
+   * schema. As UNION type can have one or more types and at a given instance, 
it holds an object
+   * of only one type of the defined types, this method is used to figure out 
the corresponding
+   * instance's
+   * schema type index.
+   *
+   * @param instanceValue value that the object holds
+   * @param unionSchema   union schema containing all of the data types
+   * @return the unionSchemaPosition corresponding schema position
+   */
+  private int getUnionSchema(Object instanceValue, Schema unionSchema) {
+    int unionSchemaPos = 0;
+    for (Schema currentSchema : unionSchema.getTypes()) {
+      Schema.Type schemaType = currentSchema.getType();
+      if (instanceValue instanceof CharSequence && 
schemaType.equals(Schema.Type.STRING))
+        return unionSchemaPos;
+      else if (instanceValue instanceof ByteBuffer && 
schemaType.equals(Schema.Type.BYTES))
+        return unionSchemaPos;
+      else if (instanceValue instanceof byte[] && 
schemaType.equals(Schema.Type.BYTES))
+        return unionSchemaPos;
+      else if (instanceValue instanceof Integer && 
schemaType.equals(Schema.Type.INT))
+        return unionSchemaPos;
+      else if (instanceValue instanceof Long && 
schemaType.equals(Schema.Type.LONG))
+        return unionSchemaPos;
+      else if (instanceValue instanceof Double && 
schemaType.equals(Schema.Type.DOUBLE))
+        return unionSchemaPos;
+      else if (instanceValue instanceof Float && 
schemaType.equals(Schema.Type.FLOAT))
+        return unionSchemaPos;
+      else if (instanceValue instanceof Boolean && 
schemaType.equals(Schema.Type.BOOLEAN))
+        return unionSchemaPos;
+      else if (instanceValue instanceof Map && 
schemaType.equals(Schema.Type.MAP))
+        return unionSchemaPos;
+      else if (instanceValue instanceof List && 
schemaType.equals(Schema.Type.ARRAY))
+        return unionSchemaPos;
+      else if (instanceValue instanceof Persistent && 
schemaType.equals(Schema.Type.RECORD))
+        return unionSchemaPos;
+      unionSchemaPos++;
+    }
+    return 0;
   }
 }

Reply via email to