Repository: spark
Updated Branches:
  refs/heads/branch-1.6 ce90bbe42 -> 64813b7e7


[SPARK-11998][SQL][TEST-HADOOP2.0] When downloading Hadoop artifacts from 
maven, we need to try to download the version that is used by Spark

If we need to download Hive/Hadoop artifacts, try to download a Hadoop that 
matches the Hadoop used by Spark. If the Hadoop artifact cannot be resolved 
(e.g. Hadoop version is a vendor specific version like 2.0.0-cdh4.1.1), we will 
use Hadoop 2.4.0 (we used to hard code this version as the hadoop that we will 
download from maven) and we will not share Hadoop classes.

I tested this match in my laptop with the following confs (these confs are used 
by our builds). All tests are good.
```
build/sbt -Phadoop-1 -Dhadoop.version=1.2.1 -Pkinesis-asl -Phive-thriftserver 
-Phive
build/sbt -Phadoop-1 -Dhadoop.version=2.0.0-mr1-cdh4.1.1 -Pkinesis-asl 
-Phive-thriftserver -Phive
build/sbt -Pyarn -Phadoop-2.2 -Pkinesis-asl -Phive-thriftserver -Phive
build/sbt -Pyarn -Phadoop-2.3 -Dhadoop.version=2.3.0 -Pkinesis-asl 
-Phive-thriftserver -Phive
```

Author: Yin Huai <yh...@databricks.com>

Closes #9979 from yhuai/versionsSuite.

(cherry picked from commit ad76562390b81207f8f32491c0bd8ad0e020141f)
Signed-off-by: Yin Huai <yh...@databricks.com>


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/64813b7e
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/64813b7e
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/64813b7e

Branch: refs/heads/branch-1.6
Commit: 64813b7e7ed5d0367d5e8c58831d64dd4b1007d2
Parents: ce90bbe
Author: Yin Huai <yh...@databricks.com>
Authored: Thu Nov 26 16:20:08 2015 -0800
Committer: Yin Huai <yh...@databricks.com>
Committed: Thu Nov 26 16:20:19 2015 -0800

----------------------------------------------------------------------
 .../org/apache/spark/sql/hive/HiveContext.scala |  4 +-
 .../sql/hive/client/IsolatedClientLoader.scala  | 62 ++++++++++++++++----
 .../spark/sql/hive/client/VersionsSuite.scala   | 23 ++++++--
 3 files changed, 72 insertions(+), 17 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/64813b7e/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala
----------------------------------------------------------------------
diff --git 
a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala 
b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala
index 8a42641..e83941c 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala
@@ -35,6 +35,7 @@ import org.apache.hadoop.hive.conf.HiveConf.ConfVars
 import org.apache.hadoop.hive.ql.metadata.Table
 import org.apache.hadoop.hive.ql.parse.VariableSubstitution
 import org.apache.hadoop.hive.serde2.io.{DateWritable, TimestampWritable}
+import org.apache.hadoop.util.VersionInfo
 
 import org.apache.spark.api.java.JavaSparkContext
 import org.apache.spark.sql.SQLConf.SQLConfEntry
