Implement Queries for Ignite

The operations newQuery, execute, deleteByQuery, getPartitions are
implemented for the Ignite backend. The corresponding dependencies
IgniteQuery and IgniteResult are also included.


Project: http://git-wip-us.apache.org/repos/asf/gora/repo
Commit: http://git-wip-us.apache.org/repos/asf/gora/commit/87eddc24
Tree: http://git-wip-us.apache.org/repos/asf/gora/tree/87eddc24
Diff: http://git-wip-us.apache.org/repos/asf/gora/diff/87eddc24

Branch: refs/heads/master
Commit: 87eddc2495cdeec9b28246ea08708540c365f34e
Parents: db7540e
Author: Carlos M <carlosr...@gmail.com>
Authored: Thu Jul 5 23:11:38 2018 -0500
Committer: Carlos M <carlosr...@gmail.com>
Committed: Thu Jul 5 23:11:38 2018 -0500

----------------------------------------------------------------------
 .../apache/gora/ignite/query/IgniteQuery.java   |  32 ++++++
 .../apache/gora/ignite/query/IgniteResult.java  | 101 +++++++++++++++++++
 .../apache/gora/ignite/query/package-info.java  |  20 ++++
 .../apache/gora/ignite/store/IgniteStore.java   |  65 +++++++++++-
 .../gora/ignite/utils/IgniteSQLBuilder.java     |  89 ++++++++++++++++
 .../apache/gora/ignite/utils/package-info.java  |  20 ++++
 6 files changed, 322 insertions(+), 5 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/gora/blob/87eddc24/gora-ignite/src/main/java/org/apache/gora/ignite/query/IgniteQuery.java
