Add get functionality for aerospike module

Project: http://git-wip-us.apache.org/repos/asf/gora/repo
Commit: http://git-wip-us.apache.org/repos/asf/gora/commit/00d392f2
Tree: http://git-wip-us.apache.org/repos/asf/gora/tree/00d392f2
Diff: http://git-wip-us.apache.org/repos/asf/gora/diff/00d392f2

Branch: refs/heads/master
Commit: 00d392f2c71e4c6b3e1c8a2fefff0b30afd7c05a
Parents: eae36f4
Author: nishadi <ndime...@gmail.com>
Authored: Thu Jun 22 10:43:16 2017 +0530
Committer: nishadi <ndime...@gmail.com>
Committed: Thu Jun 22 10:43:16 2017 +0530

----------------------------------------------------------------------
 .../gora/aerospike/store/AerospikeMapping.java  |   6 +-
 .../gora/aerospike/store/AerospikeStore.java    | 119 +++++++++++++++++--
 2 files changed, 115 insertions(+), 10 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/gora/blob/00d392f2/gora-aerospike/src/main/java/org/apache/gora/aerospike/store/AerospikeMapping.java
----------------------------------------------------------------------
diff --git 
a/gora-aerospike/src/main/java/org/apache/gora/aerospike/store/AerospikeMapping.java
 
b/gora-aerospike/src/main/java/org/apache/gora/aerospike/store/AerospikeMapping.java
index ae8284d..1a17241 100644
--- 
a/gora-aerospike/src/main/java/org/apache/gora/aerospike/store/AerospikeMapping.java
+++ 
b/gora-aerospike/src/main/java/org/apache/gora/aerospike/store/AerospikeMapping.java
@@ -1,4 +1,4 @@
-/**
+/*
  * Licensed to the Apache Software Foundation (ASF) under one or more
  * contributor license agreements.  See the NOTICE file distributed with
  * this work for additional information regarding copyright ownership.
@@ -74,4 +74,8 @@ public class AerospikeMapping {
   public void setBinMapping(Map<String, String> binMapping) {
     this.binMapping = binMapping;
   }
+
+  public String getBinName(String fieldName){
+    return binMapping.get(fieldName);
+  }
 }

http://git-wip-us.apache.org/repos/asf/gora/blob/00d392f2/gora-aerospike/src/main/java/org/apache/gora/aerospike/store/AerospikeStore.java
----------------------------------------------------------------------
diff --git 
a/gora-aerospike/src/main/java/org/apache/gora/aerospike/store/AerospikeStore.java
 
b/gora-aerospike/src/main/java/org/apache/gora/aerospike/store/AerospikeStore.java
index df2f38d..653cbf8 100644
--- 
a/gora-aerospike/src/main/java/org/apache/gora/aerospike/store/AerospikeStore.java
+++ 
b/gora-aerospike/src/main/java/org/apache/gora/aerospike/store/AerospikeStore.java
@@ -1,4 +1,4 @@
-/**
+/*
  * Licensed to the Apache Software Foundation (ASF) under one or more
  * contributor license agreements.  See the NOTICE file distributed with
  * this work for additional information regarding copyright ownership.
@@ -28,7 +28,6 @@ import org.apache.gora.persistency.impl.PersistentBase;
 import org.apache.gora.query.PartitionQuery;
 import org.apache.gora.query.Query;
 import org.apache.gora.query.Result;
-import org.apache.gora.store.DataStoreFactory;
 import org.apache.gora.store.impl.DataStoreBase;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -41,7 +40,7 @@ import org.slf4j.LoggerFactory;
  */
 public class AerospikeStore<K, T extends PersistentBase> extends 
