[
https://issues.apache.org/jira/browse/GORA-535?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16544458#comment-16544458
]
ASF GitHub Bot commented on GORA-535:
-------------------------------------
Github user nishadi commented on a diff in the pull request:
https://github.com/apache/gora/pull/134#discussion_r202534472
--- Diff:
gora-ignite/src/main/java/org/apache/gora/ignite/store/IgniteStore.java ---
@@ -0,0 +1,578 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gora.ignite.store;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.sql.Connection;
+import java.sql.DriverManager;
+import java.sql.PreparedStatement;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.sql.Statement;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+import java.util.concurrent.ConcurrentHashMap;
+import javax.sql.rowset.CachedRowSet;
+import javax.sql.rowset.RowSetFactory;
+import javax.sql.rowset.RowSetProvider;
+import org.apache.avro.Schema;
+import org.apache.avro.specific.SpecificDatumReader;
+import org.apache.avro.specific.SpecificDatumWriter;
+import org.apache.avro.util.Utf8;
+import org.apache.gora.ignite.query.IgniteQuery;
+import org.apache.gora.ignite.query.IgniteResult;
+import org.apache.gora.ignite.utils.IgniteSQLBuilder;
+import org.apache.gora.persistency.Persistent;
+import org.apache.gora.persistency.impl.PersistentBase;
+import org.apache.gora.query.PartitionQuery;
+import org.apache.gora.query.Query;
+import org.apache.gora.query.Result;
+import org.apache.gora.query.impl.PartitionQueryImpl;
+import org.apache.gora.store.impl.DataStoreBase;
+import org.apache.gora.util.AvroUtils;
+import org.apache.gora.util.GoraException;
+import org.apache.gora.util.IOUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Implementation of a Ignite data store to be used by gora.
+ *
+ * @param <K> class to be used for the key
+ * @param <T> class to be persisted within the store
+ */
+public class IgniteStore<K, T extends PersistentBase> extends
DataStoreBase<K, T> {
+
+ public static final Logger LOG =
LoggerFactory.getLogger(IgniteStore.class);
+ private static final String PARSE_MAPPING_FILE_KEY =
"gora.ignite.mapping.file";
+ private static final String DEFAULT_MAPPING_FILE =
"gora-ignite-mapping.xml";
+ private IgniteParameters igniteParameters;
+ private IgniteMapping igniteMapping;
+ private Connection connection;
+
+ /*
+ * Create a threadlocal map for the datum readers and writers, because
they
+ * are not thread safe, at least not before Avro 1.4.0 (See AVRO-650).
When
+ * they are thread safe, it is possible to maintain a single reader and
writer
+ * pair for every schema, instead of one for every thread.
+ */
+ public static final ConcurrentHashMap<Schema, SpecificDatumReader<?>>
readerMap = new ConcurrentHashMap<>();
+
+ public static final ConcurrentHashMap<Schema, SpecificDatumWriter<?>>
writerMap = new ConcurrentHashMap<>();
+
+ @Override
+ public void initialize(Class<K> keyClass, Class<T> persistentClass,
Properties properties) throws GoraException {
+
+ try {
+ super.initialize(keyClass, persistentClass, properties);
+ IgniteMappingBuilder builder = new IgniteMappingBuilder(this);
+ builder.readMappingFile(getConf().get(PARSE_MAPPING_FILE_KEY,
DEFAULT_MAPPING_FILE));
+ igniteMapping = builder.getIgniteMapping();
+ igniteParameters = IgniteParameters.load(properties, conf);
+ connection = acquiereConnection();
+ LOG.info("Ignite store was successfully initialized");
+ } catch (ClassNotFoundException | SQLException ex) {
+ LOG.error("Error while initializing Ignite store", ex);
+ throw new GoraException(ex);
+ }
+ }
+
+ private Connection acquiereConnection() throws ClassNotFoundException,
SQLException {
+ Class.forName("org.apache.ignite.IgniteJdbcThinDriver");
+ StringBuilder urlBuilder = new StringBuilder();
+ urlBuilder.append("jdbc:ignite:thin://");
+ urlBuilder.append(igniteParameters.getHost());
+ if (igniteParameters.getPort() != null) {
+ urlBuilder.append(":" + igniteParameters.getPort());
+ }
+ if (igniteParameters.getSchema() != null) {
+ urlBuilder.append("/" + igniteParameters.getSchema());
+ }
+ if (igniteParameters.getUser() != null) {
+ urlBuilder.append(";" + igniteParameters.getUser());
+ }
+ if (igniteParameters.getPassword() != null) {
+ urlBuilder.append(";" + igniteParameters.getPassword());
+ }
+ if (igniteParameters.getAdditionalConfigurations() != null) {
+ urlBuilder.append(igniteParameters.getAdditionalConfigurations());
+ }
+ Connection conn = DriverManager.getConnection(urlBuilder.toString());
+ return conn;
+ }
+
+ @Override
+ public String getSchemaName() {
+ return igniteMapping.getTableName();
+ }
+
+ @Override
+ public String getSchemaName(final String mappingSchemaName,
+ final Class<?> persistentClass) {
+ return super.getSchemaName(mappingSchemaName, persistentClass);
+ }
+
+ @Override
+ public void createSchema() throws GoraException {
+ if (connection == null) {
+ throw new GoraException(
+ "Impossible to create the schema as no connection has been
initiated.");
+ }
+ if (schemaExists()) {
+ return;
+ }
+ try (Statement stmt = connection.createStatement()) {
+ String createTableSQL = IgniteSQLBuilder.createTable(igniteMapping);
+ stmt.executeUpdate(createTableSQL);
+ LOG.info("Table {} has been created for Ignite instance.",
+ igniteMapping.getTableName());
+ } catch (SQLException ex) {
+ throw new GoraException(ex);
+ }
+ }
+
+ @Override
+ public void deleteSchema() throws GoraException {
+ if (connection == null) {
+ throw new GoraException(
+ "Impossible to delete the schema as no connection has been
initiated.");
+ }
+ try (Statement stmt = connection.createStatement()) {
+ String dropTableSQL =
IgniteSQLBuilder.dropTable(igniteMapping.getTableName());
+ stmt.executeUpdate(dropTableSQL);
+ LOG.info("Table {} has been dropped from Ignite instance.",
+ igniteMapping.getTableName());
+ } catch (SQLException ex) {
+ throw new GoraException(ex);
+ }
+ }
+
+ @Override
+ public boolean schemaExists() throws GoraException {
+ boolean exists = false;
+ try (Statement stmt = connection.createStatement()) {
+ String tableExistsSQL =
IgniteSQLBuilder.tableExists(igniteMapping.getTableName());
+ ResultSet executeQuery = stmt.executeQuery(tableExistsSQL);
+ executeQuery.close();
+ exists = true;
+ } catch (SQLException ex) {
+ /**
+ * a 42000 error code is thrown by Ignite when a non-existent table
+ * queried. More details:
+ * https://apacheignite-sql.readme.io/docs/jdbc-error-codes
+ */
+ if (ex.getSQLState() != null && ex.getSQLState().equals("42000")) {
+ exists = false;
+ } else {
+ throw new GoraException(ex);
+ }
+ }
+ return exists;
+ }
+
+ @Override
+ public T get(K key, String[] fields) throws GoraException {
+ String[] avFields = getFieldsToQuery(fields);
+ Object[] keyl = null;
+ if (igniteMapping.getPrimaryKey().size() == 1) {
+ keyl = new Object[]{key};
+ } else {
+ //Composite key pending
+ }
+ //Avro fields to Ignite fields
+ List<String> dbFields = new ArrayList<>();
+ for (String af : avFields) {
+ dbFields.add(igniteMapping.getFields().get(af).getName());
+ }
+ String selectQuery = IgniteSQLBuilder.selectGet(igniteMapping,
dbFields);
+ try (PreparedStatement stmt =
connection.prepareStatement(selectQuery)) {
+ IgniteSQLBuilder.fillSelectStatement(stmt, igniteMapping, keyl);
+ ResultSet rs = stmt.executeQuery();
+ boolean data = rs.next();
+ T resp = null;
+ if (data) {
+ resp = newInstance(rs, fields);
+ if (rs.next()) {
+ LOG.warn("Multiple results for primary key {} in the schema {},
ignoring additional rows.", keyl, igniteMapping.getTableName());
+ }
+ }
+ rs.close();
+ return resp;
+ } catch (SQLException | IOException ex) {
+ throw new GoraException(ex);
+ }
+
+ }
+
+ public T newInstance(ResultSet rs, String[] fields) throws
GoraException, SQLException, IOException {
+ fields = getFieldsToQuery(fields);
+ T persistent = newPersistent();
+ for (String f : fields) {
+ Schema.Field field = fieldMap.get(f);
+ Schema fieldSchema = field.schema();
+ String dbField = igniteMapping.getFields().get(f).getName();
+ Object sv = rs.getObject(dbField);
+ if (sv == null) {
+ continue;
+ }
+ Object v = deserializeFieldValue(field, fieldSchema, sv, persistent);
+ persistent.put(field.pos(), v);
+ persistent.setDirty(field.pos());
+ }
+ return persistent;
+ }
+
+ private Object deserializeFieldValue(Schema.Field field, Schema
fieldSchema,
+ Object igniteValue, T persistent) throws IOException {
+ Object fieldValue = null;
+ switch (fieldSchema.getType()) {
+ case MAP:
+ case ARRAY:
+ case RECORD:
+ @SuppressWarnings("rawtypes") SpecificDatumReader reader =
getDatumReader(fieldSchema);
+ fieldValue = IOUtils.deserialize((byte[]) igniteValue, reader,
+ persistent.get(field.pos()));
+ break;
+ case ENUM:
+ fieldValue = AvroUtils.getEnumValue(fieldSchema,
igniteValue.toString());
+ break;
+ case FIXED:
+ break;
+ case BYTES:
+ fieldValue = ByteBuffer.wrap((byte[]) igniteValue);
+ break;
+ case STRING:
+ fieldValue = new Utf8(igniteValue.toString());
+ break;
+ case UNION:
+ if (fieldSchema.getTypes().size() == 2 && isNullable(fieldSchema))
{
+ int schemaPos = getUnionSchema(igniteValue, fieldSchema);
+ Schema unionSchema = fieldSchema.getTypes().get(schemaPos);
+ fieldValue = deserializeFieldValue(field, unionSchema,
igniteValue, persistent);
+ } else {
+ reader = getDatumReader(fieldSchema);
+ fieldValue = IOUtils.deserialize((byte[]) igniteValue, reader,
+ persistent.get(field.pos()));
+ }
+ break;
+ default:
+ fieldValue = igniteValue;
+ }
+ return fieldValue;
+
+ }
+
+ @Override
+ public void put(K key, T obj) throws GoraException {
+ try {
+ if (obj.isDirty()) {
+ Schema schema = obj.getSchema();
+ List<Schema.Field> fields = schema.getFields();
+ Map<Column, Object> data = new HashMap<>();
+ if (igniteMapping.getPrimaryKey().size() == 1) {
+ Column getKey = igniteMapping.getPrimaryKey().get(0);
+ data.put(getKey, key);
+ } else {
+ //Composite keys pending..
+ }
+ for (Schema.Field field : fields) {
+ Column get = igniteMapping.getFields().get(field.name());
+ Object v = obj.get(field.pos());
+ if (get != null && v != null) {
+ Schema fieldSchema = field.schema();
+ Object serializedObj = serializeFieldValue(get, fieldSchema,
v);
+ data.put(get, serializedObj);
+ }
+ }
+ String baseInsertStatement =
IgniteSQLBuilder.baseInsertStatement(igniteMapping, data);
+ try (PreparedStatement stmt =
connection.prepareStatement(baseInsertStatement)) {
+ IgniteSQLBuilder.fillInsertStatement(stmt, data);
+ stmt.executeUpdate();
+ } catch (SQLException ex) {
+ throw new GoraException(ex);
+ }
+ } else {
+ LOG.info("Ignored putting object {} in the store as it is neither "
+ + "new, neither dirty.", new Object[]{obj});
+ }
+ } catch (Exception e) {
+ throw new GoraException(e);
+ }
+ }
+
+ @Override
+ public boolean delete(K key) throws GoraException {
+ String deleteQuery = null;
+ Object[] keyl = null;
+ if (igniteMapping.getPrimaryKey().size() == 1) {
+ deleteQuery = IgniteSQLBuilder.delete(igniteMapping);
+ keyl = new Object[]{key};
+ } else {
+ //Composite key pending
+ }
+ try (PreparedStatement stmt =
connection.prepareStatement(deleteQuery)) {
+ IgniteSQLBuilder.fillDeleteStatement(stmt, igniteMapping, keyl);
+ stmt.executeUpdate();
+ return true;
+ } catch (SQLException ex) {
+ throw new GoraException(ex);
+ }
+ }
+
+ @Override
+ public long deleteByQuery(Query<K, T> query) throws GoraException {
+
+ String deleteQuery;
+ if (query.getFields() != null && query.getFields().length <
igniteMapping.getFields().size()) {
+ List<String> dbFields = new ArrayList<>();
+ for (String af : query.getFields()) {
+ dbFields.add(igniteMapping.getFields().get(af).getName());
+ }
+ deleteQuery = IgniteSQLBuilder.deleteQueryFields(igniteMapping,
dbFields);
+ } else {
+ deleteQuery = IgniteSQLBuilder.deleteQuery(igniteMapping);
+ }
+ String selectQueryWhere =
IgniteSQLBuilder.selectQueryWhere(igniteMapping, query.getStartKey(),
query.getEndKey(), query.getLimit());
+ try (PreparedStatement stmt = connection.prepareStatement(deleteQuery
+ selectQueryWhere)) {
+ IgniteSQLBuilder.fillSelectQuery(stmt, query.getStartKey(),
query.getEndKey());
+ stmt.executeUpdate();
+ return 0;
+ } catch (SQLException ex) {
+ throw new GoraException(ex);
+ }
+ }
+
+ @Override
+ public Result<K, T> execute(Query<K, T> query) throws GoraException {
+ String[] fields = getFieldsToQuery(query.getFields());
+ //Avro fields to Ignite fields
+ List<String> dbFields = new ArrayList<>();
+ for (String af : fields) {
+ dbFields.add(igniteMapping.getFields().get(af).getName());
+ }
+ String selectQuery = IgniteSQLBuilder.selectQuery(igniteMapping,
dbFields);
+ String selectQueryWhere =
IgniteSQLBuilder.selectQueryWhere(igniteMapping, query.getStartKey(),
query.getEndKey(), query.getLimit());
+ try {
+ PreparedStatement stmt = connection.prepareStatement(selectQuery +
selectQueryWhere);
+ RowSetFactory factory = RowSetProvider.newFactory();
+ CachedRowSet rowset = factory.createCachedRowSet();
+ IgniteSQLBuilder.fillSelectQuery(stmt, query.getStartKey(),
query.getEndKey());
+ ResultSet executeQuery = stmt.executeQuery();
+ rowset.populate(executeQuery);
+ IgniteResult<K, T> igniteResult = new IgniteResult<>(this, query);
+ igniteResult.setResultSet(rowset);
+ return igniteResult;
+ } catch (SQLException ex) {
+ throw new GoraException(ex);
+ }
+ }
+
+ public K extractKey(ResultSet r) throws SQLException {
+ assert igniteMapping.getPrimaryKey().size() == 1;
+ return (K) r.getObject(igniteMapping.getPrimaryKey().get(0).getName());
+ }
+
+ @Override
+ public Query<K, T> newQuery() {
+ IgniteQuery<K, T> query = new IgniteQuery<>(this);
+ query.setFields(getFieldsToQuery(null));
+ return query;
+ }
+
+ @Override
+ public List<PartitionQuery<K, T>> getPartitions(Query<K, T> query)
throws IOException {
+ List<PartitionQuery<K, T>> partitions = new ArrayList<>();
+ PartitionQueryImpl<K, T> partitionQuery = new PartitionQueryImpl<>(
+ query);
+ partitionQuery.setConf(getConf());
+ partitions.add(partitionQuery);
+ return partitions;
+ }
+
+ @Override
+ public void flush() throws GoraException {
+ //Auto-commit mode by default
+ }
+
+ @Override
+ public void close() {
+ try {
+ connection.close();
+ LOG.info("Ignite datastore destroyed successfully.");
+ } catch (Exception ex) {
+ LOG.error(ex.getMessage(), ex);
+ }
+ }
+
+ private Object serializeFieldValue(Column get, Schema fieldSchema,
Object fieldValue) {
--- End diff --
Is there any specific reason why we are accepting the variable 'get' in
here, as it is not being used?
> Add a data store for Apache Ignite
> -----------------------------------
>
> Key: GORA-535
> URL: https://issues.apache.org/jira/browse/GORA-535
> Project: Apache Gora
> Issue Type: New Feature
> Reporter: Nishadi Kirielle
> Priority: Major
> Labels: gsoc2018
>
> Currently, Gora has support for persisting objects to various database models
> such as Apache Hbase, Apache Cassandra and much more. [1] This project aims
> to extend its capability to provide support for Apache Ignite database.
> Apache Ignite is a distributed database, caching and processing platform.[2]
> [1]. [http://gora.apache.org/]
> [2] . [https://ignite.apache.org/]
>
--
This message was sent by Atlassian JIRA
(v7.6.3#76005)