----------------------------------------------------------------------
diff --git 
a/gora-ignite/src/main/java/org/apache/gora/ignite/query/IgniteQuery.java 
b/gora-ignite/src/main/java/org/apache/gora/ignite/query/IgniteQuery.java
new file mode 100644
index 0000000..b33e682
--- /dev/null
+++ b/gora-ignite/src/main/java/org/apache/gora/ignite/query/IgniteQuery.java
@@ -0,0 +1,32 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gora.ignite.query;
+
+import org.apache.gora.persistency.impl.PersistentBase;
+import org.apache.gora.query.impl.QueryBase;
+import org.apache.gora.store.DataStore;
+
+/**
+ * Ignite specific implementation of the {@link Query} interface.
+ */
+public class IgniteQuery<K, T extends PersistentBase> extends QueryBase<K, T> {
+
+  public IgniteQuery(DataStore<K, T> dataStore) {
+    super(dataStore);
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/gora/blob/87eddc24/gora-ignite/src/main/java/org/apache/gora/ignite/query/IgniteResult.java
----------------------------------------------------------------------
diff --git 
a/gora-ignite/src/main/java/org/apache/gora/ignite/query/IgniteResult.java 
b/gora-ignite/src/main/java/org/apache/gora/ignite/query/IgniteResult.java
new file mode 100644
index 0000000..09f717e
--- /dev/null
+++ b/gora-ignite/src/main/java/org/apache/gora/ignite/query/IgniteResult.java
@@ -0,0 +1,101 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gora.ignite.query;
+
+import java.io.IOException;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.sql.Statement;
+import org.apache.gora.ignite.store.IgniteStore;
+import org.apache.gora.persistency.impl.PersistentBase;
+import org.apache.gora.query.Query;
+import org.apache.gora.query.impl.ResultBase;
+import org.apache.gora.store.DataStore;
+
+/**
+ * IgniteResult specific implementation of the
+ * {@link org.apache.gora.query.Result} interface.
+ */
+public class IgniteResult<K, T extends PersistentBase> extends ResultBase<K, 
T> {
+
+  private ResultSet resultSet;
+  private Statement st;
+  private int size;
+
+  public IgniteResult(DataStore<K, T> dataStore, Query<K, T> query) {
+    super(dataStore, query);
+  }
+
+  @Override
+  protected boolean nextInner() throws IOException {
+    try {
+      if (!resultSet.next()) {
+        return false;
+      }
+      key = ((IgniteStore<K, T>) getDataStore()).extractKey(resultSet);
+      persistent = ((IgniteStore<K, T>) getDataStore()).newInstance(resultSet, 
getQuery().getFields());
+      return persistent != null;
+    } catch (SQLException ex) {
+      throw new IOException(ex);
+    }
+  }
+
+  @Override
+  public float getProgress() throws IOException, InterruptedException {
+    if (resultSet == null) {
+      return 0;
+    } else if (size == 0) {
+      return 1;
+    } else {
+      return offset / (float) size;
+    }
+  }
+
+  @Override
+  public int size() {
+    return size;
+  }
+
+  @Override
+  public void close() throws IOException {
+    if (resultSet != null) {
+      try {
+        resultSet.close();
+      } catch (SQLException ex) {
+        throw new IOException(ex);
+      }
+    }
+    if (st != null) {
+      try {
+        st.close();
+      } catch (SQLException ex) {
+        throw new IOException(ex);
+      }
+    }
+  }
+
+  public void setResultSet(ResultSet resultSet) throws SQLException {
+    this.resultSet = resultSet;
+    if (resultSet.last()) {
+      size = resultSet.getRow();
+    } else {
+      size = 0;
+    }
+    resultSet.beforeFirst();
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/gora/blob/87eddc24/gora-ignite/src/main/java/org/apache/gora/ignite/query/package-info.java
----------------------------------------------------------------------
diff --git 
a/gora-ignite/src/main/java/org/apache/gora/ignite/query/package-info.java 
b/gora-ignite/src/main/java/org/apache/gora/ignite/query/package-info.java
new file mode 100644
index 0000000..5e463ff
--- /dev/null
+++ b/gora-ignite/src/main/java/org/apache/gora/ignite/query/package-info.java
@@ -0,0 +1,20 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+/**
+ * This package contains Ignite datastore related all classes.
+ */
+package org.apache.gora.ignite.query;
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/gora/blob/87eddc24/gora-ignite/src/main/java/org/apache/gora/ignite/store/IgniteStore.java
----------------------------------------------------------------------
diff --git 
a/gora-ignite/src/main/java/org/apache/gora/ignite/store/IgniteStore.java 
b/gora-ignite/src/main/java/org/apache/gora/ignite/store/IgniteStore.java
index 497e161..d9f3527 100644
--- a/gora-ignite/src/main/java/org/apache/gora/ignite/store/IgniteStore.java
+++ b/gora-ignite/src/main/java/org/apache/gora/ignite/store/IgniteStore.java
@@ -28,19 +28,24 @@ import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
-import java.util.Map.Entry;
 import java.util.Properties;
 import java.util.concurrent.ConcurrentHashMap;
+import javax.sql.rowset.CachedRowSet;
+import javax.sql.rowset.RowSetFactory;
+import javax.sql.rowset.RowSetProvider;
 import org.apache.avro.Schema;
 import org.apache.avro.specific.SpecificDatumReader;
 import org.apache.avro.specific.SpecificDatumWriter;
 import org.apache.avro.util.Utf8;
+import org.apache.gora.ignite.query.IgniteQuery;
+import org.apache.gora.ignite.query.IgniteResult;
 import org.apache.gora.ignite.utils.IgniteSQLBuilder;
 import org.apache.gora.persistency.Persistent;
 import org.apache.gora.persistency.impl.PersistentBase;
 import org.apache.gora.query.PartitionQuery;
 import org.apache.gora.query.Query;
 import org.apache.gora.query.Result;
+import org.apache.gora.query.impl.PartitionQueryImpl;
 import org.apache.gora.store.impl.DataStoreBase;
 import org.apache.gora.util.AvroUtils;
 import org.apache.gora.util.GoraException;
@@ -334,22 +339,72 @@ public class IgniteStore<K, T extends PersistentBase> 
extends DataStoreBase<K, T
 
   @Override
   public long deleteByQuery(Query<K, T> query) throws GoraException {
-    throw new UnsupportedOperationException("Not supported yet."); //To change 
body of generated methods, choose Tools | Templates.
+
+    String deleteQuery;
+    if (query.getFields() != null && query.getFields().length < 
igniteMapping.getFields().size()) {
+      List<String> dbFields = new ArrayList<>();
+      for (String af : query.getFields()) {
+        dbFields.add(igniteMapping.getFields().get(af).getName());
+      }
+      deleteQuery = IgniteSQLBuilder.deleteQueryFields(igniteMapping, 
dbFields);
+    } else {
+      deleteQuery = IgniteSQLBuilder.deleteQuery(igniteMapping);
+    }
+    String selectQueryWhere = IgniteSQLBuilder.selectQueryWhere(igniteMapping, 
query.getStartKey(), query.getEndKey(), query.getLimit());
+    try (PreparedStatement stmt = connection.prepareStatement(deleteQuery + 
selectQueryWhere)) {
+      IgniteSQLBuilder.fillSelectQuery(stmt, query.getStartKey(), 
query.getEndKey());
+      stmt.executeUpdate();
+      return 0;
+    } catch (SQLException ex) {
+      throw new GoraException(ex);
+    }
   }
 
   @Override
   public Result<K, T> execute(Query<K, T> query) throws GoraException {
-    throw new UnsupportedOperationException("Not supported yet."); //To change 
body of generated methods, choose Tools | Templates.
+    String[] fields = getFieldsToQuery(query.getFields());
+    //Avro fields to Ignite fields
+    List<String> dbFields = new ArrayList<>();
+    for (String af : fields) {
+      dbFields.add(igniteMapping.getFields().get(af).getName());
+    }
+    String selectQuery = IgniteSQLBuilder.selectQuery(igniteMapping, dbFields);
+    String selectQueryWhere = IgniteSQLBuilder.selectQueryWhere(igniteMapping, 
query.getStartKey(), query.getEndKey(), query.getLimit());
+    try {
+      PreparedStatement stmt = connection.prepareStatement(selectQuery + 
selectQueryWhere);
+      RowSetFactory factory = RowSetProvider.newFactory();
+      CachedRowSet rowset = factory.createCachedRowSet();
+      IgniteSQLBuilder.fillSelectQuery(stmt, query.getStartKey(), 
query.getEndKey());
+      ResultSet executeQuery = stmt.executeQuery();
+      rowset.populate(executeQuery);
+      IgniteResult<K, T> igniteResult = new IgniteResult<>(this, query);
+      igniteResult.setResultSet(rowset);
+      return igniteResult;
+    } catch (SQLException ex) {
+      throw new GoraException(ex);
+    }
+  }
+
+  public K extractKey(ResultSet r) throws SQLException {
+    assert igniteMapping.getPrimaryKey().size() == 1;
+    return (K) r.getObject(igniteMapping.getPrimaryKey().get(0).getName());
   }
 
   @Override
   public Query<K, T> newQuery() {
-    throw new UnsupportedOperationException("Not supported yet."); //To change 
body of generated methods, choose Tools | Templates.
+    IgniteQuery<K, T> query = new IgniteQuery<>(this);
+    query.setFields(getFieldsToQuery(null));
+    return query;
   }
 
   @Override
   public List<PartitionQuery<K, T>> getPartitions(Query<K, T> query) throws 
IOException {
-    throw new UnsupportedOperationException("Not supported yet."); //To change 
body of generated methods, choose Tools | Templates.
+    List<PartitionQuery<K, T>> partitions = new ArrayList<>();
+    PartitionQueryImpl<K, T> partitionQuery = new PartitionQueryImpl<>(
+        query);
+    partitionQuery.setConf(getConf());
+    partitions.add(partitionQuery);
+    return partitions;
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/gora/blob/87eddc24/gora-ignite/src/main/java/org/apache/gora/ignite/utils/IgniteSQLBuilder.java
----------------------------------------------------------------------
diff --git 
a/gora-ignite/src/main/java/org/apache/gora/ignite/utils/IgniteSQLBuilder.java 
b/gora-ignite/src/main/java/org/apache/gora/ignite/utils/IgniteSQLBuilder.java
index 92014c1..4c1e314 100644
--- 
a/gora-ignite/src/main/java/org/apache/gora/ignite/utils/IgniteSQLBuilder.java
+++ 
b/gora-ignite/src/main/java/org/apache/gora/ignite/utils/IgniteSQLBuilder.java
@@ -146,4 +146,93 @@ public class IgniteSQLBuilder {
       st.setObject(j, data[i]);
     }
   }
+
+  public static String selectQuery(IgniteMapping mapping, List<String> 
ifields) {
+    List<String> fields = new ArrayList<>();
+    for (Column c : mapping.getPrimaryKey()) {
+      fields.add(c.getName());
+    }
+    fields.addAll(ifields);
+    StringBuilder sqlBuilder = new StringBuilder();
+    sqlBuilder.append("SELECT ");
+    for (int i = 0; i < fields.size(); i++) {
+      sqlBuilder.append(fields.get(i));
+      sqlBuilder.append(i == fields.size() - 1 ? "" : " , ");
+    }
+    sqlBuilder.append(" FROM ");
+    sqlBuilder.append(mapping.getTableName());
+    return sqlBuilder.toString();
+  }
+
+  public static String deleteQuery(IgniteMapping mapping) {
+    StringBuilder sqlBuilder = new StringBuilder();
+    sqlBuilder.append("DELETE FROM ");
+    sqlBuilder.append(mapping.getTableName());
+    return sqlBuilder.toString();
+  }
+
+  public static String deleteQueryFields(IgniteMapping mapping, List<String> 
lsFields) {
+    StringBuilder sqlBuilder = new StringBuilder();
+    sqlBuilder.append("UPDATE ");
+    sqlBuilder.append(mapping.getTableName());
+    if (!lsFields.isEmpty()) {
+      sqlBuilder.append(" SET ");
+    }
+    for (int i = 0; i < lsFields.size(); i++) {
+      sqlBuilder.append(lsFields.get(i));
+      sqlBuilder.append(" = null");
+      sqlBuilder.append(i == lsFields.size() - 1 ? "" : " , ");
+    }
+    return sqlBuilder.toString();
+  }
+
+  public static String selectQueryWhere(IgniteMapping mapping, Object sk, 
Object ek, long limit) {
+    //composite keys pending
+    assert mapping.getPrimaryKey().size() == 1;
+    String keycolumn = mapping.getPrimaryKey().get(0).getName();
+    StringBuilder sqlBuilder = new StringBuilder();
+    if (sk != null || ek != null) {
+      sqlBuilder.append(" WHERE ");
+      if (sk != null && ek != null && sk.equals(ek)) {
+        sqlBuilder.append(keycolumn);
+        sqlBuilder.append("= ?");
+      } else {
+        if (sk != null) {
+          sqlBuilder.append(keycolumn);
+          sqlBuilder.append(">= ?");
+        }
+        if (sk != null && ek != null) {
+          sqlBuilder.append(" AND ");
+        }
+        if (ek != null) {
+          sqlBuilder.append(keycolumn);
+          sqlBuilder.append("<= ?");
+        }
+      }
+    }
+    if (limit > 0) {
+      sqlBuilder.append(" LIMIT " + limit);
+    }
+    return sqlBuilder.toString();
+  }
+
+  public static void fillSelectQuery(PreparedStatement st, Object sk, Object 
ek) throws SQLException {
+    if (sk != null || ek != null) {
+      if (sk != null && ek != null && sk.equals(ek)) {
+        st.setObject(1, sk);
+      } else {
+        if (sk != null && ek != null) {
+          st.setObject(1, sk);
+          st.setObject(2, ek);
+        } else {
+          if (sk != null) {
+            st.setObject(1, sk);
+          } else {
+            st.setObject(1, ek);
+          }
+        }
+      }
+    }
+  }
+
 }

http://git-wip-us.apache.org/repos/asf/gora/blob/87eddc24/gora-ignite/src/main/java/org/apache/gora/ignite/utils/package-info.java
----------------------------------------------------------------------
diff --git 
a/gora-ignite/src/main/java/org/apache/gora/ignite/utils/package-info.java 
b/gora-ignite/src/main/java/org/apache/gora/ignite/utils/package-info.java
new file mode 100644
index 0000000..ce4cd7a
--- /dev/null
+++ b/gora-ignite/src/main/java/org/apache/gora/ignite/utils/package-info.java
@@ -0,0 +1,20 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+/**
+ * This package contains Ignite datastore related all classes.
+ */
+package org.apache.gora.ignite.utils;
\ No newline at end of file

Reply via email to