@@ -288,7 +289,8 @@ class HiveContext private[hive](
       logInfo(
         s"Initializing HiveMetastoreConnection version $hiveMetastoreVersion 
using maven.")
       IsolatedClientLoader.forVersion(
-        version = hiveMetastoreVersion,
+        hiveMetastoreVersion = hiveMetastoreVersion,
+        hadoopVersion = VersionInfo.getVersion,
         config = allConfig,
         barrierPrefixes = hiveMetastoreBarrierPrefixes,
         sharedPrefixes = hiveMetastoreSharedPrefixes)

http://git-wip-us.apache.org/repos/asf/spark/blob/64813b7e/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/IsolatedClientLoader.scala
----------------------------------------------------------------------
diff --git 
a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/IsolatedClientLoader.scala
 
b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/IsolatedClientLoader.scala
index e041e0d..010051d 100644
--- 
a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/IsolatedClientLoader.scala
+++ 
b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/IsolatedClientLoader.scala
@@ -34,23 +34,51 @@ import org.apache.spark.sql.hive.HiveContext
 import org.apache.spark.util.{MutableURLClassLoader, Utils}
 
 /** Factory for `IsolatedClientLoader` with specific versions of hive. */
-private[hive] object IsolatedClientLoader {
+private[hive] object IsolatedClientLoader extends Logging {
   /**
    * Creates isolated Hive client loaders by downloading the requested version 
from maven.
    */
   def forVersion(
-      version: String,
+      hiveMetastoreVersion: String,
+      hadoopVersion: String,
       config: Map[String, String] = Map.empty,
       ivyPath: Option[String] = None,
       sharedPrefixes: Seq[String] = Seq.empty,
       barrierPrefixes: Seq[String] = Seq.empty): IsolatedClientLoader = 
synchronized {
-    val resolvedVersion = hiveVersion(version)
-    val files = resolvedVersions.getOrElseUpdate(resolvedVersion,
-      downloadVersion(resolvedVersion, ivyPath))
+    val resolvedVersion = hiveVersion(hiveMetastoreVersion)
+    // We will first try to share Hadoop classes. If we cannot resolve the 
Hadoop artifact
+    // with the given version, we will use Hadoop 2.4.0 and then will not 
share Hadoop classes.
+    var sharesHadoopClasses = true
+    val files = if (resolvedVersions.contains((resolvedVersion, 
hadoopVersion))) {
+      resolvedVersions((resolvedVersion, hadoopVersion))
+    } else {
+      val (downloadedFiles, actualHadoopVersion) =
+        try {
+          (downloadVersion(resolvedVersion, hadoopVersion, ivyPath), 
hadoopVersion)
+        } catch {
+          case e: RuntimeException if e.getMessage.contains("hadoop") =>
+            // If the error message contains hadoop, it is probably because 
the hadoop
+            // version cannot be resolved (e.g. it is a vendor specific 
version like
+            // 2.0.0-cdh4.1.1). If it is the case, we will try just
+            // "org.apache.hadoop:hadoop-client:2.4.0". 
"org.apache.hadoop:hadoop-client:2.4.0"
+            // is used just because we used to hard code it as the hadoop 
artifact to download.
+            logWarning(s"Failed to resolve Hadoop artifacts for the version 
${hadoopVersion}. " +
+              s"We will change the hadoop version from ${hadoopVersion} to 
2.4.0 and try again. " +
+              "Hadoop classes will not be shared between Spark and Hive 
metastore client. " +
+              "It is recommended to set jars used by Hive metastore client 
through " +
+              "spark.sql.hive.metastore.jars in the production environment.")
+            sharesHadoopClasses = false
+            (downloadVersion(resolvedVersion, "2.4.0", ivyPath), "2.4.0")
+        }
+      resolvedVersions.put((resolvedVersion, actualHadoopVersion), 
downloadedFiles)
+      resolvedVersions((resolvedVersion, actualHadoopVersion))
+    }
+
     new IsolatedClientLoader(
-      version = hiveVersion(version),
+      version = hiveVersion(hiveMetastoreVersion),
       execJars = files,
       config = config,
+      sharesHadoopClasses = sharesHadoopClasses,
       sharedPrefixes = sharedPrefixes,
       barrierPrefixes = barrierPrefixes)
   }
@@ -64,12 +92,15 @@ private[hive] object IsolatedClientLoader {
     case "1.2" | "1.2.0" | "1.2.1" => hive.v1_2
   }
 
-  private def downloadVersion(version: HiveVersion, ivyPath: Option[String]): 
Seq[URL] = {
+  private def downloadVersion(
+      version: HiveVersion,
+      hadoopVersion: String,
+      ivyPath: Option[String]): Seq[URL] = {
     val hiveArtifacts = version.extraDeps ++
       Seq("hive-metastore", "hive-exec", "hive-common", "hive-serde")
         .map(a => s"org.apache.hive:$a:${version.fullVersion}") ++
       Seq("com.google.guava:guava:14.0.1",
-        "org.apache.hadoop:hadoop-client:2.4.0")
+        s"org.apache.hadoop:hadoop-client:$hadoopVersion")
 
     val classpath = quietly {
       SparkSubmitUtils.resolveMavenCoordinates(
@@ -86,7 +117,10 @@ private[hive] object IsolatedClientLoader {
     tempDir.listFiles().map(_.toURI.toURL)
   }
 
-  private def resolvedVersions = new 
scala.collection.mutable.HashMap[HiveVersion, Seq[URL]]
+  // A map from a given pair of HiveVersion and Hadoop version to jar files.
+  // It is only used by forVersion.
+  private val resolvedVersions =
+    new scala.collection.mutable.HashMap[(HiveVersion, String), Seq[URL]]
 }
 
 /**
@@ -106,6 +140,7 @@ private[hive] object IsolatedClientLoader {
  * @param config   A set of options that will be added to the HiveConf of the 
constructed client.
  * @param isolationOn When true, custom versions of barrier classes will be 
constructed.  Must be
  *                    true unless loading the version of hive that is on 
Sparks classloader.
+ * @param sharesHadoopClasses When true, we will share Hadoop classes between 
Spark and
  * @param rootClassLoader The system root classloader. Must not know about 
Hive classes.
  * @param baseClassLoader The spark classloader that is used to load shared 
classes.
  */
@@ -114,6 +149,7 @@ private[hive] class IsolatedClientLoader(
     val execJars: Seq[URL] = Seq.empty,
     val config: Map[String, String] = Map.empty,
     val isolationOn: Boolean = true,
+    val sharesHadoopClasses: Boolean = true,
     val rootClassLoader: ClassLoader = 
ClassLoader.getSystemClassLoader.getParent.getParent,
     val baseClassLoader: ClassLoader = 
Thread.currentThread().getContextClassLoader,
     val sharedPrefixes: Seq[String] = Seq.empty,
@@ -126,16 +162,20 @@ private[hive] class IsolatedClientLoader(
   /** All jars used by the hive specific classloader. */
   protected def allJars = execJars.toArray
 
-  protected def isSharedClass(name: String): Boolean =
+  protected def isSharedClass(name: String): Boolean = {
+    val isHadoopClass =
+      name.startsWith("org.apache.hadoop.") && 
!name.startsWith("org.apache.hadoop.hive.")
+
     name.contains("slf4j") ||
     name.contains("log4j") ||
     name.startsWith("org.apache.spark.") ||
-    (name.startsWith("org.apache.hadoop.") && 
!name.startsWith("org.apache.hadoop.hive.")) ||
+    (sharesHadoopClasses && isHadoopClass) ||
     name.startsWith("scala.") ||
     (name.startsWith("com.google") && !name.startsWith("com.google.cloud")) ||
     name.startsWith("java.lang.") ||
     name.startsWith("java.net") ||
     sharedPrefixes.exists(name.startsWith)
+  }
 
   /** True if `name` refers to a spark class that must see specific version of 
Hive. */
   protected def isBarrierClass(name: String): Boolean =

http://git-wip-us.apache.org/repos/asf/spark/blob/64813b7e/sql/hive/src/test/scala/org/apache/spark/sql/hive/client/VersionsSuite.scala
----------------------------------------------------------------------
diff --git 
a/sql/hive/src/test/scala/org/apache/spark/sql/hive/client/VersionsSuite.scala 
b/sql/hive/src/test/scala/org/apache/spark/sql/hive/client/VersionsSuite.scala
index 7bc13bc..502b240 100644
--- 
a/sql/hive/src/test/scala/org/apache/spark/sql/hive/client/VersionsSuite.scala
+++ 
b/sql/hive/src/test/scala/org/apache/spark/sql/hive/client/VersionsSuite.scala
@@ -19,6 +19,8 @@ package org.apache.spark.sql.hive.client
 
 import java.io.File
 
+import org.apache.hadoop.util.VersionInfo
+
 import org.apache.spark.sql.hive.HiveContext
 import org.apache.spark.{Logging, SparkFunSuite}
 import org.apache.spark.sql.catalyst.expressions.{NamedExpression, Literal, 
AttributeReference, EqualTo}
@@ -53,9 +55,11 @@ class VersionsSuite extends SparkFunSuite with Logging {
   }
 
   test("success sanity check") {
-    val badClient = 
IsolatedClientLoader.forVersion(HiveContext.hiveExecutionVersion,
-      buildConf(),
-      ivyPath).createClient()
+    val badClient = IsolatedClientLoader.forVersion(
+      hiveMetastoreVersion = HiveContext.hiveExecutionVersion,
+      hadoopVersion = VersionInfo.getVersion,
+      config = buildConf(),
+      ivyPath = ivyPath).createClient()
     val db = new HiveDatabase("default", "")
     badClient.createDatabase(db)
   }
@@ -85,7 +89,11 @@ class VersionsSuite extends SparkFunSuite with Logging {
   ignore("failure sanity check") {
     val e = intercept[Throwable] {
       val badClient = quietly {
-        IsolatedClientLoader.forVersion("13", buildConf(), 
ivyPath).createClient()
+        IsolatedClientLoader.forVersion(
+          hiveMetastoreVersion = "13",
+          hadoopVersion = VersionInfo.getVersion,
+          config = buildConf(),
+          ivyPath = ivyPath).createClient()
       }
     }
     assert(getNestedMessages(e) contains "Unknown column 'A0.OWNER_NAME' in 
'field list'")
@@ -99,7 +107,12 @@ class VersionsSuite extends SparkFunSuite with Logging {
     test(s"$version: create client") {
       client = null
       System.gc() // Hack to avoid SEGV on some JVM versions.
-      client = IsolatedClientLoader.forVersion(version, buildConf(), 
ivyPath).createClient()
+      client =
+        IsolatedClientLoader.forVersion(
+          hiveMetastoreVersion = version,
+          hadoopVersion = VersionInfo.getVersion,
+          config = buildConf(),
+          ivyPath = ivyPath).createClient()
     }
 
     test(s"$version: createDatabase") {


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to