This is an automated email from the ASF dual-hosted git repository. djkevincr pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/gora.git
The following commit(s) were added to refs/heads/master by this push: new 504833f GORA-320 Hive backend support in Gora (#187) 504833f is described below commit 504833f99f9bffd3b92bd46a48daebdabf1ccd57 Author: Chanaka Balasooriya <chanaka...@gmail.com> AuthorDate: Tue Aug 27 21:39:55 2019 +0530 GORA-320 Hive backend support in Gora (#187) * GORA 320 Hive backend support for Gora * Add type safety issues * Configure hive embedded server for testing * Support delete queries in gora-hive * Moving hive dependencies to parent pom * Add hive test server * Resolve hive dependency issues * Remove gora-hive test profile --- .../org/apache/gora/store/DataStoreTestUtil.java | 10 +- gora-hive/pom.xml | 217 +++++++++ .../java/org/apache/gora/hive/package-info.java | 21 + .../java/org/apache/gora/hive/query/HiveQuery.java | 40 ++ .../org/apache/gora/hive/query/HiveResult.java | 79 ++++ .../org/apache/gora/hive/query/package-info.java | 21 + .../apache/gora/hive/store/HiveDataContext.java | 256 ++++++++++ .../org/apache/gora/hive/store/HiveMapping.java | 95 ++++ .../apache/gora/hive/store/HiveMappingBuilder.java | 157 +++++++ .../java/org/apache/gora/hive/store/HiveStore.java | 374 +++++++++++++++ .../gora/hive/store/HiveStoreParameters.java | 123 +++++ .../org/apache/gora/hive/store/package-info.java | 21 + .../apache/gora/hive/util/HiveQueryBuilder.java | 521 +++++++++++++++++++++ .../apache/gora/hive/util/HiveResultParser.java | 243 ++++++++++ .../org/apache/gora/hive/util/package-info.java | 22 + .../org/apache/gora/hive/GoraHiveTestDriver.java | 49 ++ .../java/org/apache/gora/hive/package-info.java | 21 + .../org/apache/gora/hive/store/TestHiveStore.java | 171 +++++++ .../org/apache/gora/hive/store/package-info.java | 21 + .../org/apache/gora/hive/util/HiveTestServer.java | 109 +++++ .../org/apache/gora/hive/util/package-info.java | 21 + gora-hive/src/test/resources/gora-hive-mapping.xml | 45 ++ gora-hive/src/test/resources/gora.properties | 28 ++ gora-hive/src/test/resources/hive-site.xml | 85 ++++ pom.xml | 75 ++- 25 files changed, 2817 insertions(+), 8 deletions(-) diff --git a/gora-core/src/test/java/org/apache/gora/store/DataStoreTestUtil.java b/gora-core/src/test/java/org/apache/gora/store/DataStoreTestUtil.java index bb935ba..a452999 100644 --- a/gora-core/src/test/java/org/apache/gora/store/DataStoreTestUtil.java +++ b/gora-core/src/test/java/org/apache/gora/store/DataStoreTestUtil.java @@ -105,7 +105,7 @@ public class DataStoreTestUtil { return employee; } - private static <K> WebPage createWebPage() { + public static <K> WebPage createWebPage() { WebPage webpage = WebPage.newBuilder().build(); webpage.setUrl(new Utf8("url..")); webpage.setContent(ByteBuffer.wrap("test content".getBytes(Charset.defaultCharset()))); @@ -338,7 +338,7 @@ public class DataStoreTestUtil { * @param employee * @param after */ - private static void assertEqualEmployeeObjects(Employee employee, Employee after) { + public static void assertEqualEmployeeObjects(Employee employee, Employee after) { //for (int i = 1; i < employee.SCHEMA$.getFields().size(); i++) { // for (int j = 1; j < after.SCHEMA$.getFields().size(); j++) { // assertEquals(employee.SCHEMA$.getFields().get(i), after.SCHEMA$.getFields().get(j)); @@ -388,7 +388,7 @@ public class DataStoreTestUtil { * @param beforeWebPage * @param afterWebPage */ - private static void assertEqualWebPageObjects(WebPage beforeWebPage, WebPage afterWebPage) { + public static void assertEqualWebPageObjects(WebPage beforeWebPage, WebPage afterWebPage) { //check url field CharSequence beforeUrl = beforeWebPage.getUrl(); CharSequence afterUrl = afterWebPage.getUrl(); @@ -424,7 +424,7 @@ public class DataStoreTestUtil { * @param beforeMetadata * @param afterMetadata */ - private static void assertEqualMetadataObjects(Metadata beforeMetadata, Metadata afterMetadata) { + public static void assertEqualMetadataObjects(Metadata beforeMetadata, Metadata afterMetadata) { //check version field int beforeVersion = beforeMetadata.getVersion(); int afterVersion = afterMetadata.getVersion(); @@ -767,7 +767,7 @@ public class DataStoreTestUtil { " actual=" + CONTENTS[i] + " i=" + i , Arrays.equals( toByteArray(page.getContent() ) , CONTENTS[i].getBytes(Charset.defaultCharset()))); - + List<CharSequence> parsedContent = page.getParsedContent(); assertNotNull(parsedContent); assertTrue(parsedContent.size() > 0); diff --git a/gora-hive/pom.xml b/gora-hive/pom.xml new file mode 100644 index 0000000..e1ae565 --- /dev/null +++ b/gora-hive/pom.xml @@ -0,0 +1,217 @@ +<?xml version="1.0" encoding="UTF-8"?> +<!-- + ~ Licensed to the Apache Software Foundation (ASF) under one + ~ or more contributor license agreements. See the NOTICE file + ~ distributed with this work for additional information + ~ regarding copyright ownership. The ASF licenses this file + ~ to you under the Apache License, Version 2.0 (the + ~ "License"); you may not use this file except in compliance + ~ with the License. You may obtain a copy of the License at + ~ + ~ http://www.apache.org/licenses/LICENSE-2.0 + ~ + ~ Unless required by applicable law or agreed to in writing, software + ~ distributed under the License is distributed on an "AS IS" BASIS, + ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + ~ See the License for the specific language governing permissions and + ~ limitations under the License. + --> + +<project xmlns="http://maven.apache.org/POM/4.0.0" + xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" + xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> + <modelVersion>4.0.0</modelVersion> + + <parent> + <groupId>org.apache.gora</groupId> + <artifactId>gora</artifactId> + <version>1.0-SNAPSHOT</version> + <relativePath>../</relativePath> + </parent> + <artifactId>gora-hive</artifactId> + <packaging>bundle</packaging> + + <name>Apache Gora :: Hive</name> + <url>http://gora.apache.org</url> + <description>The Apache Gora open source framework provides an in-memory data model and + persistence for big data. Gora supports persisting to column stores, key value stores, + document stores and RDBMSs, and analyzing the data with extensive Apache Hadoop MapReduce + support. + </description> + <inceptionYear>2010</inceptionYear> + <organization> + <name>The Apache Software Foundation</name> + <url>http://www.apache.org/</url> + </organization> + <issueManagement> + <system>JIRA</system> + <url>https://issues.apache.org/jira/browse/GORA</url> + </issueManagement> + <ciManagement> + <system>Jenkins</system> + <url>https://builds.apache.org/job/Gora-trunk/</url> + </ciManagement> + + <properties> + <osgi.import>*</osgi.import> + <osgi.export>org.apache.gora.hive*;version="${project.version}";-noimport:=true</osgi.export> + </properties> + + <build> + <directory>target</directory> + <outputDirectory>target/classes</outputDirectory> + <finalName>${project.artifactId}-${project.version}</finalName> + <testOutputDirectory>target/test-classes</testOutputDirectory> + <testSourceDirectory>src/test/java</testSourceDirectory> + <sourceDirectory>src/main/java</sourceDirectory> + <testResources> + <testResource> + <directory>${project.basedir}/src/test/resources</directory> + <includes> + <include>**/*</include> + </includes> + </testResource> + </testResources> + <plugins> + <plugin> + <groupId>org.codehaus.mojo</groupId> + <artifactId>build-helper-maven-plugin</artifactId> + <version>${build-helper-maven-plugin.version}</version> + <executions> + <execution> + <phase>generate-sources</phase> + <goals> + <goal>add-source</goal> + </goals> + <configuration> + <sources> + <source>src/examples/java</source> + </sources> + </configuration> + </execution> + </executions> + </plugin> + </plugins> + </build> + + <dependencies> + <!-- Gora Internal Dependencies --> + <dependency> + <groupId>org.apache.gora</groupId> + <artifactId>gora-core</artifactId> + <exclusions> + <exclusion> + <groupId>org.apache.logging.log4j</groupId> + <artifactId>log4j-slf4j-impl</artifactId> + </exclusion> + <exclusion> + <groupId>com.google.guava</groupId> + <artifactId>guava</artifactId> + </exclusion> + </exclusions> + </dependency> + + <dependency> + <groupId>org.apache.gora</groupId> + <artifactId>gora-core</artifactId> + <type>test-jar</type> + <scope>test</scope> + <exclusions> + <exclusion> + <groupId>com.google.guava</groupId> + <artifactId>guava</artifactId> + </exclusion> + </exclusions> + </dependency> + + <dependency> + <groupId>org.jdom</groupId> + <artifactId>jdom</artifactId> + </dependency> + + <!-- Logging Dependencies --> + <dependency> + <groupId>org.slf4j</groupId> + <artifactId>slf4j-log4j12</artifactId> + </dependency> + + <dependency> + <groupId>log4j</groupId> + <artifactId>log4j</artifactId> + <exclusions> + <exclusion> + <groupId>javax.jms</groupId> + <artifactId>jms</artifactId> + </exclusion> + </exclusions> + </dependency> + + <!-- Testing Dependencies --> + <dependency> + <groupId>junit</groupId> + <artifactId>junit</artifactId> + </dependency> + + <dependency> + <groupId>org.apache.hadoop</groupId> + <artifactId>hadoop-minicluster</artifactId> + </dependency> + + <!-- 2.6 version of hadoop-common is defined only for gora-hive + as it creates some backward compatibility issues by defining in parent--> + <dependency> + <groupId>org.apache.hadoop</groupId> + <artifactId>hadoop-common</artifactId> + <version>${hadoop-common.version}</version> + </dependency> + + <!-- Hive Dependencies --> + <dependency> + <groupId>org.apache.hive</groupId> + <artifactId>hive-jdbc</artifactId> + <exclusions> + <exclusion> + <groupId>org.apache.logging.log4j</groupId> + <artifactId>log4j-slf4j-impl</artifactId> + </exclusion> + </exclusions> + </dependency> + + <dependency> + <groupId>org.apache.hive</groupId> + <artifactId>hive-service</artifactId> + <exclusions> + <exclusion> + <groupId>org.apache.logging.log4j</groupId> + <artifactId>log4j-slf4j-impl</artifactId> + </exclusion> + </exclusions> + </dependency> + + <dependency> + <groupId>org.apache.hive</groupId> + <artifactId>hive-exec</artifactId> + <exclusions> + <exclusion> + <groupId>org.apache.logging.log4j</groupId> + <artifactId>log4j-slf4j-impl</artifactId> + </exclusion> + </exclusions> + </dependency> + + <dependency> + <groupId>org.apache.metamodel</groupId> + <artifactId>MetaModel-jdbc</artifactId> + </dependency> + + <dependency> + <groupId>org.apache.metamodel</groupId> + <artifactId>MetaModel-core</artifactId> + </dependency> + + <dependency> + <groupId>org.json</groupId> + <artifactId>json</artifactId> + </dependency> + </dependencies> +</project> \ No newline at end of file diff --git a/gora-hive/src/main/java/org/apache/gora/hive/package-info.java b/gora-hive/src/main/java/org/apache/gora/hive/package-info.java new file mode 100644 index 0000000..654732e --- /dev/null +++ b/gora-hive/src/main/java/org/apache/gora/hive/package-info.java @@ -0,0 +1,21 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +/** + * This package contains Hive store related all classes + */ +package org.apache.gora.hive; \ No newline at end of file diff --git a/gora-hive/src/main/java/org/apache/gora/hive/query/HiveQuery.java b/gora-hive/src/main/java/org/apache/gora/hive/query/HiveQuery.java new file mode 100644 index 0000000..7f587ea --- /dev/null +++ b/gora-hive/src/main/java/org/apache/gora/hive/query/HiveQuery.java @@ -0,0 +1,40 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gora.hive.query; + +import org.apache.gora.persistency.impl.PersistentBase; +import org.apache.gora.query.impl.QueryBase; +import org.apache.gora.store.DataStore; + +/** + * HiveDataStore specific implementation of the {@link org.apache.gora.query.Query} interface. + * @param <K> Key Class Type + * @param <T> Persistence Class Type + * + */ +public class HiveQuery<K, T extends PersistentBase> extends QueryBase<K, T> { + + public HiveQuery() { + super(null); + } + + public HiveQuery(DataStore<K, T> dataStore) { + super(dataStore); + } +} diff --git a/gora-hive/src/main/java/org/apache/gora/hive/query/HiveResult.java b/gora-hive/src/main/java/org/apache/gora/hive/query/HiveResult.java new file mode 100644 index 0000000..203d498 --- /dev/null +++ b/gora-hive/src/main/java/org/apache/gora/hive/query/HiveResult.java @@ -0,0 +1,79 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gora.hive.query; + +import java.io.IOException; +import java.util.List; +import org.apache.gora.hive.store.HiveStore; +import org.apache.gora.persistency.impl.PersistentBase; +import org.apache.gora.query.Query; +import org.apache.gora.query.impl.ResultBase; +import org.apache.metamodel.data.DataSet; +import org.apache.metamodel.data.Row; + +/** + * Hive Query Result implementation of the the {@link org.apache.gora.query.Result} interface. + */ +public class HiveResult<K, T extends PersistentBase> extends ResultBase<K, T> { + + private List<Row> results; + private int currentRow= 0 ; + + public HiveResult(HiveStore<K, T> dataStore, Query<K, T> query) { + super(dataStore, query); + } + + public HiveResult(HiveStore<K, T> dataStore, Query<K, T> query, DataSet dataSet) { + super(dataStore, query); + results = dataSet.toRows(); + currentRow = 0; + } + + @Override + public HiveStore<K, T> getDataStore() { + return (HiveStore<K, T>) super.getDataStore(); + } + + @Override + public float getProgress() throws IOException, InterruptedException { + return currentRow/(size()+1.0f); + } + + @Override + public int size() { + return (results == null) ? 0 : results.size(); + } + + @Override + protected boolean nextInner() throws IOException { + try { + if (results == null || currentRow == results.size()){ + return false; + } + HiveStore<K, T> hiveStore = ((HiveStore<K, T>) dataStore); + Row nextRow = results.get(currentRow); + key = hiveStore.readKey(nextRow); + persistent = hiveStore.readObject(nextRow); + currentRow++; + return true; + } catch (Exception ex) { + throw new IOException(ex); + } + } +} diff --git a/gora-hive/src/main/java/org/apache/gora/hive/query/package-info.java b/gora-hive/src/main/java/org/apache/gora/hive/query/package-info.java new file mode 100644 index 0000000..fa1e33c --- /dev/null +++ b/gora-hive/src/main/java/org/apache/gora/hive/query/package-info.java @@ -0,0 +1,21 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +/** + * This package contains all Hive query and result representing classes + */ +package org.apache.gora.hive.query; \ No newline at end of file diff --git a/gora-hive/src/main/java/org/apache/gora/hive/store/HiveDataContext.java b/gora-hive/src/main/java/org/apache/gora/hive/store/HiveDataContext.java new file mode 100644 index 0000000..256d1af --- /dev/null +++ b/gora-hive/src/main/java/org/apache/gora/hive/store/HiveDataContext.java @@ -0,0 +1,256 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gora.hive.store; + + +import java.lang.invoke.MethodHandles; +import java.sql.Connection; +import java.sql.PreparedStatement; +import java.sql.SQLException; +import java.sql.Statement; +import java.util.List; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.LinkedBlockingQueue; +import org.apache.gora.util.GoraException; +import org.apache.metamodel.DataContext; +import org.apache.metamodel.MetaModelException; +import org.apache.metamodel.UpdateScript; +import org.apache.metamodel.UpdateSummary; +import org.apache.metamodel.UpdateableDataContext; +import org.apache.metamodel.data.DataSet; +import org.apache.metamodel.factory.DataContextPropertiesImpl; +import org.apache.metamodel.jdbc.JdbcDataContext; +import org.apache.metamodel.jdbc.JdbcDataContextFactory; +import org.apache.metamodel.query.CompiledQuery; +import org.apache.metamodel.query.Query; +import org.apache.metamodel.query.builder.InitFromBuilder; +import org.apache.metamodel.schema.Column; +import org.apache.metamodel.schema.Schema; +import org.apache.metamodel.schema.Table; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Thread safe implementation to query on hive data context This implements all methods of + * {@link org.apache.metamodel.DataContext} and {@link org.apache.metamodel.UpdateableDataContext} + * and methods to establish a HiveConnection and execute row hive sql strings on that connection + */ +public class HiveDataContext implements DataContext, UpdateableDataContext { + + private static final Logger LOG = LoggerFactory.getLogger((MethodHandles.lookup().lookupClass())); + + private final ThreadLocal<JdbcDataContext> dataContext; + private final DataContextPropertiesImpl dataContextProperties; + private final JdbcDataContextFactory jdbcDataContextFactory; + private final BlockingQueue<JdbcDataContext> dataContextPool = new LinkedBlockingQueue<>(); + // provide if the hive data context is closed and this is used to synchronise creating + // and closing jdbc data contexts + private Boolean isClosed; + + public HiveDataContext(HiveStoreParameters hiveStoreParameters) { + jdbcDataContextFactory = new JdbcDataContextFactory(); + dataContextProperties = generateDataContextProperties(hiveStoreParameters); + this.dataContext = new ThreadLocal<>(); + this.isClosed = false; + } + + /** + * Return jdbc data context assigned to the current thread. + * + * @return {@link org.apache.metamodel.jdbc.JdbcDataContext} Jdbc data context + */ + public JdbcDataContext getDataContext() { + // If the data context is not found or the connection is closed, + // a new data context will be created assuming the current thread accesses + // the data context for the first time or the previous connection closed due to some exceptions + // thrown during the last query + + JdbcDataContext jdbcDataContext = dataContext.get(); + boolean connectionClosed = false; + if (jdbcDataContext != null) { + try { + connectionClosed = jdbcDataContext.getConnection().isClosed(); + } catch (SQLException e) { + LOG.error("Checking connection status failed", e); + } + } + synchronized (isClosed) { + if ((jdbcDataContext == null || connectionClosed) && !isClosed) { + jdbcDataContext = (JdbcDataContext) jdbcDataContextFactory + .create(dataContextProperties, null); + dataContext.set(jdbcDataContext); + dataContextPool.add(jdbcDataContext); + } + } + return jdbcDataContext; + } + + @Override + public DataContext refreshSchemas() { + try { + return getDataContext().refreshSchemas(); + } catch (MetaModelException e) { + LOG.error("Refreshing schema failed", e); + return null; + } + } + + @Override + public List<Schema> getSchemas() { + return getDataContext().getSchemas(); + } + + @Override + public List<String> getSchemaNames() { + return getDataContext().getSchemaNames(); + } + + @Override + public Schema getDefaultSchema() { + return getDataContext().getDefaultSchema(); + } + + @Override + public Schema getSchemaByName(String s) { + return getDataContext().getSchemaByName(s); + } + + @Override + public InitFromBuilder query() { + try { + return getDataContext().query(); + } catch (MetaModelException e) { + LOG.error("Initiating a query failed", e); + return null; + } + } + + @Override + public Query parseQuery(String s) { + return getDataContext().parseQuery(s); + } + + @Override + public DataSet executeQuery(Query query) { + return getDataContext().executeQuery(query); + } + + @Override + public CompiledQuery compileQuery(Query query) { + return getDataContext().compileQuery(query); + } + + @Override + public DataSet executeQuery(CompiledQuery compiledQuery, Object... objects) { + return getDataContext().executeQuery(compiledQuery, objects); + } + + @Override + public DataSet executeQuery(String s) { + return getDataContext().executeQuery(s); + } + + @Override + public UpdateSummary executeUpdate(UpdateScript updateScript) { + return getDataContext().executeUpdate(updateScript); + } + + @Override + public Column getColumnByQualifiedLabel(String s) { + return getDataContext().getColumnByQualifiedLabel(s); + } + + @Override + public Table getTableByQualifiedLabel(String s) { + return getDataContext().getTableByQualifiedLabel(s); + } + + /** + * Close hive data context and clear all its subsequent jdbc data contexts + */ + public void close() { + synchronized (isClosed) { + if (!isClosed) { + for (JdbcDataContext jdbcDataContext : dataContextPool) { + Connection connection = jdbcDataContext.getConnection(); + jdbcDataContext.close(connection); + } + dataContextPool.clear(); + isClosed = true; + } + } + } + + /** + * Execute hive query sql + * + * @param hiveQuery query to be executed + * @throws GoraException throw if a SQLException is thrown + */ + public void executeHiveQL(String hiveQuery) throws GoraException { + try { + if (LOG.isDebugEnabled()) { + LOG.debug("Executing the query : {}", hiveQuery); + } + Connection connection = getDataContext().getConnection(); + Statement statement = connection.createStatement(); + statement.execute(hiveQuery); + statement.close(); + } catch (SQLException e) { + throw new GoraException(e); + } + } + + /** + * Create a prepared statement using the given hive sql query + * + * @param hiveQuery query to be executed + * @return {@link org.apache.hive.jdbc.HivePreparedStatement} hive prepared statement + * @throws GoraException throw if a SQLException is thrown + */ + public PreparedStatement getPreparedStatement(String hiveQuery) throws GoraException { + try { + if (LOG.isDebugEnabled()) { + LOG.debug("Initiating prepared statement for the query : {}", hiveQuery); + } + Connection connection = getDataContext().getConnection(); + return connection.prepareStatement(hiveQuery); + } catch (SQLException e) { + throw new GoraException(e); + } + } + + /** + * Generate DataContextPropertiesImpl using basic properties to establish a connection to Hive + * backend service + * + * @param hiveStoreParameters hive store parameters including at least server url + * @return DataContextPropertiesImpl connection properties + */ + private DataContextPropertiesImpl generateDataContextProperties( + HiveStoreParameters hiveStoreParameters) { + final DataContextPropertiesImpl properties = new DataContextPropertiesImpl(); + properties.put(DataContextPropertiesImpl.PROPERTY_DATA_CONTEXT_TYPE, + HiveStoreParameters.HIVE_DATA_CONTEXT_TYPE); + properties.put(DataContextPropertiesImpl.PROPERTY_URL, hiveStoreParameters.getServerUrl()); + properties + .put(DataContextPropertiesImpl.PROPERTY_DRIVER_CLASS, hiveStoreParameters.getDriverName()); + return properties; + } +} \ No newline at end of file diff --git a/gora-hive/src/main/java/org/apache/gora/hive/store/HiveMapping.java b/gora-hive/src/main/java/org/apache/gora/hive/store/HiveMapping.java new file mode 100644 index 0000000..9def8a7 --- /dev/null +++ b/gora-hive/src/main/java/org/apache/gora/hive/store/HiveMapping.java @@ -0,0 +1,95 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gora.hive.store; + +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +/** + * Mapping definitions for Hive Store + */ +public class HiveMapping { + + public static final String DEFAULT_KEY_NAME = "primary_key"; + + /** + * Name of the schema table to be used + */ + private String tableName; + + /** + * List of field names in the schema + */ + private List<String> fields = new ArrayList<>(); + + /** + * Map of column names mapping to field names + */ + private Map<String, String> columnFieldMap = new HashMap<>(); + + /** + * Getter for table name of the schema + * @return table name as a String + */ + public String getTableName() { + return tableName; + } + + /** + * Setter for the table name of the schema + * @param tableName table name as a string + */ + public void setTableName(String tableName) { + this.tableName = tableName; + } + + /** + * Getter for the list of field names in the schema + * @return field names as a list of strings. + */ + public List<String> getFields() { + return fields; + } + + /** + * Setter for the list of field names in the schema + * @param fields field names as a list of strings. + */ + public void setFields(List<String> fields) { + this.fields = fields; + } + + /** + * Getter for columnFieldMap + * @return map of column names to field names + */ + public Map<String, String> getColumnFieldMap() { + return columnFieldMap; + } + + /** + * Setter for columnFieldMap + * @param columnFieldMap map of column names to field names map + */ + public void setColumnFieldMap(Map<String, String> columnFieldMap) { + this.columnFieldMap = columnFieldMap; + } +} diff --git a/gora-hive/src/main/java/org/apache/gora/hive/store/HiveMappingBuilder.java b/gora-hive/src/main/java/org/apache/gora/hive/store/HiveMappingBuilder.java new file mode 100644 index 0000000..fe0d931 --- /dev/null +++ b/gora-hive/src/main/java/org/apache/gora/hive/store/HiveMappingBuilder.java @@ -0,0 +1,157 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gora.hive.store; + + +import java.io.IOException; +import java.io.InputStream; +import java.lang.invoke.MethodHandles; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Locale; +import java.util.Map; +import org.apache.gora.util.GoraException; +import org.jdom.Element; +import org.jdom.JDOMException; +import org.jdom.input.SAXBuilder; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * This class is for reading a given mapping file and building the hive mappings + * + * @param <K> Key class + * @param <T> Persistent class + */ +public class HiveMappingBuilder<K, T> { + + private static final Logger LOG = LoggerFactory.getLogger((MethodHandles.lookup().lookupClass())); + + //Tag names + private static final String CLASS_TAG = "class"; + private static final String FIELD_TAG = "field"; + + //Attribute names + private static final String KEYCLASS_ATTRIBUTE = "keyClass"; + private static final String TABLE_ATTRIBUTE = "table"; + private static final String NAME_ATTRIBUTE = "name"; + + private HiveStore<?, ?> dataStore; + + public HiveMappingBuilder(HiveStore<K, ?> dataStore) { + this.dataStore = dataStore; + } + + /** + * Reading the given file to build the Hive Mappings. + * + * @param filename path of the mapping file + * @return {@link org.apache.gora.hive.store.HiveMapping} hive mappings + * @throws GoraException exception in reading mappings + */ + @SuppressWarnings("unchecked") + public HiveMapping readMappingFile(String filename) throws GoraException { + HiveMapping mapping = new HiveMapping(); + final Class<T> persistentClass = (Class<T>) dataStore.getPersistentClass(); + final Class<K> keyClass = (Class<K>) dataStore.getKeyClass(); + final SAXBuilder saxBuilder = new SAXBuilder(); + final InputStream is = getClass().getClassLoader().getResourceAsStream(filename); + if (is == null) { + throw new GoraException("hive mapping file:" + filename + " could not be loaded"); + } + final Element root; + try { + root = saxBuilder.build(is).getRootElement(); + } catch (JDOMException | IOException e) { + throw new GoraException("Reading hive mapping file : " + filename + " failed", e); + } + List<Element> classElements = root.getChildren(CLASS_TAG); + if (classElements == null || classElements.isEmpty()) { + throw new GoraException( + "Could not find any class definition in the mapping file:" + filename); + } + parseClassElements(mapping, persistentClass, keyClass, classElements); + if (!validateSchema(mapping)) { + throw new GoraException("Schema validation failed for the mapping file: " + filename); + } + return mapping; + } + + /** + * Validate hive schema mappings + * + * @param mapping Hive mappings + * @return true if a valid schema. otherwise false + */ + private boolean validateSchema(HiveMapping mapping) { + List<String> fields = mapping.getFields(); + if (fields == null || fields.isEmpty()) { + LOG.error("table should have at least single column"); + return false; + } else { + for (String fieldName : fields) { + if (HiveMapping.DEFAULT_KEY_NAME.equals(fieldName)) { + LOG.error("\'{}\' is a reserved keyword and cannot be used as a field name", + HiveMapping.DEFAULT_KEY_NAME); + return false; + } + } + return true; + } + } + + @SuppressWarnings("unchecked") + private void parseClassElements(HiveMapping hiveMapping, Class<T> persistentClass, + Class<K> keyClass, List<Element> classElements) { + for (Element classElement : classElements) { + String persistentClassName = classElement.getAttributeValue(NAME_ATTRIBUTE); + String keyClassName = classElement.getAttributeValue(KEYCLASS_ATTRIBUTE); + //Find a class which matches persistent class name and key class name + if (persistentClassName != null && keyClassName != null && persistentClassName + .equals(persistentClass.getName()) && keyClassName + .equals(keyClass.getName())) { + hiveMapping.setTableName(dataStore + .getSchemaName(classElement.getAttributeValue(TABLE_ATTRIBUTE), + dataStore.getPersistentClass())); + List<Element> fieldElments = classElement.getChildren(FIELD_TAG); + parseFieldElements(hiveMapping, fieldElments); + break; + } + } + } + + private void parseFieldElements(HiveMapping hiveMapping, List<Element> fieldElments) { + if (fieldElments != null) { + List<String> fieldNames = new ArrayList<>(); + Map<String, String> columnFieldMap = new HashMap<>(); + for (Element field : fieldElments) { + String fieldName = field.getAttributeValue(HiveMappingBuilder.NAME_ATTRIBUTE); + if (fieldName == null || fieldName.isEmpty()) { + LOG.warn("Field without a name attribute is found and that field will be ignored"); + } else { + fieldNames.add(fieldName); + columnFieldMap.put(fieldName.toLowerCase(Locale.getDefault()), fieldName); + } + } + hiveMapping.setFields(fieldNames); + hiveMapping.setColumnFieldMap(columnFieldMap); + } + } +} \ No newline at end of file diff --git a/gora-hive/src/main/java/org/apache/gora/hive/store/HiveStore.java b/gora-hive/src/main/java/org/apache/gora/hive/store/HiveStore.java new file mode 100644 index 0000000..d04574e --- /dev/null +++ b/gora-hive/src/main/java/org/apache/gora/hive/store/HiveStore.java @@ -0,0 +1,374 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gora.hive.store; + +import java.io.IOException; +import java.sql.PreparedStatement; +import java.sql.SQLException; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; +import java.util.Properties; +import org.apache.avro.Schema.Field; +import org.apache.gora.hive.query.HiveQuery; +import org.apache.gora.hive.query.HiveResult; +import org.apache.gora.hive.util.HiveQueryBuilder; +import org.apache.gora.hive.util.HiveResultParser; +import org.apache.gora.persistency.impl.PersistentBase; +import org.apache.gora.query.PartitionQuery; +import org.apache.gora.query.Query; +import org.apache.gora.query.Result; +import org.apache.gora.query.impl.PartitionQueryImpl; +import org.apache.gora.store.impl.DataStoreBase; +import org.apache.gora.util.GoraException; +import org.apache.metamodel.data.DataSet; +import org.apache.metamodel.data.Row; +import org.apache.metamodel.delete.DeleteFrom; +import org.apache.metamodel.drop.DropTable; +import org.apache.metamodel.query.SelectItem; +import org.apache.metamodel.query.builder.SatisfiedSelectBuilder; +import org.apache.metamodel.query.builder.SatisfiedWhereBuilder; +import org.apache.metamodel.schema.Column; +import org.apache.metamodel.schema.Table; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Implementation of a Hive data store to be used by gora. + * + * @param <K> class to be used for the key + * @param <T> class to be persisted within the store + */ +public class HiveStore<K, T extends PersistentBase> extends DataStoreBase<K, T> { + + private static final Logger LOG = LoggerFactory.getLogger(HiveStore.class); + + private static final String PARSE_MAPPING_FILE_KEY = "gora.hive.mapping.file"; + + private static final String DEFAULT_MAPPING_FILE = "gora-hive-mapping.xml"; + + + private volatile HiveDataContext dataContext; + private HiveStoreParameters hiveStoreParameters; + private HiveMapping mapping; + private Table schemaTable; + private HiveQueryBuilder queryBuilder; + private HiveResultParser resultParser; + + @Override + public void initialize(Class<K> keyClass, Class<T> persistentClass, Properties properties) + throws GoraException { + LOG.debug("Initializing Hive store"); + super.initialize(keyClass, persistentClass, properties); + hiveStoreParameters = HiveStoreParameters.load(properties); + mapping = new HiveMappingBuilder<Object, Object>((HiveStore<Object, ?>) this) + .readMappingFile(getConf().get(PARSE_MAPPING_FILE_KEY, DEFAULT_MAPPING_FILE)); + try { + dataContext = new HiveDataContext(hiveStoreParameters); + } catch (Exception e) { + LOG.error("Data context creation failed", e); + throw new GoraException(e); + } + queryBuilder = new HiveQueryBuilder(this, mapping); + resultParser = new HiveResultParser(this); + } + + @Override + public String getSchemaName() { + return getSchemaName(mapping.getTableName(), persistentClass); + } + + @Override + protected String getSchemaName(String mappingSchemaName, Class<?> persistentClass) { + return super.getSchemaName(mappingSchemaName, persistentClass); + } + + @Override + public void createSchema() throws GoraException { + if (!schemaExists()) { + LOG.info("Creating hive schema {}", getSchemaName()); + String hiveQuery = queryBuilder.buildCreateQuery(hiveStoreParameters.getDatabaseName(), + fieldMap); + if (LOG.isDebugEnabled()) { + LOG.debug("Hive schema creation query : {}", hiveQuery); + } + dataContext.executeHiveQL(hiveQuery); + dataContext.refreshSchemas(); + } + } + + @Override + public void deleteSchema() throws GoraException { + if (schemaExists()) { + LOG.info("Deleting hive schema {}", getSchemaName()); + DropTable dropTable = new DropTable(getSchemaTable()); + dataContext.executeUpdate(dropTable); + dataContext.refreshSchemas(); + schemaTable = null; + } + } + + @Override + public boolean schemaExists() throws GoraException { + Table table = getSchemaTable(); + return (table != null); + } + + @Override + public boolean exists(K key) throws GoraException { + DataSet dataSet = executeGetQuery(key, new String[]{HiveMapping.DEFAULT_KEY_NAME}); + return (dataSet != null && dataSet.next()); + } + + @Override + public T get(K key, String[] fields) throws GoraException { + if (fields == null || fields.length == 0) { + fields = getFields(); + } + DataSet dataSet = executeGetQuery(key, fields); + return newInstance(dataSet); + } + + /** + * Put an persistent object into the hive store Though we use a key value, currently hive does not + * validate integrity constraints and Hive server may not support update queries on a particular + * key + * + * @param key the key of the object. + * @param obj the Persistent object. + * @throws GoraException throws if putting object is failed + */ + @Override + public void put(K key, T obj) throws GoraException { + try { + List<Object> parementerList = new ArrayList<>(); + String sql = queryBuilder.buildInsertQuery(key, obj, fieldMap, parementerList); + PreparedStatement statement = dataContext.getPreparedStatement(sql); + for (int i = 0; i < parementerList.size(); i++) { + statement.setObject(i + 1, parementerList.get(i)); + } + statement.execute(); + } catch (IOException | SQLException e) { + throw new GoraException(e); + } + } + + /** + * This deletes a record from the hive store for the supplied key. But, delete queries do not + * support in all hive servers and they require some specific server configurations. + * + * @param key the key of the deleting entry. + */ + @Override + public boolean delete(K key) throws GoraException { + Table table = getSchemaTable(); + DeleteFrom delete = new DeleteFrom(table).where(HiveMapping.DEFAULT_KEY_NAME).eq(key); + dataContext.executeUpdate(delete); + return true; + } + + /** + * This deletes all the matching records from hive store. But, delete queries do not support in + * all hive servers and they require some specific server configurations. + * + * @param query matching records to this query will be deleted. + * @return number of deleted entires. -1 if all entries were deleted + */ + @Override + public long deleteByQuery(Query<K, T> query) throws GoraException { + if (query.getKey() == null && query.getStartKey() == null && query.getEndKey() == null) { + deleteSchema(); + createSchema(); + return -1; + } else { + try { + int deleteCount = 0; + Result<K, T> result = query.execute(); + while (result.next()) { + if (this.delete(result.getKey())) { + deleteCount++; + } + } + return deleteCount; + } catch (Exception e) { + throw new GoraException(e); + } + } + } + + @Override + public Result<K, T> execute(Query<K, T> query) throws GoraException { + String[] fields = query.getFields(); + SatisfiedSelectBuilder<?> builder; + if (fields == null || fields.length == 0) { + builder = dataContext.query().from(getSchemaTable()).selectAll(); + } else { + int fieldLength = fields.length; + fields = Arrays.copyOf(fields, fieldLength + 1); + fields[fieldLength] = HiveMapping.DEFAULT_KEY_NAME; + builder = dataContext.query().from(getSchemaTable()).select(fields); + } + K startKey = query.getStartKey(); + K endKey = query.getEndKey(); + if (startKey != null && startKey.equals(endKey)) { + builder.where(HiveMapping.DEFAULT_KEY_NAME).eq(startKey); + } else { + if (startKey != null) { + builder.where(HiveMapping.DEFAULT_KEY_NAME).greaterThanOrEquals(startKey); + } + + if (endKey != null) { + builder.where(HiveMapping.DEFAULT_KEY_NAME).lessThanOrEquals(endKey); + } + } + if (query.getLimit() > 0) { + builder.limit(((Long) query.getLimit()).intValue()); + } + return new HiveResult<>(this, query, dataContext.executeQuery(builder.toQuery())); + } + + @Override + public Query<K, T> newQuery() { + return new HiveQuery<>(this); + } + + @Override + public List<PartitionQuery<K, T>> getPartitions(Query<K, T> query) { + List<PartitionQuery<K, T>> partitions = new ArrayList<>(); + PartitionQueryImpl<K, T> partitionQuery = new PartitionQueryImpl<>( + query); + partitionQuery.setConf(getConf()); + partitions.add(partitionQuery); + return partitions; + } + + @Override + public void flush() { + dataContext.refreshSchemas(); + } + + @Override + public void close() { + dataContext.close(); + } + + @Override + protected String[] getFields() { + List<String> fields = mapping.getFields(); + return fields.toArray(new String[0]); + } + + /** + * Creates a new Persistent instance with the values in 'dataSet' for the fields selected in the + * query + * + * @param dataSet result data set from a hive query + * @return a persistence class instance which content was deserialized + */ + private T newInstance(DataSet dataSet) throws GoraException { + if (dataSet == null) { + return null; + } + dataSet.next(); + return readObject(dataSet.getRow()); + } + + /** + * Return database schemaTable for a given schema name + * + * @return org.apache.metamodel.schema.Table schemaTable + */ + private Table getSchemaTable() throws GoraException { + if (schemaTable != null && schemaTable.getName().equalsIgnoreCase(getSchemaName())) { + return schemaTable; + } + org.apache.metamodel.schema.Schema schema = dataContext + .getSchemaByName(hiveStoreParameters.getDatabaseName()); + if (schema == null) { + throw new GoraException( + "Could not find database for name : " + hiveStoreParameters.getDatabaseName()); + } + schemaTable = schema.getTableByName(getSchemaName()); + return schemaTable; + } + + /** + * Read a resulted row object and parse it to a persistent object + * + * @param row {@link org.apache.metamodel.data.Row} Resulted row object + * @return Persistent object + * @throws GoraException throws if reading of data is failed + */ + public T readObject(Row row) throws GoraException { + if (row == null || row.size() == 0) { + LOG.error("Data set is empty"); + return null; + } + T persistent = newPersistent(); + List<SelectItem> selectItems = row.getSelectItems(); + for (SelectItem selectItem : selectItems) { + Column column = selectItem.getColumn(); + if (HiveMapping.DEFAULT_KEY_NAME.equalsIgnoreCase(column.getName())) { + continue; + } + Field field = fieldMap.get(mapping.getColumnFieldMap().get(column.getName())); + Object value = row.getValue(column); + if (value != null) { + persistent.put(field.name(), resultParser.parseSchema(field.schema(), value)); + persistent.isDirty(field.name()); + } + } + persistent.clearDirty(); + return persistent; + } + + /** + * Read the key value from a given data row + * + * @param row Resulted data row + * @return value of the key field + * @throws GoraException throws if reading of the key value is failed + */ + @SuppressWarnings("unchecked") + public K readKey(Row row) throws GoraException { + if (row == null || row.size() == 0) { + LOG.error("Data set is empty"); + return null; + } + Column keyColumn = getSchemaTable().getColumnByName(HiveMapping.DEFAULT_KEY_NAME); + return (K) row.getValue(keyColumn); + } + + /** + * Execute a select query based on key value + * + * @param key key value + * @param fields filed names to be selected + * @return Resulted data set + * @throws GoraException throws if selection query is failed + */ + private DataSet executeGetQuery(K key, String[] fields) throws GoraException { + Table table = getSchemaTable(); + SatisfiedWhereBuilder<?> query = dataContext.query().from(table).select(fields) + .where(HiveMapping.DEFAULT_KEY_NAME).eq(key); + String sql = query.toQuery().toSql(); + return dataContext.executeQuery(sql); + } +} + diff --git a/gora-hive/src/main/java/org/apache/gora/hive/store/HiveStoreParameters.java b/gora-hive/src/main/java/org/apache/gora/hive/store/HiveStoreParameters.java new file mode 100644 index 0000000..710017d --- /dev/null +++ b/gora-hive/src/main/java/org/apache/gora/hive/store/HiveStoreParameters.java @@ -0,0 +1,123 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gora.hive.store; + +import java.util.Properties; + +/** + * This class is to hold parameters which need to initiate a connection to hive backend service + */ +public class HiveStoreParameters { + /** + * Hive server url can be, jdbc:hive2://<host>:<port>/<db>;initFile=<file>, + * jdbc:hive2:///;initFile=<file>, jdbc:hive2://<host>:<port>/<db>;transportMode=http;httpPath=<http_endpoint>, + * jdbc:hive2://<host>:<port>/<db>;ssl=true;sslTrustStore=<trust_store_path>;trustStorePassword=<trust_store_password>, + * jdbc:hive2://<zookeeper quorum>/;serviceDiscoveryMode=zooKeeper;zooKeeperNamespace=hiveserver2 + */ + private static final String HIVE_SERVER_URL_PROPERTY = "gora.hive.server.url"; + + private static final String HIVE_DATABASE_NAME_PROPERTY = "gora.hive.database.name"; + private static final String HIVE_DRIVER_NAME_PROPERTY = "gora.hive.driver.name"; + + //hive default values + private static final String HIVE_DEFAULT_DATABASE_NAME = "default"; + public static final String HIVE_DEFAULT_DRIVER_NAME = "org.apache.hive.jdbc.HiveDriver"; + public static final String HIVE_DATA_CONTEXT_TYPE = "jdbc"; + + private String databaseName; + private String serverUrl; + private String driverName; + + /** + * This shouldn't be initiated directly + */ + private HiveStoreParameters() { + } + + /** + * Getter for hive server url + * + * @return server url + */ + public String getServerUrl() { + return serverUrl; + } + + /** + * Setter for hive server url + * + * @param serverUrl Sever url + */ + public void setServerUrl(String serverUrl) { + this.serverUrl = serverUrl; + } + + /** + * Getter for hive driver class name. + * + * @return driver class name + */ + public String getDriverName() { + return driverName; + } + + /** + * Setter for hive driver name + * + * @param driverName driver class name + */ + public void setDriverName(String driverName) { + this.driverName = driverName; + } + + /** + * Getter for hive database name + * + * @return database name + */ + public String getDatabaseName() { + return databaseName; + } + + /** + * Setter for hive database name + * + * @param databaseName database name + */ + public void setDatabaseName(String databaseName) { + this.databaseName = databaseName; + } + + /** + * Derive hive store parameters from the properties + * + * @param properties hive properties + * @return HiveStoreParameters object + */ + public static HiveStoreParameters load(Properties properties) { + HiveStoreParameters storeParameters = new HiveStoreParameters(); + storeParameters + .setDatabaseName( + properties.getProperty(HIVE_DATABASE_NAME_PROPERTY, HIVE_DEFAULT_DATABASE_NAME)); + storeParameters.setServerUrl(properties.getProperty(HIVE_SERVER_URL_PROPERTY)); + storeParameters.setDriverName(properties.getProperty(HIVE_DRIVER_NAME_PROPERTY, + HIVE_DEFAULT_DRIVER_NAME)); + return storeParameters; + } +} diff --git a/gora-hive/src/main/java/org/apache/gora/hive/store/package-info.java b/gora-hive/src/main/java/org/apache/gora/hive/store/package-info.java new file mode 100644 index 0000000..7057e86 --- /dev/null +++ b/gora-hive/src/main/java/org/apache/gora/hive/store/package-info.java @@ -0,0 +1,21 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +/** + * This package contains all the Hive store related classes. + */ +package org.apache.gora.hive.store; \ No newline at end of file diff --git a/gora-hive/src/main/java/org/apache/gora/hive/util/HiveQueryBuilder.java b/gora-hive/src/main/java/org/apache/gora/hive/util/HiveQueryBuilder.java new file mode 100644 index 0000000..dadd8ca --- /dev/null +++ b/gora-hive/src/main/java/org/apache/gora/hive/util/HiveQueryBuilder.java @@ -0,0 +1,521 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gora.hive.util; + +import java.lang.invoke.MethodHandles; +import java.nio.ByteBuffer; +import java.nio.charset.Charset; +import java.util.ArrayList; +import java.util.List; +import java.util.Locale; +import java.util.Map; +import org.apache.avro.AvroRuntimeException; +import org.apache.avro.Schema; +import org.apache.avro.Schema.Field; +import org.apache.avro.Schema.Type; +import org.apache.gora.hive.store.HiveMapping; +import org.apache.gora.hive.store.HiveStore; +import org.apache.gora.persistency.impl.BeanFactoryImpl; +import org.apache.gora.persistency.impl.PersistentBase; +import org.apache.gora.util.ClassLoadingUtils; +import org.apache.gora.util.GoraException; +import org.apache.hadoop.hive.serde2.SerDeException; +import org.apache.hadoop.hive.serde2.avro.AvroObjectInspectorGenerator; +import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * This class is to build hive SQL queries for schema creation and inserting items into the hive + * store + */ +public class HiveQueryBuilder { + + private static final Logger LOG = LoggerFactory.getLogger((MethodHandles.lookup().lookupClass())); + + private HiveStore<?, ?> hiveStore; + private HiveMapping hiveMapping; + + /** + * common characters for building queries + **/ + private static final char QUESTION_SYMBOL = '?'; + private static final char QUOTE_SYMBOL = '\''; + private static final char OPEN_BRACKET_SYMBOL = '('; + private static final char CLOSE_BRACKET_SYMBOL = ')'; + private static final char COMMA_SYMBOL = ','; + private static final char SPACE_SYMBOL = ' '; + private static final char PERIOD_SYMBOL = '.'; + + public HiveQueryBuilder(HiveStore<?, ?> hiveStore, HiveMapping hiveMapping) { + this.hiveStore = hiveStore; + this.hiveMapping = hiveMapping; + } + + /** + * Create hive sql query to create the schema table + * + * @param databaseName hive database name + * @param fieldMap map of avro fields to their field names + * @return hive sql query + * @throws GoraException throw if the generation of the sql is failed + */ + public String buildCreateQuery(String databaseName, Map<String, Field> fieldMap) + throws GoraException { + //Initiate create query + StringBuilder createQuery = new StringBuilder(); + createQuery.append("create table if not exists ") + .append(databaseName).append(PERIOD_SYMBOL) + .append(hiveStore.getSchemaName()).append(OPEN_BRACKET_SYMBOL); + //Create an Avro schema including fields only in mappings + Schema avroSchema = createAvroSchema(fieldMap); + try { + buildColumnType(createQuery, avroSchema); + } catch (SerDeException e) { + throw new GoraException("Schema inspection has been failed.", e); + } + createQuery.append(CLOSE_BRACKET_SYMBOL); + return createQuery.toString(); + } + + /** + * Create hive query to insert persistent item into hive store + * + * @param key value of the key + * @param value item to be stored + * @param fieldMap schema fields mapping to their names + * @param parameterList empty list to be filled with parameters of the sql + * @return parameterized hive sql string. + * @throws GoraException throw if the generation of the sql is failed + */ + public String buildInsertQuery(Object key, PersistentBase value, Map<String, Field> fieldMap, + List<Object> parameterList) + throws GoraException { + StringBuilder insert = new StringBuilder(); + StringBuilder fields = new StringBuilder(); + StringBuilder values = new StringBuilder(); + + insert.append("insert into").append(SPACE_SYMBOL) + .append(hiveStore.getSchemaName()).append(OPEN_BRACKET_SYMBOL); + values.append("select").append(SPACE_SYMBOL); + + //add key value + fields.append(HiveMapping.DEFAULT_KEY_NAME); + parameterList.add((key instanceof String) ? key.toString() : key); + values.append(QUESTION_SYMBOL); + + //add field values + for (String fieldName : hiveMapping.getFields()) { + Field field = fieldMap.get(fieldName); + if (field == null) { + LOG.warn("{} is skipped as it is not recognised as a field in the schema", fieldName); + continue; + } + Object fieldValue = value.get(field.pos()); + if (value.isDirty(field.name()) && fieldValue != null) { + Object serializedValue = serializeValue(parameterList, field.schema(), fieldValue); + if (serializedValue != null && (!(serializedValue instanceof String) + || !((String) serializedValue).isEmpty())) { + fields.append(COMMA_SYMBOL).append(SPACE_SYMBOL) + .append(field.name().toLowerCase(Locale.getDefault())); + values.append(COMMA_SYMBOL).append(SPACE_SYMBOL) + .append(serializedValue); + } + } + } + fields.append(CLOSE_BRACKET_SYMBOL).append(SPACE_SYMBOL); + insert.append(fields).append(values); + return insert.toString(); + } + + /** + * Inspect avro schema and append its fields with their types to the create query. + * + * @param createQuery String builder to generate the create query + * @param schema Avro schema to derive column names and types + * @throws SerDeException Throw if schema inspection failed + */ + private void buildColumnType(StringBuilder createQuery, Schema schema) throws SerDeException { + AvroObjectInspectorGenerator generator = new AvroObjectInspectorGenerator(schema); + List<TypeInfo> typeInfos = generator.getColumnTypes(); + List<String> names = generator.getColumnNames(); + for (int i = 0; i < names.size(); i++) { + createQuery.append(names.get(i)).append(" ") + .append(typeInfos.get(i).getTypeName()); + if (i < names.size() - 1) { + createQuery.append(COMMA_SYMBOL).append(SPACE_SYMBOL); + } + } + } + + /** + * Create an avro schema including keys and fields of the mapping. + * + * @param fieldMap Avro field map which maps each field to its name + * @return Genereated avro schema + * @throws GoraException throws if the schema generation is failed + */ + private Schema createAvroSchema(Map<String, Field> fieldMap) throws GoraException { + Class<?> persistentClass = hiveStore.getPersistentClass(); + Schema avroSchema = Schema.createRecord(persistentClass.getSimpleName(), null, + persistentClass.getPackage().getName(), false); + List<Field> avroFieldList = new ArrayList<>(); + + avroFieldList.add(new Field(HiveMapping.DEFAULT_KEY_NAME, getKeySchema(), null, 1)); + + List<String> fieldNames = hiveMapping.getFields(); + for (String fieldName : fieldNames) { + Field field = fieldMap.get(fieldName); + if (field == null) { + throw new GoraException( + "Could not find a avro field for field name : " + fieldName); + } + avroFieldList.add(new Field(field.name(), field.schema(), field.doc(), field.defaultVal())); + } + avroSchema.setFields(avroFieldList); + return avroSchema; + } + + /** + * Derive the schema for the key field + * + * @return schema for the key field + * @throws GoraException throw if no schema could identified for the key class + */ + private Schema getKeySchema() throws GoraException { + final String keyName = hiveStore.getKeyClass().getSimpleName().toUpperCase(Locale.getDefault()); + try { + Type keyType = Type.valueOf(keyName); + return Schema.create(keyType); + } catch (IllegalArgumentException | AvroRuntimeException e) { + throw new GoraException("Failed to derive schema for the key class name : " + keyName, e); + } + } + + /** + * Serialize the given value according to its schema. if a primitive type has to be included as a + * a value, it will be added to the parameter list + * + * @param parameterList carries the list of parameters to be injected into sql + * @param schema the value should be serialized based on the schema + * @param value the value to be serialized + * @return serialized value + * @throws GoraException throw if serialization is failed. + */ + private Object serializeValue(List<Object> parameterList, Schema schema, Object value) + throws GoraException { + if (value == null) { + return getNullValue(parameterList, schema); + } + final Type type = schema.getType(); + switch (type) { + case RECORD: + return serializeRecord(parameterList, schema, value); + case MAP: + return serializeMap(parameterList, schema, value); + case ARRAY: + return serializeArray(parameterList, schema, value); + case UNION: + return serializeUnion(parameterList, schema, value); + case STRING: + case ENUM: + parameterList.add(value.toString()); + return QUESTION_SYMBOL; + case BYTES: + return serializeBytes(parameterList, value); + default: + parameterList.add(value); + return QUESTION_SYMBOL; + } + } + + /** + * A record is handled as a struct in hive context and the sql query for structs are define as + * named_struct(field1, value1, field2, value2,..). Moreover nested structs are not allowed to be + * null and they have to be empty structs which all fields represent respective null values. + * + * @param parameterList carries the list of parameters to be injected into sql + * @param schema the record schema + * @param value the record object + * @return serialized value + * @throws GoraException throw if serialization is failed. + */ + private Object serializeRecord(List<Object> parameterList, Schema schema, Object value) + throws GoraException { + if (value == null) { + return getNullValue(parameterList, schema); + } else if (!(value instanceof PersistentBase)) { + throw new GoraException("Record value is not a persistent object"); + } + PersistentBase record = (PersistentBase) value; + + StringBuilder valueBuilder = new StringBuilder(); + valueBuilder.append("named_struct("); + + List<Field> fields = schema.getFields(); + for (int i = 0; i < fields.size(); i++) { + Field field = fields.get(i); + valueBuilder.append(QUOTE_SYMBOL).append(field.name()).append(QUOTE_SYMBOL) + .append(COMMA_SYMBOL).append(SPACE_SYMBOL); + valueBuilder.append(serializeValue(parameterList, field.schema(), record.get(field.pos()))); + if (i < fields.size() - 1) { + valueBuilder.append(","); + } + } + return valueBuilder.append(CLOSE_BRACKET_SYMBOL).toString(); + } + + /** + * Serialize a given map by serializing its values based on schema.valueType. map values in the + * sql have to be defined as map(key1, value1, key2, value2,..). If the map value is null or + * empty, an empty map \"map(null, null_value_type)\" should be returned and it is required to + * represent the null value of the respective value type in the first value entry. Ex : if the map + * is binary type. the empty map will be map(null, binary(null)) + * + * @param parameterList carries the list of parameters to be injected into sql + * @param schema the map schema + * @param value the map object + * @return serialized value + * @throws GoraException throw if serialization is failed. + */ + private Object serializeMap(List<Object> parameterList, Schema schema, Object value) + throws GoraException { + if (value == null) { + return getNullValue(parameterList, schema); + } else if (!(value instanceof Map)) { + throw new GoraException("Value is not an object of java.util.Map for schema type MAP"); + } + Map<?, ?> map = (Map<?, ?>) value; + if (map.keySet().isEmpty()) { + return getNullValue(parameterList, schema); + } + //create a map serializing all its entries + StringBuilder valueBuilder = new StringBuilder(); + valueBuilder.append("map("); + int size = map.keySet().size(); + for (Map.Entry<?, ?> entry : map.entrySet()) { + valueBuilder.append(QUOTE_SYMBOL).append(entry.getKey().toString()).append(QUOTE_SYMBOL) + .append(COMMA_SYMBOL).append(SPACE_SYMBOL) + .append(serializeValue(parameterList, schema.getValueType(), entry.getValue())); + if (size-- > 1) { + valueBuilder.append(COMMA_SYMBOL).append(SPACE_SYMBOL); + } + } + return valueBuilder.append(CLOSE_BRACKET_SYMBOL).toString(); + } + + /** + * Serialize a given list by serializing its values based on schema.elementType. list values in + * the sql have to be defined as array(value1, value2,..). If the list is null or empty, an empty + * list \"array(null)\" should be returned and it is required to represent the null value of the + * respective value type. Ex : if the array is binary type. the empty array will be + * array(binary(null)) + * + * @param parameterList carries the list of parameters to be injected into sql + * @param schema the array schema + * @param value the list object + * @return serialized value + * @throws GoraException throw if serialization is failed. + */ + private Object serializeArray(List<Object> parameterList, Schema schema, Object value) + throws GoraException { + if (!(value instanceof List) && value != null) { + throw new GoraException("Value is not an object of java.util.List for schema type ARRAY"); + } + + List<?> list = (List<?>) value; + if (list == null || list.isEmpty()) { + return getNullValue(parameterList, schema); + } else { + StringBuilder valueBuilder = new StringBuilder(); + valueBuilder.append("array("); + for (int i = 0; i < list.size(); i++) { + valueBuilder.append(serializeValue(parameterList, schema.getElementType(), list.get(i))); + if (i < list.size() - 1) { + valueBuilder.append(", "); + } + } + return valueBuilder.append(CLOSE_BRACKET_SYMBOL).toString(); + } + } + + /** + * Serialize a given object based on union types. If the union type has only two values and one of + * them is null type, the schema is considered as a type of the not null type. However if there + * are more than one not-null types, the value should be defined as create_union(position, + * serialized_value). Ex : if the field is UNION<String, Float>, the query should be + * "create_union(1, 'value')". the position is counted using only not-null types. + * + * @param parameterList carries the list of parameters to be injected into sql + * @param schema the union schema + * @param value the value object + * @return serialized value + * @throws GoraException throw if serialization is failed. + */ + private Object serializeUnion(List<Object> parameterList, Schema schema, Object value) + throws GoraException { + List<Schema> schemas = schema.getTypes(); + + if (schemas.size() == 2 && isNullable(schema)) { + return serializeValue(parameterList, getValidSchema(schema), value); + } + + StringBuilder valueBuilder = new StringBuilder(); + int count = 1; + for (Schema valueSchema : schemas) { + if (!Type.NULL.equals(valueSchema.getType())) { + if (isValidType(valueSchema.getType(), value)) { + valueBuilder.append("create_union(") + .append(count).append(COMMA_SYMBOL).append(SPACE_SYMBOL) + .append(serializeValue(parameterList, valueSchema, value)) + .append(CLOSE_BRACKET_SYMBOL); + return valueBuilder.toString(); + } + count++; + } + } + throw new GoraException("Serializing union value is failed"); + } + + /** + * Serialize a given object into a binary. The sql string is defined as "binary(value)" + * + * @param parameterList carries the list of parameters to be injected into sql + * @param value the byte object + * @return serialized value + */ + private Object serializeBytes(List<Object> parameterList, Object value) { + if (value instanceof ByteBuffer) { + ByteBuffer clone = ByteBuffer.allocate(((ByteBuffer) value).capacity()); + clone.put((ByteBuffer) value); + ((ByteBuffer) value).rewind(); + clone.flip(); + value = QUOTE_SYMBOL + Charset.defaultCharset().decode(clone).toString() + QUOTE_SYMBOL; + } + parameterList.add(value); + return "binary(?)"; + } + + /** + * Returns the first not-null sub-schema from a union schema + * + * @param schemas Union schema + * @return first valid sub-schema + * @throws GoraException throw if a valid schema is not found + */ + static Schema getValidSchema(Schema schemas) throws GoraException { + for (Schema innerSchema : schemas.getTypes()) { + if (!Type.NULL.equals(innerSchema.getType())) { + return innerSchema; + } + } + throw new GoraException("Could not find a valid schema"); + } + + /** + * Generate the null value for a given schema type + * + * @param parameterList carries the list of parameters to be injected into sql + * @param schema schema to get null type + * @return null value for the schema.type + * @throws GoraException throw if the null value generation is failed + */ + private Object getNullValue(List<Object> parameterList, Schema schema) throws GoraException { + final Type type = schema.getType(); + switch (type) { + case BYTES: + return "binary(null)"; + case MAP: + return "map(null," + getNullValue(parameterList, schema.getValueType()) + + CLOSE_BRACKET_SYMBOL; + case ARRAY: + return "array(" + getNullValue(parameterList, schema.getElementType()) + + CLOSE_BRACKET_SYMBOL; + case UNION: + return serializeUnion(parameterList, schema, null); + case RECORD: + Class<?> clazz; + try { + clazz = ClassLoadingUtils.loadClass(schema.getFullName()); + } catch (ClassNotFoundException e) { + throw new GoraException(e); + } + @SuppressWarnings("unchecked") final PersistentBase emptyRecord = (PersistentBase) new BeanFactoryImpl( + hiveStore.getKeyClass(), clazz).newPersistent(); + return serializeRecord(parameterList, schema, emptyRecord); + default: + return null; + } + } + + /** + * Compare the object type and the required schema type and return true if the object type is + * compatible with the schema type. if the type matches non of the defined types, the object is + * considered to be a valid object for any type + * + * @param type schema type to be compared + * @param value object to be compared + * @return true if object is compatible with the schema type + */ + private boolean isValidType(Type type, Object value) { + switch (type) { + case INT: + return (value instanceof Integer); + case LONG: + return (value instanceof Long); + case BYTES: + return (value instanceof Byte[]) || (value instanceof ByteBuffer); + case NULL: + return (value == null); + case STRING: + return (value instanceof CharSequence); + case ENUM: + return (value instanceof Enum); + case DOUBLE: + return (value instanceof Double); + case FLOAT: + return (value instanceof Float); + case BOOLEAN: + return (value instanceof Boolean); + case MAP: + return (value instanceof Map); + case ARRAY: + return (value instanceof List) || (value != null && value.getClass().isArray()); + case RECORD: + return (value instanceof PersistentBase); + default: + return true; + } + } + + /** + * Check whether the given union schema contains any nullable sub-schemas + * + * @param schemas union schema + * @return true if the list of sub-schemas contain a nullable schema + */ + static boolean isNullable(Schema schemas) { + for (Schema innerSchema : schemas.getTypes()) { + if (innerSchema.getType().equals(Type.NULL)) { + return true; + } + } + return false; + } +} diff --git a/gora-hive/src/main/java/org/apache/gora/hive/util/HiveResultParser.java b/gora-hive/src/main/java/org/apache/gora/hive/util/HiveResultParser.java new file mode 100644 index 0000000..5ff436f --- /dev/null +++ b/gora-hive/src/main/java/org/apache/gora/hive/util/HiveResultParser.java @@ -0,0 +1,243 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gora.hive.util; + +import java.lang.invoke.MethodHandles; +import java.nio.ByteBuffer; +import java.nio.charset.Charset; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import org.apache.avro.Schema; +import org.apache.avro.Schema.Field; +import org.apache.avro.Schema.Type; +import org.apache.avro.util.Utf8; +import org.apache.gora.hive.store.HiveStore; +import org.apache.gora.persistency.impl.BeanFactoryImpl; +import org.apache.gora.persistency.impl.PersistentBase; +import org.apache.gora.util.AvroUtils; +import org.apache.gora.util.ClassLoadingUtils; +import org.apache.gora.util.GoraException; +import org.json.JSONArray; +import org.json.JSONObject; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * This class parses a given result set into a persistent schema object + */ +public class HiveResultParser { + + private static final Logger LOG = LoggerFactory.getLogger((MethodHandles.lookup().lookupClass())); + + private HiveStore<?, ?> hiveStore; + + public HiveResultParser(HiveStore<?, ?> hiveStore) { + this.hiveStore = hiveStore; + } + + /** + * Parsing a value based on given schema + * @param schema schema value to be used + * @param value value to be parsed + * @return parsed object value + * @throws GoraException throw if parsing the object is failed + */ + public Object parseSchema(Schema schema, Object value) throws GoraException { + if (value == null) { + return null; + } + final Type type = schema.getType(); + switch (type) { + case INT: + return (value instanceof String) ? Integer.parseInt(String.valueOf(value)) : value; + case STRING: + return new Utf8(String.valueOf(value)); + case ENUM: + return AvroUtils.getEnumValue(schema, String.valueOf(value)); + case FLOAT: + return (value instanceof String) ? Float.parseFloat(String.valueOf(value)) : value; + case DOUBLE: + return (value instanceof String) ? Double.parseDouble(String.valueOf(value)) : value; + case LONG: + return (value instanceof String) ? Long.parseLong(String.valueOf(value)) : value; + case BOOLEAN: + return (value instanceof String) ? Boolean.parseBoolean(String.valueOf(value)) : value; + case BYTES: + return parseBytes(value); + case ARRAY: + return parseArray(value, schema); + case MAP: + return parseMap(value, schema); + case UNION: + return parseUnion(value, schema); + case RECORD: + return parseRecord(value, schema); + default: + return value; + } + } + + /** + * Parse a given bytes value into a ByteBuffer. When bytes are stored using a complex parent data + * type like map<string, bytes>, a quotation mark (') is added to the front and trailer of the + * byte string it will be removed here + * + * @param value to parsed + * @return {@link ByteBuffer} parsed byte buffer + */ + private Object parseBytes(Object value) { + if (value == null) { + return null; + } + if(LOG.isDebugEnabled()) { + LOG.debug("Parsing byte string :{}", value); + } + String byteString = value.toString(); + if (value instanceof byte[]) { + byte[] byteArray = (byte[]) value; + byteString = new String(byteArray, Charset.defaultCharset()); + if ('\'' == byteString.charAt(0)) { + byteString = byteString.substring(1); + } + if ('\'' == byteString.charAt(byteString.length() - 1)) { + byteString = byteString.substring(0, byteString.length() - 1); + } + } + return ByteBuffer.wrap(byteString.getBytes(Charset.defaultCharset())); + } + + /** + * Arrays are returned as json array strings and this will parse them back to lists. + * + * @param value json array string + * @return {@link List}parsed list + * @throws GoraException throw if parsing array values is failed + */ + private Object parseArray(Object value, Schema schema) throws GoraException { + if (value == null) { + return null; + } + if(LOG.isDebugEnabled()) { + LOG.debug("Parsing json array : {}", value); + } + Schema elementSchema = schema.getElementType(); + JSONArray jsonArray = new JSONArray(String.valueOf(value)); + List<Object> valueList = new ArrayList<>(); + for (int i = 0; i < jsonArray.length(); i++) { + valueList.add(parseSchema(elementSchema, jsonArray.get(i))); + } + return valueList; + } + + /** + * Maps are returned as json objects and this will parse them back to map objects + * + * @param value String of json object + * @param schema Map schema to be used for parsing map values + * @return {@link Map} Parsed map + * @throws GoraException throw if parsing map entries is failed + */ + private Object parseMap(Object value, Schema schema) throws GoraException { + if (value == null) { + return null; + } + if(LOG.isDebugEnabled()) { + LOG.debug("Parsing json object:{} as a map", value); + } + Schema valueSchema = schema.getValueType(); + JSONObject jsonObject = new JSONObject(String.valueOf(value)); + Map<CharSequence, Object> valueMap = new HashMap<>(); + Iterator<String> keys = jsonObject.keys(); + while (keys.hasNext()) { + String key = keys.next(); + valueMap.put(new Utf8(key), parseSchema(valueSchema, jsonObject.get(key))); + } + return valueMap; + } + + /** + * This parses the given object into union typed object. If there is only one not-null sub-type, + * the values parsed according to that schema regarless of this union schema. If there are more + * than one not-null sub-types, the object is consided to be a json string with one field + * representing the position of the type of the given value among the the list of sub-types.<br> + * Ex: if the schema is union<String, Array<int>>, the object will be {2, '[1,2,3]'} + * + * @param value union object + * @param schema union schema to be used for parsing map values + * @return {@link Object} Parsed object + * @throws GoraException throw if parsing union value is failed + */ + private Object parseUnion(Object value, Schema schema) throws GoraException { + if(LOG.isDebugEnabled()) { + LOG.debug("Parsing json object:{} as a union", value); + } + List<Schema> typeSchemaList = schema.getTypes(); + Schema primarySchema = null; + if (typeSchemaList.size() == 2 && HiveQueryBuilder.isNullable(schema)) { + primarySchema = HiveQueryBuilder.getValidSchema(schema); + } else { + JSONObject unionObject = new JSONObject(String.valueOf(value)); + int position = Integer.parseInt(unionObject.keys().next()); + for (Schema typeSchema : typeSchemaList) { + if (!(Type.NULL.equals(typeSchema.getType())) && (position-- == 0)) { + primarySchema = typeSchema; + break; + } + } + } + if (primarySchema == null) { + return null; + } else { + return parseSchema(primarySchema, value); + } + } + + /** + * Parse a json object to a persistent record. + * + * @param value json object string + * @param schema record schema to be used for parsing the object + * @return persistent object + * @throws GoraException throw if parsing record value is failed + */ + private Object parseRecord(Object value, Schema schema) throws GoraException { + if(LOG.isDebugEnabled()) { + LOG.debug("Parsing json object:{} as a record", value); + } + Class<?> clazz; + try { + clazz = ClassLoadingUtils.loadClass(schema.getFullName()); + } catch (ClassNotFoundException e) { + throw new GoraException(e); + } + @SuppressWarnings("unchecked") final PersistentBase record = (PersistentBase) new BeanFactoryImpl( + hiveStore.getKeyClass(), clazz).newPersistent(); + JSONObject recordObject = new JSONObject(String.valueOf(value)); + for (Field recField : schema.getFields()) { + Schema innerSchema = recField.schema(); + if (recordObject.has(recField.name())) { + record.put(recField.pos(), parseSchema(innerSchema, recordObject.get(recField.name()))); + } + } + return record; + } +} diff --git a/gora-hive/src/main/java/org/apache/gora/hive/util/package-info.java b/gora-hive/src/main/java/org/apache/gora/hive/util/package-info.java new file mode 100644 index 0000000..884dd6b --- /dev/null +++ b/gora-hive/src/main/java/org/apache/gora/hive/util/package-info.java @@ -0,0 +1,22 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +/** + * This package contains all utility classes to execute queries on Hive store + */ +package org.apache.gora.hive.util; \ No newline at end of file diff --git a/gora-hive/src/test/java/org/apache/gora/hive/GoraHiveTestDriver.java b/gora-hive/src/test/java/org/apache/gora/hive/GoraHiveTestDriver.java new file mode 100644 index 0000000..9354940 --- /dev/null +++ b/gora-hive/src/test/java/org/apache/gora/hive/GoraHiveTestDriver.java @@ -0,0 +1,49 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gora.hive; + +import org.apache.gora.GoraTestDriver; +import org.apache.gora.hive.store.HiveStore; +import org.apache.gora.hive.util.HiveTestServer; + +public class GoraHiveTestDriver extends GoraTestDriver { + + private HiveTestServer testServer; + + public GoraHiveTestDriver() { + super(HiveStore.class); + } + + @Override + public void setUpClass() throws Exception { + log.info("setting up hive test driver"); + if (testServer == null) { + testServer = new HiveTestServer(); + } + testServer.start(); + } + + @Override + public void tearDownClass() throws Exception { + log.info("tearing down hive test driver"); + if (testServer != null) { + testServer.stop(); + } + } +} diff --git a/gora-hive/src/test/java/org/apache/gora/hive/package-info.java b/gora-hive/src/test/java/org/apache/gora/hive/package-info.java new file mode 100644 index 0000000..958eaab --- /dev/null +++ b/gora-hive/src/test/java/org/apache/gora/hive/package-info.java @@ -0,0 +1,21 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +/** + * This package contains Hive test driver implementation classes + */ +package org.apache.gora.hive; \ No newline at end of file diff --git a/gora-hive/src/test/java/org/apache/gora/hive/store/TestHiveStore.java b/gora-hive/src/test/java/org/apache/gora/hive/store/TestHiveStore.java new file mode 100644 index 0000000..0d635f2 --- /dev/null +++ b/gora-hive/src/test/java/org/apache/gora/hive/store/TestHiveStore.java @@ -0,0 +1,171 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gora.hive.store; + +import static org.junit.Assert.assertTrue; + +import java.nio.ByteBuffer; +import java.nio.charset.Charset; +import java.util.ArrayList; +import java.util.Set; +import org.apache.avro.util.Utf8; +import org.apache.gora.examples.generated.Employee; +import org.apache.gora.examples.generated.Metadata; +import org.apache.gora.examples.generated.WebPage; +import org.apache.gora.hive.GoraHiveTestDriver; +import org.apache.gora.persistency.impl.BeanFactoryImpl; +import org.apache.gora.store.DataStoreTestBase; +import org.apache.gora.store.DataStoreTestUtil; +import org.apache.gora.util.GoraException; +import org.apache.gora.util.StringUtils; +import org.junit.Ignore; + +/** + * HiveStore Tests extending {@link DataStoreTestBase} which run the base JUnit test suite for + * Gora. + */ +public class TestHiveStore extends DataStoreTestBase { + + static { + try { + setTestDriver(new GoraHiveTestDriver()); + } catch (Exception e) { + throw new RuntimeException(e); + } + } + + @Override + public void assertSchemaExists(String schemaName) throws Exception { + assertTrue(employeeStore.schemaExists()); + } + + @Override + public void assertPut(Employee employee) throws GoraException { + employeeStore.put(employee.getSsn().toString(), employee); + } + + @Override + public void testGetWithFields() throws Exception { + //Overrides DataStoreTestBase.testGetWithFields to avoid recursive field "boss" + Employee employee = DataStoreTestUtil.createEmployee(); + WebPage webpage = DataStoreTestUtil.createWebPage(); + employee.setWebpage(webpage); + String ssn = employee.getSsn().toString(); + employeeStore.put(ssn, employee); + employeeStore.flush(); + + String[] fields = ((HiveStore<String, Employee>) employeeStore).getFields(); + for (Set<String> subset : StringUtils.powerset(fields)) { + if (subset.isEmpty()) { + continue; + } + Employee after = employeeStore.get(ssn, subset.toArray(new String[subset.size()])); + Employee expected = Employee.newBuilder().build(); + for (String field : subset) { + int index = expected.getSchema().getField(field).pos(); + expected.put(index, employee.get(index)); + } + + DataStoreTestUtil.assertEqualEmployeeObjects(expected, after); + } + } + + @Override + public void testGet() throws Exception { + //Overrides DataStoreTestBase.testGet to avoid recursive field "boss" + log.info("test method: testGet"); + employeeStore.createSchema(); + Employee employee = DataStoreTestUtil.createEmployee(); + String ssn = employee.getSsn().toString(); + employeeStore.put(ssn, employee); + employeeStore.flush(); + Employee after = employeeStore.get(ssn, null); + DataStoreTestUtil.assertEqualEmployeeObjects(employee, after); + } + + @Override + public void testGetNested() throws Exception { + //Overrides DataStoreTestBase.testGetNested to avoid recursive field "boss" + Employee employee = DataStoreTestUtil.createEmployee(); + + WebPage webpage = new BeanFactoryImpl<>(String.class, WebPage.class).newPersistent(); + webpage.setUrl(new Utf8("url..")); + webpage.setContent(ByteBuffer.wrap("test content".getBytes(Charset.defaultCharset()))); + webpage.setParsedContent(new ArrayList<>()); + + Metadata metadata = new BeanFactoryImpl<>(String.class, Metadata.class).newPersistent(); + webpage.setMetadata(metadata); + employee.setWebpage(webpage); + String ssn = employee.getSsn().toString(); + + employeeStore.put(ssn, employee); + employeeStore.flush(); + Employee after = employeeStore.get(ssn, null); + DataStoreTestUtil.assertEqualEmployeeObjects(employee, after); + DataStoreTestUtil.assertEqualWebPageObjects(webpage, after.getWebpage()); + } + + @Ignore("Hive test server doesn't support deleting and updating entries") + @Override + public void testExists() throws Exception { + //Hive test server doesn't support deleting and updating entries + } + + @Ignore("Hive test server doesn't support deleting and updating entries") + @Override + public void testDelete() throws Exception { + //Hive test server doesn't support deleting and updating entries + } + + @Ignore("Hive test server doesn't support deleting and updating entries") + @Override + public void testDeleteByQuery() throws Exception { + //Hive test server doesn't support deleting and updating entries + } + + @Ignore("Hive test server doesn't support deleting and updating entries") + @Override + public void testDeleteByQueryFields() throws Exception { + //Hive test server doesn't support deleting and updating entries + } + + @Ignore("Hive test server doesn't support deleting and updating entries") + @Override + public void testUpdate() throws Exception { + //Hive test server doesn't support deleting and updating entries + } + + @Ignore("Hive datastore doesn't support recursive records") + @Override + public void testGetRecursive() throws Exception { + //Hive datastore doesn't support recursive records + } + + @Ignore("Hive datastore doesn't support recursive records") + @Override + public void testGetDoubleRecursive() throws Exception { + //Hive datastore doesn't support recursive records + } + + @Ignore("As recursive records are not supported, employee.boss field cannot be processed.") + @Override + public void testGet3UnionField() throws Exception { + //As recursive records are not supported, employee.boss field cannot be processed. + } +} diff --git a/gora-hive/src/test/java/org/apache/gora/hive/store/package-info.java b/gora-hive/src/test/java/org/apache/gora/hive/store/package-info.java new file mode 100644 index 0000000..04d11b2 --- /dev/null +++ b/gora-hive/src/test/java/org/apache/gora/hive/store/package-info.java @@ -0,0 +1,21 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +/** + * This package contains all test classes to test Hive store + */ +package org.apache.gora.hive.store; \ No newline at end of file diff --git a/gora-hive/src/test/java/org/apache/gora/hive/util/HiveTestServer.java b/gora-hive/src/test/java/org/apache/gora/hive/util/HiveTestServer.java new file mode 100644 index 0000000..3ff464e --- /dev/null +++ b/gora-hive/src/test/java/org/apache/gora/hive/util/HiveTestServer.java @@ -0,0 +1,109 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gora.hive.util; + +import java.lang.invoke.MethodHandles; +import java.util.HashMap; +import java.util.Map; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; +import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hive.service.Service; +import org.apache.hive.service.cli.CLIService; +import org.apache.hive.service.cli.SessionHandle; +import org.apache.hive.service.server.HiveServer2; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + + +/** + * Hive test server implementation + */ +public class HiveTestServer { + + private static final Logger log = LoggerFactory.getLogger((MethodHandles.lookup().lookupClass())); + private HiveServer2 hiveServer; + + /** + * Initiate a hive server instance + * + * @throws Exception throws if server initiation is failed + */ + public void start() throws Exception { + log.info("Starting Hive Test Server..."); + if (hiveServer == null) { + hiveServer = new HiveServer2(); + hiveServer.init(new HiveConf()); + hiveServer.start(); + waitForStartup(); + log.info("Hive Test Server Started"); + } + } + + /** + * Waiting for maximum of one minute to start the server. + * + * @throws Exception throws if server couldn't start within the time limit + */ + private void waitForStartup() throws Exception { + long timeout = TimeUnit.MINUTES.toMillis(1); + long unitOfWait = TimeUnit.SECONDS.toMillis(1) * 5; + CLIService hs2Client = getServiceClientInternal(); + SessionHandle sessionHandle = null; + for (int interval = 0; interval < timeout / unitOfWait; interval++) { + Thread.sleep(unitOfWait); + try { + Map<String, String> sessionConf = new HashMap<>(); + sessionHandle = hs2Client.openSession("hive", "", sessionConf); + return; + } catch (Exception e) { + //server hasn't started yet + } finally { + hs2Client.closeSession(sessionHandle); + } + } + throw new TimeoutException("Hive test server starting timeout"); + } + + /** + * Get the client service from the hive server instance. + * + * @return CLIService client service initiated in the hive server instance + */ + private CLIService getServiceClientInternal() { + for (Service service : hiveServer.getServices()) { + if (service instanceof CLIService) { + return (CLIService) service; + } + } + throw new IllegalStateException("Cannot find CLIService"); + } + + /** + * Stop hive test server + */ + public void stop() { + if (hiveServer != null) { + log.info("Stopping Hive Test Server..."); + hiveServer.stop(); + hiveServer = null; + log.info("Hive Test Server Stopped SucessFully"); + } + } +} diff --git a/gora-hive/src/test/java/org/apache/gora/hive/util/package-info.java b/gora-hive/src/test/java/org/apache/gora/hive/util/package-info.java new file mode 100644 index 0000000..f90e729 --- /dev/null +++ b/gora-hive/src/test/java/org/apache/gora/hive/util/package-info.java @@ -0,0 +1,21 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +/** + * This package contains Hive test server implementation + */ +package org.apache.gora.hive.util; \ No newline at end of file diff --git a/gora-hive/src/test/resources/gora-hive-mapping.xml b/gora-hive/src/test/resources/gora-hive-mapping.xml new file mode 100644 index 0000000..6d31df9 --- /dev/null +++ b/gora-hive/src/test/resources/gora-hive-mapping.xml @@ -0,0 +1,45 @@ +<?xml version="1.0" encoding="UTF-8"?> +<!-- + ~ Licensed to the Apache Software Foundation (ASF) under one + ~ or more contributor license agreements. See the NOTICE file + ~ distributed with this work for additional information + ~ regarding copyright ownership. The ASF licenses this file + ~ to you under the Apache License, Version 2.0 (the + ~ "License"); you may not use this file except in compliance + ~ with the License. You may obtain a copy of the License at + ~ + ~ http://www.apache.org/licenses/LICENSE-2.0 + ~ + ~ Unless required by applicable law or agreed to in writing, software + ~ distributed under the License is distributed on an "AS IS" BASIS, + ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + ~ See the License for the specific language governing permissions and + ~ limitations under the License. + --> +<gora-otd> + + <class name="org.apache.gora.examples.generated.Employee" keyClass="java.lang.String" tableName="Employee"> + <field name="name"/> + <field name="dateOfBirth"/> + <field name="ssn"/> + <field name="salary"/> + <field name="webpage"/> + </class> + + <class name="org.apache.gora.examples.generated.WebPage" keyClass="java.lang.String" tableName="WebPage"> + <field name="url"/> + <field name="content"/> + <field name="parsedContent"/> + <field name="outlinks"/> + <field name="headers"/> + <field name="metadata"/> + <field name="byteData"/> + <field name="stringData"/> + </class> + + + <class name="org.apache.gora.examples.generated.TokenDatum" keyClass="java.lang.String"> + <field name="count"/> + </class> + +</gora-otd> diff --git a/gora-hive/src/test/resources/gora.properties b/gora-hive/src/test/resources/gora.properties new file mode 100644 index 0000000..8f625fc --- /dev/null +++ b/gora-hive/src/test/resources/gora.properties @@ -0,0 +1,28 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +############################ +# HiveStore properties # +############################ + +gora.datastore.autocreateschema=true +gora.datastore.default=org.apache.gora.hive.store.HiveStore + +gora.hive.server.url=jdbc:hive2://localhost:10000/default +gora.hive.database.name=default + diff --git a/gora-hive/src/test/resources/hive-site.xml b/gora-hive/src/test/resources/hive-site.xml new file mode 100755 index 0000000..ce35e8c --- /dev/null +++ b/gora-hive/src/test/resources/hive-site.xml @@ -0,0 +1,85 @@ +<?xml version="1.0" encoding="UTF-8" standalone="no"?> +<?xml-stylesheet type="text/xsl" href="configuration.xsl"?><!-- + Licensed to the Apache Software Foundation (ASF) under one or more + contributor license agreements. See the NOTICE file distributed with + this work for additional information regarding copyright ownership. + The ASF licenses this file to You under the Apache License, Version 2.0 + (the "License"); you may not use this file except in compliance with + the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. +--> +<!-- Hive Test Server Configurations --> +<configuration> + <property> + <name>hive.metastore.schema.verification</name> + <value>false</value> + </property> + <property> + <name>datanucleus.schema.autoCreateTables</name> + <value>true</value> + </property> + <property> + <name>hive.security.authorization.enabled</name> + <value>false</value> + </property> + <property> + <name>javax.jdo.option.ConnectionURL</name> + <value>jdbc:derby:;databaseName=target/metastore_db;create=true</value> + </property> + <property> + <name>hive.metastore.sasl.enabled</name> + <value>false</value> + </property> + <property> + <name>datanucleus.schema.autoCreateAll</name> + <value>true</value> + </property> + <property> + <name>javax.jdo.option.ConnectionDriverName</name> + <value>org.apache.derby.jdbc.EmbeddedDriver</value> + </property> + <property> + <name>hive.metastore.schema.verification</name> + <value>false</value> + </property> + <property> + <name>hive.metastore.schema.verification.record.version</name> + <value>false</value> + </property> + <property> + <name>datanucleus.transactionIsolation</name> + <value>read-committed</value> + </property> + <property> + <name>datanucleus.cache.level2</name> + <value>false</value> + </property> + <property> + <name>datanucleus.cache.level2.type</name> + <value>none</value> + </property> + <property> + <name>datanucleus.rdbms.useLegacyNativeValueStrategy</name> + <value>true</value> + <description/> + </property> + <property> + <name>hive.server2.table.type.mapping</name> + <value>CLASSIC</value> + </property> + <property> + <name>hive.metastore.warehouse.dir</name> + <value>file://${user.dir}/target/hive</value> + </property> + <property> + <name>hive.strict.checks.bucketing</name> + <value>false</value> + </property> +</configuration> \ No newline at end of file diff --git a/pom.xml b/pom.xml index 767f02c..ec6f843 100644 --- a/pom.xml +++ b/pom.xml @@ -31,9 +31,9 @@ <packaging>pom</packaging> <version>1.0-SNAPSHOT</version> <name>Apache Gora</name> - <description>The Apache Gora open source framework provides an in-memory data model and - persistence for big data. Gora supports persisting to column stores, key value stores, - document stores and RDBMSs, and analyzing the data with extensive Apache Hadoop MapReduce + <description>The Apache Gora open source framework provides an in-memory data model and + persistence for big data. Gora supports persisting to column stores, key value stores, + document stores and RDBMSs, and analyzing the data with extensive Apache Hadoop MapReduce support. </description> <url>http://gora.apache.org</url> <inceptionYear>2010</inceptionYear> @@ -771,6 +771,7 @@ <module>gora-solr</module> <module>gora-aerospike</module> <module>gora-ignite</module> + <module>gora-hive</module> <module>gora-tutorial</module> <module>sources-dist</module> </modules> @@ -827,6 +828,12 @@ <orientdb.version>2.2.22</orientdb.version> <orientqb.version>0.2.0</orientqb.version> + <!-- HiveStore Dependencies --> + <metamodel.version>5.3.0</metamodel.version> + <json.version>20180813</json.version> + <hive.version>2.3.5</hive.version> + <hadoop-common.version>2.6.0</hadoop-common.version> + <!-- Testing Dependencies --> <junit.version>4.10</junit.version> <test.container.version>1.4.2</test.container.version> @@ -854,6 +861,7 @@ <pig.version>0.16.0</pig.version> <!-- General Properties --> + <!--suppress UnresolvedMavenProperty --> <implementation.build>${scmBranch}@r${buildNumber}</implementation.build> <javac.src.version>1.8</javac.src.version> <javac.target.version>1.8</javac.target.version> @@ -959,6 +967,11 @@ </dependency> <dependency> <groupId>org.apache.gora</groupId> + <artifactId>gora-hive</artifactId> + <version>${project.version}</version> + </dependency> + <dependency> + <groupId>org.apache.gora</groupId> <artifactId>gora-dynamodb</artifactId> <version>${project.version}</version> <type>test-jar</type> @@ -1697,6 +1710,62 @@ <artifactId>ignite-core</artifactId> <version>${ignite.version}</version> </dependency> + + <!-- Hive DataStore dependencies --> + <dependency> + <groupId>org.apache.hive</groupId> + <artifactId>hive-jdbc</artifactId> + <version>${hive.version}</version> + <scope>compile</scope> + <exclusions> + <exclusion> + <groupId>org.apache.logging.log4j</groupId> + <artifactId>log4j-slf4j-impl</artifactId> + </exclusion> + </exclusions> + </dependency> + + <dependency> + <groupId>org.apache.hive</groupId> + <artifactId>hive-service</artifactId> + <version>${hive.version}</version> + <exclusions> + <exclusion> + <groupId>org.apache.logging.log4j</groupId> + <artifactId>log4j-slf4j-impl</artifactId> + </exclusion> + </exclusions> + </dependency> + + <dependency> + <groupId>org.apache.hive</groupId> + <artifactId>hive-exec</artifactId> + <version>${hive.version}</version> + <exclusions> + <exclusion> + <groupId>org.apache.logging.log4j</groupId> + <artifactId>log4j-slf4j-impl</artifactId> + </exclusion> + </exclusions> + </dependency> + + <dependency> + <groupId>org.apache.metamodel</groupId> + <artifactId>MetaModel-jdbc</artifactId> + <version>${metamodel.version}</version> + </dependency> + + <dependency> + <groupId>org.apache.metamodel</groupId> + <artifactId>MetaModel-core</artifactId> + <version>${metamodel.version}</version> + </dependency> + + <dependency> + <groupId>org.json</groupId> + <artifactId>json</artifactId> + <version>${json.version}</version> + </dependency> </dependencies> </dependencyManagement>