HIVE-1555: JDBC Storage Handler (Gunther Hagleitner, reviewed by Jason Dere)


Project: http://git-wip-us.apache.org/repos/asf/hive/repo
Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/12b27a35
Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/12b27a35
Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/12b27a35

Branch: refs/heads/master
Commit: 12b27a355558499f6422e49742bd5cad71416fb2
Parents: a9de1cd
Author: Gunther Hagleitner <gunt...@apache.org>
Authored: Tue Feb 28 23:29:32 2017 -0800
Committer: Gunther Hagleitner <gunt...@apache.org>
Committed: Tue Feb 28 23:55:05 2017 -0800

----------------------------------------------------------------------
 itests/qtest/pom.xml                            |   7 +
 .../test/resources/testconfiguration.properties |   1 +
 jdbc-handler/pom.xml                            | 127 ++++++++
 .../hive/storage/jdbc/JdbcInputFormat.java      | 108 +++++++
 .../hive/storage/jdbc/JdbcInputSplit.java       | 100 ++++++
 .../hive/storage/jdbc/JdbcOutputFormat.java     |  68 +++++
 .../hive/storage/jdbc/JdbcRecordReader.java     | 133 ++++++++
 .../org/apache/hive/storage/jdbc/JdbcSerDe.java | 164 ++++++++++
 .../hive/storage/jdbc/JdbcStorageHandler.java   | 106 +++++++
 .../storage/jdbc/QueryConditionBuilder.java     | 186 ++++++++++++
 .../storage/jdbc/conf/CustomConfigManager.java  |  23 ++
 .../jdbc/conf/CustomConfigManagerFactory.java   |  50 +++
 .../hive/storage/jdbc/conf/DatabaseType.java    |  21 ++
 .../storage/jdbc/conf/JdbcStorageConfig.java    |  49 +++
 .../jdbc/conf/JdbcStorageConfigManager.java     |  97 ++++++
 .../hive/storage/jdbc/dao/DatabaseAccessor.java |  34 +++
 .../jdbc/dao/DatabaseAccessorFactory.java       |  53 ++++
 .../jdbc/dao/GenericJdbcDatabaseAccessor.java   | 253 ++++++++++++++++
 .../storage/jdbc/dao/JdbcRecordIterator.java    | 104 +++++++
 .../storage/jdbc/dao/MySqlDatabaseAccessor.java |  39 +++
 .../HiveJdbcDatabaseAccessException.java        |  41 +++
 .../exception/HiveJdbcStorageException.java     |  40 +++
 .../src/test/java/org/apache/TestSuite.java     |  29 ++
 .../config/JdbcStorageConfigManagerTest.java    |  87 ++++++
 .../hive/storage/jdbc/JdbcInputFormatTest.java  |  81 +++++
 .../storage/jdbc/QueryConditionBuilderTest.java | 151 +++++++++
 .../dao/GenericJdbcDatabaseAccessorTest.java    | 206 +++++++++++++
 jdbc-handler/src/test/resources/condition1.xml  |  48 +++
 jdbc-handler/src/test/resources/condition2.xml  | 101 +++++++
 jdbc-handler/src/test/resources/test_script.sql |  21 ++
 packaging/pom.xml                               |   5 +
 packaging/src/main/assembly/src.xml             |   1 +
 pom.xml                                         |   3 +
 .../test/queries/clientpositive/jdbc_handler.q  |  58 ++++
 .../clientpositive/llap/jdbc_handler.q.out      | 303 +++++++++++++++++++
 35 files changed, 2898 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hive/blob/12b27a35/itests/qtest/pom.xml
----------------------------------------------------------------------
diff --git a/itests/qtest/pom.xml b/itests/qtest/pom.xml
index 1b49e88..1c3b601 100644
--- a/itests/qtest/pom.xml
+++ b/itests/qtest/pom.xml
@@ -119,6 +119,13 @@
       <classifier>tests</classifier>
       <scope>test</scope>
     </dependency>
+    <dependency>
+      <groupId>org.apache.hive</groupId>
+      <artifactId>hive-jdbc-handler</artifactId>
+      <version>${project.version}</version>
+      <scope>test</scope>
+    </dependency>
+
 
     <!-- test inter-project -->
     <dependency>

http://git-wip-us.apache.org/repos/asf/hive/blob/12b27a35/itests/src/test/resources/testconfiguration.properties
----------------------------------------------------------------------
diff --git a/itests/src/test/resources/testconfiguration.properties 
b/itests/src/test/resources/testconfiguration.properties
index 778b614..807b124 100644
--- a/itests/src/test/resources/testconfiguration.properties
+++ b/itests/src/test/resources/testconfiguration.properties
@@ -499,6 +499,7 @@ minillaplocal.query.files=acid_globallimit.q,\
   input16_cc.q,\
   insert_dir_distcp.q,\
   insert_into_with_schema.q,\
+  jdbc_handler.q,\
   join1.q,\
   join_acid_non_acid.q,\
   join_filters.q,\

http://git-wip-us.apache.org/repos/asf/hive/blob/12b27a35/jdbc-handler/pom.xml
----------------------------------------------------------------------
diff --git a/jdbc-handler/pom.xml b/jdbc-handler/pom.xml
new file mode 100644
index 0000000..364886a
--- /dev/null
+++ b/jdbc-handler/pom.xml
@@ -0,0 +1,127 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+  Licensed under the Apache License, Version 2.0 (the "License");
+  you may not use this file except in compliance with the License.
+  You may obtain a copy of the License at
+
+      http://www.apache.org/licenses/LICENSE-2.0
+
+  Unless required by applicable law or agreed to in writing, software
+  distributed under the License is distributed on an "AS IS" BASIS,
+  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  See the License for the specific language governing permissions and
+  limitations under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0";
+         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance";
+         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
http://maven.apache.org/xsd/maven-4.0.0.xsd";>
+  <modelVersion>4.0.0</modelVersion>
+  <parent>
+    <groupId>org.apache.hive</groupId>
+    <artifactId>hive</artifactId>
+    <version>2.2.0-SNAPSHOT</version>
+    <relativePath>../pom.xml</relativePath>
+  </parent>
+
+  <artifactId>hive-jdbc-handler</artifactId>
+  <packaging>jar</packaging>
+  <name>Hive JDBC Handler</name>
+
+  <properties>
+    <hive.path.to.root>..</hive.path.to.root>
+  </properties>
+
+  <dependencies>
+    <dependency>
+      <groupId>org.apache.hive</groupId>
+      <artifactId>hive-common</artifactId>
+      <version>${project.version}</version>
+      <exclusions>
+        <exclusion>
+          <groupId>org.eclipse.jetty.aggregate</groupId>
+          <artifactId>jetty-all</artifactId>
+        </exclusion>
+      </exclusions>
+    </dependency>
+
+    <dependency>
+      <groupId>org.apache.hive</groupId>
+      <artifactId>hive-shims</artifactId>
+      <version>${project.version}</version>
+    </dependency>
+
+    <dependency>
+      <groupId>org.apache.hive</groupId>
+      <artifactId>hive-exec</artifactId>
+      <version>${project.version}</version>
+    </dependency>
+
+    <dependency>
+      <groupId>org.apache.hive</groupId>
+      <artifactId>hive-serde</artifactId>
+      <version>${project.version}</version>
+    </dependency>
+
+    <dependency>
+      <groupId>org.apache.hadoop</groupId>
+      <artifactId>hadoop-mapreduce-client-core</artifactId>
+      <version>${hadoop.version}</version>
+      <optional>true</optional>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.hadoop</groupId>
+      <artifactId>hadoop-mapreduce-client-common</artifactId>
+      <version>${hadoop.version}</version>
+      <optional>true</optional>
+      <scope>test</scope>
+      <exclusions>
+        <exclusion>
+          <groupId>org.slf4j</groupId>
+          <artifactId>slf4j-log4j12</artifactId>
+        </exclusion>
+        <exclusion>
+          <groupId>commmons-logging</groupId>
+          <artifactId>commons-logging</artifactId>
+        </exclusion>
+      </exclusions>
+    </dependency>
+
+    <dependency>
+      <groupId>org.hamcrest</groupId>
+      <artifactId>hamcrest-all</artifactId>
+      <version>${hamcrest.version}</version>
+      <scope>test</scope>
+    </dependency>
+
+    <dependency>
+      <groupId>junit</groupId>
+      <artifactId>junit</artifactId>
+      <version>${junit.version}</version>
+      <scope>test</scope>
+    </dependency>
+
+    <dependency>
+      <groupId>org.mockito</groupId>
+      <artifactId>mockito-all</artifactId>
+      <version>${mockito-all.version}</version>
+      <scope>test</scope>
+    </dependency>
+
+    <dependency>
+      <groupId>org.apache.hive</groupId>
+      <artifactId>hive-common</artifactId>
+      <version>${project.version}</version>
+      <scope>test</scope>
+      <type>test-jar</type>
+    </dependency>
+
+    <dependency>
+      <groupId>com.h2database</groupId>
+      <artifactId>h2</artifactId>
+      <version>${h2database.version}</version>
+      <scope>test</scope>
+    </dependency>
+
+  </dependencies>
+
+</project>

