HIVE-1555: JDBC Storage Handler (Gunther Hagleitner, reviewed by Jason Dere)
Project: http://git-wip-us.apache.org/repos/asf/hive/repo Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/12b27a35 Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/12b27a35 Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/12b27a35 Branch: refs/heads/master Commit: 12b27a355558499f6422e49742bd5cad71416fb2 Parents: a9de1cd Author: Gunther Hagleitner <gunt...@apache.org> Authored: Tue Feb 28 23:29:32 2017 -0800 Committer: Gunther Hagleitner <gunt...@apache.org> Committed: Tue Feb 28 23:55:05 2017 -0800 ---------------------------------------------------------------------- itests/qtest/pom.xml | 7 + .../test/resources/testconfiguration.properties | 1 + jdbc-handler/pom.xml | 127 ++++++++ .../hive/storage/jdbc/JdbcInputFormat.java | 108 +++++++ .../hive/storage/jdbc/JdbcInputSplit.java | 100 ++++++ .../hive/storage/jdbc/JdbcOutputFormat.java | 68 +++++ .../hive/storage/jdbc/JdbcRecordReader.java | 133 ++++++++ .../org/apache/hive/storage/jdbc/JdbcSerDe.java | 164 ++++++++++ .../hive/storage/jdbc/JdbcStorageHandler.java | 106 +++++++ .../storage/jdbc/QueryConditionBuilder.java | 186 ++++++++++++ .../storage/jdbc/conf/CustomConfigManager.java | 23 ++ .../jdbc/conf/CustomConfigManagerFactory.java | 50 +++ .../hive/storage/jdbc/conf/DatabaseType.java | 21 ++ .../storage/jdbc/conf/JdbcStorageConfig.java | 49 +++ .../jdbc/conf/JdbcStorageConfigManager.java | 97 ++++++ .../hive/storage/jdbc/dao/DatabaseAccessor.java | 34 +++ .../jdbc/dao/DatabaseAccessorFactory.java | 53 ++++ .../jdbc/dao/GenericJdbcDatabaseAccessor.java | 253 ++++++++++++++++ .../storage/jdbc/dao/JdbcRecordIterator.java | 104 +++++++ .../storage/jdbc/dao/MySqlDatabaseAccessor.java | 39 +++ .../HiveJdbcDatabaseAccessException.java | 41 +++ .../exception/HiveJdbcStorageException.java | 40 +++ .../src/test/java/org/apache/TestSuite.java | 29 ++ .../config/JdbcStorageConfigManagerTest.java | 87 ++++++ .../hive/storage/jdbc/JdbcInputFormatTest.java | 81 +++++ .../storage/jdbc/QueryConditionBuilderTest.java | 151 +++++++++ .../dao/GenericJdbcDatabaseAccessorTest.java | 206 +++++++++++++ jdbc-handler/src/test/resources/condition1.xml | 48 +++ jdbc-handler/src/test/resources/condition2.xml | 101 +++++++ jdbc-handler/src/test/resources/test_script.sql | 21 ++ packaging/pom.xml | 5 + packaging/src/main/assembly/src.xml | 1 + pom.xml | 3 + .../test/queries/clientpositive/jdbc_handler.q | 58 ++++ .../clientpositive/llap/jdbc_handler.q.out | 303 +++++++++++++++++++ 35 files changed, 2898 insertions(+) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hive/blob/12b27a35/itests/qtest/pom.xml ---------------------------------------------------------------------- diff --git a/itests/qtest/pom.xml b/itests/qtest/pom.xml index 1b49e88..1c3b601 100644 --- a/itests/qtest/pom.xml +++ b/itests/qtest/pom.xml @@ -119,6 +119,13 @@ <classifier>tests</classifier> <scope>test</scope> </dependency> + <dependency> + <groupId>org.apache.hive</groupId> + <artifactId>hive-jdbc-handler</artifactId> + <version>${project.version}</version> + <scope>test</scope> + </dependency> + <!-- test inter-project --> <dependency> http://git-wip-us.apache.org/repos/asf/hive/blob/12b27a35/itests/src/test/resources/testconfiguration.properties ---------------------------------------------------------------------- diff --git a/itests/src/test/resources/testconfiguration.properties b/itests/src/test/resources/testconfiguration.properties index 778b614..807b124 100644 --- a/itests/src/test/resources/testconfiguration.properties +++ b/itests/src/test/resources/testconfiguration.properties @@ -499,6 +499,7 @@ minillaplocal.query.files=acid_globallimit.q,\ input16_cc.q,\ insert_dir_distcp.q,\ insert_into_with_schema.q,\ + jdbc_handler.q,\ join1.q,\ join_acid_non_acid.q,\ join_filters.q,\ http://git-wip-us.apache.org/repos/asf/hive/blob/12b27a35/jdbc-handler/pom.xml ---------------------------------------------------------------------- diff --git a/jdbc-handler/pom.xml b/jdbc-handler/pom.xml new file mode 100644 index 0000000..364886a --- /dev/null +++ b/jdbc-handler/pom.xml @@ -0,0 +1,127 @@ +<?xml version="1.0" encoding="UTF-8"?> +<!-- + Licensed under the Apache License, Version 2.0 (the "License"); + you may not use this file except in compliance with the License. + You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. +--> +<project xmlns="http://maven.apache.org/POM/4.0.0" + xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" + xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> + <modelVersion>4.0.0</modelVersion> + <parent> + <groupId>org.apache.hive</groupId> + <artifactId>hive</artifactId> + <version>2.2.0-SNAPSHOT</version> + <relativePath>../pom.xml</relativePath> + </parent> + + <artifactId>hive-jdbc-handler</artifactId> + <packaging>jar</packaging> + <name>Hive JDBC Handler</name> + + <properties> + <hive.path.to.root>..</hive.path.to.root> + </properties> + + <dependencies> + <dependency> + <groupId>org.apache.hive</groupId> + <artifactId>hive-common</artifactId> + <version>${project.version}</version> + <exclusions> + <exclusion> + <groupId>org.eclipse.jetty.aggregate</groupId> + <artifactId>jetty-all</artifactId> + </exclusion> + </exclusions> + </dependency> + + <dependency> + <groupId>org.apache.hive</groupId> + <artifactId>hive-shims</artifactId> + <version>${project.version}</version> + </dependency> + + <dependency> + <groupId>org.apache.hive</groupId> + <artifactId>hive-exec</artifactId> + <version>${project.version}</version> + </dependency> + + <dependency> + <groupId>org.apache.hive</groupId> + <artifactId>hive-serde</artifactId> + <version>${project.version}</version> + </dependency> + + <dependency> + <groupId>org.apache.hadoop</groupId> + <artifactId>hadoop-mapreduce-client-core</artifactId> + <version>${hadoop.version}</version> + <optional>true</optional> + </dependency> + <dependency> + <groupId>org.apache.hadoop</groupId> + <artifactId>hadoop-mapreduce-client-common</artifactId> + <version>${hadoop.version}</version> + <optional>true</optional> + <scope>test</scope> + <exclusions> + <exclusion> + <groupId>org.slf4j</groupId> + <artifactId>slf4j-log4j12</artifactId> + </exclusion> + <exclusion> + <groupId>commmons-logging</groupId> + <artifactId>commons-logging</artifactId> + </exclusion> + </exclusions> + </dependency> + + <dependency> + <groupId>org.hamcrest</groupId> + <artifactId>hamcrest-all</artifactId> + <version>${hamcrest.version}</version> + <scope>test</scope> + </dependency> + + <dependency> + <groupId>junit</groupId> + <artifactId>junit</artifactId> + <version>${junit.version}</version> + <scope>test</scope> + </dependency> + + <dependency> + <groupId>org.mockito</groupId> + <artifactId>mockito-all</artifactId> + <version>${mockito-all.version}</version> + <scope>test</scope> + </dependency> + + <dependency> + <groupId>org.apache.hive</groupId> + <artifactId>hive-common</artifactId> + <version>${project.version}</version> + <scope>test</scope> + <type>test-jar</type> + </dependency> + + <dependency> + <groupId>com.h2database</groupId> + <artifactId>h2</artifactId> + <version>${h2database.version}</version> + <scope>test</scope> + </dependency> + + </dependencies> + +</project> http://git-wip-us.apache.org/repos/asf/hive/blob/12b27a35/jdbc-handler/src/main/java/org/apache/hive/storage/jdbc/JdbcInputFormat.java ---------------------------------------------------------------------- diff --git a/jdbc-handler/src/main/java/org/apache/hive/storage/jdbc/JdbcInputFormat.java b/jdbc-handler/src/main/java/org/apache/hive/storage/jdbc/JdbcInputFormat.java new file mode 100644 index 0000000..bfa7a26 --- /dev/null +++ b/jdbc-handler/src/main/java/org/apache/hive/storage/jdbc/JdbcInputFormat.java @@ -0,0 +1,108 @@ +/* + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hive.storage.jdbc; + +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hive.ql.io.HiveInputFormat; +import org.apache.hadoop.io.LongWritable; +import org.apache.hadoop.io.MapWritable; +import org.apache.hadoop.mapred.FileInputFormat; +import org.apache.hadoop.mapred.InputSplit; +import org.apache.hadoop.mapred.JobConf; +import org.apache.hadoop.mapred.RecordReader; +import org.apache.hadoop.mapred.Reporter; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import org.apache.hive.storage.jdbc.dao.DatabaseAccessor; +import org.apache.hive.storage.jdbc.dao.DatabaseAccessorFactory; + +import java.io.IOException; + +public class JdbcInputFormat extends HiveInputFormat<LongWritable, MapWritable> { + + private static final Logger LOGGER = LoggerFactory.getLogger(JdbcInputFormat.class); + private DatabaseAccessor dbAccessor = null; + + + /** + * {@inheritDoc} + */ + @Override + public RecordReader<LongWritable, MapWritable> + getRecordReader(InputSplit split, JobConf job, Reporter reporter) throws IOException { + + if (!(split instanceof JdbcInputSplit)) { + throw new RuntimeException("Incompatible split type " + split.getClass().getName() + "."); + } + + return new JdbcRecordReader(job, (JdbcInputSplit) split); + } + + + /** + * {@inheritDoc} + */ + @Override + public InputSplit[] getSplits(JobConf job, int numSplits) throws IOException { + try { + if (numSplits <= 0) { + numSplits = 1; + } + LOGGER.debug("Creating {} input splits", numSplits); + if (dbAccessor == null) { + dbAccessor = DatabaseAccessorFactory.getAccessor(job); + } + + int numRecords = dbAccessor.getTotalNumberOfRecords(job); + int numRecordsPerSplit = numRecords / numSplits; + int numSplitsWithExtraRecords = numRecords % numSplits; + + LOGGER.debug("Num records = {}", numRecords); + InputSplit[] splits = new InputSplit[numSplits]; + Path[] tablePaths = FileInputFormat.getInputPaths(job); + + int offset = 0; + for (int i = 0; i < numSplits; i++) { + int numRecordsInThisSplit = numRecordsPerSplit; + if (i < numSplitsWithExtraRecords) { + numRecordsInThisSplit++; + } + + splits[i] = new JdbcInputSplit(numRecordsInThisSplit, offset, tablePaths[0]); + offset += numRecordsInThisSplit; + } + + return splits; + } + catch (Exception e) { + LOGGER.error("Error while splitting input data.", e); + throw new IOException(e); + } + } + + + /** + * For testing purposes only + * + * @param dbAccessor + * DatabaseAccessor object + */ + public void setDbAccessor(DatabaseAccessor dbAccessor) { + this.dbAccessor = dbAccessor; + } + +} http://git-wip-us.apache.org/repos/asf/hive/blob/12b27a35/jdbc-handler/src/main/java/org/apache/hive/storage/jdbc/JdbcInputSplit.java ---------------------------------------------------------------------- diff --git a/jdbc-handler/src/main/java/org/apache/hive/storage/jdbc/JdbcInputSplit.java b/jdbc-handler/src/main/java/org/apache/hive/storage/jdbc/JdbcInputSplit.java new file mode 100644 index 0000000..a691cc2 --- /dev/null +++ b/jdbc-handler/src/main/java/org/apache/hive/storage/jdbc/JdbcInputSplit.java @@ -0,0 +1,100 @@ +/* + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hive.storage.jdbc; + +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.mapred.FileSplit; +import org.apache.hadoop.mapred.InputSplit; + +import java.io.DataInput; +import java.io.DataOutput; +import java.io.IOException; + +public class JdbcInputSplit extends FileSplit implements InputSplit { + + private static final String[] EMPTY_ARRAY = new String[] {}; + + private int limit = 0; + private int offset = 0; + + + public JdbcInputSplit() { + super((Path) null, 0, 0, EMPTY_ARRAY); + + } + + + public JdbcInputSplit(long start, long end, Path dummyPath) { + super(dummyPath, 0, 0, EMPTY_ARRAY); + this.setLimit((int) start); + this.setOffset((int) end); + } + + + public JdbcInputSplit(int limit, int offset) { + super((Path) null, 0, 0, EMPTY_ARRAY); + this.limit = limit; + this.offset = offset; + } + + + @Override + public void write(DataOutput out) throws IOException { + super.write(out); + out.writeInt(limit); + out.writeInt(offset); + } + + + @Override + public void readFields(DataInput in) throws IOException { + super.readFields(in); + limit = in.readInt(); + offset = in.readInt(); + } + + + @Override + public long getLength() { + return limit; + } + + + @Override + public String[] getLocations() throws IOException { + return EMPTY_ARRAY; + } + + + public int getLimit() { + return limit; + } + + + public void setLimit(int limit) { + this.limit = limit; + } + + + public int getOffset() { + return offset; + } + + + public void setOffset(int offset) { + this.offset = offset; + } + +} http://git-wip-us.apache.org/repos/asf/hive/blob/12b27a35/jdbc-handler/src/main/java/org/apache/hive/storage/jdbc/JdbcOutputFormat.java ---------------------------------------------------------------------- diff --git a/jdbc-handler/src/main/java/org/apache/hive/storage/jdbc/JdbcOutputFormat.java b/jdbc-handler/src/main/java/org/apache/hive/storage/jdbc/JdbcOutputFormat.java new file mode 100644 index 0000000..26fb3cd --- /dev/null +++ b/jdbc-handler/src/main/java/org/apache/hive/storage/jdbc/JdbcOutputFormat.java @@ -0,0 +1,68 @@ +/* + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hive.storage.jdbc; + +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hive.ql.exec.FileSinkOperator.RecordWriter; +import org.apache.hadoop.hive.ql.io.HiveOutputFormat; +import org.apache.hadoop.io.MapWritable; +import org.apache.hadoop.io.NullWritable; +import org.apache.hadoop.io.Writable; +import org.apache.hadoop.mapred.JobConf; +import org.apache.hadoop.mapred.OutputFormat; +import org.apache.hadoop.util.Progressable; + +import java.io.IOException; +import java.util.Properties; + +public class JdbcOutputFormat implements OutputFormat<NullWritable, MapWritable>, + HiveOutputFormat<NullWritable, MapWritable> { + + /** + * {@inheritDoc} + */ + @Override + public RecordWriter getHiveRecordWriter(JobConf jc, + Path finalOutPath, + Class<? extends Writable> valueClass, + boolean isCompressed, + Properties tableProperties, + Progressable progress) throws IOException { + throw new UnsupportedOperationException("Write operations are not allowed."); + } + + + /** + * {@inheritDoc} + */ + @Override + public org.apache.hadoop.mapred.RecordWriter<NullWritable, MapWritable> getRecordWriter(FileSystem ignored, + JobConf job, + String name, + Progressable progress) throws IOException { + throw new UnsupportedOperationException("Write operations are not allowed."); + } + + + /** + * {@inheritDoc} + */ + @Override + public void checkOutputSpecs(FileSystem ignored, JobConf job) throws IOException { + // do nothing + } + +} http://git-wip-us.apache.org/repos/asf/hive/blob/12b27a35/jdbc-handler/src/main/java/org/apache/hive/storage/jdbc/JdbcRecordReader.java ---------------------------------------------------------------------- diff --git a/jdbc-handler/src/main/java/org/apache/hive/storage/jdbc/JdbcRecordReader.java b/jdbc-handler/src/main/java/org/apache/hive/storage/jdbc/JdbcRecordReader.java new file mode 100644 index 0000000..0a24bd9 --- /dev/null +++ b/jdbc-handler/src/main/java/org/apache/hive/storage/jdbc/JdbcRecordReader.java @@ -0,0 +1,133 @@ +/* + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hive.storage.jdbc; + +import org.apache.hadoop.io.LongWritable; +import org.apache.hadoop.io.MapWritable; +import org.apache.hadoop.io.Text; +import org.apache.hadoop.mapred.JobConf; +import org.apache.hadoop.mapred.RecordReader; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import org.apache.hive.storage.jdbc.dao.DatabaseAccessor; +import org.apache.hive.storage.jdbc.dao.DatabaseAccessorFactory; +import org.apache.hive.storage.jdbc.dao.JdbcRecordIterator; + +import java.io.IOException; +import java.util.Map; +import java.util.Map.Entry; + +public class JdbcRecordReader implements RecordReader<LongWritable, MapWritable> { + + private static final Logger LOGGER = LoggerFactory.getLogger(JdbcRecordReader.class); + private DatabaseAccessor dbAccessor = null; + private JdbcRecordIterator iterator = null; + private JdbcInputSplit split = null; + private JobConf conf = null; + private int pos = 0; + + + public JdbcRecordReader(JobConf conf, JdbcInputSplit split) { + LOGGER.debug("Initializing JdbcRecordReader"); + this.split = split; + this.conf = conf; + } + + + @Override + public boolean next(LongWritable key, MapWritable value) throws IOException { + try { + LOGGER.debug("JdbcRecordReader.next called"); + if (dbAccessor == null) { + dbAccessor = DatabaseAccessorFactory.getAccessor(conf); + iterator = dbAccessor.getRecordIterator(conf, split.getLimit(), split.getOffset()); + } + + if (iterator.hasNext()) { + LOGGER.debug("JdbcRecordReader has more records to read."); + key.set(pos); + pos++; + Map<String, String> record = iterator.next(); + if ((record != null) && (!record.isEmpty())) { + for (Entry<String, String> entry : record.entrySet()) { + value.put(new Text(entry.getKey()), new Text(entry.getValue())); + } + return true; + } + else { + LOGGER.debug("JdbcRecordReader got null record."); + return false; + } + } + else { + LOGGER.debug("JdbcRecordReader has no more records to read."); + return false; + } + } + catch (Exception e) { + LOGGER.error("An error occurred while reading the next record from DB.", e); + return false; + } + } + + + @Override + public LongWritable createKey() { + return new LongWritable(); + } + + + @Override + public MapWritable createValue() { + return new MapWritable(); + } + + + @Override + public long getPos() throws IOException { + return pos; + } + + + @Override + public void close() throws IOException { + if (iterator != null) { + iterator.close(); + } + } + + + @Override + public float getProgress() throws IOException { + if (split == null) { + return 0; + } + else { + return split.getLength() > 0 ? pos / (float) split.getLength() : 1.0f; + } + } + + + public void setDbAccessor(DatabaseAccessor dbAccessor) { + this.dbAccessor = dbAccessor; + } + + + public void setIterator(JdbcRecordIterator iterator) { + this.iterator = iterator; + } + +} http://git-wip-us.apache.org/repos/asf/hive/blob/12b27a35/jdbc-handler/src/main/java/org/apache/hive/storage/jdbc/JdbcSerDe.java ---------------------------------------------------------------------- diff --git a/jdbc-handler/src/main/java/org/apache/hive/storage/jdbc/JdbcSerDe.java b/jdbc-handler/src/main/java/org/apache/hive/storage/jdbc/JdbcSerDe.java new file mode 100644 index 0000000..f35c33d --- /dev/null +++ b/jdbc-handler/src/main/java/org/apache/hive/storage/jdbc/JdbcSerDe.java @@ -0,0 +1,164 @@ +/* + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hive.storage.jdbc; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hive.serde.serdeConstants; +import org.apache.hadoop.hive.serde2.AbstractSerDe; +import org.apache.hadoop.hive.serde2.SerDeException; +import org.apache.hadoop.hive.serde2.SerDeStats; +import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector; +import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorFactory; +import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector; +import org.apache.hadoop.hive.serde2.objectinspector.primitive.PrimitiveObjectInspectorFactory; +import org.apache.hadoop.io.MapWritable; +import org.apache.hadoop.io.Text; +import org.apache.hadoop.io.Writable; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import org.apache.hive.storage.jdbc.conf.JdbcStorageConfig; +import org.apache.hive.storage.jdbc.conf.JdbcStorageConfigManager; +import org.apache.hive.storage.jdbc.dao.DatabaseAccessor; +import org.apache.hive.storage.jdbc.dao.DatabaseAccessorFactory; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; +import java.util.Properties; + +public class JdbcSerDe extends AbstractSerDe { + + private static final Logger LOGGER = LoggerFactory.getLogger(JdbcSerDe.class); + + private StructObjectInspector objectInspector; + private int numColumns; + private String[] hiveColumnTypeArray; + private List<String> columnNames; + private List<String> row; + + + /* + * This method gets called multiple times by Hive. On some invocations, the properties will be empty. + * We need to detect when the properties are not empty to initialise the class variables. + * + * @see org.apache.hadoop.hive.serde2.Deserializer#initialize(org.apache.hadoop.conf.Configuration, java.util.Properties) + */ + @Override + public void initialize(Configuration conf, Properties tbl) throws SerDeException { + try { + LOGGER.debug("Initializing the SerDe"); + + // Hive cdh-4.3 does not provide the properties object on all calls + if (tbl.containsKey(JdbcStorageConfig.DATABASE_TYPE.getPropertyName())) { + Configuration tableConfig = JdbcStorageConfigManager.convertPropertiesToConfiguration(tbl); + + DatabaseAccessor dbAccessor = DatabaseAccessorFactory.getAccessor(tableConfig); + columnNames = dbAccessor.getColumnNames(tableConfig); + numColumns = columnNames.size(); + + String[] hiveColumnNameArray = parseProperty(tbl.getProperty(serdeConstants.LIST_COLUMNS), ","); + if (numColumns != hiveColumnNameArray.length) { + throw new SerDeException("Expected " + numColumns + " columns. Table definition has " + + hiveColumnNameArray.length + " columns"); + } + List<String> hiveColumnNames = Arrays.asList(hiveColumnNameArray); + + hiveColumnTypeArray = parseProperty(tbl.getProperty(serdeConstants.LIST_COLUMN_TYPES), ":"); + if (hiveColumnTypeArray.length == 0) { + throw new SerDeException("Received an empty Hive column type definition"); + } + + List<ObjectInspector> fieldInspectors = new ArrayList<ObjectInspector>(numColumns); + for (int i = 0; i < numColumns; i++) { + fieldInspectors.add(PrimitiveObjectInspectorFactory.javaStringObjectInspector); + } + + objectInspector = + ObjectInspectorFactory.getStandardStructObjectInspector(hiveColumnNames, + fieldInspectors); + row = new ArrayList<String>(numColumns); + } + } + catch (Exception e) { + LOGGER.error("Caught exception while initializing the SqlSerDe", e); + throw new SerDeException(e); + } + } + + + private String[] parseProperty(String propertyValue, String delimiter) { + if ((propertyValue == null) || (propertyValue.trim().isEmpty())) { + return new String[] {}; + } + + return propertyValue.split(delimiter); + } + + + @Override + public Object deserialize(Writable blob) throws SerDeException { + LOGGER.debug("Deserializing from SerDe"); + if (!(blob instanceof MapWritable)) { + throw new SerDeException("Expected MapWritable. Got " + blob.getClass().getName()); + } + + if ((row == null) || (columnNames == null)) { + throw new SerDeException("JDBC SerDe hasn't been initialized properly"); + } + + row.clear(); + MapWritable input = (MapWritable) blob; + Text columnKey = new Text(); + + for (int i = 0; i < numColumns; i++) { + columnKey.set(columnNames.get(i)); + Writable value = input.get(columnKey); + if (value == null) { + row.add(null); + } + else { + row.add(value.toString()); + } + } + + return row; + } + + + @Override + public ObjectInspector getObjectInspector() throws SerDeException { + return objectInspector; + } + + + @Override + public Class<? extends Writable> getSerializedClass() { + return MapWritable.class; + } + + + @Override + public Writable serialize(Object obj, ObjectInspector objInspector) throws SerDeException { + throw new UnsupportedOperationException("Writes are not allowed"); + } + + + @Override + public SerDeStats getSerDeStats() { + return null; + } + +} http://git-wip-us.apache.org/repos/asf/hive/blob/12b27a35/jdbc-handler/src/main/java/org/apache/hive/storage/jdbc/JdbcStorageHandler.java ---------------------------------------------------------------------- diff --git a/jdbc-handler/src/main/java/org/apache/hive/storage/jdbc/JdbcStorageHandler.java b/jdbc-handler/src/main/java/org/apache/hive/storage/jdbc/JdbcStorageHandler.java new file mode 100644 index 0000000..946ee0c --- /dev/null +++ b/jdbc-handler/src/main/java/org/apache/hive/storage/jdbc/JdbcStorageHandler.java @@ -0,0 +1,106 @@ +/* + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hive.storage.jdbc; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hive.metastore.HiveMetaHook; +import org.apache.hadoop.hive.ql.metadata.HiveException; +import org.apache.hadoop.hive.ql.metadata.HiveStorageHandler; +import org.apache.hadoop.hive.ql.plan.TableDesc; +import org.apache.hadoop.hive.ql.security.authorization.HiveAuthorizationProvider; +import org.apache.hadoop.hive.serde2.AbstractSerDe; +import org.apache.hadoop.mapred.InputFormat; +import org.apache.hadoop.mapred.JobConf; +import org.apache.hadoop.mapred.OutputFormat; + +import org.apache.hive.storage.jdbc.conf.JdbcStorageConfigManager; + +import java.util.Map; +import java.util.Properties; + +public class JdbcStorageHandler implements HiveStorageHandler { + + private Configuration conf; + + + @Override + public void setConf(Configuration conf) { + this.conf = conf; + } + + + @Override + public Configuration getConf() { + return this.conf; + } + + + @SuppressWarnings("rawtypes") + @Override + public Class<? extends InputFormat> getInputFormatClass() { + return JdbcInputFormat.class; + } + + + @SuppressWarnings("rawtypes") + @Override + public Class<? extends OutputFormat> getOutputFormatClass() { + return JdbcOutputFormat.class; + } + + + @Override + public Class<? extends AbstractSerDe> getSerDeClass() { + return JdbcSerDe.class; + } + + + @Override + public HiveMetaHook getMetaHook() { + return null; + } + + + @Override + public void configureTableJobProperties(TableDesc tableDesc, Map<String, String> jobProperties) { + Properties properties = tableDesc.getProperties(); + JdbcStorageConfigManager.copyConfigurationToJob(properties, jobProperties); + } + + + @Override + public void configureInputJobProperties(TableDesc tableDesc, Map<String, String> jobProperties) { + Properties properties = tableDesc.getProperties(); + JdbcStorageConfigManager.copyConfigurationToJob(properties, jobProperties); + } + + + @Override + public void configureOutputJobProperties(TableDesc tableDesc, Map<String, String> jobProperties) { + // Nothing to do here... + } + + + @Override + public HiveAuthorizationProvider getAuthorizationProvider() throws HiveException { + return null; + } + + @Override + public void configureJobConf(TableDesc tableDesc, JobConf jobConf) { + + } + +} http://git-wip-us.apache.org/repos/asf/hive/blob/12b27a35/jdbc-handler/src/main/java/org/apache/hive/storage/jdbc/QueryConditionBuilder.java ---------------------------------------------------------------------- diff --git a/jdbc-handler/src/main/java/org/apache/hive/storage/jdbc/QueryConditionBuilder.java b/jdbc-handler/src/main/java/org/apache/hive/storage/jdbc/QueryConditionBuilder.java new file mode 100644 index 0000000..194fad8 --- /dev/null +++ b/jdbc-handler/src/main/java/org/apache/hive/storage/jdbc/QueryConditionBuilder.java @@ -0,0 +1,186 @@ +/* + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hive.storage.jdbc; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hive.ql.plan.ExprNodeColumnDesc; +import org.apache.hadoop.hive.ql.plan.ExprNodeDesc; +import org.apache.hadoop.hive.ql.plan.TableScanDesc; +import org.apache.hadoop.hive.serde.serdeConstants; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import org.apache.hive.storage.jdbc.conf.JdbcStorageConfig; + +import java.beans.XMLDecoder; +import java.io.ByteArrayInputStream; +import java.util.HashMap; +import java.util.Map; + +/** + * Translates the hive query condition into a condition that can be run on the underlying database + */ +public class QueryConditionBuilder { + + private static final Logger LOGGER = LoggerFactory.getLogger(QueryConditionBuilder.class); + private static final String EMPTY_STRING = ""; + private static QueryConditionBuilder instance = null; + + + public static QueryConditionBuilder getInstance() { + if (instance == null) { + instance = new QueryConditionBuilder(); + } + + return instance; + } + + + private QueryConditionBuilder() { + + } + + + public String buildCondition(Configuration conf) { + if (conf == null) { + return EMPTY_STRING; + } + + String filterXml = conf.get(TableScanDesc.FILTER_EXPR_CONF_STR); + String hiveColumns = conf.get(serdeConstants.LIST_COLUMNS); + String columnMapping = conf.get(JdbcStorageConfig.COLUMN_MAPPING.getPropertyName()); + + if ((filterXml == null) || ((columnMapping == null) && (hiveColumns == null))) { + return EMPTY_STRING; + } + + if (hiveColumns == null) { + hiveColumns = ""; + } + + Map<String, String> columnMap = buildColumnMapping(columnMapping, hiveColumns); + String condition = createConditionString(filterXml, columnMap); + return condition; + } + + + /* + * Build a Hive-to-X column mapping, + * + */ + private Map<String, String> buildColumnMapping(String columnMapping, String hiveColumns) { + if ((columnMapping == null) || (columnMapping.trim().isEmpty())) { + return createIdentityMap(hiveColumns); + } + + Map<String, String> columnMap = new HashMap<String, String>(); + String[] mappingPairs = columnMapping.toLowerCase().split(","); + for (String mapPair : mappingPairs) { + String[] columns = mapPair.split("="); + columnMap.put(columns[0].trim(), columns[1].trim()); + } + + return columnMap; + } + + + /* + * When no mapping is defined, it is assumed that the hive column names are equivalent to the column names in the + * underlying table + */ + private Map<String, String> createIdentityMap(String hiveColumns) { + Map<String, String> columnMap = new HashMap<String, String>(); + String[] columns = hiveColumns.toLowerCase().split(","); + + for (String col : columns) { + columnMap.put(col.trim(), col.trim()); + } + + return columnMap; + } + + + /* + * Walk to Hive AST and translate the hive column names to their equivalent mappings. This is basically a cheat. + * + */ + private String createConditionString(String filterXml, Map<String, String> columnMap) { + if ((filterXml == null) || (filterXml.trim().isEmpty())) { + return EMPTY_STRING; + } + + try (XMLDecoder decoder = new XMLDecoder(new ByteArrayInputStream(filterXml.getBytes("UTF-8")))) { + Object object = decoder.readObject(); + if (!(object instanceof ExprNodeDesc)) { + LOGGER.error("Deserialized filter expression is not of the expected type"); + throw new RuntimeException("Deserialized filter expression is not of the expected type"); + } + + ExprNodeDesc conditionNode = (ExprNodeDesc) object; + walkTreeAndTranslateColumnNames(conditionNode, columnMap); + return conditionNode.getExprString(); + } + catch (Exception e) { + LOGGER.error("Error during condition build", e); + return EMPTY_STRING; + } + } + + + /* + * Translate column names by walking the AST + */ + private void walkTreeAndTranslateColumnNames(ExprNodeDesc node, Map<String, String> columnMap) { + if (node == null) { + return; + } + + if (node instanceof ExprNodeColumnDesc) { + ExprNodeColumnDesc column = (ExprNodeColumnDesc) node; + String hiveColumnName = column.getColumn().toLowerCase(); + if (columnMap.containsKey(hiveColumnName)) { + String dbColumnName = columnMap.get(hiveColumnName); + String finalName = formatColumnName(dbColumnName); + column.setColumn(finalName); + } + } + else { + if (node.getChildren() != null) { + for (ExprNodeDesc childNode : node.getChildren()) { + walkTreeAndTranslateColumnNames(childNode, columnMap); + } + } + } + } + + + /** + * This is an ugly hack for handling date column types because Hive doesn't have a built-in type for dates + */ + private String formatColumnName(String dbColumnName) { + if (dbColumnName.contains(":")) { + String[] typeSplit = dbColumnName.split(":"); + + if (typeSplit[1].equalsIgnoreCase("date")) { + return "{d " + typeSplit[0] + "}"; + } + + return typeSplit[0]; + } + else { + return dbColumnName; + } + } +} http://git-wip-us.apache.org/repos/asf/hive/blob/12b27a35/jdbc-handler/src/main/java/org/apache/hive/storage/jdbc/conf/CustomConfigManager.java ---------------------------------------------------------------------- diff --git a/jdbc-handler/src/main/java/org/apache/hive/storage/jdbc/conf/CustomConfigManager.java b/jdbc-handler/src/main/java/org/apache/hive/storage/jdbc/conf/CustomConfigManager.java new file mode 100644 index 0000000..7a787d4 --- /dev/null +++ b/jdbc-handler/src/main/java/org/apache/hive/storage/jdbc/conf/CustomConfigManager.java @@ -0,0 +1,23 @@ +/* + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hive.storage.jdbc.conf; + +import java.util.Properties; + +public interface CustomConfigManager { + + void checkRequiredProperties(Properties properties); + +} http://git-wip-us.apache.org/repos/asf/hive/blob/12b27a35/jdbc-handler/src/main/java/org/apache/hive/storage/jdbc/conf/CustomConfigManagerFactory.java ---------------------------------------------------------------------- diff --git a/jdbc-handler/src/main/java/org/apache/hive/storage/jdbc/conf/CustomConfigManagerFactory.java b/jdbc-handler/src/main/java/org/apache/hive/storage/jdbc/conf/CustomConfigManagerFactory.java new file mode 100644 index 0000000..eed0dff --- /dev/null +++ b/jdbc-handler/src/main/java/org/apache/hive/storage/jdbc/conf/CustomConfigManagerFactory.java @@ -0,0 +1,50 @@ +/* + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hive.storage.jdbc.conf; + +import java.util.Properties; + +/** + * Factory for creating custom config managers based on the database type + */ +public class CustomConfigManagerFactory { + + private static CustomConfigManager nopConfigManager = new NopCustomConfigManager(); + + + private CustomConfigManagerFactory() { + } + + + public static CustomConfigManager getCustomConfigManagerFor(DatabaseType databaseType) { + switch (databaseType) { + case MYSQL: + return nopConfigManager; + + default: + return nopConfigManager; + } + } + + private static class NopCustomConfigManager implements CustomConfigManager { + + @Override + public void checkRequiredProperties(Properties properties) { + return; + } + + } + +} http://git-wip-us.apache.org/repos/asf/hive/blob/12b27a35/jdbc-handler/src/main/java/org/apache/hive/storage/jdbc/conf/DatabaseType.java ---------------------------------------------------------------------- diff --git a/jdbc-handler/src/main/java/org/apache/hive/storage/jdbc/conf/DatabaseType.java b/jdbc-handler/src/main/java/org/apache/hive/storage/jdbc/conf/DatabaseType.java new file mode 100644 index 0000000..a2bdbe4 --- /dev/null +++ b/jdbc-handler/src/main/java/org/apache/hive/storage/jdbc/conf/DatabaseType.java @@ -0,0 +1,21 @@ +/* + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hive.storage.jdbc.conf; + +public enum DatabaseType { + MYSQL, + H2, + DERBY +} http://git-wip-us.apache.org/repos/asf/hive/blob/12b27a35/jdbc-handler/src/main/java/org/apache/hive/storage/jdbc/conf/JdbcStorageConfig.java ---------------------------------------------------------------------- diff --git a/jdbc-handler/src/main/java/org/apache/hive/storage/jdbc/conf/JdbcStorageConfig.java b/jdbc-handler/src/main/java/org/apache/hive/storage/jdbc/conf/JdbcStorageConfig.java new file mode 100644 index 0000000..ff6357d --- /dev/null +++ b/jdbc-handler/src/main/java/org/apache/hive/storage/jdbc/conf/JdbcStorageConfig.java @@ -0,0 +1,49 @@ +/* + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hive.storage.jdbc.conf; + +public enum JdbcStorageConfig { + DATABASE_TYPE("database.type", true), + JDBC_URL("jdbc.url", true), + JDBC_DRIVER_CLASS("jdbc.driver", true), + QUERY("query", true), + JDBC_FETCH_SIZE("jdbc.fetch.size", false), + COLUMN_MAPPING("column.mapping", false); + + private String propertyName; + private boolean required = false; + + + JdbcStorageConfig(String propertyName, boolean required) { + this.propertyName = propertyName; + this.required = required; + } + + + JdbcStorageConfig(String propertyName) { + this.propertyName = propertyName; + } + + + public String getPropertyName() { + return JdbcStorageConfigManager.CONFIG_PREFIX + "." + propertyName; + } + + + public boolean isRequired() { + return required; + } + +} http://git-wip-us.apache.org/repos/asf/hive/blob/12b27a35/jdbc-handler/src/main/java/org/apache/hive/storage/jdbc/conf/JdbcStorageConfigManager.java ---------------------------------------------------------------------- diff --git a/jdbc-handler/src/main/java/org/apache/hive/storage/jdbc/conf/JdbcStorageConfigManager.java b/jdbc-handler/src/main/java/org/apache/hive/storage/jdbc/conf/JdbcStorageConfigManager.java new file mode 100644 index 0000000..5267cda --- /dev/null +++ b/jdbc-handler/src/main/java/org/apache/hive/storage/jdbc/conf/JdbcStorageConfigManager.java @@ -0,0 +1,97 @@ +/* + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hive.storage.jdbc.conf; + +import org.apache.hadoop.conf.Configuration; + +import org.apache.hive.storage.jdbc.QueryConditionBuilder; + +import java.util.EnumSet; +import java.util.Map; +import java.util.Map.Entry; +import java.util.Properties; + +/** + * Main configuration handler class + */ +public class JdbcStorageConfigManager { + + public static final String CONFIG_PREFIX = "hive.sql"; + private static final EnumSet<JdbcStorageConfig> DEFAULT_REQUIRED_PROPERTIES = + EnumSet.of(JdbcStorageConfig.DATABASE_TYPE, + JdbcStorageConfig.JDBC_URL, + JdbcStorageConfig.JDBC_DRIVER_CLASS, + JdbcStorageConfig.QUERY); + + + private JdbcStorageConfigManager() { + } + + + public static void copyConfigurationToJob(Properties props, Map<String, String> jobProps) { + checkRequiredPropertiesAreDefined(props); + for (Entry<Object, Object> entry : props.entrySet()) { + jobProps.put(String.valueOf(entry.getKey()), String.valueOf(entry.getValue())); + } + } + + + public static Configuration convertPropertiesToConfiguration(Properties props) { + checkRequiredPropertiesAreDefined(props); + Configuration conf = new Configuration(); + + for (Entry<Object, Object> entry : props.entrySet()) { + conf.set(String.valueOf(entry.getKey()), String.valueOf(entry.getValue())); + } + + return conf; + } + + + private static void checkRequiredPropertiesAreDefined(Properties props) { + for (JdbcStorageConfig configKey : DEFAULT_REQUIRED_PROPERTIES) { + String propertyKey = configKey.getPropertyName(); + if ((props == null) || (!props.containsKey(propertyKey)) || (isEmptyString(props.getProperty(propertyKey)))) { + throw new IllegalArgumentException("Property " + propertyKey + " is required."); + } + } + + DatabaseType dbType = DatabaseType.valueOf(props.getProperty(JdbcStorageConfig.DATABASE_TYPE.getPropertyName())); + CustomConfigManager configManager = CustomConfigManagerFactory.getCustomConfigManagerFor(dbType); + configManager.checkRequiredProperties(props); + } + + + public static String getConfigValue(JdbcStorageConfig key, Configuration config) { + return config.get(key.getPropertyName()); + } + + + public static String getQueryToExecute(Configuration config) { + String query = config.get(JdbcStorageConfig.QUERY.getPropertyName()); + String hiveFilterCondition = QueryConditionBuilder.getInstance().buildCondition(config); + if ((hiveFilterCondition != null) && (!hiveFilterCondition.trim().isEmpty())) { + query = query + " WHERE " + hiveFilterCondition; + } + + return query; + } + + + private static boolean isEmptyString(String value) { + return ((value == null) || (value.trim().isEmpty())); + } + +} http://git-wip-us.apache.org/repos/asf/hive/blob/12b27a35/jdbc-handler/src/main/java/org/apache/hive/storage/jdbc/dao/DatabaseAccessor.java ---------------------------------------------------------------------- diff --git a/jdbc-handler/src/main/java/org/apache/hive/storage/jdbc/dao/DatabaseAccessor.java b/jdbc-handler/src/main/java/org/apache/hive/storage/jdbc/dao/DatabaseAccessor.java new file mode 100644 index 0000000..f50d53e --- /dev/null +++ b/jdbc-handler/src/main/java/org/apache/hive/storage/jdbc/dao/DatabaseAccessor.java @@ -0,0 +1,34 @@ +/* + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hive.storage.jdbc.dao; + +import org.apache.hadoop.conf.Configuration; + +import org.apache.hive.storage.jdbc.exception.HiveJdbcDatabaseAccessException; + +import java.util.List; + +public interface DatabaseAccessor { + + List<String> getColumnNames(Configuration conf) throws HiveJdbcDatabaseAccessException; + + + int getTotalNumberOfRecords(Configuration conf) throws HiveJdbcDatabaseAccessException; + + + JdbcRecordIterator + getRecordIterator(Configuration conf, int limit, int offset) throws HiveJdbcDatabaseAccessException; + +} http://git-wip-us.apache.org/repos/asf/hive/blob/12b27a35/jdbc-handler/src/main/java/org/apache/hive/storage/jdbc/dao/DatabaseAccessorFactory.java ---------------------------------------------------------------------- diff --git a/jdbc-handler/src/main/java/org/apache/hive/storage/jdbc/dao/DatabaseAccessorFactory.java b/jdbc-handler/src/main/java/org/apache/hive/storage/jdbc/dao/DatabaseAccessorFactory.java new file mode 100644 index 0000000..7dc690f --- /dev/null +++ b/jdbc-handler/src/main/java/org/apache/hive/storage/jdbc/dao/DatabaseAccessorFactory.java @@ -0,0 +1,53 @@ +/* + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hive.storage.jdbc.dao; + +import org.apache.hadoop.conf.Configuration; + +import org.apache.hive.storage.jdbc.conf.DatabaseType; +import org.apache.hive.storage.jdbc.conf.JdbcStorageConfig; + +/** + * Factory for creating the correct DatabaseAccessor class for the job + */ +public class DatabaseAccessorFactory { + + private DatabaseAccessorFactory() { + } + + + public static DatabaseAccessor getAccessor(DatabaseType dbType) { + + DatabaseAccessor accessor = null; + switch (dbType) { + case MYSQL: + accessor = new MySqlDatabaseAccessor(); + break; + + default: + accessor = new GenericJdbcDatabaseAccessor(); + break; + } + + return accessor; + } + + + public static DatabaseAccessor getAccessor(Configuration conf) { + DatabaseType dbType = DatabaseType.valueOf(conf.get(JdbcStorageConfig.DATABASE_TYPE.getPropertyName())); + return getAccessor(dbType); + } + +} http://git-wip-us.apache.org/repos/asf/hive/blob/12b27a35/jdbc-handler/src/main/java/org/apache/hive/storage/jdbc/dao/GenericJdbcDatabaseAccessor.java ---------------------------------------------------------------------- diff --git a/jdbc-handler/src/main/java/org/apache/hive/storage/jdbc/dao/GenericJdbcDatabaseAccessor.java b/jdbc-handler/src/main/java/org/apache/hive/storage/jdbc/dao/GenericJdbcDatabaseAccessor.java new file mode 100644 index 0000000..b655aec --- /dev/null +++ b/jdbc-handler/src/main/java/org/apache/hive/storage/jdbc/dao/GenericJdbcDatabaseAccessor.java @@ -0,0 +1,253 @@ +/* + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hive.storage.jdbc.dao; + +import org.apache.commons.dbcp.BasicDataSourceFactory; +import org.apache.hadoop.conf.Configuration; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import org.apache.hive.storage.jdbc.conf.JdbcStorageConfig; +import org.apache.hive.storage.jdbc.conf.JdbcStorageConfigManager; +import org.apache.hive.storage.jdbc.exception.HiveJdbcDatabaseAccessException; + +import javax.sql.DataSource; + +import java.sql.Connection; +import java.sql.PreparedStatement; +import java.sql.ResultSet; +import java.sql.ResultSetMetaData; +import java.sql.SQLException; +import java.util.ArrayList; +import java.util.List; +import java.util.Map; +import java.util.Map.Entry; +import java.util.Properties; + +/** + * A data accessor that should in theory work with all JDBC compliant database drivers. + */ +public class GenericJdbcDatabaseAccessor implements DatabaseAccessor { + + protected static final String DBCP_CONFIG_PREFIX = JdbcStorageConfigManager.CONFIG_PREFIX + ".dbcp"; + protected static final int DEFAULT_FETCH_SIZE = 1000; + protected static final Logger LOGGER = LoggerFactory.getLogger(GenericJdbcDatabaseAccessor.class); + protected DataSource dbcpDataSource = null; + + + public GenericJdbcDatabaseAccessor() { + } + + + @Override + public List<String> getColumnNames(Configuration conf) throws HiveJdbcDatabaseAccessException { + Connection conn = null; + PreparedStatement ps = null; + ResultSet rs = null; + + try { + initializeDatabaseConnection(conf); + String sql = JdbcStorageConfigManager.getQueryToExecute(conf); + String metadataQuery = addLimitToQuery(sql, 1); + LOGGER.debug("Query to execute is [{}]", metadataQuery); + + conn = dbcpDataSource.getConnection(); + ps = conn.prepareStatement(metadataQuery); + rs = ps.executeQuery(); + + ResultSetMetaData metadata = rs.getMetaData(); + int numColumns = metadata.getColumnCount(); + List<String> columnNames = new ArrayList<String>(numColumns); + for (int i = 0; i < numColumns; i++) { + columnNames.add(metadata.getColumnName(i + 1)); + } + + return columnNames; + } + catch (Exception e) { + LOGGER.error("Error while trying to get column names.", e); + throw new HiveJdbcDatabaseAccessException("Error while trying to get column names: " + e.getMessage(), e); + } + finally { + cleanupResources(conn, ps, rs); + } + + } + + + @Override + public int getTotalNumberOfRecords(Configuration conf) throws HiveJdbcDatabaseAccessException { + Connection conn = null; + PreparedStatement ps = null; + ResultSet rs = null; + + try { + initializeDatabaseConnection(conf); + String sql = JdbcStorageConfigManager.getQueryToExecute(conf); + String countQuery = "SELECT COUNT(*) FROM (" + sql + ") tmptable"; + LOGGER.debug("Query to execute is [{}]", countQuery); + + conn = dbcpDataSource.getConnection(); + ps = conn.prepareStatement(countQuery); + rs = ps.executeQuery(); + if (rs.next()) { + return rs.getInt(1); + } + else { + LOGGER.warn("The count query did not return any results.", countQuery); + throw new HiveJdbcDatabaseAccessException("Count query did not return any results."); + } + } + catch (HiveJdbcDatabaseAccessException he) { + throw he; + } + catch (Exception e) { + LOGGER.error("Caught exception while trying to get the number of records", e); + throw new HiveJdbcDatabaseAccessException(e); + } + finally { + cleanupResources(conn, ps, rs); + } + } + + + @Override + public JdbcRecordIterator + getRecordIterator(Configuration conf, int limit, int offset) throws HiveJdbcDatabaseAccessException { + + Connection conn = null; + PreparedStatement ps = null; + ResultSet rs = null; + + try { + initializeDatabaseConnection(conf); + String sql = JdbcStorageConfigManager.getQueryToExecute(conf); + String limitQuery = addLimitAndOffsetToQuery(sql, limit, offset); + LOGGER.debug("Query to execute is [{}]", limitQuery); + + conn = dbcpDataSource.getConnection(); + ps = conn.prepareStatement(limitQuery, ResultSet.TYPE_FORWARD_ONLY, ResultSet.CONCUR_READ_ONLY); + ps.setFetchSize(getFetchSize(conf)); + rs = ps.executeQuery(); + + return new JdbcRecordIterator(conn, ps, rs); + } + catch (Exception e) { + LOGGER.error("Caught exception while trying to execute query", e); + cleanupResources(conn, ps, rs); + throw new HiveJdbcDatabaseAccessException("Caught exception while trying to execute query", e); + } + } + + + /** + * Uses generic JDBC escape functions to add a limit and offset clause to a query string + * + * @param sql + * @param limit + * @param offset + * @return + */ + protected String addLimitAndOffsetToQuery(String sql, int limit, int offset) { + if (offset == 0) { + return addLimitToQuery(sql, limit); + } + else { + return sql + " {LIMIT " + limit + " OFFSET " + offset + "}"; + } + } + + + /* + * Uses generic JDBC escape functions to add a limit clause to a query string + */ + protected String addLimitToQuery(String sql, int limit) { + return sql + " {LIMIT " + limit + "}"; + } + + + protected void cleanupResources(Connection conn, PreparedStatement ps, ResultSet rs) { + try { + if (rs != null) { + rs.close(); + } + } catch (SQLException e) { + LOGGER.warn("Caught exception during resultset cleanup.", e); + } + + try { + if (ps != null) { + ps.close(); + } + } catch (SQLException e) { + LOGGER.warn("Caught exception during statement cleanup.", e); + } + + try { + if (conn != null) { + conn.close(); + } + } catch (SQLException e) { + LOGGER.warn("Caught exception during connection cleanup.", e); + } + } + + protected void initializeDatabaseConnection(Configuration conf) throws Exception { + if (dbcpDataSource == null) { + synchronized (this) { + if (dbcpDataSource == null) { + Properties props = getConnectionPoolProperties(conf); + dbcpDataSource = BasicDataSourceFactory.createDataSource(props); + } + } + } + } + + + protected Properties getConnectionPoolProperties(Configuration conf) { + // Create the default properties object + Properties dbProperties = getDefaultDBCPProperties(); + + // override with user defined properties + Map<String, String> userProperties = conf.getValByRegex(DBCP_CONFIG_PREFIX + "\\.*"); + if ((userProperties != null) && (!userProperties.isEmpty())) { + for (Entry<String, String> entry : userProperties.entrySet()) { + dbProperties.put(entry.getKey().replaceFirst(DBCP_CONFIG_PREFIX + "\\.", ""), entry.getValue()); + } + } + + // essential properties that shouldn't be overridden by users + dbProperties.put("url", conf.get(JdbcStorageConfig.JDBC_URL.getPropertyName())); + dbProperties.put("driverClassName", conf.get(JdbcStorageConfig.JDBC_DRIVER_CLASS.getPropertyName())); + dbProperties.put("type", "javax.sql.DataSource"); + return dbProperties; + } + + + protected Properties getDefaultDBCPProperties() { + Properties props = new Properties(); + props.put("initialSize", "1"); + props.put("maxActive", "3"); + props.put("maxIdle", "0"); + props.put("maxWait", "10000"); + props.put("timeBetweenEvictionRunsMillis", "30000"); + return props; + } + + + protected int getFetchSize(Configuration conf) { + return conf.getInt(JdbcStorageConfig.JDBC_FETCH_SIZE.getPropertyName(), DEFAULT_FETCH_SIZE); + } +} http://git-wip-us.apache.org/repos/asf/hive/blob/12b27a35/jdbc-handler/src/main/java/org/apache/hive/storage/jdbc/dao/JdbcRecordIterator.java ---------------------------------------------------------------------- diff --git a/jdbc-handler/src/main/java/org/apache/hive/storage/jdbc/dao/JdbcRecordIterator.java b/jdbc-handler/src/main/java/org/apache/hive/storage/jdbc/dao/JdbcRecordIterator.java new file mode 100644 index 0000000..4262502 --- /dev/null +++ b/jdbc-handler/src/main/java/org/apache/hive/storage/jdbc/dao/JdbcRecordIterator.java @@ -0,0 +1,104 @@ +/* + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hive.storage.jdbc.dao; + +import org.apache.hadoop.io.NullWritable; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.sql.Connection; +import java.sql.PreparedStatement; +import java.sql.ResultSet; +import java.sql.ResultSetMetaData; +import java.util.HashMap; +import java.util.Iterator; +import java.util.Map; + +/** + * An iterator that allows iterating through a SQL resultset. Includes methods to clear up resources. + */ +public class JdbcRecordIterator implements Iterator<Map<String, String>> { + + private static final Logger LOGGER = LoggerFactory.getLogger(JdbcRecordIterator.class); + + private Connection conn; + private PreparedStatement ps; + private ResultSet rs; + + + public JdbcRecordIterator(Connection conn, PreparedStatement ps, ResultSet rs) { + this.conn = conn; + this.ps = ps; + this.rs = rs; + } + + + @Override + public boolean hasNext() { + try { + return rs.next(); + } + catch (Exception se) { + LOGGER.warn("hasNext() threw exception", se); + return false; + } + } + + + @Override + public Map<String, String> next() { + try { + ResultSetMetaData metadata = rs.getMetaData(); + int numColumns = metadata.getColumnCount(); + Map<String, String> record = new HashMap<String, String>(numColumns); + for (int i = 0; i < numColumns; i++) { + String key = metadata.getColumnName(i + 1); + String value = rs.getString(i + 1); + if (value == null) { + value = NullWritable.get().toString(); + } + record.put(key, value); + } + + return record; + } + catch (Exception e) { + LOGGER.warn("next() threw exception", e); + return null; + } + } + + + @Override + public void remove() { + throw new UnsupportedOperationException("Remove is not supported"); + } + + + /** + * Release all DB resources + */ + public void close() { + try { + rs.close(); + ps.close(); + conn.close(); + } + catch (Exception e) { + LOGGER.warn("Caught exception while trying to close database objects", e); + } + } + +} http://git-wip-us.apache.org/repos/asf/hive/blob/12b27a35/jdbc-handler/src/main/java/org/apache/hive/storage/jdbc/dao/MySqlDatabaseAccessor.java ---------------------------------------------------------------------- diff --git a/jdbc-handler/src/main/java/org/apache/hive/storage/jdbc/dao/MySqlDatabaseAccessor.java b/jdbc-handler/src/main/java/org/apache/hive/storage/jdbc/dao/MySqlDatabaseAccessor.java new file mode 100644 index 0000000..7d821d8 --- /dev/null +++ b/jdbc-handler/src/main/java/org/apache/hive/storage/jdbc/dao/MySqlDatabaseAccessor.java @@ -0,0 +1,39 @@ +/* + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hive.storage.jdbc.dao; + +/** + * MySQL specific data accessor. This is needed because MySQL JDBC drivers do not support generic LIMIT and OFFSET + * escape functions + */ +public class MySqlDatabaseAccessor extends GenericJdbcDatabaseAccessor { + + @Override + protected String addLimitAndOffsetToQuery(String sql, int limit, int offset) { + if (offset == 0) { + return addLimitToQuery(sql, limit); + } + else { + return sql + " LIMIT " + limit + "," + offset; + } + } + + + @Override + protected String addLimitToQuery(String sql, int limit) { + return sql + " LIMIT " + limit; + } + +} http://git-wip-us.apache.org/repos/asf/hive/blob/12b27a35/jdbc-handler/src/main/java/org/apache/hive/storage/jdbc/exception/HiveJdbcDatabaseAccessException.java ---------------------------------------------------------------------- diff --git a/jdbc-handler/src/main/java/org/apache/hive/storage/jdbc/exception/HiveJdbcDatabaseAccessException.java b/jdbc-handler/src/main/java/org/apache/hive/storage/jdbc/exception/HiveJdbcDatabaseAccessException.java new file mode 100644 index 0000000..cde859f --- /dev/null +++ b/jdbc-handler/src/main/java/org/apache/hive/storage/jdbc/exception/HiveJdbcDatabaseAccessException.java @@ -0,0 +1,41 @@ +/* + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hive.storage.jdbc.exception; + +public class HiveJdbcDatabaseAccessException extends HiveJdbcStorageException { + + private static final long serialVersionUID = -4106595742876276803L; + + + public HiveJdbcDatabaseAccessException() { + super(); + } + + + public HiveJdbcDatabaseAccessException(String message, Throwable cause) { + super(message, cause); + } + + + public HiveJdbcDatabaseAccessException(String message) { + super(message); + } + + + public HiveJdbcDatabaseAccessException(Throwable cause) { + super(cause); + } + +} http://git-wip-us.apache.org/repos/asf/hive/blob/12b27a35/jdbc-handler/src/main/java/org/apache/hive/storage/jdbc/exception/HiveJdbcStorageException.java ---------------------------------------------------------------------- diff --git a/jdbc-handler/src/main/java/org/apache/hive/storage/jdbc/exception/HiveJdbcStorageException.java b/jdbc-handler/src/main/java/org/apache/hive/storage/jdbc/exception/HiveJdbcStorageException.java new file mode 100644 index 0000000..1317838 --- /dev/null +++ b/jdbc-handler/src/main/java/org/apache/hive/storage/jdbc/exception/HiveJdbcStorageException.java @@ -0,0 +1,40 @@ +/* + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hive.storage.jdbc.exception; + +public class HiveJdbcStorageException extends Exception { + + private static final long serialVersionUID = 4858210651037826401L; + + + public HiveJdbcStorageException() { + super(); + } + + + public HiveJdbcStorageException(String message) { + super(message); + } + + + public HiveJdbcStorageException(Throwable cause) { + super(cause); + } + + + public HiveJdbcStorageException(String message, Throwable cause) { + super(message, cause); + } +} http://git-wip-us.apache.org/repos/asf/hive/blob/12b27a35/jdbc-handler/src/test/java/org/apache/TestSuite.java ---------------------------------------------------------------------- diff --git a/jdbc-handler/src/test/java/org/apache/TestSuite.java b/jdbc-handler/src/test/java/org/apache/TestSuite.java new file mode 100644 index 0000000..df8eab7 --- /dev/null +++ b/jdbc-handler/src/test/java/org/apache/TestSuite.java @@ -0,0 +1,29 @@ +/* + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hive.storage.jdbc; + +import org.junit.runner.RunWith; +import org.junit.runners.Suite; +import org.junit.runners.Suite.SuiteClasses; + +import org.apache.hive.config.JdbcStorageConfigManagerTest; +import org.apache.hive.storage.jdbc.QueryConditionBuilderTest; +import org.apache.hive.storage.jdbc.dao.GenericJdbcDatabaseAccessorTest; + +@RunWith(Suite.class) +@SuiteClasses({ JdbcStorageConfigManagerTest.class, GenericJdbcDatabaseAccessorTest.class, + QueryConditionBuilderTest.class }) +public class TestSuite { +} http://git-wip-us.apache.org/repos/asf/hive/blob/12b27a35/jdbc-handler/src/test/java/org/apache/hive/config/JdbcStorageConfigManagerTest.java ---------------------------------------------------------------------- diff --git a/jdbc-handler/src/test/java/org/apache/hive/config/JdbcStorageConfigManagerTest.java b/jdbc-handler/src/test/java/org/apache/hive/config/JdbcStorageConfigManagerTest.java new file mode 100644 index 0000000..c950831 --- /dev/null +++ b/jdbc-handler/src/test/java/org/apache/hive/config/JdbcStorageConfigManagerTest.java @@ -0,0 +1,87 @@ +/* + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hive.config; + +import static org.hamcrest.Matchers.equalTo; +import static org.hamcrest.Matchers.is; +import static org.hamcrest.Matchers.notNullValue; +import static org.junit.Assert.assertThat; + +import org.junit.Test; + +import org.apache.hive.storage.jdbc.conf.DatabaseType; +import org.apache.hive.storage.jdbc.conf.JdbcStorageConfig; +import org.apache.hive.storage.jdbc.conf.JdbcStorageConfigManager; + +import java.util.HashMap; +import java.util.Map; +import java.util.Properties; + +public class JdbcStorageConfigManagerTest { + + @Test + public void testWithAllRequiredSettingsDefined() { + Properties props = new Properties(); + props.put(JdbcStorageConfig.DATABASE_TYPE.getPropertyName(), DatabaseType.MYSQL.toString()); + props.put(JdbcStorageConfig.JDBC_URL.getPropertyName(), "jdbc://localhost:3306/hive"); + props.put(JdbcStorageConfig.QUERY.getPropertyName(), "SELECT col1,col2,col3 FROM sometable"); + props.put(JdbcStorageConfig.JDBC_DRIVER_CLASS.getPropertyName(), "com.mysql.jdbc.Driver"); + + Map<String, String> jobMap = new HashMap<String, String>(); + JdbcStorageConfigManager.copyConfigurationToJob(props, jobMap); + + assertThat(jobMap, is(notNullValue())); + assertThat(jobMap.size(), is(equalTo(4))); + assertThat(jobMap.get(JdbcStorageConfig.DATABASE_TYPE.getPropertyName()), is(equalTo("MYSQL"))); + assertThat(jobMap.get(JdbcStorageConfig.JDBC_URL.getPropertyName()), is(equalTo("jdbc://localhost:3306/hive"))); + assertThat(jobMap.get(JdbcStorageConfig.QUERY.getPropertyName()), + is(equalTo("SELECT col1,col2,col3 FROM sometable"))); + } + + + @Test(expected = IllegalArgumentException.class) + public void testWithJdbcUrlMissing() { + Properties props = new Properties(); + props.put(JdbcStorageConfig.DATABASE_TYPE.getPropertyName(), DatabaseType.MYSQL.toString()); + props.put(JdbcStorageConfig.QUERY.getPropertyName(), "SELECT col1,col2,col3 FROM sometable"); + + Map<String, String> jobMap = new HashMap<String, String>(); + JdbcStorageConfigManager.copyConfigurationToJob(props, jobMap); + } + + + @Test(expected = IllegalArgumentException.class) + public void testWithDatabaseTypeMissing() { + Properties props = new Properties(); + props.put(JdbcStorageConfig.JDBC_URL.getPropertyName(), "jdbc://localhost:3306/hive"); + props.put(JdbcStorageConfig.QUERY.getPropertyName(), "SELECT col1,col2,col3 FROM sometable"); + + Map<String, String> jobMap = new HashMap<String, String>(); + JdbcStorageConfigManager.copyConfigurationToJob(props, jobMap); + } + + + @Test(expected = IllegalArgumentException.class) + public void testWithUnknownDatabaseType() { + Properties props = new Properties(); + props.put(JdbcStorageConfig.DATABASE_TYPE.getPropertyName(), "Postgres"); + props.put(JdbcStorageConfig.JDBC_URL.getPropertyName(), "jdbc://localhost:3306/hive"); + props.put(JdbcStorageConfig.QUERY.getPropertyName(), "SELECT col1,col2,col3 FROM sometable"); + + Map<String, String> jobMap = new HashMap<String, String>(); + JdbcStorageConfigManager.copyConfigurationToJob(props, jobMap); + } + +} http://git-wip-us.apache.org/repos/asf/hive/blob/12b27a35/jdbc-handler/src/test/java/org/apache/hive/storage/jdbc/JdbcInputFormatTest.java ---------------------------------------------------------------------- diff --git a/jdbc-handler/src/test/java/org/apache/hive/storage/jdbc/JdbcInputFormatTest.java b/jdbc-handler/src/test/java/org/apache/hive/storage/jdbc/JdbcInputFormatTest.java new file mode 100644 index 0000000..cc6acf1 --- /dev/null +++ b/jdbc-handler/src/test/java/org/apache/hive/storage/jdbc/JdbcInputFormatTest.java @@ -0,0 +1,81 @@ +/* + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hive.storage.jdbc; + +import static org.hamcrest.Matchers.is; +import static org.hamcrest.Matchers.notNullValue; +import static org.junit.Assert.assertThat; +import static org.mockito.Matchers.any; +import static org.mockito.Mockito.when; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.mapred.InputSplit; +import org.apache.hadoop.mapred.JobConf; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.mockito.Mock; +import org.mockito.runners.MockitoJUnitRunner; + +import org.apache.hive.storage.jdbc.dao.DatabaseAccessor; +import org.apache.hive.storage.jdbc.exception.HiveJdbcDatabaseAccessException; + +import java.io.IOException; + +@RunWith(MockitoJUnitRunner.class) +public class JdbcInputFormatTest { + + @Mock + private DatabaseAccessor mockDatabaseAccessor; + + + @Test + public void testSplitLogic_noSpillOver() throws HiveJdbcDatabaseAccessException, IOException { + JdbcInputFormat f = new JdbcInputFormat(); + when(mockDatabaseAccessor.getTotalNumberOfRecords(any(Configuration.class))).thenReturn(15); + f.setDbAccessor(mockDatabaseAccessor); + + JobConf conf = new JobConf(); + conf.set("mapred.input.dir", "/temp"); + InputSplit[] splits = f.getSplits(conf, 3); + + assertThat(splits, is(notNullValue())); + assertThat(splits.length, is(3)); + + assertThat(splits[0].getLength(), is(5L)); + } + + + @Test + public void testSplitLogic_withSpillOver() throws HiveJdbcDatabaseAccessException, IOException { + JdbcInputFormat f = new JdbcInputFormat(); + when(mockDatabaseAccessor.getTotalNumberOfRecords(any(Configuration.class))).thenReturn(15); + f.setDbAccessor(mockDatabaseAccessor); + + JobConf conf = new JobConf(); + conf.set("mapred.input.dir", "/temp"); + InputSplit[] splits = f.getSplits(conf, 6); + + assertThat(splits, is(notNullValue())); + assertThat(splits.length, is(6)); + + for (int i = 0; i < 3; i++) { + assertThat(splits[i].getLength(), is(3L)); + } + + for (int i = 3; i < 6; i++) { + assertThat(splits[i].getLength(), is(2L)); + } + } +} http://git-wip-us.apache.org/repos/asf/hive/blob/12b27a35/jdbc-handler/src/test/java/org/apache/hive/storage/jdbc/QueryConditionBuilderTest.java ---------------------------------------------------------------------- diff --git a/jdbc-handler/src/test/java/org/apache/hive/storage/jdbc/QueryConditionBuilderTest.java b/jdbc-handler/src/test/java/org/apache/hive/storage/jdbc/QueryConditionBuilderTest.java new file mode 100644 index 0000000..5cdae47 --- /dev/null +++ b/jdbc-handler/src/test/java/org/apache/hive/storage/jdbc/QueryConditionBuilderTest.java @@ -0,0 +1,151 @@ +/* + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hive.storage.jdbc; + +import static org.hamcrest.Matchers.equalToIgnoringWhiteSpace; +import static org.hamcrest.Matchers.is; +import static org.hamcrest.Matchers.notNullValue; +import static org.junit.Assert.assertThat; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hive.ql.plan.TableScanDesc; +import org.apache.hadoop.hive.serde.serdeConstants; +import org.junit.BeforeClass; +import org.junit.Test; + +import org.apache.hive.storage.jdbc.conf.JdbcStorageConfig; + +import java.io.IOException; +import java.util.Scanner; + +public class QueryConditionBuilderTest { + + private static String condition1; + private static String condition2; + + + @BeforeClass + public static void setup() throws IOException { + condition1 = readFileContents("condition1.xml"); + condition2 = readFileContents("condition2.xml"); + } + + + private static String readFileContents(String name) throws IOException { + try (Scanner s = new Scanner(QueryConditionBuilderTest.class.getClassLoader().getResourceAsStream(name))) { + return s.useDelimiter("\\Z").next(); + } + } + + + @Test + public void testSimpleCondition_noTranslation() { + Configuration conf = new Configuration(); + conf.set(TableScanDesc.FILTER_EXPR_CONF_STR, condition1); + conf.set(serdeConstants.LIST_COLUMNS, "visitor_id,sentiment,tracking_id"); + String condition = QueryConditionBuilder.getInstance().buildCondition(conf); + + assertThat(condition, is(notNullValue())); + assertThat(condition, is(equalToIgnoringWhiteSpace("(visitor_id = 'x')"))); + } + + + @Test + public void testSimpleCondition_withTranslation() { + Configuration conf = new Configuration(); + conf.set(TableScanDesc.FILTER_EXPR_CONF_STR, condition1); + conf.set(serdeConstants.LIST_COLUMNS, "visitor_id,sentiment,tracking_id"); + conf.set(JdbcStorageConfig.COLUMN_MAPPING.getPropertyName(), + "visitor_id=vid, sentiment=sentiment, tracking_id=tracking_id"); + String condition = QueryConditionBuilder.getInstance().buildCondition(conf); + + assertThat(condition, is(notNullValue())); + assertThat(condition, is(equalToIgnoringWhiteSpace("(vid = 'x')"))); + } + + + @Test + public void testSimpleCondition_withDateType() { + Configuration conf = new Configuration(); + conf.set(TableScanDesc.FILTER_EXPR_CONF_STR, condition1); + conf.set(serdeConstants.LIST_COLUMNS, "visitor_id,sentiment,tracking_id"); + conf.set(JdbcStorageConfig.COLUMN_MAPPING.getPropertyName(), + "visitor_id=vid:date, sentiment=sentiment, tracking_id=tracking_id"); + String condition = QueryConditionBuilder.getInstance().buildCondition(conf); + + assertThat(condition, is(notNullValue())); + assertThat(condition, is(equalToIgnoringWhiteSpace("({d vid} = 'x')"))); + } + + + @Test + public void testSimpleCondition_withVariedCaseMappings() { + Configuration conf = new Configuration(); + conf.set(TableScanDesc.FILTER_EXPR_CONF_STR, condition1); + conf.set(serdeConstants.LIST_COLUMNS, "visitor_ID,sentiment,tracking_id"); + conf.set(JdbcStorageConfig.COLUMN_MAPPING.getPropertyName(), + "visitor_id=VID:date, sentiment=sentiment, tracking_id=tracking_id"); + String condition = QueryConditionBuilder.getInstance().buildCondition(conf); + + assertThat(condition, is(notNullValue())); + assertThat(condition, is(equalToIgnoringWhiteSpace("({d vid} = 'x')"))); + } + + + @Test + public void testMultipleConditions_noTranslation() { + Configuration conf = new Configuration(); + conf.set(TableScanDesc.FILTER_EXPR_CONF_STR, condition2); + conf.set(serdeConstants.LIST_COLUMNS, "visitor_id,sentiment,tracking_id"); + String condition = QueryConditionBuilder.getInstance().buildCondition(conf); + + assertThat(condition, is(notNullValue())); + assertThat(condition, is(equalToIgnoringWhiteSpace("((visitor_id = 'x') and (sentiment = 'y'))"))); + } + + + @Test + public void testMultipleConditions_withTranslation() { + Configuration conf = new Configuration(); + conf.set(TableScanDesc.FILTER_EXPR_CONF_STR, condition2); + conf.set(serdeConstants.LIST_COLUMNS, "visitor_id,sentiment,tracking_id"); + conf.set(JdbcStorageConfig.COLUMN_MAPPING.getPropertyName(), "visitor_id=v,sentiment=s,tracking_id=t"); + String condition = QueryConditionBuilder.getInstance().buildCondition(conf); + + assertThat(condition, is(notNullValue())); + assertThat(condition, is(equalToIgnoringWhiteSpace("((v = 'x') and (s = 'y'))"))); + } + + + @Test + public void testWithNullConf() { + String condition = QueryConditionBuilder.getInstance().buildCondition(null); + assertThat(condition, is(notNullValue())); + assertThat(condition.trim().isEmpty(), is(true)); + } + + + @Test + public void testWithUndefinedFilterExpr() { + Configuration conf = new Configuration(); + conf.set(serdeConstants.LIST_COLUMNS, "visitor_id,sentiment,tracking_id"); + conf.set(JdbcStorageConfig.COLUMN_MAPPING.getPropertyName(), "visitor_id=v,sentiment=s,tracking_id=t"); + String condition = QueryConditionBuilder.getInstance().buildCondition(conf); + + assertThat(condition, is(notNullValue())); + assertThat(condition.trim().isEmpty(), is(true)); + } + +}