This is an automated email from the ASF dual-hosted git repository.

gurwls223 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new 5627ceeddb4 Revert "[SPARK-42539][SQL][HIVE] Eliminate separate 
classloader when using 'builtin' Hive version for metadata client"
5627ceeddb4 is described below

commit 5627ceeddb45f2796fb8ad08b9f1c8a163823b2b
Author: Hyukjin Kwon <gurwls...@apache.org>
AuthorDate: Tue Feb 28 13:05:04 2023 +0900

    Revert "[SPARK-42539][SQL][HIVE] Eliminate separate classloader when using 
'builtin' Hive version for metadata client"
    
    This reverts commit 27ad5830f9aee616d25301b19aa7059d394fb942.
---
 .../main/scala/org/apache/spark/TestUtils.scala    |  5 +-
 .../org/apache/spark/sql/hive/HiveUtils.scala      | 53 +++++++++++++-
 .../sql/hive/client/IsolatedClientLoader.scala     | 83 ++++++++++++----------
 .../org/apache/spark/sql/hive/HiveUtilsSuite.scala | 34 +--------
 4 files changed, 97 insertions(+), 78 deletions(-)

diff --git a/core/src/main/scala/org/apache/spark/TestUtils.scala 
b/core/src/main/scala/org/apache/spark/TestUtils.scala
index 13ae6aca38b..bdf81d22efa 100644
--- a/core/src/main/scala/org/apache/spark/TestUtils.scala
+++ b/core/src/main/scala/org/apache/spark/TestUtils.scala
@@ -193,15 +193,12 @@ private[spark] object TestUtils {
       baseClass: String = null,
       classpathUrls: Seq[URL] = Seq.empty,
       implementsClasses: Seq[String] = Seq.empty,
-      extraCodeBody: String = "",
-      packageName: Option[String] = None): File = {
+      extraCodeBody: String = ""): File = {
     val extendsText = Option(baseClass).map { c => s" extends ${c}" 
}.getOrElse("")
     val implementsText =
       "implements " + (implementsClasses :+ 
"java.io.Serializable").mkString(", ")
-    val packageText = packageName.map(p => s"package $p;\n").getOrElse("")
     val sourceFile = new JavaSourceFromString(className,
       s"""
-         |$packageText
          |public class $className $extendsText $implementsText {
          |  @Override public String toString() { return "$toStringValue"; }
          |
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveUtils.scala 
b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveUtils.scala
index 1a0cac42fa7..fe9bdef3d0e 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveUtils.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveUtils.scala
@@ -18,7 +18,7 @@
 package org.apache.spark.sql.hive
 
 import java.io.File
-import java.net.URL
+import java.net.{URL, URLClassLoader}
 import java.util.Locale
 import java.util.concurrent.TimeUnit
 
@@ -26,9 +26,11 @@ import scala.collection.JavaConverters._
 import scala.collection.mutable.HashMap
 import scala.util.Try
 
+import org.apache.commons.lang3.{JavaVersion, SystemUtils}
 import org.apache.hadoop.conf.Configuration
 import org.apache.hadoop.hive.conf.HiveConf
 import org.apache.hadoop.hive.conf.HiveConf.ConfVars
+import org.apache.hadoop.hive.ql.session.SessionState
 import org.apache.hadoop.util.VersionInfo
 import org.apache.hive.common.util.HiveVersionInfo
 
@@ -44,7 +46,7 @@ import org.apache.spark.sql.internal.SQLConf
 import org.apache.spark.sql.internal.SQLConf._
 import org.apache.spark.sql.internal.StaticSQLConf.WAREHOUSE_PATH
 import org.apache.spark.sql.types._
-import org.apache.spark.util.Utils
+import org.apache.spark.util.{ChildFirstURLClassLoader, Utils}
 
 
 private[spark] object HiveUtils extends Logging {
@@ -319,6 +321,22 @@ private[spark] object HiveUtils extends Logging {
     (commonTimeVars ++ hardcodingTimeVars).toMap
   }
 
+  /**
+   * Check current Thread's SessionState type
+   * @return true when SessionState.get returns an instance of CliSessionState,
+   *         false when it gets non-CliSessionState instance or null
+   */
+  def isCliSessionState(): Boolean = {
+    val state = SessionState.get
+    var temp: Class[_] = if (state != null) state.getClass else null
+    var found = false
+    while (temp != null && !found) {
+      found = temp.getName == "org.apache.hadoop.hive.cli.CliSessionState"
+      temp = temp.getSuperclass
+    }
+    found
+  }
+
   /**
    * Create a [[HiveClient]] used for execution.
    *
@@ -391,14 +409,43 @@ private[spark] object HiveUtils extends Logging {
             s"or change ${HIVE_METASTORE_VERSION.key} to $builtinHiveVersion.")
       }
 
+      // We recursively find all jars in the class loader chain,
+      // starting from the given classLoader.
+      def allJars(classLoader: ClassLoader): Array[URL] = classLoader match {
+        case null => Array.empty[URL]
+        case childFirst: ChildFirstURLClassLoader =>
+          childFirst.getURLs() ++ allJars(Utils.getSparkClassLoader)
+        case urlClassLoader: URLClassLoader =>
+          urlClassLoader.getURLs ++ allJars(urlClassLoader.getParent)
+        case other => allJars(other.getParent)
+      }
+
+      val classLoader = Utils.getContextOrSparkClassLoader
+      val jars: Array[URL] = if 
(SystemUtils.isJavaVersionAtLeast(JavaVersion.JAVA_9)) {
+        // Do nothing. The system classloader is no longer a URLClassLoader in 
Java 9,
+        // so it won't match the case in allJars. It no longer exposes URLs of
+        // the system classpath
+        Array.empty[URL]
+      } else {
+        val loadedJars = allJars(classLoader)
+        // Verify at least one jar was found
+        if (loadedJars.length == 0) {
+          throw new IllegalArgumentException(
+            "Unable to locate hive jars to connect to metastore. " +
+              s"Please set ${HIVE_METASTORE_JARS.key}.")
+        }
+        loadedJars
+      }
+
       logInfo(
         s"Initializing HiveMetastoreConnection version $hiveMetastoreVersion 
using Spark classes.")
       new IsolatedClientLoader(
         version = metaVersion,
         sparkConf = conf,
         hadoopConf = hadoopConf,
+        execJars = jars.toSeq,
         config = configurations,
-        isolationOn = false,
+        isolationOn = !isCliSessionState(),
         barrierPrefixes = hiveMetastoreBarrierPrefixes,
         sharedPrefixes = hiveMetastoreSharedPrefixes)
     } else if (hiveMetastoreJars == "maven") {
diff --git 
a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/IsolatedClientLoader.scala
 
b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/IsolatedClientLoader.scala
index 879b2451cae..e65e6d42937 100644
--- 
a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/IsolatedClientLoader.scala
+++ 
b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/IsolatedClientLoader.scala
@@ -232,46 +232,51 @@ private[hive] class IsolatedClientLoader(
   private[hive] val classLoader: MutableURLClassLoader = {
     val isolatedClassLoader =
       if (isolationOn) {
-        val rootClassLoader: ClassLoader =
-          if (SystemUtils.isJavaVersionAtLeast(JavaVersion.JAVA_9)) {
-            // In Java 9, the boot classloader can see few JDK classes. The 
intended parent
-            // classloader for delegation is now the platform classloader.
-            // See http://java9.wtf/class-loading/
-            val platformCL =
-            classOf[ClassLoader].getMethod("getPlatformClassLoader").
-              invoke(null).asInstanceOf[ClassLoader]
-            // Check to make sure that the root classloader does not know 
about Hive.
-            
assert(Try(platformCL.loadClass("org.apache.hadoop.hive.conf.HiveConf")).isFailure)
-            platformCL
-          } else {
-            // The boot classloader is represented by null (the instance 
itself isn't accessible)
-            // and before Java 9 can see all JDK classes
-            null
-          }
-        new URLClassLoader(allJars, rootClassLoader) {
-          override def loadClass(name: String, resolve: Boolean): Class[_] = {
-            val loaded = findLoadedClass(name)
-            if (loaded == null) doLoadClass(name, resolve) else loaded
-          }
-          def doLoadClass(name: String, resolve: Boolean): Class[_] = {
-            val classFileName = name.replaceAll("\\.", "/") + ".class"
-            if (isBarrierClass(name)) {
-              // For barrier classes, we construct a new copy of the class.
-              val bytes = 
IOUtils.toByteArray(baseClassLoader.getResourceAsStream(classFileName))
-              logDebug(s"custom defining: $name - 
${util.Arrays.hashCode(bytes)}")
-              defineClass(name, bytes, 0, bytes.length)
-            } else if (!isSharedClass(name)) {
-              logDebug(s"hive class: $name - 
${getResource(classToPath(name))}")
-              super.loadClass(name, resolve)
+        if (allJars.isEmpty) {
+          // See HiveUtils; this is the Java 9+ + builtin mode scenario
+          baseClassLoader
+        } else {
+          val rootClassLoader: ClassLoader =
+            if (SystemUtils.isJavaVersionAtLeast(JavaVersion.JAVA_9)) {
+              // In Java 9, the boot classloader can see few JDK classes. The 
intended parent
+              // classloader for delegation is now the platform classloader.
+              // See http://java9.wtf/class-loading/
+              val platformCL =
+              classOf[ClassLoader].getMethod("getPlatformClassLoader").
+                invoke(null).asInstanceOf[ClassLoader]
+              // Check to make sure that the root classloader does not know 
about Hive.
+              
assert(Try(platformCL.loadClass("org.apache.hadoop.hive.conf.HiveConf")).isFailure)
+              platformCL
             } else {
-              // For shared classes, we delegate to baseClassLoader, but fall 
back in case the
-              // class is not found.
-              logDebug(s"shared class: $name")
-              try {
-                baseClassLoader.loadClass(name)
-              } catch {
-                case _: ClassNotFoundException =>
-                  super.loadClass(name, resolve)
+              // The boot classloader is represented by null (the instance 
itself isn't accessible)
+              // and before Java 9 can see all JDK classes
+              null
+            }
+          new URLClassLoader(allJars, rootClassLoader) {
+            override def loadClass(name: String, resolve: Boolean): Class[_] = 
{
+              val loaded = findLoadedClass(name)
+              if (loaded == null) doLoadClass(name, resolve) else loaded
+            }
+            def doLoadClass(name: String, resolve: Boolean): Class[_] = {
+              val classFileName = name.replaceAll("\\.", "/") + ".class"
+              if (isBarrierClass(name)) {
+                // For barrier classes, we construct a new copy of the class.
+                val bytes = 
IOUtils.toByteArray(baseClassLoader.getResourceAsStream(classFileName))
+                logDebug(s"custom defining: $name - 
${util.Arrays.hashCode(bytes)}")
+                defineClass(name, bytes, 0, bytes.length)
+              } else if (!isSharedClass(name)) {
+                logDebug(s"hive class: $name - 
${getResource(classToPath(name))}")
+                super.loadClass(name, resolve)
+              } else {
+                // For shared classes, we delegate to baseClassLoader, but 
fall back in case the
+                // class is not found.
+                logDebug(s"shared class: $name")
+                try {
+                  baseClassLoader.loadClass(name)
+                } catch {
+                  case _: ClassNotFoundException =>
+                    super.loadClass(name, resolve)
+                }
               }
             }
           }
diff --git 
a/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveUtilsSuite.scala 
b/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveUtilsSuite.scala
index 823ac8ed957..d8e1e012928 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveUtilsSuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveUtilsSuite.scala
@@ -17,19 +17,15 @@
 
 package org.apache.spark.sql.hive
 
-import java.io.File
-import java.net.URI
-
 import org.apache.hadoop.conf.Configuration
 import org.apache.hadoop.hive.conf.HiveConf.ConfVars
 
-import org.apache.spark.{SparkConf, TestUtils}
+import org.apache.spark.SparkConf
 import org.apache.spark.deploy.SparkHadoopUtil
 import org.apache.spark.sql.QueryTest
-import org.apache.spark.sql.catalyst.catalog.CatalogDatabase
 import org.apache.spark.sql.hive.test.TestHiveSingleton
 import org.apache.spark.sql.test.SQLTestUtils
-import org.apache.spark.util.{ChildFirstURLClassLoader, MutableURLClassLoader}
+import org.apache.spark.util.ChildFirstURLClassLoader
 
 class HiveUtilsSuite extends QueryTest with SQLTestUtils with 
TestHiveSingleton {
 
@@ -81,32 +77,6 @@ class HiveUtilsSuite extends QueryTest with SQLTestUtils 
with TestHiveSingleton
     }
   }
 
-  test("SPARK-42539: User-provided JARs should not take precedence over 
builtin Hive JARs") {
-    withTempDir { tmpDir =>
-        val classFile = TestUtils.createCompiledClass(
-          "Hive", tmpDir, packageName = 
Some("org.apache.hadoop.hive.ql.metadata"))
-
-      val jarFile = new File(tmpDir, "hive-fake.jar")
-      TestUtils.createJar(Seq(classFile), jarFile, 
Some("org/apache/hadoop/hive/ql/metadata"))
-
-      val conf = new SparkConf
-      val contextClassLoader = Thread.currentThread().getContextClassLoader
-      val loader = new MutableURLClassLoader(Array(jarFile.toURI.toURL), 
contextClassLoader)
-      try {
-        Thread.currentThread().setContextClassLoader(loader)
-        val client = HiveUtils.newClientForMetadata(
-          conf,
-          SparkHadoopUtil.newConfiguration(conf),
-          HiveUtils.newTemporaryConfiguration(useInMemoryDerby = true))
-        client.createDatabase(
-          CatalogDatabase("foo", "", 
URI.create(s"file://${tmpDir.getAbsolutePath}/foo.db"), Map()),
-          ignoreIfExists = true)
-      } finally {
-        Thread.currentThread().setContextClassLoader(contextClassLoader)
-      }
-    }
-  }
-
   test("SPARK-27349: Dealing with TimeVars removed in Hive 2.x") {
     // Test default value
     val defaultConf = new Configuration


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to