http://git-wip-us.apache.org/repos/asf/hive/blob/12b27a35/jdbc-handler/src/main/java/org/apache/hive/storage/jdbc/JdbcInputFormat.java
----------------------------------------------------------------------
diff --git 
a/jdbc-handler/src/main/java/org/apache/hive/storage/jdbc/JdbcInputFormat.java 
b/jdbc-handler/src/main/java/org/apache/hive/storage/jdbc/JdbcInputFormat.java
new file mode 100644
index 0000000..bfa7a26
--- /dev/null
+++ 
b/jdbc-handler/src/main/java/org/apache/hive/storage/jdbc/JdbcInputFormat.java
@@ -0,0 +1,108 @@
+/*
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hive.storage.jdbc;
+
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.ql.io.HiveInputFormat;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.MapWritable;
+import org.apache.hadoop.mapred.FileInputFormat;
+import org.apache.hadoop.mapred.InputSplit;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.RecordReader;
+import org.apache.hadoop.mapred.Reporter;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.hive.storage.jdbc.dao.DatabaseAccessor;
+import org.apache.hive.storage.jdbc.dao.DatabaseAccessorFactory;
+
+import java.io.IOException;
+
+public class JdbcInputFormat extends HiveInputFormat<LongWritable, 
MapWritable> {
+
+  private static final Logger LOGGER = 
LoggerFactory.getLogger(JdbcInputFormat.class);
+  private DatabaseAccessor dbAccessor = null;
+
+
+  /**
+   * {@inheritDoc}
+   */
+  @Override
+  public RecordReader<LongWritable, MapWritable>
+    getRecordReader(InputSplit split, JobConf job, Reporter reporter) throws 
IOException {
+
+    if (!(split instanceof JdbcInputSplit)) {
+      throw new RuntimeException("Incompatible split type " + 
split.getClass().getName() + ".");
+    }
+
+    return new JdbcRecordReader(job, (JdbcInputSplit) split);
+  }
+
+
+  /**
+   * {@inheritDoc}
+   */
+  @Override
+  public InputSplit[] getSplits(JobConf job, int numSplits) throws IOException 
{
+    try {
+      if (numSplits <= 0) {
+        numSplits = 1;
+      }
+      LOGGER.debug("Creating {} input splits", numSplits);
+      if (dbAccessor == null) {
+        dbAccessor = DatabaseAccessorFactory.getAccessor(job);
+      }
+
+      int numRecords = dbAccessor.getTotalNumberOfRecords(job);
+      int numRecordsPerSplit = numRecords / numSplits;
+      int numSplitsWithExtraRecords = numRecords % numSplits;
+
+      LOGGER.debug("Num records = {}", numRecords);
+      InputSplit[] splits = new InputSplit[numSplits];
+      Path[] tablePaths = FileInputFormat.getInputPaths(job);
+
+      int offset = 0;
+      for (int i = 0; i < numSplits; i++) {
+        int numRecordsInThisSplit = numRecordsPerSplit;
+        if (i < numSplitsWithExtraRecords) {
+          numRecordsInThisSplit++;
+        }
+
+        splits[i] = new JdbcInputSplit(numRecordsInThisSplit, offset, 
tablePaths[0]);
+        offset += numRecordsInThisSplit;
+      }
+
+      return splits;
+    }
+    catch (Exception e) {
+      LOGGER.error("Error while splitting input data.", e);
+      throw new IOException(e);
+    }
+  }
+
+
+  /**
+   * For testing purposes only
+   *
+   * @param dbAccessor
+   *            DatabaseAccessor object
+   */
+  public void setDbAccessor(DatabaseAccessor dbAccessor) {
+    this.dbAccessor = dbAccessor;
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/hive/blob/12b27a35/jdbc-handler/src/main/java/org/apache/hive/storage/jdbc/JdbcInputSplit.java
----------------------------------------------------------------------
diff --git 
a/jdbc-handler/src/main/java/org/apache/hive/storage/jdbc/JdbcInputSplit.java 
b/jdbc-handler/src/main/java/org/apache/hive/storage/jdbc/JdbcInputSplit.java
new file mode 100644
index 0000000..a691cc2
--- /dev/null
+++ 
b/jdbc-handler/src/main/java/org/apache/hive/storage/jdbc/JdbcInputSplit.java
@@ -0,0 +1,100 @@
+/*
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hive.storage.jdbc;
+
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.mapred.FileSplit;
+import org.apache.hadoop.mapred.InputSplit;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+
+public class JdbcInputSplit extends FileSplit implements InputSplit {
+
+  private static final String[] EMPTY_ARRAY = new String[] {};
+
+  private int limit = 0;
+  private int offset = 0;
+
+
+  public JdbcInputSplit() {
+    super((Path) null, 0, 0, EMPTY_ARRAY);
+
+  }
+
+
+  public JdbcInputSplit(long start, long end, Path dummyPath) {
+    super(dummyPath, 0, 0, EMPTY_ARRAY);
+    this.setLimit((int) start);
+    this.setOffset((int) end);
+  }
+
+
+  public JdbcInputSplit(int limit, int offset) {
+    super((Path) null, 0, 0, EMPTY_ARRAY);
+    this.limit = limit;
+    this.offset = offset;
+  }
+
+
+  @Override
+  public void write(DataOutput out) throws IOException {
+    super.write(out);
+    out.writeInt(limit);
+    out.writeInt(offset);
+  }
+
+
+  @Override
+  public void readFields(DataInput in) throws IOException {
+    super.readFields(in);
+    limit = in.readInt();
+    offset = in.readInt();
+  }
+
+
+  @Override
+  public long getLength() {
+    return limit;
+  }
+
+
+  @Override
+  public String[] getLocations() throws IOException {
+    return EMPTY_ARRAY;
+  }
+
+
+  public int getLimit() {
+    return limit;
+  }
+
+
+  public void setLimit(int limit) {
+    this.limit = limit;
+  }
+
+
+  public int getOffset() {
+    return offset;
+  }
+
+
+  public void setOffset(int offset) {
+    this.offset = offset;
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/hive/blob/12b27a35/jdbc-handler/src/main/java/org/apache/hive/storage/jdbc/JdbcOutputFormat.java
----------------------------------------------------------------------
diff --git 
a/jdbc-handler/src/main/java/org/apache/hive/storage/jdbc/JdbcOutputFormat.java 
b/jdbc-handler/src/main/java/org/apache/hive/storage/jdbc/JdbcOutputFormat.java
new file mode 100644
index 0000000..26fb3cd
--- /dev/null
+++ 
b/jdbc-handler/src/main/java/org/apache/hive/storage/jdbc/JdbcOutputFormat.java
@@ -0,0 +1,68 @@
+/*
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hive.storage.jdbc;
+
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.ql.exec.FileSinkOperator.RecordWriter;
+import org.apache.hadoop.hive.ql.io.HiveOutputFormat;
+import org.apache.hadoop.io.MapWritable;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.OutputFormat;
+import org.apache.hadoop.util.Progressable;
+
+import java.io.IOException;
+import java.util.Properties;
+
+public class JdbcOutputFormat implements OutputFormat<NullWritable, 
MapWritable>,
+                                         HiveOutputFormat<NullWritable, 
MapWritable> {
+
+  /**
+   * {@inheritDoc}
+   */
+  @Override
+  public RecordWriter getHiveRecordWriter(JobConf jc,
+      Path finalOutPath,
+      Class<? extends Writable> valueClass,
+      boolean isCompressed,
+      Properties tableProperties,
+      Progressable progress) throws IOException {
+    throw new UnsupportedOperationException("Write operations are not 
allowed.");
+  }
+
+
+  /**
+   * {@inheritDoc}
+   */
+  @Override
+  public org.apache.hadoop.mapred.RecordWriter<NullWritable, MapWritable> 
getRecordWriter(FileSystem ignored,
+      JobConf job,
+      String name,
+      Progressable progress) throws IOException {
+    throw new UnsupportedOperationException("Write operations are not 
allowed.");
+  }
+
+
+  /**
+   * {@inheritDoc}
+   */
+  @Override
+  public void checkOutputSpecs(FileSystem ignored, JobConf job) throws 
IOException {
+    // do nothing
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/hive/blob/12b27a35/jdbc-handler/src/main/java/org/apache/hive/storage/jdbc/JdbcRecordReader.java
----------------------------------------------------------------------
diff --git 
a/jdbc-handler/src/main/java/org/apache/hive/storage/jdbc/JdbcRecordReader.java 
b/jdbc-handler/src/main/java/org/apache/hive/storage/jdbc/JdbcRecordReader.java
new file mode 100644
index 0000000..0a24bd9
--- /dev/null
+++ 
b/jdbc-handler/src/main/java/org/apache/hive/storage/jdbc/JdbcRecordReader.java
@@ -0,0 +1,133 @@
+/*
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hive.storage.jdbc;
+
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.MapWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.RecordReader;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.hive.storage.jdbc.dao.DatabaseAccessor;
+import org.apache.hive.storage.jdbc.dao.DatabaseAccessorFactory;
+import org.apache.hive.storage.jdbc.dao.JdbcRecordIterator;
+
+import java.io.IOException;
+import java.util.Map;
+import java.util.Map.Entry;
+
+public class JdbcRecordReader implements RecordReader<LongWritable, 
MapWritable> {
+
+  private static final Logger LOGGER = 
LoggerFactory.getLogger(JdbcRecordReader.class);
+  private DatabaseAccessor dbAccessor = null;
+  private JdbcRecordIterator iterator = null;
+  private JdbcInputSplit split = null;
+  private JobConf conf = null;
+  private int pos = 0;
+
+
+  public JdbcRecordReader(JobConf conf, JdbcInputSplit split) {
+    LOGGER.debug("Initializing JdbcRecordReader");
+    this.split = split;
+    this.conf = conf;
+  }
+
+
+  @Override
+  public boolean next(LongWritable key, MapWritable value) throws IOException {
+    try {
+      LOGGER.debug("JdbcRecordReader.next called");
+      if (dbAccessor == null) {
+        dbAccessor = DatabaseAccessorFactory.getAccessor(conf);
+        iterator = dbAccessor.getRecordIterator(conf, split.getLimit(), 
split.getOffset());
+      }
+
+      if (iterator.hasNext()) {
+        LOGGER.debug("JdbcRecordReader has more records to read.");
+        key.set(pos);
+        pos++;
+        Map<String, String> record = iterator.next();
+        if ((record != null) && (!record.isEmpty())) {
+          for (Entry<String, String> entry : record.entrySet()) {
+            value.put(new Text(entry.getKey()), new Text(entry.getValue()));
+          }
+          return true;
+        }
+        else {
+          LOGGER.debug("JdbcRecordReader got null record.");
+          return false;
+        }
+      }
+      else {
+        LOGGER.debug("JdbcRecordReader has no more records to read.");
+        return false;
+      }
+    }
+    catch (Exception e) {
+      LOGGER.error("An error occurred while reading the next record from DB.", 
e);
+      return false;
+    }
+  }
+
+
+  @Override
+  public LongWritable createKey() {
+    return new LongWritable();
+  }
+
+
+  @Override
+  public MapWritable createValue() {
+    return new MapWritable();
+  }
+
+
+  @Override
+  public long getPos() throws IOException {
+    return pos;
+  }
+
+
+  @Override
+  public void close() throws IOException {
+    if (iterator != null) {
+      iterator.close();
+    }
+  }
+
+
+  @Override
+  public float getProgress() throws IOException {
+    if (split == null) {
+      return 0;
+    }
+    else {
+      return split.getLength() > 0 ? pos / (float) split.getLength() : 1.0f;
+    }
+  }
+
+
+  public void setDbAccessor(DatabaseAccessor dbAccessor) {
+    this.dbAccessor = dbAccessor;
+  }
+
+
+  public void setIterator(JdbcRecordIterator iterator) {
+    this.iterator = iterator;
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/hive/blob/12b27a35/jdbc-handler/src/main/java/org/apache/hive/storage/jdbc/JdbcSerDe.java
----------------------------------------------------------------------
diff --git 
a/jdbc-handler/src/main/java/org/apache/hive/storage/jdbc/JdbcSerDe.java 
b/jdbc-handler/src/main/java/org/apache/hive/storage/jdbc/JdbcSerDe.java
new file mode 100644
index 0000000..f35c33d
--- /dev/null
+++ b/jdbc-handler/src/main/java/org/apache/hive/storage/jdbc/JdbcSerDe.java
@@ -0,0 +1,164 @@
+/*
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hive.storage.jdbc;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hive.serde.serdeConstants;
+import org.apache.hadoop.hive.serde2.AbstractSerDe;
+import org.apache.hadoop.hive.serde2.SerDeException;
+import org.apache.hadoop.hive.serde2.SerDeStats;
+import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
+import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorFactory;
+import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector;
+import 
org.apache.hadoop.hive.serde2.objectinspector.primitive.PrimitiveObjectInspectorFactory;
+import org.apache.hadoop.io.MapWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.Writable;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.hive.storage.jdbc.conf.JdbcStorageConfig;
+import org.apache.hive.storage.jdbc.conf.JdbcStorageConfigManager;
+import org.apache.hive.storage.jdbc.dao.DatabaseAccessor;
+import org.apache.hive.storage.jdbc.dao.DatabaseAccessorFactory;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Properties;
+
+public class JdbcSerDe extends AbstractSerDe {
+
+  private static final Logger LOGGER = 
LoggerFactory.getLogger(JdbcSerDe.class);
+
+  private StructObjectInspector objectInspector;
+  private int numColumns;
+  private String[] hiveColumnTypeArray;
+  private List<String> columnNames;
+  private List<String> row;
+
+
+  /*
+   * This method gets called multiple times by Hive. On some invocations, the 
properties will be empty.
+   * We need to detect when the properties are not empty to initialise the 
class variables.
+   *
+   * @see 
org.apache.hadoop.hive.serde2.Deserializer#initialize(org.apache.hadoop.conf.Configuration,
 java.util.Properties)
+   */
+  @Override
+  public void initialize(Configuration conf, Properties tbl) throws 
SerDeException {
+    try {
+      LOGGER.debug("Initializing the SerDe");
+
+      // Hive cdh-4.3 does not provide the properties object on all calls
+      if (tbl.containsKey(JdbcStorageConfig.DATABASE_TYPE.getPropertyName())) {
+        Configuration tableConfig = 
JdbcStorageConfigManager.convertPropertiesToConfiguration(tbl);
+
+        DatabaseAccessor dbAccessor = 
DatabaseAccessorFactory.getAccessor(tableConfig);
+        columnNames = dbAccessor.getColumnNames(tableConfig);
+        numColumns = columnNames.size();
+
+        String[] hiveColumnNameArray = 
parseProperty(tbl.getProperty(serdeConstants.LIST_COLUMNS), ",");
+        if (numColumns != hiveColumnNameArray.length) {
+          throw new SerDeException("Expected " + numColumns + " columns. Table 
definition has "
+              + hiveColumnNameArray.length + " columns");
+        }
+        List<String> hiveColumnNames = Arrays.asList(hiveColumnNameArray);
+
+        hiveColumnTypeArray = 
parseProperty(tbl.getProperty(serdeConstants.LIST_COLUMN_TYPES), ":");
+        if (hiveColumnTypeArray.length == 0) {
+          throw new SerDeException("Received an empty Hive column type 
definition");
+        }
+
+        List<ObjectInspector> fieldInspectors = new 
ArrayList<ObjectInspector>(numColumns);
+        for (int i = 0; i < numColumns; i++) {
+          
fieldInspectors.add(PrimitiveObjectInspectorFactory.javaStringObjectInspector);
+        }
+
+        objectInspector =
+          
ObjectInspectorFactory.getStandardStructObjectInspector(hiveColumnNames,
+              fieldInspectors);
+        row = new ArrayList<String>(numColumns);
+      }
+    }
+    catch (Exception e) {
+      LOGGER.error("Caught exception while initializing the SqlSerDe", e);
+      throw new SerDeException(e);
+    }
+  }
+
+
+  private String[] parseProperty(String propertyValue, String delimiter) {
+    if ((propertyValue == null) || (propertyValue.trim().isEmpty())) {
+      return new String[] {};
+    }
+
+    return propertyValue.split(delimiter);
+  }
+
+
+  @Override
+  public Object deserialize(Writable blob) throws SerDeException {
+    LOGGER.debug("Deserializing from SerDe");
+    if (!(blob instanceof MapWritable)) {
+      throw new SerDeException("Expected MapWritable. Got " + 
blob.getClass().getName());
+    }
+
+    if ((row == null) || (columnNames == null)) {
+      throw new SerDeException("JDBC SerDe hasn't been initialized properly");
+    }
+
+    row.clear();
+    MapWritable input = (MapWritable) blob;
+    Text columnKey = new Text();
+
+    for (int i = 0; i < numColumns; i++) {
+      columnKey.set(columnNames.get(i));
+      Writable value = input.get(columnKey);
+      if (value == null) {
+        row.add(null);
+      }
+      else {
+        row.add(value.toString());
+      }
+    }
+
+    return row;
+  }
+
+
+  @Override
+  public ObjectInspector getObjectInspector() throws SerDeException {
+    return objectInspector;
+  }
+
+
+  @Override
+  public Class<? extends Writable> getSerializedClass() {
+    return MapWritable.class;
+  }
+
+
+  @Override
+  public Writable serialize(Object obj, ObjectInspector objInspector) throws 
SerDeException {
+    throw new UnsupportedOperationException("Writes are not allowed");
+  }
+
+
+  @Override
+  public SerDeStats getSerDeStats() {
+    return null;
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/hive/blob/12b27a35/jdbc-handler/src/main/java/org/apache/hive/storage/jdbc/JdbcStorageHandler.java
----------------------------------------------------------------------
diff --git 
a/jdbc-handler/src/main/java/org/apache/hive/storage/jdbc/JdbcStorageHandler.java
 
b/jdbc-handler/src/main/java/org/apache/hive/storage/jdbc/JdbcStorageHandler.java
new file mode 100644
index 0000000..946ee0c
--- /dev/null
+++ 
b/jdbc-handler/src/main/java/org/apache/hive/storage/jdbc/JdbcStorageHandler.java
@@ -0,0 +1,106 @@
+/*
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hive.storage.jdbc;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hive.metastore.HiveMetaHook;
+import org.apache.hadoop.hive.ql.metadata.HiveException;
+import org.apache.hadoop.hive.ql.metadata.HiveStorageHandler;
+import org.apache.hadoop.hive.ql.plan.TableDesc;
+import 
org.apache.hadoop.hive.ql.security.authorization.HiveAuthorizationProvider;
+import org.apache.hadoop.hive.serde2.AbstractSerDe;
+import org.apache.hadoop.mapred.InputFormat;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.OutputFormat;
+
+import org.apache.hive.storage.jdbc.conf.JdbcStorageConfigManager;
+
+import java.util.Map;
+import java.util.Properties;
+
+public class JdbcStorageHandler implements HiveStorageHandler {
+
+  private Configuration conf;
+
+
+  @Override
+  public void setConf(Configuration conf) {
+    this.conf = conf;
+  }
+
+
+  @Override
+  public Configuration getConf() {
+    return this.conf;
+  }
+
+
+  @SuppressWarnings("rawtypes")
+  @Override
+  public Class<? extends InputFormat> getInputFormatClass() {
+    return JdbcInputFormat.class;
+  }
+
+
+  @SuppressWarnings("rawtypes")
+  @Override
+  public Class<? extends OutputFormat> getOutputFormatClass() {
+    return JdbcOutputFormat.class;
+  }
+
+
+  @Override
+  public Class<? extends AbstractSerDe> getSerDeClass() {
+    return JdbcSerDe.class;
+  }
+
+
+  @Override
+  public HiveMetaHook getMetaHook() {
+    return null;
+  }
+
+
+  @Override
+  public void configureTableJobProperties(TableDesc tableDesc, Map<String, 
String> jobProperties) {
+    Properties properties = tableDesc.getProperties();
+    JdbcStorageConfigManager.copyConfigurationToJob(properties, jobProperties);
+  }
+
+
+  @Override
+  public void configureInputJobProperties(TableDesc tableDesc, Map<String, 
String> jobProperties) {
+    Properties properties = tableDesc.getProperties();
+    JdbcStorageConfigManager.copyConfigurationToJob(properties, jobProperties);
+  }
+
+
+  @Override
+  public void configureOutputJobProperties(TableDesc tableDesc, Map<String, 
String> jobProperties) {
+    // Nothing to do here...
+  }
+
+
+  @Override
+  public HiveAuthorizationProvider getAuthorizationProvider() throws 
HiveException {
+    return null;
+  }
+
+  @Override
+  public void configureJobConf(TableDesc tableDesc, JobConf jobConf) {
+
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/hive/blob/12b27a35/jdbc-handler/src/main/java/org/apache/hive/storage/jdbc/QueryConditionBuilder.java
----------------------------------------------------------------------
diff --git 
a/jdbc-handler/src/main/java/org/apache/hive/storage/jdbc/QueryConditionBuilder.java
 
b/jdbc-handler/src/main/java/org/apache/hive/storage/jdbc/QueryConditionBuilder.java
new file mode 100644
index 0000000..194fad8
--- /dev/null
+++ 
b/jdbc-handler/src/main/java/org/apache/hive/storage/jdbc/QueryConditionBuilder.java
@@ -0,0 +1,186 @@
+/*
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hive.storage.jdbc;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hive.ql.plan.ExprNodeColumnDesc;
+import org.apache.hadoop.hive.ql.plan.ExprNodeDesc;
+import org.apache.hadoop.hive.ql.plan.TableScanDesc;
+import org.apache.hadoop.hive.serde.serdeConstants;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.hive.storage.jdbc.conf.JdbcStorageConfig;
+
+import java.beans.XMLDecoder;
+import java.io.ByteArrayInputStream;
+import java.util.HashMap;
+import java.util.Map;
+
+/**
+ * Translates the hive query condition into a condition that can be run on the 
underlying database
+ */
+public class QueryConditionBuilder {
+
+  private static final Logger LOGGER = 
LoggerFactory.getLogger(QueryConditionBuilder.class);
+  private static final String EMPTY_STRING = "";
+  private static QueryConditionBuilder instance = null;
+
+
+  public static QueryConditionBuilder getInstance() {
+    if (instance == null) {
+      instance = new QueryConditionBuilder();
+    }
+
+    return instance;
+  }
+
+
+  private QueryConditionBuilder() {
+
+  }
+
+
+  public String buildCondition(Configuration conf) {
+    if (conf == null) {
+      return EMPTY_STRING;
+    }
+
+    String filterXml = conf.get(TableScanDesc.FILTER_EXPR_CONF_STR);
+    String hiveColumns = conf.get(serdeConstants.LIST_COLUMNS);
+    String columnMapping = 
conf.get(JdbcStorageConfig.COLUMN_MAPPING.getPropertyName());
+
+    if ((filterXml == null) || ((columnMapping == null) && (hiveColumns == 
null))) {
+      return EMPTY_STRING;
+    }
+
+    if (hiveColumns == null) {
+      hiveColumns = "";
+    }
+
+    Map<String, String> columnMap = buildColumnMapping(columnMapping, 
hiveColumns);
+    String condition = createConditionString(filterXml, columnMap);
+    return condition;
+  }
+
+
+  /*
+   * Build a Hive-to-X column mapping,
+   *
+   */
+  private Map<String, String> buildColumnMapping(String columnMapping, String 
hiveColumns) {
+    if ((columnMapping == null) || (columnMapping.trim().isEmpty())) {
+      return createIdentityMap(hiveColumns);
+    }
+
+    Map<String, String> columnMap = new HashMap<String, String>();
+    String[] mappingPairs = columnMapping.toLowerCase().split(",");
+    for (String mapPair : mappingPairs) {
+      String[] columns = mapPair.split("=");
+      columnMap.put(columns[0].trim(), columns[1].trim());
+    }
+
+    return columnMap;
+  }
+
+
+  /*
+   * When no mapping is defined, it is assumed that the hive column names are 
equivalent to the column names in the
+   * underlying table
+   */
+  private Map<String, String> createIdentityMap(String hiveColumns) {
+    Map<String, String> columnMap = new HashMap<String, String>();
+    String[] columns = hiveColumns.toLowerCase().split(",");
+
+    for (String col : columns) {
+      columnMap.put(col.trim(), col.trim());
+    }
+
+    return columnMap;
+  }
+
+
+  /*
+   * Walk to Hive AST and translate the hive column names to their equivalent 
mappings. This is basically a cheat.
+   *
+   */
+  private String createConditionString(String filterXml, Map<String, String> 
columnMap) {
+    if ((filterXml == null) || (filterXml.trim().isEmpty())) {
+      return EMPTY_STRING;
+    }
+
+    try (XMLDecoder decoder = new XMLDecoder(new 
ByteArrayInputStream(filterXml.getBytes("UTF-8")))) {
+      Object object = decoder.readObject();
+      if (!(object instanceof ExprNodeDesc)) {
+        LOGGER.error("Deserialized filter expression is not of the expected 
type");
+        throw new RuntimeException("Deserialized filter expression is not of 
the expected type");
+      }
+
+      ExprNodeDesc conditionNode = (ExprNodeDesc) object;
+      walkTreeAndTranslateColumnNames(conditionNode, columnMap);
+      return conditionNode.getExprString();
+    }
+    catch (Exception e) {
+      LOGGER.error("Error during condition build", e);
+      return EMPTY_STRING;
+    }
+  }
+
+
+  /*
+   * Translate column names by walking the AST
+   */
+  private void walkTreeAndTranslateColumnNames(ExprNodeDesc node, Map<String, 
String> columnMap) {
+    if (node == null) {
+      return;
+    }
+
+    if (node instanceof ExprNodeColumnDesc) {
+      ExprNodeColumnDesc column = (ExprNodeColumnDesc) node;
+      String hiveColumnName = column.getColumn().toLowerCase();
+      if (columnMap.containsKey(hiveColumnName)) {
+        String dbColumnName = columnMap.get(hiveColumnName);
+        String finalName = formatColumnName(dbColumnName);
+        column.setColumn(finalName);
+      }
+    }
+    else {
+      if (node.getChildren() != null) {
+        for (ExprNodeDesc childNode : node.getChildren()) {
+          walkTreeAndTranslateColumnNames(childNode, columnMap);
+        }
+      }
+    }
+  }
+
+
+  /**
+   * This is an ugly hack for handling date column types because Hive doesn't 
have a built-in type for dates
+   */
+  private String formatColumnName(String dbColumnName) {
+    if (dbColumnName.contains(":")) {
+      String[] typeSplit = dbColumnName.split(":");
+
+      if (typeSplit[1].equalsIgnoreCase("date")) {
+        return "{d " + typeSplit[0] + "}";
+      }
+
+      return typeSplit[0];
+    }
+    else {
+      return dbColumnName;
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/hive/blob/12b27a35/jdbc-handler/src/main/java/org/apache/hive/storage/jdbc/conf/CustomConfigManager.java
----------------------------------------------------------------------
diff --git 
a/jdbc-handler/src/main/java/org/apache/hive/storage/jdbc/conf/CustomConfigManager.java
 
b/jdbc-handler/src/main/java/org/apache/hive/storage/jdbc/conf/CustomConfigManager.java
new file mode 100644
index 0000000..7a787d4
--- /dev/null
+++ 
b/jdbc-handler/src/main/java/org/apache/hive/storage/jdbc/conf/CustomConfigManager.java
@@ -0,0 +1,23 @@
+/*
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hive.storage.jdbc.conf;
+
+import java.util.Properties;
+
+public interface CustomConfigManager {
+
+  void checkRequiredProperties(Properties properties);
+
+}

http://git-wip-us.apache.org/repos/asf/hive/blob/12b27a35/jdbc-handler/src/main/java/org/apache/hive/storage/jdbc/conf/CustomConfigManagerFactory.java
----------------------------------------------------------------------
diff --git 
a/jdbc-handler/src/main/java/org/apache/hive/storage/jdbc/conf/CustomConfigManagerFactory.java
 
b/jdbc-handler/src/main/java/org/apache/hive/storage/jdbc/conf/CustomConfigManagerFactory.java
new file mode 100644
index 0000000..eed0dff
--- /dev/null
+++ 
b/jdbc-handler/src/main/java/org/apache/hive/storage/jdbc/conf/CustomConfigManagerFactory.java
@@ -0,0 +1,50 @@
+/*
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hive.storage.jdbc.conf;
+
+import java.util.Properties;
+
+/**
+ * Factory for creating custom config managers based on the database type
+ */
+public class CustomConfigManagerFactory {
+
+  private static CustomConfigManager nopConfigManager = new 
NopCustomConfigManager();
+
+
+  private CustomConfigManagerFactory() {
+  }
+
+
+  public static CustomConfigManager getCustomConfigManagerFor(DatabaseType 
databaseType) {
+    switch (databaseType) {
+    case MYSQL:
+      return nopConfigManager;
+
+    default:
+      return nopConfigManager;
+    }
+  }
+
+  private static class NopCustomConfigManager implements CustomConfigManager {
+
+    @Override
+    public void checkRequiredProperties(Properties properties) {
+      return;
+    }
+
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/hive/blob/12b27a35/jdbc-handler/src/main/java/org/apache/hive/storage/jdbc/conf/DatabaseType.java
----------------------------------------------------------------------
diff --git 
a/jdbc-handler/src/main/java/org/apache/hive/storage/jdbc/conf/DatabaseType.java
 
b/jdbc-handler/src/main/java/org/apache/hive/storage/jdbc/conf/DatabaseType.java
new file mode 100644
index 0000000..a2bdbe4
--- /dev/null
+++ 
b/jdbc-handler/src/main/java/org/apache/hive/storage/jdbc/conf/DatabaseType.java
@@ -0,0 +1,21 @@
+/*
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hive.storage.jdbc.conf;
+
+public enum DatabaseType {
+  MYSQL,
+  H2,
+  DERBY
+}

http://git-wip-us.apache.org/repos/asf/hive/blob/12b27a35/jdbc-handler/src/main/java/org/apache/hive/storage/jdbc/conf/JdbcStorageConfig.java
----------------------------------------------------------------------
diff --git 
a/jdbc-handler/src/main/java/org/apache/hive/storage/jdbc/conf/JdbcStorageConfig.java
 
b/jdbc-handler/src/main/java/org/apache/hive/storage/jdbc/conf/JdbcStorageConfig.java
new file mode 100644
index 0000000..ff6357d
--- /dev/null
+++ 
b/jdbc-handler/src/main/java/org/apache/hive/storage/jdbc/conf/JdbcStorageConfig.java
@@ -0,0 +1,49 @@
+/*
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hive.storage.jdbc.conf;
+
+public enum JdbcStorageConfig {
+  DATABASE_TYPE("database.type", true),
+  JDBC_URL("jdbc.url", true),
+  JDBC_DRIVER_CLASS("jdbc.driver", true),
+  QUERY("query", true),
+  JDBC_FETCH_SIZE("jdbc.fetch.size", false),
+  COLUMN_MAPPING("column.mapping", false);
+
+  private String propertyName;
+  private boolean required = false;
+
+
+  JdbcStorageConfig(String propertyName, boolean required) {
+    this.propertyName = propertyName;
+    this.required = required;
+  }
+
+
+  JdbcStorageConfig(String propertyName) {
+    this.propertyName = propertyName;
+  }
+
+
+  public String getPropertyName() {
+    return JdbcStorageConfigManager.CONFIG_PREFIX + "." + propertyName;
+  }
+
+
+  public boolean isRequired() {
+    return required;
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/hive/blob/12b27a35/jdbc-handler/src/main/java/org/apache/hive/storage/jdbc/conf/JdbcStorageConfigManager.java
----------------------------------------------------------------------
diff --git 
a/jdbc-handler/src/main/java/org/apache/hive/storage/jdbc/conf/JdbcStorageConfigManager.java
 
b/jdbc-handler/src/main/java/org/apache/hive/storage/jdbc/conf/JdbcStorageConfigManager.java
new file mode 100644
index 0000000..5267cda
--- /dev/null
+++ 
b/jdbc-handler/src/main/java/org/apache/hive/storage/jdbc/conf/JdbcStorageConfigManager.java
@@ -0,0 +1,97 @@
+/*
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hive.storage.jdbc.conf;
+
+import org.apache.hadoop.conf.Configuration;
+
+import org.apache.hive.storage.jdbc.QueryConditionBuilder;
+
+import java.util.EnumSet;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Properties;
+
+/**
+ * Main configuration handler class
+ */
+public class JdbcStorageConfigManager {
+
+  public static final String CONFIG_PREFIX = "hive.sql";
+  private static final EnumSet<JdbcStorageConfig> DEFAULT_REQUIRED_PROPERTIES =
+    EnumSet.of(JdbcStorageConfig.DATABASE_TYPE,
+        JdbcStorageConfig.JDBC_URL,
+        JdbcStorageConfig.JDBC_DRIVER_CLASS,
+        JdbcStorageConfig.QUERY);
+
+
+  private JdbcStorageConfigManager() {
+  }
+
+
+  public static void copyConfigurationToJob(Properties props, Map<String, 
String> jobProps) {
+    checkRequiredPropertiesAreDefined(props);
+    for (Entry<Object, Object> entry : props.entrySet()) {
+      jobProps.put(String.valueOf(entry.getKey()), 
String.valueOf(entry.getValue()));
+    }
+  }
+
+
+  public static Configuration convertPropertiesToConfiguration(Properties 
props) {
+    checkRequiredPropertiesAreDefined(props);
+    Configuration conf = new Configuration();
+
+    for (Entry<Object, Object> entry : props.entrySet()) {
+      conf.set(String.valueOf(entry.getKey()), 
String.valueOf(entry.getValue()));
+    }
+
+    return conf;
+  }
+
+
+  private static void checkRequiredPropertiesAreDefined(Properties props) {
+    for (JdbcStorageConfig configKey : DEFAULT_REQUIRED_PROPERTIES) {
+      String propertyKey = configKey.getPropertyName();
+      if ((props == null) || (!props.containsKey(propertyKey)) || 
(isEmptyString(props.getProperty(propertyKey)))) {
+        throw new IllegalArgumentException("Property " + propertyKey + " is 
required.");
+      }
+    }
+
+    DatabaseType dbType = 
DatabaseType.valueOf(props.getProperty(JdbcStorageConfig.DATABASE_TYPE.getPropertyName()));
+    CustomConfigManager configManager = 
CustomConfigManagerFactory.getCustomConfigManagerFor(dbType);
+    configManager.checkRequiredProperties(props);
+  }
+
+
+  public static String getConfigValue(JdbcStorageConfig key, Configuration 
config) {
+    return config.get(key.getPropertyName());
+  }
+
+
+  public static String getQueryToExecute(Configuration config) {
+    String query = config.get(JdbcStorageConfig.QUERY.getPropertyName());
+    String hiveFilterCondition = 
QueryConditionBuilder.getInstance().buildCondition(config);
+    if ((hiveFilterCondition != null) && 
(!hiveFilterCondition.trim().isEmpty())) {
+      query = query + " WHERE " + hiveFilterCondition;
+    }
+
+    return query;
+  }
+
+
+  private static boolean isEmptyString(String value) {
+    return ((value == null) || (value.trim().isEmpty()));
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/hive/blob/12b27a35/jdbc-handler/src/main/java/org/apache/hive/storage/jdbc/dao/DatabaseAccessor.java
----------------------------------------------------------------------
diff --git 
a/jdbc-handler/src/main/java/org/apache/hive/storage/jdbc/dao/DatabaseAccessor.java
 
b/jdbc-handler/src/main/java/org/apache/hive/storage/jdbc/dao/DatabaseAccessor.java
new file mode 100644
index 0000000..f50d53e
--- /dev/null
+++ 
b/jdbc-handler/src/main/java/org/apache/hive/storage/jdbc/dao/DatabaseAccessor.java
@@ -0,0 +1,34 @@
+/*
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hive.storage.jdbc.dao;
+
+import org.apache.hadoop.conf.Configuration;
+
+import org.apache.hive.storage.jdbc.exception.HiveJdbcDatabaseAccessException;
+
+import java.util.List;
+
+public interface DatabaseAccessor {
+
+  List<String> getColumnNames(Configuration conf) throws 
HiveJdbcDatabaseAccessException;
+
+
+  int getTotalNumberOfRecords(Configuration conf) throws 
HiveJdbcDatabaseAccessException;
+
+
+  JdbcRecordIterator
+    getRecordIterator(Configuration conf, int limit, int offset) throws 
HiveJdbcDatabaseAccessException;
+
+}

http://git-wip-us.apache.org/repos/asf/hive/blob/12b27a35/jdbc-handler/src/main/java/org/apache/hive/storage/jdbc/dao/DatabaseAccessorFactory.java
----------------------------------------------------------------------
diff --git 
a/jdbc-handler/src/main/java/org/apache/hive/storage/jdbc/dao/DatabaseAccessorFactory.java
 
b/jdbc-handler/src/main/java/org/apache/hive/storage/jdbc/dao/DatabaseAccessorFactory.java
new file mode 100644
index 0000000..7dc690f
--- /dev/null
+++ 
b/jdbc-handler/src/main/java/org/apache/hive/storage/jdbc/dao/DatabaseAccessorFactory.java
@@ -0,0 +1,53 @@
+/*
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hive.storage.jdbc.dao;
+
+import org.apache.hadoop.conf.Configuration;
+
+import org.apache.hive.storage.jdbc.conf.DatabaseType;
+import org.apache.hive.storage.jdbc.conf.JdbcStorageConfig;
+
+/**
+ * Factory for creating the correct DatabaseAccessor class for the job
+ */
+public class DatabaseAccessorFactory {
+
+  private DatabaseAccessorFactory() {
+  }
+
+
+  public static DatabaseAccessor getAccessor(DatabaseType dbType) {
+
+    DatabaseAccessor accessor = null;
+    switch (dbType) {
+    case MYSQL:
+      accessor = new MySqlDatabaseAccessor();
+      break;
+
+    default:
+      accessor = new GenericJdbcDatabaseAccessor();
+      break;
+    }
+
+    return accessor;
+  }
+
+
+  public static DatabaseAccessor getAccessor(Configuration conf) {
+    DatabaseType dbType = 
DatabaseType.valueOf(conf.get(JdbcStorageConfig.DATABASE_TYPE.getPropertyName()));
+    return getAccessor(dbType);
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/hive/blob/12b27a35/jdbc-handler/src/main/java/org/apache/hive/storage/jdbc/dao/GenericJdbcDatabaseAccessor.java
----------------------------------------------------------------------
diff --git 
a/jdbc-handler/src/main/java/org/apache/hive/storage/jdbc/dao/GenericJdbcDatabaseAccessor.java
 
b/jdbc-handler/src/main/java/org/apache/hive/storage/jdbc/dao/GenericJdbcDatabaseAccessor.java
new file mode 100644
index 0000000..b655aec
--- /dev/null
+++ 
b/jdbc-handler/src/main/java/org/apache/hive/storage/jdbc/dao/GenericJdbcDatabaseAccessor.java
@@ -0,0 +1,253 @@
+/*
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hive.storage.jdbc.dao;
+
+import org.apache.commons.dbcp.BasicDataSourceFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.hive.storage.jdbc.conf.JdbcStorageConfig;
+import org.apache.hive.storage.jdbc.conf.JdbcStorageConfigManager;
+import org.apache.hive.storage.jdbc.exception.HiveJdbcDatabaseAccessException;
+
+import javax.sql.DataSource;
+
+import java.sql.Connection;
+import java.sql.PreparedStatement;
+import java.sql.ResultSet;
+import java.sql.ResultSetMetaData;
+import java.sql.SQLException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Properties;
+
+/**
+ * A data accessor that should in theory work with all JDBC compliant database 
drivers.
+ */
+public class GenericJdbcDatabaseAccessor implements DatabaseAccessor {
+
+  protected static final String DBCP_CONFIG_PREFIX = 
JdbcStorageConfigManager.CONFIG_PREFIX + ".dbcp";
+  protected static final int DEFAULT_FETCH_SIZE = 1000;
+  protected static final Logger LOGGER = 
LoggerFactory.getLogger(GenericJdbcDatabaseAccessor.class);
+  protected DataSource dbcpDataSource = null;
+
+
+  public GenericJdbcDatabaseAccessor() {
+  }
+
+
+  @Override
+  public List<String> getColumnNames(Configuration conf) throws 
HiveJdbcDatabaseAccessException {
+    Connection conn = null;
+    PreparedStatement ps = null;
+    ResultSet rs = null;
+
+    try {
+      initializeDatabaseConnection(conf);
+      String sql = JdbcStorageConfigManager.getQueryToExecute(conf);
+      String metadataQuery = addLimitToQuery(sql, 1);
+      LOGGER.debug("Query to execute is [{}]", metadataQuery);
+
+      conn = dbcpDataSource.getConnection();
+      ps = conn.prepareStatement(metadataQuery);
+      rs = ps.executeQuery();
+
+      ResultSetMetaData metadata = rs.getMetaData();
+      int numColumns = metadata.getColumnCount();
+      List<String> columnNames = new ArrayList<String>(numColumns);
+      for (int i = 0; i < numColumns; i++) {
+        columnNames.add(metadata.getColumnName(i + 1));
+      }
+
+      return columnNames;
+    }
+    catch (Exception e) {
+      LOGGER.error("Error while trying to get column names.", e);
+      throw new HiveJdbcDatabaseAccessException("Error while trying to get 
column names: " + e.getMessage(), e);
+    }
+    finally {
+      cleanupResources(conn, ps, rs);
+    }
+
+  }
+
+
+  @Override
+  public int getTotalNumberOfRecords(Configuration conf) throws 
HiveJdbcDatabaseAccessException {
+    Connection conn = null;
+    PreparedStatement ps = null;
+    ResultSet rs = null;
+
+    try {
+      initializeDatabaseConnection(conf);
+      String sql = JdbcStorageConfigManager.getQueryToExecute(conf);
+      String countQuery = "SELECT COUNT(*) FROM (" + sql + ") tmptable";
+      LOGGER.debug("Query to execute is [{}]", countQuery);
+
+      conn = dbcpDataSource.getConnection();
+      ps = conn.prepareStatement(countQuery);
+      rs = ps.executeQuery();
+      if (rs.next()) {
+        return rs.getInt(1);
+      }
+      else {
+        LOGGER.warn("The count query did not return any results.", countQuery);
+        throw new HiveJdbcDatabaseAccessException("Count query did not return 
any results.");
+      }
+    }
+    catch (HiveJdbcDatabaseAccessException he) {
+      throw he;
+    }
+    catch (Exception e) {
+      LOGGER.error("Caught exception while trying to get the number of 
records", e);
+      throw new HiveJdbcDatabaseAccessException(e);
+    }
+    finally {
+      cleanupResources(conn, ps, rs);
+    }
+  }
+
+
+  @Override
+  public JdbcRecordIterator
+    getRecordIterator(Configuration conf, int limit, int offset) throws 
HiveJdbcDatabaseAccessException {
+
+    Connection conn = null;
+    PreparedStatement ps = null;
+    ResultSet rs = null;
+
+    try {
+      initializeDatabaseConnection(conf);
+      String sql = JdbcStorageConfigManager.getQueryToExecute(conf);
+      String limitQuery = addLimitAndOffsetToQuery(sql, limit, offset);
+      LOGGER.debug("Query to execute is [{}]", limitQuery);
+
+      conn = dbcpDataSource.getConnection();
+      ps = conn.prepareStatement(limitQuery, ResultSet.TYPE_FORWARD_ONLY, 
ResultSet.CONCUR_READ_ONLY);
+      ps.setFetchSize(getFetchSize(conf));
+      rs = ps.executeQuery();
+
+      return new JdbcRecordIterator(conn, ps, rs);
+    }
+    catch (Exception e) {
+      LOGGER.error("Caught exception while trying to execute query", e);
+      cleanupResources(conn, ps, rs);
+      throw new HiveJdbcDatabaseAccessException("Caught exception while trying 
to execute query", e);
+    }
+  }
+
+
+  /**
+   * Uses generic JDBC escape functions to add a limit and offset clause to a 
query string
+   *
+   * @param sql
+   * @param limit
+   * @param offset
+   * @return
+   */
+  protected String addLimitAndOffsetToQuery(String sql, int limit, int offset) 
{
+    if (offset == 0) {
+      return addLimitToQuery(sql, limit);
+    }
+    else {
+      return sql + " {LIMIT " + limit + " OFFSET " + offset + "}";
+    }
+  }
+
+
+  /*
+   * Uses generic JDBC escape functions to add a limit clause to a query string
+   */
+  protected String addLimitToQuery(String sql, int limit) {
+    return sql + " {LIMIT " + limit + "}";
+  }
+
+
+  protected void cleanupResources(Connection conn, PreparedStatement ps, 
ResultSet rs) {
+    try {
+      if (rs != null) {
+        rs.close();
+      }
+    } catch (SQLException e) {
+      LOGGER.warn("Caught exception during resultset cleanup.", e);
+    }
+
+    try {
+      if (ps != null) {
+        ps.close();
+      }
+    } catch (SQLException e) {
+      LOGGER.warn("Caught exception during statement cleanup.", e);
+    }
+
+    try {
+      if (conn != null) {
+        conn.close();
+      }
+    } catch (SQLException e) {
+      LOGGER.warn("Caught exception during connection cleanup.", e);
+    }
+  }
+
+  protected void initializeDatabaseConnection(Configuration conf) throws 
Exception {
+    if (dbcpDataSource == null) {
+      synchronized (this) {
+        if (dbcpDataSource == null) {
+          Properties props = getConnectionPoolProperties(conf);
+          dbcpDataSource = BasicDataSourceFactory.createDataSource(props);
+        }
+      }
+    }
+  }
+
+
+  protected Properties getConnectionPoolProperties(Configuration conf) {
+    // Create the default properties object
+    Properties dbProperties = getDefaultDBCPProperties();
+
+    // override with user defined properties
+    Map<String, String> userProperties = conf.getValByRegex(DBCP_CONFIG_PREFIX 
+ "\\.*");
+    if ((userProperties != null) && (!userProperties.isEmpty())) {
+      for (Entry<String, String> entry : userProperties.entrySet()) {
+        dbProperties.put(entry.getKey().replaceFirst(DBCP_CONFIG_PREFIX + 
"\\.", ""), entry.getValue());
+      }
+    }
+
+    // essential properties that shouldn't be overridden by users
+    dbProperties.put("url", 
conf.get(JdbcStorageConfig.JDBC_URL.getPropertyName()));
+    dbProperties.put("driverClassName", 
conf.get(JdbcStorageConfig.JDBC_DRIVER_CLASS.getPropertyName()));
+    dbProperties.put("type", "javax.sql.DataSource");
+    return dbProperties;
+  }
+
+
+  protected Properties getDefaultDBCPProperties() {
+    Properties props = new Properties();
+    props.put("initialSize", "1");
+    props.put("maxActive", "3");
+    props.put("maxIdle", "0");
+    props.put("maxWait", "10000");
+    props.put("timeBetweenEvictionRunsMillis", "30000");
+    return props;
+  }
+
+
+  protected int getFetchSize(Configuration conf) {
+    return conf.getInt(JdbcStorageConfig.JDBC_FETCH_SIZE.getPropertyName(), 
DEFAULT_FETCH_SIZE);
+  }
+}

http://git-wip-us.apache.org/repos/asf/hive/blob/12b27a35/jdbc-handler/src/main/java/org/apache/hive/storage/jdbc/dao/JdbcRecordIterator.java
----------------------------------------------------------------------
diff --git 
a/jdbc-handler/src/main/java/org/apache/hive/storage/jdbc/dao/JdbcRecordIterator.java
 
b/jdbc-handler/src/main/java/org/apache/hive/storage/jdbc/dao/JdbcRecordIterator.java
new file mode 100644
index 0000000..4262502
--- /dev/null
+++ 
b/jdbc-handler/src/main/java/org/apache/hive/storage/jdbc/dao/JdbcRecordIterator.java
@@ -0,0 +1,104 @@
+/*
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hive.storage.jdbc.dao;
+
+import org.apache.hadoop.io.NullWritable;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.sql.Connection;
+import java.sql.PreparedStatement;
+import java.sql.ResultSet;
+import java.sql.ResultSetMetaData;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.Map;
+
+/**
+ * An iterator that allows iterating through a SQL resultset. Includes methods 
to clear up resources.
+ */
+public class JdbcRecordIterator implements Iterator<Map<String, String>> {
+
+  private static final Logger LOGGER = 
LoggerFactory.getLogger(JdbcRecordIterator.class);
+
+  private Connection conn;
+  private PreparedStatement ps;
+  private ResultSet rs;
+
+
+  public JdbcRecordIterator(Connection conn, PreparedStatement ps, ResultSet 
rs) {
+    this.conn = conn;
+    this.ps = ps;
+    this.rs = rs;
+  }
+
+
+  @Override
+  public boolean hasNext() {
+    try {
+      return rs.next();
+    }
+    catch (Exception se) {
+      LOGGER.warn("hasNext() threw exception", se);
+      return false;
+    }
+  }
+
+
+  @Override
+  public Map<String, String> next() {
+    try {
+      ResultSetMetaData metadata = rs.getMetaData();
+      int numColumns = metadata.getColumnCount();
+      Map<String, String> record = new HashMap<String, String>(numColumns);
+      for (int i = 0; i < numColumns; i++) {
+        String key = metadata.getColumnName(i + 1);
+        String value = rs.getString(i + 1);
+        if (value == null) {
+          value = NullWritable.get().toString();
+        }
+        record.put(key, value);
+      }
+
+      return record;
+    }
+    catch (Exception e) {
+      LOGGER.warn("next() threw exception", e);
+      return null;
+    }
+  }
+
+
+  @Override
+  public void remove() {
+    throw new UnsupportedOperationException("Remove is not supported");
+  }
+
+
+  /**
+   * Release all DB resources
+   */
+  public void close() {
+    try {
+      rs.close();
+      ps.close();
+      conn.close();
+    }
+    catch (Exception e) {
+      LOGGER.warn("Caught exception while trying to close database objects", 
e);
+    }
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/hive/blob/12b27a35/jdbc-handler/src/main/java/org/apache/hive/storage/jdbc/dao/MySqlDatabaseAccessor.java
----------------------------------------------------------------------
diff --git 
a/jdbc-handler/src/main/java/org/apache/hive/storage/jdbc/dao/MySqlDatabaseAccessor.java
 
b/jdbc-handler/src/main/java/org/apache/hive/storage/jdbc/dao/MySqlDatabaseAccessor.java
new file mode 100644
index 0000000..7d821d8
--- /dev/null
+++ 
b/jdbc-handler/src/main/java/org/apache/hive/storage/jdbc/dao/MySqlDatabaseAccessor.java
@@ -0,0 +1,39 @@
+/*
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hive.storage.jdbc.dao;
+
+/**
+ * MySQL specific data accessor. This is needed because MySQL JDBC drivers do 
not support generic LIMIT and OFFSET
+ * escape functions
+ */
+public class MySqlDatabaseAccessor extends GenericJdbcDatabaseAccessor {
+
+  @Override
+  protected String addLimitAndOffsetToQuery(String sql, int limit, int offset) 
{
+    if (offset == 0) {
+      return addLimitToQuery(sql, limit);
+    }
+    else {
+      return sql + " LIMIT " + limit + "," + offset;
+    }
+  }
+
+
+  @Override
+  protected String addLimitToQuery(String sql, int limit) {
+    return sql + " LIMIT " + limit;
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/hive/blob/12b27a35/jdbc-handler/src/main/java/org/apache/hive/storage/jdbc/exception/HiveJdbcDatabaseAccessException.java
----------------------------------------------------------------------
diff --git 
a/jdbc-handler/src/main/java/org/apache/hive/storage/jdbc/exception/HiveJdbcDatabaseAccessException.java
 
b/jdbc-handler/src/main/java/org/apache/hive/storage/jdbc/exception/HiveJdbcDatabaseAccessException.java
new file mode 100644
index 0000000..cde859f
--- /dev/null
+++ 
b/jdbc-handler/src/main/java/org/apache/hive/storage/jdbc/exception/HiveJdbcDatabaseAccessException.java
@@ -0,0 +1,41 @@
+/*
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hive.storage.jdbc.exception;
+
+public class HiveJdbcDatabaseAccessException extends HiveJdbcStorageException {
+
+  private static final long serialVersionUID = -4106595742876276803L;
+
+
+  public HiveJdbcDatabaseAccessException() {
+    super();
+  }
+
+
+  public HiveJdbcDatabaseAccessException(String message, Throwable cause) {
+    super(message, cause);
+  }
+
+
+  public HiveJdbcDatabaseAccessException(String message) {
+    super(message);
+  }
+
+
+  public HiveJdbcDatabaseAccessException(Throwable cause) {
+    super(cause);
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/hive/blob/12b27a35/jdbc-handler/src/main/java/org/apache/hive/storage/jdbc/exception/HiveJdbcStorageException.java
----------------------------------------------------------------------
diff --git 
a/jdbc-handler/src/main/java/org/apache/hive/storage/jdbc/exception/HiveJdbcStorageException.java
 
b/jdbc-handler/src/main/java/org/apache/hive/storage/jdbc/exception/HiveJdbcStorageException.java
new file mode 100644
index 0000000..1317838
--- /dev/null
+++ 
b/jdbc-handler/src/main/java/org/apache/hive/storage/jdbc/exception/HiveJdbcStorageException.java
@@ -0,0 +1,40 @@
+/*
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hive.storage.jdbc.exception;
+
+public class HiveJdbcStorageException extends Exception {
+
+  private static final long serialVersionUID = 4858210651037826401L;
+
+
+  public HiveJdbcStorageException() {
+    super();
+  }
+
+
+  public HiveJdbcStorageException(String message) {
+    super(message);
+  }
+
+
+  public HiveJdbcStorageException(Throwable cause) {
+    super(cause);
+  }
+
+
+  public HiveJdbcStorageException(String message, Throwable cause) {
+    super(message, cause);
+  }
+}

http://git-wip-us.apache.org/repos/asf/hive/blob/12b27a35/jdbc-handler/src/test/java/org/apache/TestSuite.java
----------------------------------------------------------------------
diff --git a/jdbc-handler/src/test/java/org/apache/TestSuite.java 
b/jdbc-handler/src/test/java/org/apache/TestSuite.java
new file mode 100644
index 0000000..df8eab7
--- /dev/null
+++ b/jdbc-handler/src/test/java/org/apache/TestSuite.java
@@ -0,0 +1,29 @@
+/*
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hive.storage.jdbc;
+
+import org.junit.runner.RunWith;
+import org.junit.runners.Suite;
+import org.junit.runners.Suite.SuiteClasses;
+
+import org.apache.hive.config.JdbcStorageConfigManagerTest;
+import org.apache.hive.storage.jdbc.QueryConditionBuilderTest;
+import org.apache.hive.storage.jdbc.dao.GenericJdbcDatabaseAccessorTest;
+
+@RunWith(Suite.class)
+@SuiteClasses({ JdbcStorageConfigManagerTest.class, 
GenericJdbcDatabaseAccessorTest.class,
+        QueryConditionBuilderTest.class })
+public class TestSuite {
+}

http://git-wip-us.apache.org/repos/asf/hive/blob/12b27a35/jdbc-handler/src/test/java/org/apache/hive/config/JdbcStorageConfigManagerTest.java
----------------------------------------------------------------------
diff --git 
a/jdbc-handler/src/test/java/org/apache/hive/config/JdbcStorageConfigManagerTest.java
 
b/jdbc-handler/src/test/java/org/apache/hive/config/JdbcStorageConfigManagerTest.java
new file mode 100644
index 0000000..c950831
--- /dev/null
+++ 
b/jdbc-handler/src/test/java/org/apache/hive/config/JdbcStorageConfigManagerTest.java
@@ -0,0 +1,87 @@
+/*
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hive.config;
+
+import static org.hamcrest.Matchers.equalTo;
+import static org.hamcrest.Matchers.is;
+import static org.hamcrest.Matchers.notNullValue;
+import static org.junit.Assert.assertThat;
+
+import org.junit.Test;
+
+import org.apache.hive.storage.jdbc.conf.DatabaseType;
+import org.apache.hive.storage.jdbc.conf.JdbcStorageConfig;
+import org.apache.hive.storage.jdbc.conf.JdbcStorageConfigManager;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Properties;
+
+public class JdbcStorageConfigManagerTest {
+
+  @Test
+  public void testWithAllRequiredSettingsDefined() {
+    Properties props = new Properties();
+    props.put(JdbcStorageConfig.DATABASE_TYPE.getPropertyName(), 
DatabaseType.MYSQL.toString());
+    props.put(JdbcStorageConfig.JDBC_URL.getPropertyName(), 
"jdbc://localhost:3306/hive");
+    props.put(JdbcStorageConfig.QUERY.getPropertyName(), "SELECT 
col1,col2,col3 FROM sometable");
+    props.put(JdbcStorageConfig.JDBC_DRIVER_CLASS.getPropertyName(), 
"com.mysql.jdbc.Driver");
+
+    Map<String, String> jobMap = new HashMap<String, String>();
+    JdbcStorageConfigManager.copyConfigurationToJob(props, jobMap);
+
+    assertThat(jobMap, is(notNullValue()));
+    assertThat(jobMap.size(), is(equalTo(4)));
+    assertThat(jobMap.get(JdbcStorageConfig.DATABASE_TYPE.getPropertyName()), 
is(equalTo("MYSQL")));
+    assertThat(jobMap.get(JdbcStorageConfig.JDBC_URL.getPropertyName()), 
is(equalTo("jdbc://localhost:3306/hive")));
+    assertThat(jobMap.get(JdbcStorageConfig.QUERY.getPropertyName()),
+        is(equalTo("SELECT col1,col2,col3 FROM sometable")));
+  }
+
+
+  @Test(expected = IllegalArgumentException.class)
+  public void testWithJdbcUrlMissing() {
+    Properties props = new Properties();
+    props.put(JdbcStorageConfig.DATABASE_TYPE.getPropertyName(), 
DatabaseType.MYSQL.toString());
+    props.put(JdbcStorageConfig.QUERY.getPropertyName(), "SELECT 
col1,col2,col3 FROM sometable");
+
+    Map<String, String> jobMap = new HashMap<String, String>();
+    JdbcStorageConfigManager.copyConfigurationToJob(props, jobMap);
+  }
+
+
+  @Test(expected = IllegalArgumentException.class)
+  public void testWithDatabaseTypeMissing() {
+    Properties props = new Properties();
+    props.put(JdbcStorageConfig.JDBC_URL.getPropertyName(), 
"jdbc://localhost:3306/hive");
+    props.put(JdbcStorageConfig.QUERY.getPropertyName(), "SELECT 
col1,col2,col3 FROM sometable");
+
+    Map<String, String> jobMap = new HashMap<String, String>();
+    JdbcStorageConfigManager.copyConfigurationToJob(props, jobMap);
+  }
+
+
+  @Test(expected = IllegalArgumentException.class)
+  public void testWithUnknownDatabaseType() {
+    Properties props = new Properties();
+    props.put(JdbcStorageConfig.DATABASE_TYPE.getPropertyName(), "Postgres");
+    props.put(JdbcStorageConfig.JDBC_URL.getPropertyName(), 
"jdbc://localhost:3306/hive");
+    props.put(JdbcStorageConfig.QUERY.getPropertyName(), "SELECT 
col1,col2,col3 FROM sometable");
+
+    Map<String, String> jobMap = new HashMap<String, String>();
+    JdbcStorageConfigManager.copyConfigurationToJob(props, jobMap);
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/hive/blob/12b27a35/jdbc-handler/src/test/java/org/apache/hive/storage/jdbc/JdbcInputFormatTest.java
----------------------------------------------------------------------
diff --git 
a/jdbc-handler/src/test/java/org/apache/hive/storage/jdbc/JdbcInputFormatTest.java
 
b/jdbc-handler/src/test/java/org/apache/hive/storage/jdbc/JdbcInputFormatTest.java
new file mode 100644
index 0000000..cc6acf1
--- /dev/null
+++ 
b/jdbc-handler/src/test/java/org/apache/hive/storage/jdbc/JdbcInputFormatTest.java
@@ -0,0 +1,81 @@
+/*
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hive.storage.jdbc;
+
+import static org.hamcrest.Matchers.is;
+import static org.hamcrest.Matchers.notNullValue;
+import static org.junit.Assert.assertThat;
+import static org.mockito.Matchers.any;
+import static org.mockito.Mockito.when;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.mapred.InputSplit;
+import org.apache.hadoop.mapred.JobConf;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.mockito.Mock;
+import org.mockito.runners.MockitoJUnitRunner;
+
+import org.apache.hive.storage.jdbc.dao.DatabaseAccessor;
+import org.apache.hive.storage.jdbc.exception.HiveJdbcDatabaseAccessException;
+
+import java.io.IOException;
+
+@RunWith(MockitoJUnitRunner.class)
+public class JdbcInputFormatTest {
+
+  @Mock
+  private DatabaseAccessor mockDatabaseAccessor;
+
+
+  @Test
+  public void testSplitLogic_noSpillOver() throws 
HiveJdbcDatabaseAccessException, IOException {
+    JdbcInputFormat f = new JdbcInputFormat();
+    
when(mockDatabaseAccessor.getTotalNumberOfRecords(any(Configuration.class))).thenReturn(15);
+    f.setDbAccessor(mockDatabaseAccessor);
+
+    JobConf conf = new JobConf();
+    conf.set("mapred.input.dir", "/temp");
+    InputSplit[] splits = f.getSplits(conf, 3);
+
+    assertThat(splits, is(notNullValue()));
+    assertThat(splits.length, is(3));
+
+    assertThat(splits[0].getLength(), is(5L));
+  }
+
+
+  @Test
+  public void testSplitLogic_withSpillOver() throws 
HiveJdbcDatabaseAccessException, IOException {
+    JdbcInputFormat f = new JdbcInputFormat();
+    
when(mockDatabaseAccessor.getTotalNumberOfRecords(any(Configuration.class))).thenReturn(15);
+    f.setDbAccessor(mockDatabaseAccessor);
+
+    JobConf conf = new JobConf();
+    conf.set("mapred.input.dir", "/temp");
+    InputSplit[] splits = f.getSplits(conf, 6);
+
+    assertThat(splits, is(notNullValue()));
+    assertThat(splits.length, is(6));
+
+    for (int i = 0; i < 3; i++) {
+      assertThat(splits[i].getLength(), is(3L));
+    }
+
+    for (int i = 3; i < 6; i++) {
+      assertThat(splits[i].getLength(), is(2L));
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/hive/blob/12b27a35/jdbc-handler/src/test/java/org/apache/hive/storage/jdbc/QueryConditionBuilderTest.java
----------------------------------------------------------------------
diff --git 
a/jdbc-handler/src/test/java/org/apache/hive/storage/jdbc/QueryConditionBuilderTest.java
 
b/jdbc-handler/src/test/java/org/apache/hive/storage/jdbc/QueryConditionBuilderTest.java
new file mode 100644
index 0000000..5cdae47
--- /dev/null
+++ 
b/jdbc-handler/src/test/java/org/apache/hive/storage/jdbc/QueryConditionBuilderTest.java
@@ -0,0 +1,151 @@
+/*
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hive.storage.jdbc;
+
+import static org.hamcrest.Matchers.equalToIgnoringWhiteSpace;
+import static org.hamcrest.Matchers.is;
+import static org.hamcrest.Matchers.notNullValue;
+import static org.junit.Assert.assertThat;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hive.ql.plan.TableScanDesc;
+import org.apache.hadoop.hive.serde.serdeConstants;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import org.apache.hive.storage.jdbc.conf.JdbcStorageConfig;
+
+import java.io.IOException;
+import java.util.Scanner;
+
+public class QueryConditionBuilderTest {
+
+  private static String condition1;
+  private static String condition2;
+
+
+  @BeforeClass
+  public static void setup() throws IOException {
+    condition1 = readFileContents("condition1.xml");
+    condition2 = readFileContents("condition2.xml");
+  }
+
+
+  private static String readFileContents(String name) throws IOException {
+    try (Scanner s = new 
Scanner(QueryConditionBuilderTest.class.getClassLoader().getResourceAsStream(name)))
 {
+      return s.useDelimiter("\\Z").next();
+    }
+  }
+
+
+  @Test
+  public void testSimpleCondition_noTranslation() {
+    Configuration conf = new Configuration();
+    conf.set(TableScanDesc.FILTER_EXPR_CONF_STR, condition1);
+    conf.set(serdeConstants.LIST_COLUMNS, "visitor_id,sentiment,tracking_id");
+    String condition = 
QueryConditionBuilder.getInstance().buildCondition(conf);
+
+    assertThat(condition, is(notNullValue()));
+    assertThat(condition, is(equalToIgnoringWhiteSpace("(visitor_id = 'x')")));
+  }
+
+
+  @Test
+  public void testSimpleCondition_withTranslation() {
+    Configuration conf = new Configuration();
+    conf.set(TableScanDesc.FILTER_EXPR_CONF_STR, condition1);
+    conf.set(serdeConstants.LIST_COLUMNS, "visitor_id,sentiment,tracking_id");
+    conf.set(JdbcStorageConfig.COLUMN_MAPPING.getPropertyName(),
+        "visitor_id=vid, sentiment=sentiment, tracking_id=tracking_id");
+    String condition = 
QueryConditionBuilder.getInstance().buildCondition(conf);
+
+    assertThat(condition, is(notNullValue()));
+    assertThat(condition, is(equalToIgnoringWhiteSpace("(vid = 'x')")));
+  }
+
+
+  @Test
+  public void testSimpleCondition_withDateType() {
+    Configuration conf = new Configuration();
+    conf.set(TableScanDesc.FILTER_EXPR_CONF_STR, condition1);
+    conf.set(serdeConstants.LIST_COLUMNS, "visitor_id,sentiment,tracking_id");
+    conf.set(JdbcStorageConfig.COLUMN_MAPPING.getPropertyName(),
+        "visitor_id=vid:date, sentiment=sentiment, tracking_id=tracking_id");
+    String condition = 
QueryConditionBuilder.getInstance().buildCondition(conf);
+
+    assertThat(condition, is(notNullValue()));
+    assertThat(condition, is(equalToIgnoringWhiteSpace("({d vid} = 'x')")));
+  }
+
+
+  @Test
+  public void testSimpleCondition_withVariedCaseMappings() {
+    Configuration conf = new Configuration();
+    conf.set(TableScanDesc.FILTER_EXPR_CONF_STR, condition1);
+    conf.set(serdeConstants.LIST_COLUMNS, "visitor_ID,sentiment,tracking_id");
+    conf.set(JdbcStorageConfig.COLUMN_MAPPING.getPropertyName(),
+        "visitor_id=VID:date, sentiment=sentiment, tracking_id=tracking_id");
+    String condition = 
QueryConditionBuilder.getInstance().buildCondition(conf);
+
+    assertThat(condition, is(notNullValue()));
+    assertThat(condition, is(equalToIgnoringWhiteSpace("({d vid} = 'x')")));
+  }
+
+
+  @Test
+  public void testMultipleConditions_noTranslation() {
+    Configuration conf = new Configuration();
+    conf.set(TableScanDesc.FILTER_EXPR_CONF_STR, condition2);
+    conf.set(serdeConstants.LIST_COLUMNS, "visitor_id,sentiment,tracking_id");
+    String condition = 
QueryConditionBuilder.getInstance().buildCondition(conf);
+
+    assertThat(condition, is(notNullValue()));
+    assertThat(condition, is(equalToIgnoringWhiteSpace("((visitor_id = 'x') 
and (sentiment = 'y'))")));
+  }
+
+
+  @Test
+  public void testMultipleConditions_withTranslation() {
+    Configuration conf = new Configuration();
+    conf.set(TableScanDesc.FILTER_EXPR_CONF_STR, condition2);
+    conf.set(serdeConstants.LIST_COLUMNS, "visitor_id,sentiment,tracking_id");
+    conf.set(JdbcStorageConfig.COLUMN_MAPPING.getPropertyName(), 
"visitor_id=v,sentiment=s,tracking_id=t");
+    String condition = 
QueryConditionBuilder.getInstance().buildCondition(conf);
+
+    assertThat(condition, is(notNullValue()));
+    assertThat(condition, is(equalToIgnoringWhiteSpace("((v = 'x') and (s = 
'y'))")));
+  }
+
+
+  @Test
+  public void testWithNullConf() {
+    String condition = 
QueryConditionBuilder.getInstance().buildCondition(null);
+    assertThat(condition, is(notNullValue()));
+    assertThat(condition.trim().isEmpty(), is(true));
+  }
+
+
+  @Test
+  public void testWithUndefinedFilterExpr() {
+    Configuration conf = new Configuration();
+    conf.set(serdeConstants.LIST_COLUMNS, "visitor_id,sentiment,tracking_id");
+    conf.set(JdbcStorageConfig.COLUMN_MAPPING.getPropertyName(), 
"visitor_id=v,sentiment=s,tracking_id=t");
+    String condition = 
QueryConditionBuilder.getInstance().buildCondition(conf);
+
+    assertThat(condition, is(notNullValue()));
+    assertThat(condition.trim().isEmpty(), is(true));
+  }
+
+}

Reply via email to