Implement Queries for Ignite The operations newQuery, execute, deleteByQuery, getPartitions are implemented for the Ignite backend. The corresponding dependencies IgniteQuery and IgniteResult are also included.
Project: http://git-wip-us.apache.org/repos/asf/gora/repo Commit: http://git-wip-us.apache.org/repos/asf/gora/commit/87eddc24 Tree: http://git-wip-us.apache.org/repos/asf/gora/tree/87eddc24 Diff: http://git-wip-us.apache.org/repos/asf/gora/diff/87eddc24 Branch: refs/heads/master Commit: 87eddc2495cdeec9b28246ea08708540c365f34e Parents: db7540e Author: Carlos M <carlosr...@gmail.com> Authored: Thu Jul 5 23:11:38 2018 -0500 Committer: Carlos M <carlosr...@gmail.com> Committed: Thu Jul 5 23:11:38 2018 -0500 ---------------------------------------------------------------------- .../apache/gora/ignite/query/IgniteQuery.java | 32 ++++++ .../apache/gora/ignite/query/IgniteResult.java | 101 +++++++++++++++++++ .../apache/gora/ignite/query/package-info.java | 20 ++++ .../apache/gora/ignite/store/IgniteStore.java | 65 +++++++++++- .../gora/ignite/utils/IgniteSQLBuilder.java | 89 ++++++++++++++++ .../apache/gora/ignite/utils/package-info.java | 20 ++++ 6 files changed, 322 insertions(+), 5 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/gora/blob/87eddc24/gora-ignite/src/main/java/org/apache/gora/ignite/query/IgniteQuery.java ---------------------------------------------------------------------- diff --git a/gora-ignite/src/main/java/org/apache/gora/ignite/query/IgniteQuery.java b/gora-ignite/src/main/java/org/apache/gora/ignite/query/IgniteQuery.java new file mode 100644 index 0000000..b33e682 --- /dev/null +++ b/gora-ignite/src/main/java/org/apache/gora/ignite/query/IgniteQuery.java @@ -0,0 +1,32 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.gora.ignite.query; + +import org.apache.gora.persistency.impl.PersistentBase; +import org.apache.gora.query.impl.QueryBase; +import org.apache.gora.store.DataStore; + +/** + * Ignite specific implementation of the {@link Query} interface. + */ +public class IgniteQuery<K, T extends PersistentBase> extends QueryBase<K, T> { + + public IgniteQuery(DataStore<K, T> dataStore) { + super(dataStore); + } + +} http://git-wip-us.apache.org/repos/asf/gora/blob/87eddc24/gora-ignite/src/main/java/org/apache/gora/ignite/query/IgniteResult.java ---------------------------------------------------------------------- diff --git a/gora-ignite/src/main/java/org/apache/gora/ignite/query/IgniteResult.java b/gora-ignite/src/main/java/org/apache/gora/ignite/query/IgniteResult.java new file mode 100644 index 0000000..09f717e --- /dev/null +++ b/gora-ignite/src/main/java/org/apache/gora/ignite/query/IgniteResult.java @@ -0,0 +1,101 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.gora.ignite.query; + +import java.io.IOException; +import java.sql.ResultSet; +import java.sql.SQLException; +import java.sql.Statement; +import org.apache.gora.ignite.store.IgniteStore; +import org.apache.gora.persistency.impl.PersistentBase; +import org.apache.gora.query.Query; +import org.apache.gora.query.impl.ResultBase; +import org.apache.gora.store.DataStore; + +/** + * IgniteResult specific implementation of the + * {@link org.apache.gora.query.Result} interface. + */ +public class IgniteResult<K, T extends PersistentBase> extends ResultBase<K, T> { + + private ResultSet resultSet; + private Statement st; + private int size; + + public IgniteResult(DataStore<K, T> dataStore, Query<K, T> query) { + super(dataStore, query); + } + + @Override + protected boolean nextInner() throws IOException { + try { + if (!resultSet.next()) { + return false; + } + key = ((IgniteStore<K, T>) getDataStore()).extractKey(resultSet); + persistent = ((IgniteStore<K, T>) getDataStore()).newInstance(resultSet, getQuery().getFields()); + return persistent != null; + } catch (SQLException ex) { + throw new IOException(ex); + } + } + + @Override + public float getProgress() throws IOException, InterruptedException { + if (resultSet == null) { + return 0; + } else if (size == 0) { + return 1; + } else { + return offset / (float) size; + } + } + + @Override + public int size() { + return size; + } + + @Override + public void close() throws IOException { + if (resultSet != null) { + try { + resultSet.close(); + } catch (SQLException ex) { + throw new IOException(ex); + } + } + if (st != null) { + try { + st.close(); + } catch (SQLException ex) { + throw new IOException(ex); + } + } + } + + public void setResultSet(ResultSet resultSet) throws SQLException { + this.resultSet = resultSet; + if (resultSet.last()) { + size = resultSet.getRow(); + } else { + size = 0; + } + resultSet.beforeFirst(); + } + +} http://git-wip-us.apache.org/repos/asf/gora/blob/87eddc24/gora-ignite/src/main/java/org/apache/gora/ignite/query/package-info.java ---------------------------------------------------------------------- diff --git a/gora-ignite/src/main/java/org/apache/gora/ignite/query/package-info.java b/gora-ignite/src/main/java/org/apache/gora/ignite/query/package-info.java new file mode 100644 index 0000000..5e463ff --- /dev/null +++ b/gora-ignite/src/main/java/org/apache/gora/ignite/query/package-info.java @@ -0,0 +1,20 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +/** + * This package contains Ignite datastore related all classes. + */ +package org.apache.gora.ignite.query; \ No newline at end of file http://git-wip-us.apache.org/repos/asf/gora/blob/87eddc24/gora-ignite/src/main/java/org/apache/gora/ignite/store/IgniteStore.java ---------------------------------------------------------------------- diff --git a/gora-ignite/src/main/java/org/apache/gora/ignite/store/IgniteStore.java b/gora-ignite/src/main/java/org/apache/gora/ignite/store/IgniteStore.java index 497e161..d9f3527 100644 --- a/gora-ignite/src/main/java/org/apache/gora/ignite/store/IgniteStore.java +++ b/gora-ignite/src/main/java/org/apache/gora/ignite/store/IgniteStore.java @@ -28,19 +28,24 @@ import java.util.ArrayList; import java.util.HashMap; import java.util.List; import java.util.Map; -import java.util.Map.Entry; import java.util.Properties; import java.util.concurrent.ConcurrentHashMap; +import javax.sql.rowset.CachedRowSet; +import javax.sql.rowset.RowSetFactory; +import javax.sql.rowset.RowSetProvider; import org.apache.avro.Schema; import org.apache.avro.specific.SpecificDatumReader; import org.apache.avro.specific.SpecificDatumWriter; import org.apache.avro.util.Utf8; +import org.apache.gora.ignite.query.IgniteQuery; +import org.apache.gora.ignite.query.IgniteResult; import org.apache.gora.ignite.utils.IgniteSQLBuilder; import org.apache.gora.persistency.Persistent; import org.apache.gora.persistency.impl.PersistentBase; import org.apache.gora.query.PartitionQuery; import org.apache.gora.query.Query; import org.apache.gora.query.Result; +import org.apache.gora.query.impl.PartitionQueryImpl; import org.apache.gora.store.impl.DataStoreBase; import org.apache.gora.util.AvroUtils; import org.apache.gora.util.GoraException; @@ -334,22 +339,72 @@ public class IgniteStore<K, T extends PersistentBase> extends DataStoreBase<K, T @Override public long deleteByQuery(Query<K, T> query) throws GoraException { - throw new UnsupportedOperationException("Not supported yet."); //To change body of generated methods, choose Tools | Templates. + + String deleteQuery; + if (query.getFields() != null && query.getFields().length < igniteMapping.getFields().size()) { + List<String> dbFields = new ArrayList<>(); + for (String af : query.getFields()) { + dbFields.add(igniteMapping.getFields().get(af).getName()); + } + deleteQuery = IgniteSQLBuilder.deleteQueryFields(igniteMapping, dbFields); + } else { + deleteQuery = IgniteSQLBuilder.deleteQuery(igniteMapping); + } + String selectQueryWhere = IgniteSQLBuilder.selectQueryWhere(igniteMapping, query.getStartKey(), query.getEndKey(), query.getLimit()); + try (PreparedStatement stmt = connection.prepareStatement(deleteQuery + selectQueryWhere)) { + IgniteSQLBuilder.fillSelectQuery(stmt, query.getStartKey(), query.getEndKey()); + stmt.executeUpdate(); + return 0; + } catch (SQLException ex) { + throw new GoraException(ex); + } } @Override public Result<K, T> execute(Query<K, T> query) throws GoraException { - throw new UnsupportedOperationException("Not supported yet."); //To change body of generated methods, choose Tools | Templates. + String[] fields = getFieldsToQuery(query.getFields()); + //Avro fields to Ignite fields + List<String> dbFields = new ArrayList<>(); + for (String af : fields) { + dbFields.add(igniteMapping.getFields().get(af).getName()); + } + String selectQuery = IgniteSQLBuilder.selectQuery(igniteMapping, dbFields); + String selectQueryWhere = IgniteSQLBuilder.selectQueryWhere(igniteMapping, query.getStartKey(), query.getEndKey(), query.getLimit()); + try { + PreparedStatement stmt = connection.prepareStatement(selectQuery + selectQueryWhere); + RowSetFactory factory = RowSetProvider.newFactory(); + CachedRowSet rowset = factory.createCachedRowSet(); + IgniteSQLBuilder.fillSelectQuery(stmt, query.getStartKey(), query.getEndKey()); + ResultSet executeQuery = stmt.executeQuery(); + rowset.populate(executeQuery); + IgniteResult<K, T> igniteResult = new IgniteResult<>(this, query); + igniteResult.setResultSet(rowset); + return igniteResult; + } catch (SQLException ex) { + throw new GoraException(ex); + } + } + + public K extractKey(ResultSet r) throws SQLException { + assert igniteMapping.getPrimaryKey().size() == 1; + return (K) r.getObject(igniteMapping.getPrimaryKey().get(0).getName()); } @Override public Query<K, T> newQuery() { - throw new UnsupportedOperationException("Not supported yet."); //To change body of generated methods, choose Tools | Templates. + IgniteQuery<K, T> query = new IgniteQuery<>(this); + query.setFields(getFieldsToQuery(null)); + return query; } @Override public List<PartitionQuery<K, T>> getPartitions(Query<K, T> query) throws IOException { - throw new UnsupportedOperationException("Not supported yet."); //To change body of generated methods, choose Tools | Templates. + List<PartitionQuery<K, T>> partitions = new ArrayList<>(); + PartitionQueryImpl<K, T> partitionQuery = new PartitionQueryImpl<>( + query); + partitionQuery.setConf(getConf()); + partitions.add(partitionQuery); + return partitions; } @Override http://git-wip-us.apache.org/repos/asf/gora/blob/87eddc24/gora-ignite/src/main/java/org/apache/gora/ignite/utils/IgniteSQLBuilder.java ---------------------------------------------------------------------- diff --git a/gora-ignite/src/main/java/org/apache/gora/ignite/utils/IgniteSQLBuilder.java b/gora-ignite/src/main/java/org/apache/gora/ignite/utils/IgniteSQLBuilder.java index 92014c1..4c1e314 100644 --- a/gora-ignite/src/main/java/org/apache/gora/ignite/utils/IgniteSQLBuilder.java +++ b/gora-ignite/src/main/java/org/apache/gora/ignite/utils/IgniteSQLBuilder.java @@ -146,4 +146,93 @@ public class IgniteSQLBuilder { st.setObject(j, data[i]); } } + + public static String selectQuery(IgniteMapping mapping, List<String> ifields) { + List<String> fields = new ArrayList<>(); + for (Column c : mapping.getPrimaryKey()) { + fields.add(c.getName()); + } + fields.addAll(ifields); + StringBuilder sqlBuilder = new StringBuilder(); + sqlBuilder.append("SELECT "); + for (int i = 0; i < fields.size(); i++) { + sqlBuilder.append(fields.get(i)); + sqlBuilder.append(i == fields.size() - 1 ? "" : " , "); + } + sqlBuilder.append(" FROM "); + sqlBuilder.append(mapping.getTableName()); + return sqlBuilder.toString(); + } + + public static String deleteQuery(IgniteMapping mapping) { + StringBuilder sqlBuilder = new StringBuilder(); + sqlBuilder.append("DELETE FROM "); + sqlBuilder.append(mapping.getTableName()); + return sqlBuilder.toString(); + } + + public static String deleteQueryFields(IgniteMapping mapping, List<String> lsFields) { + StringBuilder sqlBuilder = new StringBuilder(); + sqlBuilder.append("UPDATE "); + sqlBuilder.append(mapping.getTableName()); + if (!lsFields.isEmpty()) { + sqlBuilder.append(" SET "); + } + for (int i = 0; i < lsFields.size(); i++) { + sqlBuilder.append(lsFields.get(i)); + sqlBuilder.append(" = null"); + sqlBuilder.append(i == lsFields.size() - 1 ? "" : " , "); + } + return sqlBuilder.toString(); + } + + public static String selectQueryWhere(IgniteMapping mapping, Object sk, Object ek, long limit) { + //composite keys pending + assert mapping.getPrimaryKey().size() == 1; + String keycolumn = mapping.getPrimaryKey().get(0).getName(); + StringBuilder sqlBuilder = new StringBuilder(); + if (sk != null || ek != null) { + sqlBuilder.append(" WHERE "); + if (sk != null && ek != null && sk.equals(ek)) { + sqlBuilder.append(keycolumn); + sqlBuilder.append("= ?"); + } else { + if (sk != null) { + sqlBuilder.append(keycolumn); + sqlBuilder.append(">= ?"); + } + if (sk != null && ek != null) { + sqlBuilder.append(" AND "); + } + if (ek != null) { + sqlBuilder.append(keycolumn); + sqlBuilder.append("<= ?"); + } + } + } + if (limit > 0) { + sqlBuilder.append(" LIMIT " + limit); + } + return sqlBuilder.toString(); + } + + public static void fillSelectQuery(PreparedStatement st, Object sk, Object ek) throws SQLException { + if (sk != null || ek != null) { + if (sk != null && ek != null && sk.equals(ek)) { + st.setObject(1, sk); + } else { + if (sk != null && ek != null) { + st.setObject(1, sk); + st.setObject(2, ek); + } else { + if (sk != null) { + st.setObject(1, sk); + } else { + st.setObject(1, ek); + } + } + } + } + } + } http://git-wip-us.apache.org/repos/asf/gora/blob/87eddc24/gora-ignite/src/main/java/org/apache/gora/ignite/utils/package-info.java ---------------------------------------------------------------------- diff --git a/gora-ignite/src/main/java/org/apache/gora/ignite/utils/package-info.java b/gora-ignite/src/main/java/org/apache/gora/ignite/utils/package-info.java new file mode 100644 index 0000000..ce4cd7a --- /dev/null +++ b/gora-ignite/src/main/java/org/apache/gora/ignite/utils/package-info.java @@ -0,0 +1,20 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +/** + * This package contains Ignite datastore related all classes. + */ +package org.apache.gora.ignite.utils; \ No newline at end of file