DataStoreBase<K, T> {
 
-  public static final Logger LOG = 
LoggerFactory.getLogger(AerospikeStore.class);
+  public static final Logger logger = 
LoggerFactory.getLogger(AerospikeStore.class);
 
   private static final String PARSE_MAPPING_FILE_KEY = 
"gora.aerospike.mapping.file";
 
@@ -51,7 +50,17 @@ public class AerospikeStore<K, T extends PersistentBase> 
extends DataStoreBase<K
 
   private AerospikeParameters aerospikeParameters;
 
-  @Override public void initialize(Class<K> keyClass, Class<T> persistentClass,
+  /**
+   * {@inheritDoc}
+   * In initializing the aerospike datastore, read the mapping file, sets the 
basic
+   * aerospike specific parameters and creates the client with the user 
defined policies
+   *
+   * @param keyClass        key class
+   * @param persistentClass persistent class
+   * @param properties      properties
+   */
+  @Override
+  public void initialize(Class<K> keyClass, Class<T> persistentClass,
           Properties properties) {
     super.initialize(keyClass, persistentClass, properties);
 
@@ -87,11 +96,36 @@ public class AerospikeStore<K, T extends PersistentBase> 
extends DataStoreBase<K
     return true;
   }
 
-  @Override public T get(K key, String[] fields) {
-    return null;
+  /**
+   * {@inheritDoc}
+   *
+   * @param key    the key of the object
+   * @param fields the fields required in the object. Pass null, to retrieve 
all fields
+   * @return the Object corresponding to the key or null if it cannot be found
+   */
+  @Override
+  public T get(K key, String[] fields) {
+    fields = getFieldsToQuery(fields);
+    Key recordKey = new 
Key(aerospikeParameters.getAerospikeMapping().getNamespace(),
+            aerospikeParameters.getAerospikeMapping().getSet(), 
Value.get(key));
+    Record record = aerospikeClient
+            .get(aerospikeParameters.getAerospikeMapping().getReadPolicy(), 
recordKey, fields);
+    if (record == null) {
+      return null;
+    }
+    return createPersistentInstance(record, fields);
   }
 
-  @Override public void put(K key, T persistent) {
+  /**
+   * Method to insert the persistent objects with the given key to the 
aerospike database server.
+   * In writing the records, the policy defined in the mapping file is used to 
decide on the
+   * behaviour of transaction handling.
+   *
+   * @param key         key of the object
+   * @param persistent  object to be persisted
+   */
+  @Override
+  public void put(K key, T persistent) {
 
     Key recordKey = new 
Key(aerospikeParameters.getAerospikeMapping().getNamespace(),
             aerospikeParameters.getAerospikeMapping().getSet(), 
Value.get(key));
@@ -131,10 +165,15 @@ public class AerospikeStore<K, T extends PersistentBase> 
extends DataStoreBase<K
     return null;
   }
 
+  @Override
   public void flush() {
   }
 
-  @Override public void close() {
+  /**
+   * Method to close aerospike client connections to database server nodes
+   */
+  @Override
+  public void close() {
     aerospikeClient.close();
   }
 
@@ -143,11 +182,12 @@ public class AerospikeStore<K, T extends PersistentBase> 
extends DataStoreBase<K
    * This method provides those utf8 valued bin values as strings
    * for aerospike Value to obtain the corresponding bin value,
    * and returns the Bin
+   * Bin is the concept in Aerospike equivalent to a column in RDBMS
    *
    * @param binName name of the bin
    * @param value   value of the bin
    * @param field   field corresponding to bin
-   * @return
+   * @return Bin
    */
   private Bin getBin(String binName, Object value, Field field) {
 
@@ -166,4 +206,65 @@ public class AerospikeStore<K, T extends PersistentBase> 
extends DataStoreBase<K
     else
       return new Bin(binName, Value.get(value));
   }
+
+  /**
+   * Method to create a persistent object given the retrieved record
+   * from Aerospike database
+   *
+   * @param record record retrieved from database
+   * @param fields fields
+   * @return persistent object created
+   */
+  private T createPersistentInstance(Record record, String[] fields) {
+
+    T persistent = newPersistent();
+    for (String field : fields) {
+      setPersistentField(field, record, persistent);
+    }
+    return persistent;
+  }
+
+  /**
+   * Method to set a field in the persistent object
+   *
+   * @param field   field name
+   * @param record  record retrieved from database
+   * @param persistent persistent object for the field to be set
+   */
+  private void setPersistentField(String field, Record record, T persistent) {
+
+    String binName = 
aerospikeParameters.getAerospikeMapping().getBinName(field);
+    if (binName == null) {
+      throw new RuntimeException("Aerospike mapping for field [" + field + "] 
not found. "
+              + "Wrong gora-aerospike-mapping.xml?");
+    }
+    Schema.Type fieldDataType = fieldMap.get(field).schema().getType();
+    String binDataType = record.bins.get(field).getClass().getSimpleName();
+    Object binValue = record.bins.get(binName);
+
+    if (fieldDataType.toString().equalsIgnoreCase(binDataType)) {
+      persistent.put(field, record.bins.get(binName));
+    } else {
+      switch (fieldDataType) {
+        case UNION:
+          Schema unionSchema = fieldMap.get(field).schema();
+          if (unionSchema.getTypes().size() == 2) {
+            Schema.Type type0 = unionSchema.getTypes().get(0).getType();
+            Schema.Type type1 = unionSchema.getTypes().get(1).getType();
+
+            if ((type0.equals(Schema.Type.NULL)) || 
(type1.equals(Schema.Type.NULL))) {
+              persistent.put(field, binValue);
+            }
+          }
+          break;
+        case INT:
+          if (binDataType.equalsIgnoreCase("long")) {
+            persistent.put(field, Math.toIntExact((Long) binValue));
+          }
+          break;
+        default:
+          break;
+      }
+    }
+  }
 }

Reply via email to