This is an automated email from the ASF dual-hosted git repository. gurwls223 pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push: new 5627ceeddb4 Revert "[SPARK-42539][SQL][HIVE] Eliminate separate classloader when using 'builtin' Hive version for metadata client" 5627ceeddb4 is described below commit 5627ceeddb45f2796fb8ad08b9f1c8a163823b2b Author: Hyukjin Kwon <gurwls...@apache.org> AuthorDate: Tue Feb 28 13:05:04 2023 +0900 Revert "[SPARK-42539][SQL][HIVE] Eliminate separate classloader when using 'builtin' Hive version for metadata client" This reverts commit 27ad5830f9aee616d25301b19aa7059d394fb942. --- .../main/scala/org/apache/spark/TestUtils.scala | 5 +- .../org/apache/spark/sql/hive/HiveUtils.scala | 53 +++++++++++++- .../sql/hive/client/IsolatedClientLoader.scala | 83 ++++++++++++---------- .../org/apache/spark/sql/hive/HiveUtilsSuite.scala | 34 +-------- 4 files changed, 97 insertions(+), 78 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/TestUtils.scala b/core/src/main/scala/org/apache/spark/TestUtils.scala index 13ae6aca38b..bdf81d22efa 100644 --- a/core/src/main/scala/org/apache/spark/TestUtils.scala +++ b/core/src/main/scala/org/apache/spark/TestUtils.scala @@ -193,15 +193,12 @@ private[spark] object TestUtils { baseClass: String = null, classpathUrls: Seq[URL] = Seq.empty, implementsClasses: Seq[String] = Seq.empty, - extraCodeBody: String = "", - packageName: Option[String] = None): File = { + extraCodeBody: String = ""): File = { val extendsText = Option(baseClass).map { c => s" extends ${c}" }.getOrElse("") val implementsText = "implements " + (implementsClasses :+ "java.io.Serializable").mkString(", ") - val packageText = packageName.map(p => s"package $p;\n").getOrElse("") val sourceFile = new JavaSourceFromString(className, s""" - |$packageText |public class $className $extendsText $implementsText { | @Override public String toString() { return "$toStringValue"; } | diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveUtils.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveUtils.scala index 1a0cac42fa7..fe9bdef3d0e 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveUtils.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveUtils.scala @@ -18,7 +18,7 @@ package org.apache.spark.sql.hive import java.io.File -import java.net.URL +import java.net.{URL, URLClassLoader} import java.util.Locale import java.util.concurrent.TimeUnit @@ -26,9 +26,11 @@ import scala.collection.JavaConverters._ import scala.collection.mutable.HashMap import scala.util.Try +import org.apache.commons.lang3.{JavaVersion, SystemUtils} import org.apache.hadoop.conf.Configuration import org.apache.hadoop.hive.conf.HiveConf import org.apache.hadoop.hive.conf.HiveConf.ConfVars +import org.apache.hadoop.hive.ql.session.SessionState import org.apache.hadoop.util.VersionInfo import org.apache.hive.common.util.HiveVersionInfo @@ -44,7 +46,7 @@ import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.internal.SQLConf._ import org.apache.spark.sql.internal.StaticSQLConf.WAREHOUSE_PATH import org.apache.spark.sql.types._ -import org.apache.spark.util.Utils +import org.apache.spark.util.{ChildFirstURLClassLoader, Utils} private[spark] object HiveUtils extends Logging { @@ -319,6 +321,22 @@ private[spark] object HiveUtils extends Logging { (commonTimeVars ++ hardcodingTimeVars).toMap } + /** + * Check current Thread's SessionState type + * @return true when SessionState.get returns an instance of CliSessionState, + * false when it gets non-CliSessionState instance or null + */ + def isCliSessionState(): Boolean = { + val state = SessionState.get + var temp: Class[_] = if (state != null) state.getClass else null + var found = false + while (temp != null && !found) { + found = temp.getName == "org.apache.hadoop.hive.cli.CliSessionState" + temp = temp.getSuperclass + } + found + } + /** * Create a [[HiveClient]] used for execution. * @@ -391,14 +409,43 @@ private[spark] object HiveUtils extends Logging { s"or change ${HIVE_METASTORE_VERSION.key} to $builtinHiveVersion.") } + // We recursively find all jars in the class loader chain, + // starting from the given classLoader. + def allJars(classLoader: ClassLoader): Array[URL] = classLoader match { + case null => Array.empty[URL] + case childFirst: ChildFirstURLClassLoader => + childFirst.getURLs() ++ allJars(Utils.getSparkClassLoader) + case urlClassLoader: URLClassLoader => + urlClassLoader.getURLs ++ allJars(urlClassLoader.getParent) + case other => allJars(other.getParent) + } + + val classLoader = Utils.getContextOrSparkClassLoader + val jars: Array[URL] = if (SystemUtils.isJavaVersionAtLeast(JavaVersion.JAVA_9)) { + // Do nothing. The system classloader is no longer a URLClassLoader in Java 9, + // so it won't match the case in allJars. It no longer exposes URLs of + // the system classpath + Array.empty[URL] + } else { + val loadedJars = allJars(classLoader) + // Verify at least one jar was found + if (loadedJars.length == 0) { + throw new IllegalArgumentException( + "Unable to locate hive jars to connect to metastore. " + + s"Please set ${HIVE_METASTORE_JARS.key}.") + } + loadedJars + } + logInfo( s"Initializing HiveMetastoreConnection version $hiveMetastoreVersion using Spark classes.") new IsolatedClientLoader( version = metaVersion, sparkConf = conf, hadoopConf = hadoopConf, + execJars = jars.toSeq, config = configurations, - isolationOn = false, + isolationOn = !isCliSessionState(), barrierPrefixes = hiveMetastoreBarrierPrefixes, sharedPrefixes = hiveMetastoreSharedPrefixes) } else if (hiveMetastoreJars == "maven") { diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/IsolatedClientLoader.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/IsolatedClientLoader.scala index 879b2451cae..e65e6d42937 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/IsolatedClientLoader.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/IsolatedClientLoader.scala @@ -232,46 +232,51 @@ private[hive] class IsolatedClientLoader( private[hive] val classLoader: MutableURLClassLoader = { val isolatedClassLoader = if (isolationOn) { - val rootClassLoader: ClassLoader = - if (SystemUtils.isJavaVersionAtLeast(JavaVersion.JAVA_9)) { - // In Java 9, the boot classloader can see few JDK classes. The intended parent - // classloader for delegation is now the platform classloader. - // See http://java9.wtf/class-loading/ - val platformCL = - classOf[ClassLoader].getMethod("getPlatformClassLoader"). - invoke(null).asInstanceOf[ClassLoader] - // Check to make sure that the root classloader does not know about Hive. - assert(Try(platformCL.loadClass("org.apache.hadoop.hive.conf.HiveConf")).isFailure) - platformCL - } else { - // The boot classloader is represented by null (the instance itself isn't accessible) - // and before Java 9 can see all JDK classes - null - } - new URLClassLoader(allJars, rootClassLoader) { - override def loadClass(name: String, resolve: Boolean): Class[_] = { - val loaded = findLoadedClass(name) - if (loaded == null) doLoadClass(name, resolve) else loaded - } - def doLoadClass(name: String, resolve: Boolean): Class[_] = { - val classFileName = name.replaceAll("\\.", "/") + ".class" - if (isBarrierClass(name)) { - // For barrier classes, we construct a new copy of the class. - val bytes = IOUtils.toByteArray(baseClassLoader.getResourceAsStream(classFileName)) - logDebug(s"custom defining: $name - ${util.Arrays.hashCode(bytes)}") - defineClass(name, bytes, 0, bytes.length) - } else if (!isSharedClass(name)) { - logDebug(s"hive class: $name - ${getResource(classToPath(name))}") - super.loadClass(name, resolve) + if (allJars.isEmpty) { + // See HiveUtils; this is the Java 9+ + builtin mode scenario + baseClassLoader + } else { + val rootClassLoader: ClassLoader = + if (SystemUtils.isJavaVersionAtLeast(JavaVersion.JAVA_9)) { + // In Java 9, the boot classloader can see few JDK classes. The intended parent + // classloader for delegation is now the platform classloader. + // See http://java9.wtf/class-loading/ + val platformCL = + classOf[ClassLoader].getMethod("getPlatformClassLoader"). + invoke(null).asInstanceOf[ClassLoader] + // Check to make sure that the root classloader does not know about Hive. + assert(Try(platformCL.loadClass("org.apache.hadoop.hive.conf.HiveConf")).isFailure) + platformCL } else { - // For shared classes, we delegate to baseClassLoader, but fall back in case the - // class is not found. - logDebug(s"shared class: $name") - try { - baseClassLoader.loadClass(name) - } catch { - case _: ClassNotFoundException => - super.loadClass(name, resolve) + // The boot classloader is represented by null (the instance itself isn't accessible) + // and before Java 9 can see all JDK classes + null + } + new URLClassLoader(allJars, rootClassLoader) { + override def loadClass(name: String, resolve: Boolean): Class[_] = { + val loaded = findLoadedClass(name) + if (loaded == null) doLoadClass(name, resolve) else loaded + } + def doLoadClass(name: String, resolve: Boolean): Class[_] = { + val classFileName = name.replaceAll("\\.", "/") + ".class" + if (isBarrierClass(name)) { + // For barrier classes, we construct a new copy of the class. + val bytes = IOUtils.toByteArray(baseClassLoader.getResourceAsStream(classFileName)) + logDebug(s"custom defining: $name - ${util.Arrays.hashCode(bytes)}") + defineClass(name, bytes, 0, bytes.length) + } else if (!isSharedClass(name)) { + logDebug(s"hive class: $name - ${getResource(classToPath(name))}") + super.loadClass(name, resolve) + } else { + // For shared classes, we delegate to baseClassLoader, but fall back in case the + // class is not found. + logDebug(s"shared class: $name") + try { + baseClassLoader.loadClass(name) + } catch { + case _: ClassNotFoundException => + super.loadClass(name, resolve) + } } } } diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveUtilsSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveUtilsSuite.scala index 823ac8ed957..d8e1e012928 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveUtilsSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveUtilsSuite.scala @@ -17,19 +17,15 @@ package org.apache.spark.sql.hive -import java.io.File -import java.net.URI - import org.apache.hadoop.conf.Configuration import org.apache.hadoop.hive.conf.HiveConf.ConfVars -import org.apache.spark.{SparkConf, TestUtils} +import org.apache.spark.SparkConf import org.apache.spark.deploy.SparkHadoopUtil import org.apache.spark.sql.QueryTest -import org.apache.spark.sql.catalyst.catalog.CatalogDatabase import org.apache.spark.sql.hive.test.TestHiveSingleton import org.apache.spark.sql.test.SQLTestUtils -import org.apache.spark.util.{ChildFirstURLClassLoader, MutableURLClassLoader} +import org.apache.spark.util.ChildFirstURLClassLoader class HiveUtilsSuite extends QueryTest with SQLTestUtils with TestHiveSingleton { @@ -81,32 +77,6 @@ class HiveUtilsSuite extends QueryTest with SQLTestUtils with TestHiveSingleton } } - test("SPARK-42539: User-provided JARs should not take precedence over builtin Hive JARs") { - withTempDir { tmpDir => - val classFile = TestUtils.createCompiledClass( - "Hive", tmpDir, packageName = Some("org.apache.hadoop.hive.ql.metadata")) - - val jarFile = new File(tmpDir, "hive-fake.jar") - TestUtils.createJar(Seq(classFile), jarFile, Some("org/apache/hadoop/hive/ql/metadata")) - - val conf = new SparkConf - val contextClassLoader = Thread.currentThread().getContextClassLoader - val loader = new MutableURLClassLoader(Array(jarFile.toURI.toURL), contextClassLoader) - try { - Thread.currentThread().setContextClassLoader(loader) - val client = HiveUtils.newClientForMetadata( - conf, - SparkHadoopUtil.newConfiguration(conf), - HiveUtils.newTemporaryConfiguration(useInMemoryDerby = true)) - client.createDatabase( - CatalogDatabase("foo", "", URI.create(s"file://${tmpDir.getAbsolutePath}/foo.db"), Map()), - ignoreIfExists = true) - } finally { - Thread.currentThread().setContextClassLoader(contextClassLoader) - } - } - } - test("SPARK-27349: Dealing with TimeVars removed in Hive 2.x") { // Test default value val defaultConf = new Configuration --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org