Implement put, get, delete

The operations put, get and delete are implemented for the
Ignite backend. In addition, primary key mapping is 
improved.

Project: http://git-wip-us.apache.org/repos/asf/gora/repo
Commit: http://git-wip-us.apache.org/repos/asf/gora/commit/db7540e0
Tree: http://git-wip-us.apache.org/repos/asf/gora/tree/db7540e0
Diff: http://git-wip-us.apache.org/repos/asf/gora/diff/db7540e0

Branch: refs/heads/master
Commit: db7540e045fc07ef39d37163a263615c6e580efe
Parents: b1e2ae4
Author: Carlos M <carlosr...@gmail.com>
Authored: Mon Jul 2 00:50:43 2018 -0500
Committer: Carlos M <carlosr...@gmail.com>
Committed: Mon Jul 2 00:50:43 2018 -0500

----------------------------------------------------------------------
 .../apache/gora/ignite/store/IgniteMapping.java |   6 +-
 .../gora/ignite/store/IgniteMappingBuilder.java |   6 +-
 .../apache/gora/ignite/store/IgniteStore.java   | 323 ++++++++++++++++++-
 .../gora/ignite/utils/IgniteSQLBuilder.java     |  90 +++++-
 .../src/test/resources/gora-ignite-mapping.xml  |  10 +-
 5 files changed, 412 insertions(+), 23 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/gora/blob/db7540e0/gora-ignite/src/main/java/org/apache/gora/ignite/store/IgniteMapping.java
