[
https://issues.apache.org/jira/browse/GORA-535?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16544361#comment-16544361
]
ASF GitHub Bot commented on GORA-535:
-------------------------------------
Github user lewismc commented on a diff in the pull request:
https://github.com/apache/gora/pull/134#discussion_r202526790
--- Diff:
gora-ignite/src/main/java/org/apache/gora/ignite/store/IgniteStore.java ---
@@ -0,0 +1,578 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gora.ignite.store;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.sql.Connection;
+import java.sql.DriverManager;
+import java.sql.PreparedStatement;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.sql.Statement;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+import java.util.concurrent.ConcurrentHashMap;
+import javax.sql.rowset.CachedRowSet;
+import javax.sql.rowset.RowSetFactory;
+import javax.sql.rowset.RowSetProvider;
+import org.apache.avro.Schema;
+import org.apache.avro.specific.SpecificDatumReader;
+import org.apache.avro.specific.SpecificDatumWriter;
+import org.apache.avro.util.Utf8;
+import org.apache.gora.ignite.query.IgniteQuery;
+import org.apache.gora.ignite.query.IgniteResult;
+import org.apache.gora.ignite.utils.IgniteSQLBuilder;
+import org.apache.gora.persistency.Persistent;
+import org.apache.gora.persistency.impl.PersistentBase;
+import org.apache.gora.query.PartitionQuery;
+import org.apache.gora.query.Query;
+import org.apache.gora.query.Result;
+import org.apache.gora.query.impl.PartitionQueryImpl;
+import org.apache.gora.store.impl.DataStoreBase;
+import org.apache.gora.util.AvroUtils;
+import org.apache.gora.util.GoraException;
+import org.apache.gora.util.IOUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Implementation of a Ignite data store to be used by gora.
+ *
+ * @param <K> class to be used for the key
+ * @param <T> class to be persisted within the store
+ */
+public class IgniteStore<K, T extends PersistentBase> extends
DataStoreBase<K, T> {
+
+ public static final Logger LOG =
LoggerFactory.getLogger(IgniteStore.class);
+ private static final String PARSE_MAPPING_FILE_KEY =
"gora.ignite.mapping.file";
+ private static final String DEFAULT_MAPPING_FILE =
"gora-ignite-mapping.xml";
+ private IgniteParameters igniteParameters;
+ private IgniteMapping igniteMapping;
+ private Connection connection;
+
+ /*
+ * Create a threadlocal map for the datum readers and writers, because
they
+ * are not thread safe, at least not before Avro 1.4.0 (See AVRO-650).
When
+ * they are thread safe, it is possible to maintain a single reader and
writer
+ * pair for every schema, instead of one for every thread.
+ */
+ public static final ConcurrentHashMap<Schema, SpecificDatumReader<?>>
readerMap = new ConcurrentHashMap<>();
+
+ public static final ConcurrentHashMap<Schema, SpecificDatumWriter<?>>
writerMap = new ConcurrentHashMap<>();
+
+ @Override
+ public void initialize(Class<K> keyClass, Class<T> persistentClass,
Properties properties) throws GoraException {
+
+ try {
+ super.initialize(keyClass, persistentClass, properties);
+ IgniteMappingBuilder builder = new IgniteMappingBuilder(this);
+ builder.readMappingFile(getConf().get(PARSE_MAPPING_FILE_KEY,
DEFAULT_MAPPING_FILE));
+ igniteMapping = builder.getIgniteMapping();
+ igniteParameters = IgniteParameters.load(properties, conf);
+ connection = acquiereConnection();
+ LOG.info("Ignite store was successfully initialized");
+ } catch (ClassNotFoundException | SQLException ex) {
+ LOG.error("Error while initializing Ignite store", ex);
+ throw new GoraException(ex);
+ }
+ }
+
+ private Connection acquiereConnection() throws ClassNotFoundException,
SQLException {
+ Class.forName("org.apache.ignite.IgniteJdbcThinDriver");
+ StringBuilder urlBuilder = new StringBuilder();
+ urlBuilder.append("jdbc:ignite:thin://");
+ urlBuilder.append(igniteParameters.getHost());
+ if (igniteParameters.getPort() != null) {
+ urlBuilder.append(":" + igniteParameters.getPort());
+ }
+ if (igniteParameters.getSchema() != null) {
+ urlBuilder.append("/" + igniteParameters.getSchema());
+ }
+ if (igniteParameters.getUser() != null) {
+ urlBuilder.append(";" + igniteParameters.getUser());
+ }
+ if (igniteParameters.getPassword() != null) {
+ urlBuilder.append(";" + igniteParameters.getPassword());
+ }
+ if (igniteParameters.getAdditionalConfigurations() != null) {
+ urlBuilder.append(igniteParameters.getAdditionalConfigurations());
+ }
+ Connection conn = DriverManager.getConnection(urlBuilder.toString());
+ return conn;
+ }
+
+ @Override
+ public String getSchemaName() {
+ return igniteMapping.getTableName();
+ }
+
+ @Override
+ public String getSchemaName(final String mappingSchemaName,
+ final Class<?> persistentClass) {
+ return super.getSchemaName(mappingSchemaName, persistentClass);
+ }
+
+ @Override
+ public void createSchema() throws GoraException {
+ if (connection == null) {
+ throw new GoraException(
+ "Impossible to create the schema as no connection has been
initiated.");
+ }
+ if (schemaExists()) {
+ return;
+ }
+ try (Statement stmt = connection.createStatement()) {
+ String createTableSQL = IgniteSQLBuilder.createTable(igniteMapping);
+ stmt.executeUpdate(createTableSQL);
+ LOG.info("Table {} has been created for Ignite instance.",
+ igniteMapping.getTableName());
+ } catch (SQLException ex) {
+ throw new GoraException(ex);
+ }
+ }
+
+ @Override
+ public void deleteSchema() throws GoraException {
+ if (connection == null) {
+ throw new GoraException(
+ "Impossible to delete the schema as no connection has been
initiated.");
+ }
+ try (Statement stmt = connection.createStatement()) {
+ String dropTableSQL =
IgniteSQLBuilder.dropTable(igniteMapping.getTableName());
+ stmt.executeUpdate(dropTableSQL);
+ LOG.info("Table {} has been dropped from Ignite instance.",
+ igniteMapping.getTableName());
+ } catch (SQLException ex) {
+ throw new GoraException(ex);
+ }
+ }
+
+ @Override
+ public boolean schemaExists() throws GoraException {
+ boolean exists = false;
+ try (Statement stmt = connection.createStatement()) {
+ String tableExistsSQL =
IgniteSQLBuilder.tableExists(igniteMapping.getTableName());
+ ResultSet executeQuery = stmt.executeQuery(tableExistsSQL);
+ executeQuery.close();
+ exists = true;
+ } catch (SQLException ex) {
+ /**
+ * a 42000 error code is thrown by Ignite when a non-existent table
+ * queried. More details:
+ * https://apacheignite-sql.readme.io/docs/jdbc-error-codes
+ */
+ if (ex.getSQLState() != null && ex.getSQLState().equals("42000")) {
+ exists = false;
+ } else {
+ throw new GoraException(ex);
+ }
+ }
+ return exists;
+ }
+
+ @Override
+ public T get(K key, String[] fields) throws GoraException {
+ String[] avFields = getFieldsToQuery(fields);
+ Object[] keyl = null;
+ if (igniteMapping.getPrimaryKey().size() == 1) {
+ keyl = new Object[]{key};
+ } else {
+ //Composite key pending
+ }
+ //Avro fields to Ignite fields
+ List<String> dbFields = new ArrayList<>();
+ for (String af : avFields) {
+ dbFields.add(igniteMapping.getFields().get(af).getName());
+ }
+ String selectQuery = IgniteSQLBuilder.selectGet(igniteMapping,
dbFields);
+ try (PreparedStatement stmt =
connection.prepareStatement(selectQuery)) {
+ IgniteSQLBuilder.fillSelectStatement(stmt, igniteMapping, keyl);
+ ResultSet rs = stmt.executeQuery();
+ boolean data = rs.next();
+ T resp = null;
+ if (data) {
+ resp = newInstance(rs, fields);
+ if (rs.next()) {
+ LOG.warn("Multiple results for primary key {} in the schema {},
ignoring additional rows.", keyl, igniteMapping.getTableName());
+ }
+ }
+ rs.close();
+ return resp;
+ } catch (SQLException | IOException ex) {
+ throw new GoraException(ex);
+ }
+
+ }
+
+ public T newInstance(ResultSet rs, String[] fields) throws
GoraException, SQLException, IOException {
+ fields = getFieldsToQuery(fields);
+ T persistent = newPersistent();
+ for (String f : fields) {
+ Schema.Field field = fieldMap.get(f);
+ Schema fieldSchema = field.schema();
+ String dbField = igniteMapping.getFields().get(f).getName();
+ Object sv = rs.getObject(dbField);
+ if (sv == null) {
+ continue;
+ }
+ Object v = deserializeFieldValue(field, fieldSchema, sv, persistent);
+ persistent.put(field.pos(), v);
+ persistent.setDirty(field.pos());
+ }
+ return persistent;
+ }
+
+ private Object deserializeFieldValue(Schema.Field field, Schema
fieldSchema,
+ Object igniteValue, T persistent) throws IOException {
+ Object fieldValue = null;
+ switch (fieldSchema.getType()) {
+ case MAP:
+ case ARRAY:
+ case RECORD:
+ @SuppressWarnings("rawtypes") SpecificDatumReader reader =
getDatumReader(fieldSchema);
+ fieldValue = IOUtils.deserialize((byte[]) igniteValue, reader,
--- End diff --
Type safety: Unchecked invocation deserialize(byte[], SpecificDatumReader,
Object) of the generic method deserialize(byte[], SpecificDatumReader< T >, T)
of type IOUtils
> Add a data store for Apache Ignite
> -----------------------------------
>
> Key: GORA-535
> URL: https://issues.apache.org/jira/browse/GORA-535
> Project: Apache Gora
> Issue Type: New Feature
> Reporter: Nishadi Kirielle
> Priority: Major
> Labels: gsoc2018
>
> Currently, Gora has support for persisting objects to various database models
> such as Apache Hbase, Apache Cassandra and much more. [1] This project aims
> to extend its capability to provide support for Apache Ignite database.
> Apache Ignite is a distributed database, caching and processing platform.[2]
> [1]. [http://gora.apache.org/]
> [2] . [https://ignite.apache.org/]
>
--
This message was sent by Atlassian JIRA
(v7.6.3#76005)