Repository: spark
Updated Branches:
  refs/heads/master a0d8a61ab -> e991255e7


[SPARK-6913][SQL] Fixed "java.sql.SQLException: No suitable driver found"

Fixed `java.sql.SQLException: No suitable driver found` when loading DataFrame 
into Spark SQL if the driver is supplied with `--jars` argument.

The problem is in `java.sql.DriverManager` class that can't access drivers 
loaded by Spark ClassLoader.

Wrappers that forward requests are created for these drivers.

Also, it's not necessary any more to include JDBC drivers in 
`--driver-class-path` in local mode, specifying in `--jars` argument is 
sufficient.

Author: Vyacheslav Baranov <slavik.bara...@gmail.com>

Closes #5782 from SlavikBaranov/SPARK-6913 and squashes the following commits:

510c43f [Vyacheslav Baranov] [SPARK-6913] Fixed review comments
b2a727c [Vyacheslav Baranov] [SPARK-6913] Fixed thread race on driver 
registration
c8294ae [Vyacheslav Baranov] [SPARK-6913] Fixed "No suitable driver found" when 
using using JDBC driver added with SparkContext.addJar


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/e991255e
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/e991255e
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/e991255e

Branch: refs/heads/master
Commit: e991255e7203a0f7080efbd71f57574f46076711
Parents: a0d8a61
Author: Vyacheslav Baranov <slavik.bara...@gmail.com>
Authored: Thu Apr 30 18:45:14 2015 -0700
Committer: Michael Armbrust <mich...@databricks.com>
Committed: Thu Apr 30 18:45:14 2015 -0700

----------------------------------------------------------------------
 .../org/apache/spark/sql/jdbc/JDBCRDD.scala     |  2 +-
 .../apache/spark/sql/jdbc/JDBCRelation.scala    |  4 +-
 .../scala/org/apache/spark/sql/jdbc/jdbc.scala  | 60 +++++++++++++++++++-
 3 files changed, 62 insertions(+), 4 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/e991255e/sql/core/src/main/scala/org/apache/spark/sql/jdbc/JDBCRDD.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/jdbc/JDBCRDD.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/jdbc/JDBCRDD.scala
index cef92ab..2f6ba48 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/jdbc/JDBCRDD.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/jdbc/JDBCRDD.scala
@@ -159,7 +159,7 @@ private[sql] object JDBCRDD extends Logging {
   def getConnector(driver: String, url: String, properties: Properties): () => 
Connection = {
     () => {
       try {
-        if (driver != null) 
Utils.getContextOrSparkClassLoader.loadClass(driver)
+        if (driver != null) DriverRegistry.register(driver)
       } catch {
         case e: ClassNotFoundException => {
           logWarning(s"Couldn't find class $driver", e);

http://git-wip-us.apache.org/repos/asf/spark/blob/e991255e/sql/core/src/main/scala/org/apache/spark/sql/jdbc/JDBCRelation.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/jdbc/JDBCRelation.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/jdbc/JDBCRelation.scala
index 5f48008..d6b3fb3 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/jdbc/JDBCRelation.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/jdbc/JDBCRelation.scala
@@ -100,7 +100,7 @@ private[sql] class DefaultSource extends RelationProvider {
     val upperBound = parameters.getOrElse("upperBound", null)
     val numPartitions = parameters.getOrElse("numPartitions", null)
 
-    if (driver != null) Utils.getContextOrSparkClassLoader.loadClass(driver)
+    if (driver != null) DriverRegistry.register(driver)
 
     if (partitionColumn != null
         && (lowerBound == null || upperBound == null || numPartitions == 
null)) {
@@ -136,7 +136,7 @@ private[sql] case class JDBCRelation(
   override val schema: StructType = JDBCRDD.resolveTable(url, table, 
properties)
 
   override def buildScan(requiredColumns: Array[String], filters: 
Array[Filter]): RDD[Row] = {
-    val driver: String = DriverManager.getDriver(url).getClass.getCanonicalName
+    val driver: String = DriverRegistry.getDriverClassName(url)
     JDBCRDD.scanTable(
       sqlContext.sparkContext,
       schema,

http://git-wip-us.apache.org/repos/asf/spark/blob/e991255e/sql/core/src/main/scala/org/apache/spark/sql/jdbc/jdbc.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/jdbc/jdbc.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/jdbc/jdbc.scala
index d4e0abc..ae9af1e 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/jdbc/jdbc.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/jdbc/jdbc.scala
@@ -17,10 +17,14 @@
 
 package org.apache.spark.sql
 
-import java.sql.{Connection, DriverManager, PreparedStatement}
+import java.sql.{Connection, Driver, DriverManager, DriverPropertyInfo, 
PreparedStatement}
+import java.util.Properties
+
+import scala.collection.mutable
 
 import org.apache.spark.Logging
 import org.apache.spark.sql.types._
+import org.apache.spark.util.Utils
 
 package object jdbc {
   private[sql] object JDBCWriteDetails extends Logging {
@@ -179,4 +183,58 @@ package object jdbc {
     }
 
   }
+
+  private [sql] class DriverWrapper(val wrapped: Driver) extends Driver {
+    override def acceptsURL(url: String): Boolean = wrapped.acceptsURL(url)
+
+    override def jdbcCompliant(): Boolean = wrapped.jdbcCompliant()
+
+    override def getPropertyInfo(url: String, info: Properties): 
Array[DriverPropertyInfo] = {
+      wrapped.getPropertyInfo(url, info)
+    }
+
+    override def getMinorVersion: Int = wrapped.getMinorVersion
+
+    override def getParentLogger: java.util.logging.Logger = 
wrapped.getParentLogger
+
+    override def connect(url: String, info: Properties): Connection = 
wrapped.connect(url, info)
+
+    override def getMajorVersion: Int = wrapped.getMajorVersion
+  }
+
+  /**
+   * java.sql.DriverManager is always loaded by bootstrap classloader,
+   * so it can't load JDBC drivers accessible by Spark ClassLoader.
+   *
+   * To solve the problem, drivers from user-supplied jars are wrapped
+   * into thin wrapper.
+   */
+  private [sql] object DriverRegistry extends Logging {
+
+    private val wrapperMap: mutable.Map[String, DriverWrapper] = 
mutable.Map.empty
+
+    def register(className: String): Unit = {
+      val cls = Utils.getContextOrSparkClassLoader.loadClass(className)
+      if (cls.getClassLoader == null) {
+        logTrace(s"$className has been loaded with bootstrap ClassLoader, 
wrapper is not required")
+      } else if (wrapperMap.get(className).isDefined) {
+        logTrace(s"Wrapper for $className already exists")
+      } else {
+        synchronized {
+          if (wrapperMap.get(className).isEmpty) {
+            val wrapper = new 
DriverWrapper(cls.newInstance().asInstanceOf[Driver])
+            DriverManager.registerDriver(wrapper)
+            wrapperMap(className) = wrapper
+            logTrace(s"Wrapper for $className registered")
+          }
+        }
+      }
+    }
+    
+    def getDriverClassName(url: String): String = DriverManager.getDriver(url) 
match {
+      case wrapper: DriverWrapper => wrapper.wrapped.getClass.getCanonicalName
+      case driver => driver.getClass.getCanonicalName  
+    }
+  }
+
 } // package object jdbc


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to