[
https://issues.apache.org/jira/browse/GORA-535?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16587242#comment-16587242
]
ASF GitHub Bot commented on GORA-535:
-------------------------------------
Github user djkevincr commented on a diff in the pull request:
https://github.com/apache/gora/pull/134#discussion_r211541050
--- Diff:
gora-ignite/src/main/java/org/apache/gora/ignite/store/IgniteStore.java ---
@@ -0,0 +1,565 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gora.ignite.store;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.sql.Connection;
+import java.sql.DriverManager;
+import java.sql.PreparedStatement;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.sql.Statement;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+import java.util.concurrent.ConcurrentHashMap;
+import javax.sql.rowset.CachedRowSet;
+import javax.sql.rowset.RowSetFactory;
+import javax.sql.rowset.RowSetProvider;
+import org.apache.avro.Schema;
+import org.apache.avro.specific.SpecificDatumReader;
+import org.apache.avro.specific.SpecificDatumWriter;
+import org.apache.avro.util.Utf8;
+import org.apache.gora.ignite.query.IgniteQuery;
+import org.apache.gora.ignite.query.IgniteResult;
+import org.apache.gora.ignite.utils.IgniteBackendConstants;
+import org.apache.gora.ignite.utils.IgniteSQLBuilder;
+import org.apache.gora.persistency.Persistent;
+import org.apache.gora.persistency.impl.PersistentBase;
+import org.apache.gora.query.PartitionQuery;
+import org.apache.gora.query.Query;
+import org.apache.gora.query.Result;
+import org.apache.gora.query.impl.PartitionQueryImpl;
+import org.apache.gora.store.impl.DataStoreBase;
+import org.apache.gora.util.AvroUtils;
+import org.apache.gora.util.GoraException;
+import org.apache.gora.util.IOUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Implementation of a Ignite data store to be used by gora.
+ *
+ * @param <K> class to be used for the key
+ * @param <T> class to be persisted within the store
+ */
+public class IgniteStore<K, T extends PersistentBase> extends
DataStoreBase<K, T> {
+
+ private static final Logger LOG =
LoggerFactory.getLogger(IgniteStore.class);
+ private static final String PARSE_MAPPING_FILE_KEY =
"gora.ignite.mapping.file";
+ private static final String DEFAULT_MAPPING_FILE =
"gora-ignite-mapping.xml";
+ private IgniteParameters igniteParameters;
+ private IgniteMapping igniteMapping;
+ private Connection connection;
+ private static final ConcurrentHashMap<Schema, SpecificDatumReader<?>>
readerMap = new ConcurrentHashMap<>();
+ private static final ConcurrentHashMap<Schema, SpecificDatumWriter<?>>
writerMap = new ConcurrentHashMap<>();
+
+ @Override
+ public void initialize(Class<K> keyClass, Class<T> persistentClass,
Properties properties) throws GoraException {
+ try {
+ super.initialize(keyClass, persistentClass, properties);
+ IgniteMappingBuilder<K, T> builder = new IgniteMappingBuilder<K,
T>(this);
+ builder.readMappingFile(getConf().get(PARSE_MAPPING_FILE_KEY,
DEFAULT_MAPPING_FILE));
+ igniteMapping = builder.getIgniteMapping();
+ igniteParameters = IgniteParameters.load(properties);
+ connection = acquireConnection();
+ LOG.info("Ignite store was successfully initialized");
+ } catch (ClassNotFoundException | SQLException ex) {
+ LOG.error("Error while initializing Ignite store", ex);
+ throw new GoraException(ex);
+ }
+ }
+
+ private Connection acquireConnection() throws ClassNotFoundException,
SQLException {
+ Class.forName("org.apache.ignite.IgniteJdbcThinDriver");
+ StringBuilder urlBuilder = new StringBuilder();
+ urlBuilder.append("jdbc:ignite:thin://");
+ urlBuilder.append(igniteParameters.getHost());
+ if (igniteParameters.getPort() != null) {
+ urlBuilder.append(":" + igniteParameters.getPort());
+ }
+ if (igniteParameters.getSchema() != null) {
+ urlBuilder.append("/" + igniteParameters.getSchema());
+ }
+ if (igniteParameters.getUser() != null) {
+ urlBuilder.append(";" + igniteParameters.getUser());
+ }
+ if (igniteParameters.getPassword() != null) {
+ urlBuilder.append(";" + igniteParameters.getPassword());
+ }
+ if (igniteParameters.getAdditionalConfigurations() != null) {
+ urlBuilder.append(igniteParameters.getAdditionalConfigurations());
+ }
+ return DriverManager.getConnection(urlBuilder.toString());
+ }
+
+ @Override
+ public String getSchemaName() {
+ return igniteMapping.getTableName();
+ }
+
+ @Override
+ public String getSchemaName(final String mappingSchemaName,
+ final Class<?> persistentClass) {
+ return super.getSchemaName(mappingSchemaName, persistentClass);
+ }
+
+ @Override
+ public void createSchema() throws GoraException {
+ if (connection == null) {
+ throw new GoraException(
+ "Impossible to create the schema as no connection has been
initiated.");
+ }
+ if (schemaExists()) {
+ return;
+ }
+ try (Statement stmt = connection.createStatement()) {
+ String createTableSQL = IgniteSQLBuilder.createTable(igniteMapping);
+ stmt.executeUpdate(createTableSQL);
+ LOG.info("Table {} has been created for Ignite instance.",
+ igniteMapping.getTableName());
+ } catch (SQLException ex) {
+ throw new GoraException(ex);
+ }
+ }
+
+ @Override
+ public void deleteSchema() throws GoraException {
+ if (connection == null) {
+ throw new GoraException(
+ "Impossible to delete the schema as no connection has been
initiated.");
+ }
+ try (Statement stmt = connection.createStatement()) {
+ String dropTableSQL =
IgniteSQLBuilder.dropTable(igniteMapping.getTableName());
+ stmt.executeUpdate(dropTableSQL);
+ LOG.info("Table {} has been dropped from Ignite instance.",
+ igniteMapping.getTableName());
+ } catch (SQLException ex) {
+ throw new GoraException(ex);
+ }
+ }
+
+ @Override
+ public boolean schemaExists() throws GoraException {
+ try (Statement stmt = connection.createStatement()) {
+ String tableExistsSQL =
IgniteSQLBuilder.tableExists(igniteMapping.getTableName());
+ ResultSet executeQuery = stmt.executeQuery(tableExistsSQL);
+ executeQuery.close();
+ return true;
+ } catch (SQLException ex) {
+ if (ex.getSQLState() != null
+ &&
ex.getSQLState().equals(IgniteBackendConstants.DEFAULT_IGNITE_TABLE_NOT_EXISTS_CODE))
{
+ return false;
+ } else {
+ throw new GoraException(ex);
+ }
+ }
+ }
+
+ @Override
+ public T get(K key, String[] fields) throws GoraException {
+ String[] avFields = getFieldsToQuery(fields);
+ Object[] keyl = null;
+ if (igniteMapping.getPrimaryKey().size() == 1) {
+ keyl = new Object[]{key};
+ } else {
+ //Composite key pending
+ }
+ //Avro fields to Ignite fields
+ List<String> dbFields = new ArrayList<>();
+ for (String af : avFields) {
+ dbFields.add(igniteMapping.getFields().get(af).getName());
+ }
+ String selectQuery = IgniteSQLBuilder.selectGet(igniteMapping,
dbFields);
+ try (PreparedStatement stmt =
connection.prepareStatement(selectQuery)) {
+ IgniteSQLBuilder.fillSelectStatement(stmt, igniteMapping, keyl);
+ ResultSet rs = stmt.executeQuery();
+ boolean data = rs.next();
+ T resp = null;
+ if (data) {
+ resp = newInstance(rs, fields);
+ if (rs.next()) {
+ LOG.warn("Multiple results for primary key {} in the schema {},
ignoring additional rows.", keyl, igniteMapping.getTableName());
+ }
+ }
+ rs.close();
+ return resp;
+ } catch (SQLException | IOException ex) {
+ throw new GoraException(ex);
+ }
+
--- End diff --
Please remove unnecessary new lines.
> Add a data store for Apache Ignite
> -----------------------------------
>
> Key: GORA-535
> URL: https://issues.apache.org/jira/browse/GORA-535
> Project: Apache Gora
> Issue Type: New Feature
> Reporter: Nishadi Kirielle
> Priority: Major
> Labels: gsoc2018
>
> Currently, Gora has support for persisting objects to various database models
> such as Apache Hbase, Apache Cassandra and much more. [1] This project aims
> to extend its capability to provide support for Apache Ignite database.
> Apache Ignite is a distributed database, caching and processing platform.[2]
> [1]. [http://gora.apache.org/]
> [2] . [https://ignite.apache.org/]
>
--
This message was sent by Atlassian JIRA
(v7.6.3#76005)