----------------------------------------------------------------------
diff --git 
a/gora-ignite/src/main/java/org/apache/gora/ignite/store/IgniteMapping.java 
b/gora-ignite/src/main/java/org/apache/gora/ignite/store/IgniteMapping.java
index 2e59da1..c0e7a98 100644
--- a/gora-ignite/src/main/java/org/apache/gora/ignite/store/IgniteMapping.java
+++ b/gora-ignite/src/main/java/org/apache/gora/ignite/store/IgniteMapping.java
@@ -27,7 +27,7 @@ public class IgniteMapping {
 
   private String tableName;
   private Map<String, Column> fields;
-  private List<String> primaryKey;
+  private List<Column> primaryKey;
 
   public IgniteMapping() {
     fields = new HashMap<>();
@@ -49,11 +49,11 @@ public class IgniteMapping {
     this.fields = fields;
   }
 
-  public List<String> getPrimaryKey() {
+  public List<Column> getPrimaryKey() {
     return primaryKey;
   }
 
-  public void setPrimaryKey(List<String> primaryKey) {
+  public void setPrimaryKey(List<Column> primaryKey) {
     this.primaryKey = primaryKey;
   }
 

http://git-wip-us.apache.org/repos/asf/gora/blob/db7540e0/gora-ignite/src/main/java/org/apache/gora/ignite/store/IgniteMappingBuilder.java
----------------------------------------------------------------------
diff --git 
a/gora-ignite/src/main/java/org/apache/gora/ignite/store/IgniteMappingBuilder.java
 
b/gora-ignite/src/main/java/org/apache/gora/ignite/store/IgniteMappingBuilder.java
index 81a3fae..3d245ea 100644
--- 
a/gora-ignite/src/main/java/org/apache/gora/ignite/store/IgniteMappingBuilder.java
+++ 
b/gora-ignite/src/main/java/org/apache/gora/ignite/store/IgniteMappingBuilder.java
@@ -80,9 +80,11 @@ public class IgniteMappingBuilder<K, T extends 
PersistentBase> {
           String tableName = dataStore.getSchemaName(tableNameFromMapping, 
dataStore.getPersistentClass());
           igniteMapping.setTableName(tableName);
           List<Element> prColumns = classElement.getChildren("primarykey");
-          List<String> prFields = new ArrayList<>();
+          List<Column> prFields = new ArrayList<>();
           for (Element aPrimaryKey : prColumns) {
-            prFields.add(aPrimaryKey.getAttributeValue("column"));
+            String name = aPrimaryKey.getAttributeValue("column");
+            String type = aPrimaryKey.getAttributeValue("type");
+            prFields.add(new Column(name, Column.FieldType.valueOf(type)));
           }
           igniteMapping.setPrimaryKey(prFields);
           List<Element> fields = classElement.getChildren("field");

http://git-wip-us.apache.org/repos/asf/gora/blob/db7540e0/gora-ignite/src/main/java/org/apache/gora/ignite/store/IgniteStore.java
----------------------------------------------------------------------
diff --git 
a/gora-ignite/src/main/java/org/apache/gora/ignite/store/IgniteStore.java 
b/gora-ignite/src/main/java/org/apache/gora/ignite/store/IgniteStore.java
index ebc2943..497e161 100644
--- a/gora-ignite/src/main/java/org/apache/gora/ignite/store/IgniteStore.java
+++ b/gora-ignite/src/main/java/org/apache/gora/ignite/store/IgniteStore.java
@@ -17,20 +17,34 @@
 package org.apache.gora.ignite.store;
 
 import java.io.IOException;
+import java.nio.ByteBuffer;
 import java.sql.Connection;
 import java.sql.DriverManager;
+import java.sql.PreparedStatement;
 import java.sql.ResultSet;
 import java.sql.SQLException;
 import java.sql.Statement;
+import java.util.ArrayList;
+import java.util.HashMap;
 import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
 import java.util.Properties;
+import java.util.concurrent.ConcurrentHashMap;
+import org.apache.avro.Schema;
+import org.apache.avro.specific.SpecificDatumReader;
+import org.apache.avro.specific.SpecificDatumWriter;
+import org.apache.avro.util.Utf8;
 import org.apache.gora.ignite.utils.IgniteSQLBuilder;
+import org.apache.gora.persistency.Persistent;
 import org.apache.gora.persistency.impl.PersistentBase;
 import org.apache.gora.query.PartitionQuery;
 import org.apache.gora.query.Query;
 import org.apache.gora.query.Result;
 import org.apache.gora.store.impl.DataStoreBase;
+import org.apache.gora.util.AvroUtils;
 import org.apache.gora.util.GoraException;
+import org.apache.gora.util.IOUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -49,6 +63,16 @@ public class IgniteStore<K, T extends PersistentBase> 
extends DataStoreBase<K, T
   private IgniteMapping igniteMapping;
   private Connection connection;
 
+  /*
+   * Create a threadlocal map for the datum readers and writers, because they
+   * are not thread safe, at least not before Avro 1.4.0 (See AVRO-650). When
+   * they are thread safe, it is possible to maintain a single reader and 
writer
+   * pair for every schema, instead of one for every thread.
+   */
+  public static final ConcurrentHashMap<Schema, SpecificDatumReader<?>> 
readerMap = new ConcurrentHashMap<>();
+
+  public static final ConcurrentHashMap<Schema, SpecificDatumWriter<?>> 
writerMap = new ConcurrentHashMap<>();
+
   @Override
   public void initialize(Class<K> keyClass, Class<T> persistentClass, 
Properties properties) throws GoraException {
 
@@ -161,17 +185,151 @@ public class IgniteStore<K, T extends PersistentBase> 
extends DataStoreBase<K, T
 
   @Override
   public T get(K key, String[] fields) throws GoraException {
-    throw new UnsupportedOperationException("Not supported yet."); //To change 
body of generated methods, choose Tools | Templates.
+    String[] avFields = getFieldsToQuery(fields);
+    Object[] keyl = null;
+    if (igniteMapping.getPrimaryKey().size() == 1) {
+      keyl = new Object[]{key};
+    } else {
+      //Composite key pending
+    }
+    //Avro fields to Ignite fields
+    List<String> dbFields = new ArrayList<>();
+    for (String af : avFields) {
+      dbFields.add(igniteMapping.getFields().get(af).getName());
+    }
+    String selectQuery = IgniteSQLBuilder.selectGet(igniteMapping, dbFields);
+    try (PreparedStatement stmt = connection.prepareStatement(selectQuery)) {
+      IgniteSQLBuilder.fillSelectStatement(stmt, igniteMapping, keyl);
+      ResultSet rs = stmt.executeQuery();
+      boolean data = rs.next();
+      T resp = null;
+      if (data) {
+        resp = newInstance(rs, fields);
+        if (rs.next()) {
+          LOG.warn("Multiple results for primary key {} in the schema {}, 
ignoring additional rows.", keyl, igniteMapping.getTableName());
+        }
+      }
+      rs.close();
+      return resp;
+    } catch (SQLException | IOException ex) {
+      throw new GoraException(ex);
+    }
+
+  }
+
+  public T newInstance(ResultSet rs, String[] fields) throws GoraException, 
SQLException, IOException {
+    fields = getFieldsToQuery(fields);
+    T persistent = newPersistent();
+    for (String f : fields) {
+      Schema.Field field = fieldMap.get(f);
+      Schema fieldSchema = field.schema();
+      String dbField = igniteMapping.getFields().get(f).getName();
+      Object sv = rs.getObject(dbField);
+      if (sv == null) {
+        continue;
+      }
+      Object v = deserializeFieldValue(field, fieldSchema, sv, persistent);
+      persistent.put(field.pos(), v);
+      persistent.setDirty(field.pos());
+    }
+    return persistent;
+  }
+
+  private Object deserializeFieldValue(Schema.Field field, Schema fieldSchema,
+      Object igniteValue, T persistent) throws IOException {
+    Object fieldValue = null;
+    switch (fieldSchema.getType()) {
+      case MAP:
+      case ARRAY:
+      case RECORD:
+        @SuppressWarnings("rawtypes") SpecificDatumReader reader = 
getDatumReader(fieldSchema);
+        fieldValue = IOUtils.deserialize((byte[]) igniteValue, reader,
+            persistent.get(field.pos()));
+        break;
+      case ENUM:
+        fieldValue = AvroUtils.getEnumValue(fieldSchema, 
igniteValue.toString());
+        break;
+      case FIXED:
+        break;
+      case BYTES:
+        fieldValue = ByteBuffer.wrap((byte[]) igniteValue);
+        break;
+      case STRING:
+        fieldValue = new Utf8(igniteValue.toString());
+        break;
+      case UNION:
+        if (fieldSchema.getTypes().size() == 2 && isNullable(fieldSchema)) {
+          int schemaPos = getUnionSchema(igniteValue, fieldSchema);
+          Schema unionSchema = fieldSchema.getTypes().get(schemaPos);
+          fieldValue = deserializeFieldValue(field, unionSchema, igniteValue, 
persistent);
+        } else {
+          reader = getDatumReader(fieldSchema);
+          fieldValue = IOUtils.deserialize((byte[]) igniteValue, reader,
+              persistent.get(field.pos()));
+        }
+        break;
+      default:
+        fieldValue = igniteValue;
+    }
+    return fieldValue;
+
   }
 
   @Override
   public void put(K key, T obj) throws GoraException {
-    throw new UnsupportedOperationException("Not supported yet."); //To change 
body of generated methods, choose Tools | Templates.
+    try {
+      if (obj.isDirty()) {
+        Schema schema = obj.getSchema();
+        List<Schema.Field> fields = schema.getFields();
+        Map<Column, Object> data = new HashMap<>();
+        if (igniteMapping.getPrimaryKey().size() == 1) {
+          Column getKey = igniteMapping.getPrimaryKey().get(0);
+          data.put(getKey, key);
+        } else {
+          //Composite keys pending..
+        }
+        for (Schema.Field field : fields) {
+          Column get = igniteMapping.getFields().get(field.name());
+          Object v = obj.get(field.pos());
+          if (get != null && v != null) {
+            Schema fieldSchema = field.schema();
+            Object serializedObj = serializeFieldValue(get, fieldSchema, v);
+            data.put(get, serializedObj);
+          }
+        }
+        String baseInsertStatement = 
IgniteSQLBuilder.baseInsertStatement(igniteMapping, data);
+        try (PreparedStatement stmt = 
connection.prepareStatement(baseInsertStatement)) {
+          IgniteSQLBuilder.fillInsertStatement(stmt, data);
+          stmt.executeUpdate();
+        } catch (SQLException ex) {
+          throw new GoraException(ex);
+        }
+      } else {
+        LOG.info("Ignored putting object {} in the store as it is neither "
+            + "new, neither dirty.", new Object[]{obj});
+      }
+    } catch (Exception e) {
+      throw new GoraException(e);
+    }
   }
 
   @Override
   public boolean delete(K key) throws GoraException {
-    throw new UnsupportedOperationException("Not supported yet."); //To change 
body of generated methods, choose Tools | Templates.
+    String deleteQuery = null;
+    Object[] keyl = null;
+    if (igniteMapping.getPrimaryKey().size() == 1) {
+      deleteQuery = IgniteSQLBuilder.delete(igniteMapping);
+      keyl = new Object[]{key};
+    } else {
+      //Composite key pending
+    }
+    try (PreparedStatement stmt = connection.prepareStatement(deleteQuery)) {
+      IgniteSQLBuilder.fillDeleteStatement(stmt, igniteMapping, keyl);
+      stmt.executeUpdate();
+      return true;
+    } catch (SQLException ex) {
+      throw new GoraException(ex);
+    }
   }
 
   @Override
@@ -196,11 +354,7 @@ public class IgniteStore<K, T extends PersistentBase> 
extends DataStoreBase<K, T
 
   @Override
   public void flush() throws GoraException {
-    try {
-      connection.commit();
-    } catch (Exception e) {
-      throw new GoraException(e);
-    }
+    //Auto-commit mode by default
   }
 
   @Override
@@ -213,4 +367,157 @@ public class IgniteStore<K, T extends PersistentBase> 
extends DataStoreBase<K, T
     }
   }
 
+  private Object serializeFieldValue(Column get, Schema fieldSchema, Object 
fieldValue) {
+    Object output = fieldValue;
+    switch (fieldSchema.getType()) {
+      case ARRAY:
+      case MAP:
+      case RECORD:
+        byte[] data = null;
+        try {
+          @SuppressWarnings("rawtypes")
+          SpecificDatumWriter writer = getDatumWriter(fieldSchema);
+          data = IOUtils.serialize(writer, fieldValue);
+        } catch (IOException e) {
+          LOG.error(e.getMessage(), e);
+        }
+        output = data;
+        break;
+      case UNION:
+        if (fieldSchema.getTypes().size() == 2 && isNullable(fieldSchema)) {
+          int schemaPos = getUnionSchema(fieldValue, fieldSchema);
+          Schema unionSchema = fieldSchema.getTypes().get(schemaPos);
+          output = serializeFieldValue(get, unionSchema, fieldValue);
+        } else {
+          data = null;
+          try {
+            @SuppressWarnings("rawtypes")
+            SpecificDatumWriter writer = getDatumWriter(fieldSchema);
+            data = IOUtils.serialize(writer, fieldValue);
+          } catch (IOException e) {
+            LOG.error(e.getMessage(), e);
+          }
+          output = data;
+        }
+        break;
+      case FIXED:
+        break;
+      case ENUM:
+      case STRING:
+        output = fieldValue.toString();
+        break;
+      case BYTES:
+        output = ((ByteBuffer) fieldValue).array();
+        break;
+      case INT:
+      case LONG:
+      case FLOAT:
+      case DOUBLE:
+      case BOOLEAN:
+        output = fieldValue;
+        break;
+      case NULL:
+        break;
+      default:
+        throw new AssertionError(fieldSchema.getType().name());
+
+    }
+    return output;
+  }
+
+  private boolean isNullable(Schema unionSchema) {
+    for (Schema innerSchema : unionSchema.getTypes()) {
+      if (innerSchema.getType().equals(Schema.Type.NULL)) {
+        return true;
+      }
+    }
+    return false;
+  }
+
+  /**
+   * Method to retrieve the corresponding schema type index of a particular
+   * object having UNION schema. As UNION type can have one or more types and 
at
+   * a given instance, it holds an object of only one type of the defined 
types,
+   * this method is used to figure out the corresponding instance's schema type
+   * index.
+   *
+   * @param instanceValue value that the object holds
+   * @param unionSchema union schema containing all of the data types
+   * @return the unionSchemaPosition corresponding schema position
+   */
+  private int getUnionSchema(Object instanceValue, Schema unionSchema) {
+    int unionSchemaPos = 0;
+    for (Schema currentSchema : unionSchema.getTypes()) {
+      Schema.Type schemaType = currentSchema.getType();
+      if (instanceValue instanceof CharSequence && 
schemaType.equals(Schema.Type.STRING)) {
+        return unionSchemaPos;
+      }
+      if (instanceValue instanceof ByteBuffer && 
schemaType.equals(Schema.Type.BYTES)) {
+        return unionSchemaPos;
+      }
+      if (instanceValue instanceof byte[] && 
schemaType.equals(Schema.Type.BYTES)) {
+        return unionSchemaPos;
+      }
+      if (instanceValue instanceof Integer && 
schemaType.equals(Schema.Type.INT)) {
+        return unionSchemaPos;
+      }
+      if (instanceValue instanceof Long && 
schemaType.equals(Schema.Type.LONG)) {
+        return unionSchemaPos;
+      }
+      if (instanceValue instanceof Double && 
schemaType.equals(Schema.Type.DOUBLE)) {
+        return unionSchemaPos;
+      }
+      if (instanceValue instanceof Float && 
schemaType.equals(Schema.Type.FLOAT)) {
+        return unionSchemaPos;
+      }
+      if (instanceValue instanceof Boolean && 
schemaType.equals(Schema.Type.BOOLEAN)) {
+        return unionSchemaPos;
+      }
+      if (instanceValue instanceof Map && schemaType.equals(Schema.Type.MAP)) {
+        return unionSchemaPos;
+      }
+      if (instanceValue instanceof List && 
schemaType.equals(Schema.Type.ARRAY)) {
+        return unionSchemaPos;
+      }
+      if (instanceValue instanceof Persistent && 
schemaType.equals(Schema.Type.RECORD)) {
+        return unionSchemaPos;
+      }
+      if (instanceValue instanceof byte[] && 
schemaType.equals(Schema.Type.MAP)) {
+        return unionSchemaPos;
+      }
+      if (instanceValue instanceof byte[] && 
schemaType.equals(Schema.Type.RECORD)) {
+        return unionSchemaPos;
+      }
+      if (instanceValue instanceof byte[] && 
schemaType.equals(Schema.Type.ARRAY)) {
+        return unionSchemaPos;
+      }
+      unionSchemaPos++;
+    }
+    return 0;
+  }
+
+  @SuppressWarnings("rawtypes")
+  private SpecificDatumReader getDatumReader(Schema fieldSchema) {
+    SpecificDatumReader<?> reader = readerMap.get(fieldSchema);
+    if (reader == null) {
+      reader = new SpecificDatumReader(fieldSchema);// ignore dirty bits
+      SpecificDatumReader localReader = null;
+      if ((localReader = readerMap.putIfAbsent(fieldSchema, reader)) != null) {
+        reader = localReader;
+      }
+    }
+    return reader;
+  }
+
+  @SuppressWarnings("rawtypes")
+  private SpecificDatumWriter getDatumWriter(Schema fieldSchema) {
+    SpecificDatumWriter writer = writerMap.get(fieldSchema);
+    if (writer == null) {
+      writer = new SpecificDatumWriter(fieldSchema);// ignore dirty bits
+      writerMap.put(fieldSchema, writer);
+    }
+
+    return writer;
+  }
+
 }

http://git-wip-us.apache.org/repos/asf/gora/blob/db7540e0/gora-ignite/src/main/java/org/apache/gora/ignite/utils/IgniteSQLBuilder.java
----------------------------------------------------------------------
diff --git 
a/gora-ignite/src/main/java/org/apache/gora/ignite/utils/IgniteSQLBuilder.java 
b/gora-ignite/src/main/java/org/apache/gora/ignite/utils/IgniteSQLBuilder.java
index 0480fd4..92014c1 100644
--- 
a/gora-ignite/src/main/java/org/apache/gora/ignite/utils/IgniteSQLBuilder.java
+++ 
b/gora-ignite/src/main/java/org/apache/gora/ignite/utils/IgniteSQLBuilder.java
@@ -17,10 +17,15 @@
 package org.apache.gora.ignite.utils;
 
 import avro.shaded.com.google.common.collect.Lists;
+import java.sql.PreparedStatement;
+import java.sql.SQLException;
 import java.text.MessageFormat;
 import java.util.ArrayList;
+import java.util.List;
 import java.util.Locale;
 import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Set;
 import org.apache.gora.ignite.store.Column;
 import org.apache.gora.ignite.store.IgniteMapping;
 
@@ -44,9 +49,9 @@ public class IgniteSQLBuilder {
     sqlBuilder.append("CREATE TABLE ");
     sqlBuilder.append(mapping.getTableName());
     sqlBuilder.append("(");
-    ArrayList<Map.Entry<String, Column>> fieldsList = 
Lists.newArrayList(mapping.getFields().entrySet());
-    for (Map.Entry<String, Column> aField : fieldsList) {
-      Column aColumn = aField.getValue();
+    ArrayList<Column> fieldsList = Lists.newArrayList(mapping.getPrimaryKey());
+    fieldsList.addAll(Lists.newArrayList(mapping.getFields().values()));
+    for (Column aColumn : fieldsList) {
       String name = aColumn.getName();
       Column.FieldType dataType = aColumn.getDataType();
       sqlBuilder.append(name).append(" 
").append(dataType.toString()).append(",");
@@ -54,16 +59,91 @@ public class IgniteSQLBuilder {
     sqlBuilder.append("PRIMARY KEY ");
     sqlBuilder.append("(");
     for (int i = 0; i < mapping.getPrimaryKey().size(); i++) {
-      sqlBuilder.append(mapping.getPrimaryKey().get(i));
+      sqlBuilder.append(mapping.getPrimaryKey().get(i).getName());
       sqlBuilder.append(i == mapping.getPrimaryKey().size() - 1 ? "" : ",");
     }
     sqlBuilder.append(")");
     sqlBuilder.append(");");
     return sqlBuilder.toString();
   }
-  
+
   public static String dropTable(String tableName) {
     return format("DROP TABLE IF EXISTS {0} ;", tableName);
   }
 
+  public static String baseInsertStatement(IgniteMapping mapping, Map<Column, 
Object> data) {
+    StringBuilder sqlBuilder = new StringBuilder();
+    sqlBuilder.append("MERGE INTO ");
+    sqlBuilder.append(mapping.getTableName());
+    sqlBuilder.append(" (");
+    List<Entry<Column, Object>> list = new ArrayList<>(data.entrySet());
+    for (int i = 0; i < list.size(); i++) {
+      sqlBuilder.append(list.get(i).getKey().getName());
+      sqlBuilder.append(i == list.size() - 1 ? "" : ",");
+    }
+    sqlBuilder.append(")");
+    sqlBuilder.append(" VALUES ");
+    sqlBuilder.append(" (");
+    for (int i = 0; i < list.size(); i++) {
+      sqlBuilder.append("?");
+      sqlBuilder.append(i == list.size() - 1 ? "" : ",");
+    }
+    sqlBuilder.append(" )");
+    return sqlBuilder.toString();
+  }
+
+  public static void fillInsertStatement(PreparedStatement st, Map<Column, 
Object> data) throws SQLException {
+    List<Entry<Column, Object>> list = new ArrayList<>(data.entrySet());
+    for (int i = 0; i < list.size(); i++) {
+      int j = i + 1;
+      st.setObject(j, list.get(i).getValue());
+    }
+  }
+
+  public static String delete(IgniteMapping mapping) {
+    StringBuilder sqlBuilder = new StringBuilder();
+    sqlBuilder.append("DELETE FROM ");
+    sqlBuilder.append(mapping.getTableName());
+    sqlBuilder.append(" WHERE ");
+    for (int i = 0; i < mapping.getPrimaryKey().size(); i++) {
+      sqlBuilder.append(mapping.getPrimaryKey().get(i).getName());
+      sqlBuilder.append("= ? ");
+      sqlBuilder.append(i == mapping.getPrimaryKey().size() - 1 ? "" : " AND 
");
+    }
+    return sqlBuilder.toString();
+  }
+
+  public static void fillDeleteStatement(PreparedStatement st, IgniteMapping 
mapping, Object... data) throws SQLException {
+    assert mapping.getPrimaryKey().size() == data.length;
+    for (int i = 0; i < mapping.getPrimaryKey().size(); i++) {
+      int j = i + 1;
+      st.setObject(j, data[i]);
+    }
+  }
+
+  public static String selectGet(IgniteMapping mapping, List<String> fields) {
+    StringBuilder sqlBuilder = new StringBuilder();
+    sqlBuilder.append("SELECT ");
+    for (int i = 0; i < fields.size(); i++) {
+      sqlBuilder.append(fields.get(i));
+      sqlBuilder.append(i == fields.size() - 1 ? "" : " , ");
+    }
+    sqlBuilder.append(" FROM ");
+    sqlBuilder.append(mapping.getTableName());
+    sqlBuilder.append(" WHERE ");
+    for (int i = 0; i < mapping.getPrimaryKey().size(); i++) {
+      sqlBuilder.append(mapping.getPrimaryKey().get(i).getName());
+      sqlBuilder.append("= ? ");
+      sqlBuilder.append(i == mapping.getPrimaryKey().size() - 1 ? "" : " AND 
");
+    }
+    return sqlBuilder.toString();
+  }
+
+  public static void fillSelectStatement(PreparedStatement st, IgniteMapping 
mapping, Object... data) throws SQLException {
+    assert mapping.getPrimaryKey().size() == data.length;
+    for (int i = 0; i < mapping.getPrimaryKey().size(); i++) {
+      int j = i + 1;
+      st.setObject(j, data[i]);
+    }
+  }
 }

http://git-wip-us.apache.org/repos/asf/gora/blob/db7540e0/gora-ignite/src/test/resources/gora-ignite-mapping.xml
----------------------------------------------------------------------
diff --git a/gora-ignite/src/test/resources/gora-ignite-mapping.xml 
b/gora-ignite/src/test/resources/gora-ignite-mapping.xml
index 3c13774..6368e18 100644
--- a/gora-ignite/src/test/resources/gora-ignite-mapping.xml
+++ b/gora-ignite/src/test/resources/gora-ignite-mapping.xml
@@ -19,17 +19,17 @@
 
 <gora-otd>
   <class name="org.apache.gora.examples.generated.Employee" 
keyClass="java.lang.String" table="Employee">
-    <primarykey column="ssn"/>
+    <primarykey column="pkssn" type="VARCHAR" />
     <field name="ssn" column="ssn" type="VARCHAR"/>
     <field name="name" column="name" type="VARCHAR"/>
     <field name="dateOfBirth" column="dateOfBirth" type="BIGINT"/>
     <field name="salary" column="salary" type="INT"/>
-    <field name="boss" column="boss" type="VARCHAR"/>
-    <field name="webpage" column="webpage" type="VARCHAR"/>
+    <field name="boss" column="boss" type="BINARY"/>
+    <field name="webpage" column="webpage" type="BINARY"/>
   </class>
   
   <class name="org.apache.gora.examples.generated.WebPage" 
keyClass="java.lang.String" table="WebPage">
-    <primarykey column="url"/>
+    <primarykey column="pkurl" type="VARCHAR"/>
     <field name="url" column="url" type="VARCHAR"/>
     <field name="content" column="content" type="BINARY"/>
     <field name="parsedContent" column="parsedContent" type="BINARY"/>
@@ -37,6 +37,6 @@
     <field name="headers" column="headers" type="BINARY"/>     
     <field name="metadata" column="metadata" type="BINARY"/>
     <field name="byteData" column="byteData" type="BINARY"/>
-    <field name="stringData" column="stringData" type="VARCHAR"/>
+    <field name="stringData" column="stringData" type="BINARY"/>
   </class>
 </gora-otd>

Reply via email to