This is an automated email from the ASF dual-hosted git repository.

chengpan pushed a commit to branch branch-1.9
in repository https://gitbox.apache.org/repos/asf/kyuubi.git


The following commit(s) were added to refs/heads/branch-1.9 by this push:
     new e22a095879 [KYUUBI #6920][FOLLOWUP] Spark SQL engine supports Spark 4.0
e22a095879 is described below

commit e22a095879fa85cc0b9a9b32e4bbccbe199ad16b
Author: Cheng Pan <[email protected]>
AuthorDate: Fri May 16 11:47:35 2025 +0800

    [KYUUBI #6920][FOLLOWUP] Spark SQL engine supports Spark 4.0
    
    There were some breaking changes after we fixed compatibility for Spark 
4.0.0 RC1 in #6920, but now Spark has reached 4.0.0 RC6, which has less chance 
to receive more breaking changes.
    
    Changes are extracted from https://github.com/apache/kyuubi/pull/6928, 
which passed CI with Spark 4.0.0 RC6
    
    No.
    
    Closes #7061 from pan3793/6920-followup.
    
    Closes #6920
    
    17a1bd9e5 [Cheng Pan] [KYUUBI #6920][FOLLOWUP] Spark SQL engine supports 
Spark 4.0
    
    Authored-by: Cheng Pan <[email protected]>
    Signed-off-by: Cheng Pan <[email protected]>
---
 .../spark/connector/hive/HiveConnectorUtils.scala  |  69 +++++++--
 .../spark/connector/hive/HiveTableCatalog.scala    | 172 +++++++++++++++++++--
 .../connector/hive/write/HiveWriteHelper.scala     |  34 ++--
 3 files changed, 224 insertions(+), 51 deletions(-)

diff --git 
a/extensions/spark/kyuubi-spark-connector-hive/src/main/scala/org/apache/kyuubi/spark/connector/hive/HiveConnectorUtils.scala
 
b/extensions/spark/kyuubi-spark-connector-hive/src/main/scala/org/apache/kyuubi/spark/connector/hive/HiveConnectorUtils.scala
index 2d5f4c08f8..371d79abe7 100644
--- 
a/extensions/spark/kyuubi-spark-connector-hive/src/main/scala/org/apache/kyuubi/spark/connector/hive/HiveConnectorUtils.scala
+++ 
b/extensions/spark/kyuubi-spark-connector-hive/src/main/scala/org/apache/kyuubi/spark/connector/hive/HiveConnectorUtils.scala
@@ -18,6 +18,7 @@
 package org.apache.kyuubi.spark.connector.hive
 
 import java.lang.{Boolean => JBoolean, Long => JLong}
+import java.net.URI
 
 import scala.util.Try
 
@@ -25,12 +26,11 @@ import org.apache.hadoop.fs.{FileStatus, Path}
 import org.apache.hadoop.hive.ql.plan.{FileSinkDesc, TableDesc}
 import org.apache.spark.internal.Logging
 import org.apache.spark.sql.SparkSession
-import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.{InternalRow, TableIdentifier}
 import org.apache.spark.sql.catalyst.catalog.{CatalogTable, 
CatalogTablePartition}
 import org.apache.spark.sql.connector.catalog.TableChange
 import org.apache.spark.sql.connector.catalog.TableChange._
 import org.apache.spark.sql.execution.command.CommandUtils
-import 
org.apache.spark.sql.execution.command.CommandUtils.{calculateMultipleLocationSizes,
 calculateSingleLocationSize}
 import org.apache.spark.sql.execution.datasources.{PartitionDirectory, 
PartitionedFile}
 import org.apache.spark.sql.hive.execution.HiveFileFormat
 import org.apache.spark.sql.internal.SQLConf
@@ -82,7 +82,28 @@ object HiveConnectorUtils extends Logging {
       isSplitable: JBoolean,
       maxSplitBytes: JLong,
       partitionValues: InternalRow): Seq[PartitionedFile] =
-    Try { // SPARK-42821: 4.0.0-preview2
+    Try { // SPARK-42821, SPARK-51185: Spark 4.0
+      val fileStatusWithMetadataClz = DynClasses.builder()
+        
.impl("org.apache.spark.sql.execution.datasources.FileStatusWithMetadata")
+        .buildChecked()
+      DynMethods
+        .builder("splitFiles")
+        .impl(
+          "org.apache.spark.sql.execution.PartitionedFileUtil",
+          fileStatusWithMetadataClz,
+          classOf[Path],
+          classOf[Boolean],
+          classOf[Long],
+          classOf[InternalRow])
+        .buildChecked()
+        .invokeChecked[Seq[PartitionedFile]](
+          null,
+          file,
+          filePath,
+          isSplitable,
+          maxSplitBytes,
+          partitionValues)
+    }.recover { case _: Exception => // SPARK-42821: 4.0.0-preview2
       val fileStatusWithMetadataClz = DynClasses.builder()
         
.impl("org.apache.spark.sql.execution.datasources.FileStatusWithMetadata")
         .buildChecked()
@@ -192,6 +213,29 @@ object HiveConnectorUtils extends Logging {
       file.asInstanceOf[FileStatus].getPath
     }.get
 
+  private def calculateMultipleLocationSizes(
+      sparkSession: SparkSession,
+      tid: TableIdentifier,
+      paths: Seq[Option[URI]]): Seq[Long] = {
+
+    val sparkSessionClz = DynClasses.builder()
+      .impl("org.apache.spark.sql.classic.SparkSession") // SPARK-49700 (4.0.0)
+      .impl("org.apache.spark.sql.SparkSession")
+      .build()
+
+    val calculateMultipleLocationSizesMethod =
+      DynMethods.builder("calculateMultipleLocationSizes")
+        .impl(
+          CommandUtils.getClass,
+          sparkSessionClz,
+          classOf[TableIdentifier],
+          classOf[Seq[Option[URI]]])
+        .buildChecked(CommandUtils)
+
+    calculateMultipleLocationSizesMethod
+      .invokeChecked[Seq[Long]](sparkSession, tid, paths)
+  }
+
   def calculateTotalSize(
       spark: SparkSession,
       catalogTable: CatalogTable,
@@ -199,12 +243,11 @@ object HiveConnectorUtils extends Logging {
     val sessionState = spark.sessionState
     val startTime = System.nanoTime()
     val (totalSize, newPartitions) = if 
(catalogTable.partitionColumnNames.isEmpty) {
-      (
-        calculateSingleLocationSize(
-          sessionState,
-          catalogTable.identifier,
-          catalogTable.storage.locationUri),
-        Seq())
+      val tableSize = CommandUtils.calculateSingleLocationSize(
+        sessionState,
+        catalogTable.identifier,
+        catalogTable.storage.locationUri)
+      (tableSize, Seq())
     } else {
       // Calculate table size as a sum of the visible partitions. See 
SPARK-21079
       val partitions = hiveTableCatalog.listPartitions(catalogTable.identifier)
@@ -402,7 +445,13 @@ object HiveConnectorUtils extends Logging {
     new StructType(newFields)
   }
 
-  def withSQLConf[T](pairs: (String, String)*)(f: => T): T = {
+  // This is a fork of Spark's withSQLConf, and we use a different name to 
avoid linkage
+  // issue on cross-version cases.
+  // For example, SPARK-46227(4.0.0) moves `withSQLConf` from SQLHelper to 
SQLConfHelper,
+  // classes that extend SQLConfHelper will prefer to linkage super class's 
method when
+  // compiling with Spark 4.0, then linkage error will happen when run the jar 
with lower
+  // Spark versions.
+  def withSparkSQLConf[T](pairs: (String, String)*)(f: => T): T = {
     val conf = SQLConf.get
     val (keys, values) = pairs.unzip
     val currentValues = keys.map { key =>
diff --git 
a/extensions/spark/kyuubi-spark-connector-hive/src/main/scala/org/apache/kyuubi/spark/connector/hive/HiveTableCatalog.scala
 
b/extensions/spark/kyuubi-spark-connector-hive/src/main/scala/org/apache/kyuubi/spark/connector/hive/HiveTableCatalog.scala
index c128d67f1f..f72881f928 100644
--- 
a/extensions/spark/kyuubi-spark-connector-hive/src/main/scala/org/apache/kyuubi/spark/connector/hive/HiveTableCatalog.scala
+++ 
b/extensions/spark/kyuubi-spark-connector-hive/src/main/scala/org/apache/kyuubi/spark/connector/hive/HiveTableCatalog.scala
@@ -17,17 +17,19 @@
 
 package org.apache.kyuubi.spark.connector.hive
 
+import java.lang.{Boolean => JBoolean, Long => JLong}
 import java.net.URI
 import java.util
 
 import scala.collection.JavaConverters._
 import scala.collection.mutable
+import scala.util.Try
 
 import org.apache.hadoop.conf.Configuration
 import org.apache.spark.SparkConf
 import org.apache.spark.internal.Logging
 import org.apache.spark.sql.SparkSession
-import org.apache.spark.sql.catalyst.{SQLConfHelper, TableIdentifier}
+import org.apache.spark.sql.catalyst.{CurrentUserContext, SQLConfHelper, 
TableIdentifier}
 import 
org.apache.spark.sql.catalyst.analysis.{NamespaceAlreadyExistsException, 
NoSuchDatabaseException, NoSuchNamespaceException, NoSuchTableException, 
TableAlreadyExistsException}
 import org.apache.spark.sql.catalyst.catalog._
 import org.apache.spark.sql.catalyst.catalog.CatalogTypes.TablePartitionSpec
@@ -44,9 +46,10 @@ import 
org.apache.spark.sql.internal.StaticSQLConf.{CATALOG_IMPLEMENTATION, GLOB
 import org.apache.spark.sql.types.StructType
 import org.apache.spark.sql.util.CaseInsensitiveStringMap
 
-import org.apache.kyuubi.spark.connector.hive.HiveConnectorUtils.withSQLConf
+import 
org.apache.kyuubi.spark.connector.hive.HiveConnectorUtils.withSparkSQLConf
 import 
org.apache.kyuubi.spark.connector.hive.HiveTableCatalog.{getStorageFormatAndProvider,
 toCatalogDatabase, CatalogDatabaseHelper, IdentifierHelper, NamespaceHelper}
 import 
org.apache.kyuubi.spark.connector.hive.KyuubiHiveConnectorDelegationTokenProvider.metastoreTokenSignature
+import org.apache.kyuubi.util.reflect.{DynClasses, DynConstructors}
 
 /**
  * A [[TableCatalog]] that wrap HiveExternalCatalog to as V2 CatalogPlugin 
instance to access Hive.
@@ -100,6 +103,20 @@ class HiveTableCatalog(sparkSession: SparkSession)
     catalogName
   }
 
+  private def newHiveMetastoreCatalog(sparkSession: SparkSession): 
HiveMetastoreCatalog = {
+    val sparkSessionClz = DynClasses.builder()
+      .impl("org.apache.spark.sql.classic.SparkSession") // SPARK-49700 (4.0.0)
+      .impl("org.apache.spark.sql.SparkSession")
+      .buildChecked()
+
+    val hiveMetastoreCatalogCtor =
+      DynConstructors.builder()
+        .impl("org.apache.spark.sql.hive.HiveMetastoreCatalog", 
sparkSessionClz)
+        .buildChecked[HiveMetastoreCatalog]()
+
+    hiveMetastoreCatalogCtor.newInstanceChecked(sparkSession)
+  }
+
   override def initialize(name: String, options: CaseInsensitiveStringMap): 
Unit = {
     assert(catalogName == null, "The Hive table catalog is already initialed.")
     assert(
@@ -110,7 +127,7 @@ class HiveTableCatalog(sparkSession: SparkSession)
     catalog = new HiveSessionCatalog(
       externalCatalogBuilder = () => externalCatalog,
       globalTempViewManagerBuilder = () => globalTempViewManager,
-      metastoreCatalog = new HiveMetastoreCatalog(sparkSession),
+      metastoreCatalog = newHiveMetastoreCatalog(sparkSession),
       functionRegistry = sessionState.functionRegistry,
       tableFunctionRegistry = sessionState.tableFunctionRegistry,
       hadoopConf = hadoopConf,
@@ -148,7 +165,7 @@ class HiveTableCatalog(sparkSession: SparkSession)
   override val defaultNamespace: Array[String] = Array("default")
 
   override def listTables(namespace: Array[String]): Array[Identifier] =
-    withSQLConf(LEGACY_NON_IDENTIFIER_OUTPUT_CATALOG_NAME -> "true") {
+    withSparkSQLConf(LEGACY_NON_IDENTIFIER_OUTPUT_CATALOG_NAME -> "true") {
       namespace match {
         case Array(db) =>
           catalog
@@ -162,16 +179,139 @@ class HiveTableCatalog(sparkSession: SparkSession)
     }
 
   override def loadTable(ident: Identifier): Table =
-    withSQLConf(LEGACY_NON_IDENTIFIER_OUTPUT_CATALOG_NAME -> "true") {
+    withSparkSQLConf(LEGACY_NON_IDENTIFIER_OUTPUT_CATALOG_NAME -> "true") {
       HiveTable(sparkSession, 
catalog.getTableMetadata(ident.asTableIdentifier), this)
     }
 
+  // scalastyle:off
+  private def newCatalogTable(
+      identifier: TableIdentifier,
+      tableType: CatalogTableType,
+      storage: CatalogStorageFormat,
+      schema: StructType,
+      provider: Option[String] = None,
+      partitionColumnNames: Seq[String] = Seq.empty,
+      bucketSpec: Option[BucketSpec] = None,
+      owner: String = 
Option(CurrentUserContext.CURRENT_USER.get()).getOrElse(""),
+      createTime: JLong = System.currentTimeMillis,
+      lastAccessTime: JLong = -1,
+      createVersion: String = "",
+      properties: Map[String, String] = Map.empty,
+      stats: Option[CatalogStatistics] = None,
+      viewText: Option[String] = None,
+      comment: Option[String] = None,
+      collation: Option[String] = None,
+      unsupportedFeatures: Seq[String] = Seq.empty,
+      tracksPartitionsInCatalog: JBoolean = false,
+      schemaPreservesCase: JBoolean = true,
+      ignoredProperties: Map[String, String] = Map.empty,
+      viewOriginalText: Option[String] = None): CatalogTable = {
+    // scalastyle:on
+    Try { // SPARK-50675 (4.0.0)
+      DynConstructors.builder()
+        .impl(
+          classOf[CatalogTable],
+          classOf[TableIdentifier],
+          classOf[CatalogTableType],
+          classOf[CatalogStorageFormat],
+          classOf[StructType],
+          classOf[Option[String]],
+          classOf[Seq[String]],
+          classOf[Option[BucketSpec]],
+          classOf[String],
+          classOf[Long],
+          classOf[Long],
+          classOf[String],
+          classOf[Map[String, String]],
+          classOf[Option[CatalogStatistics]],
+          classOf[Option[String]],
+          classOf[Option[String]],
+          classOf[Option[String]],
+          classOf[Seq[String]],
+          classOf[Boolean],
+          classOf[Boolean],
+          classOf[Map[String, String]],
+          classOf[Option[String]])
+        .buildChecked()
+        .invokeChecked[CatalogTable](
+          null,
+          identifier,
+          tableType,
+          storage,
+          schema,
+          provider,
+          partitionColumnNames,
+          bucketSpec,
+          owner,
+          createTime,
+          lastAccessTime,
+          createVersion,
+          properties,
+          stats,
+          viewText,
+          comment,
+          collation,
+          unsupportedFeatures,
+          tracksPartitionsInCatalog,
+          schemaPreservesCase,
+          ignoredProperties,
+          viewOriginalText)
+    }.recover { case _: Exception => // Spark 3.5 and previous
+      DynConstructors.builder()
+        .impl(
+          classOf[CatalogTable],
+          classOf[TableIdentifier],
+          classOf[CatalogTableType],
+          classOf[CatalogStorageFormat],
+          classOf[StructType],
+          classOf[Option[String]],
+          classOf[Seq[String]],
+          classOf[Option[BucketSpec]],
+          classOf[String],
+          classOf[Long],
+          classOf[Long],
+          classOf[String],
+          classOf[Map[String, String]],
+          classOf[Option[CatalogStatistics]],
+          classOf[Option[String]],
+          classOf[Option[String]],
+          classOf[Seq[String]],
+          classOf[Boolean],
+          classOf[Boolean],
+          classOf[Map[String, String]],
+          classOf[Option[String]])
+        .buildChecked()
+        .invokeChecked[CatalogTable](
+          null,
+          identifier,
+          tableType,
+          storage,
+          schema,
+          provider,
+          partitionColumnNames,
+          bucketSpec,
+          owner,
+          createTime,
+          lastAccessTime,
+          createVersion,
+          properties,
+          stats,
+          viewText,
+          comment,
+          unsupportedFeatures,
+          tracksPartitionsInCatalog,
+          schemaPreservesCase,
+          ignoredProperties,
+          viewOriginalText)
+    }.get
+  }
+
   override def createTable(
       ident: Identifier,
       schema: StructType,
       partitions: Array[Transform],
       properties: util.Map[String, String]): Table =
-    withSQLConf(LEGACY_NON_IDENTIFIER_OUTPUT_CATALOG_NAME -> "true") {
+    withSparkSQLConf(LEGACY_NON_IDENTIFIER_OUTPUT_CATALOG_NAME -> "true") {
       import 
org.apache.spark.sql.hive.kyuubi.connector.HiveBridgeHelper.TransformHelper
       val (partitionColumns, maybeBucketSpec) = 
partitions.toSeq.convertTransforms
       val location = Option(properties.get(TableCatalog.PROP_LOCATION))
@@ -190,7 +330,7 @@ class HiveTableCatalog(sparkSession: SparkSession)
           CatalogTableType.MANAGED
         }
 
-      val tableDesc = CatalogTable(
+      val tableDesc = newCatalogTable(
         identifier = ident.asTableIdentifier,
         tableType = tableType,
         storage = storage,
@@ -213,7 +353,7 @@ class HiveTableCatalog(sparkSession: SparkSession)
     }
 
   override def alterTable(ident: Identifier, changes: TableChange*): Table =
-    withSQLConf(LEGACY_NON_IDENTIFIER_OUTPUT_CATALOG_NAME -> "true") {
+    withSparkSQLConf(LEGACY_NON_IDENTIFIER_OUTPUT_CATALOG_NAME -> "true") {
       val catalogTable =
         try {
           catalog.getTableMetadata(ident.asTableIdentifier)
@@ -253,7 +393,7 @@ class HiveTableCatalog(sparkSession: SparkSession)
     }
 
   override def dropTable(ident: Identifier): Boolean =
-    withSQLConf(LEGACY_NON_IDENTIFIER_OUTPUT_CATALOG_NAME -> "true") {
+    withSparkSQLConf(LEGACY_NON_IDENTIFIER_OUTPUT_CATALOG_NAME -> "true") {
       try {
         if (loadTable(ident) != null) {
           catalog.dropTable(
@@ -271,7 +411,7 @@ class HiveTableCatalog(sparkSession: SparkSession)
     }
 
   override def renameTable(oldIdent: Identifier, newIdent: Identifier): Unit =
-    withSQLConf(LEGACY_NON_IDENTIFIER_OUTPUT_CATALOG_NAME -> "true") {
+    withSparkSQLConf(LEGACY_NON_IDENTIFIER_OUTPUT_CATALOG_NAME -> "true") {
       if (tableExists(newIdent)) {
         throw new TableAlreadyExistsException(newIdent)
       }
@@ -288,12 +428,12 @@ class HiveTableCatalog(sparkSession: SparkSession)
   }
 
   override def listNamespaces(): Array[Array[String]] =
-    withSQLConf(LEGACY_NON_IDENTIFIER_OUTPUT_CATALOG_NAME -> "true") {
+    withSparkSQLConf(LEGACY_NON_IDENTIFIER_OUTPUT_CATALOG_NAME -> "true") {
       catalog.listDatabases().map(Array(_)).toArray
     }
 
   override def listNamespaces(namespace: Array[String]): Array[Array[String]] =
-    withSQLConf(LEGACY_NON_IDENTIFIER_OUTPUT_CATALOG_NAME -> "true") {
+    withSparkSQLConf(LEGACY_NON_IDENTIFIER_OUTPUT_CATALOG_NAME -> "true") {
       namespace match {
         case Array() =>
           listNamespaces()
@@ -305,7 +445,7 @@ class HiveTableCatalog(sparkSession: SparkSession)
     }
 
   override def loadNamespaceMetadata(namespace: Array[String]): 
util.Map[String, String] =
-    withSQLConf(LEGACY_NON_IDENTIFIER_OUTPUT_CATALOG_NAME -> "true") {
+    withSparkSQLConf(LEGACY_NON_IDENTIFIER_OUTPUT_CATALOG_NAME -> "true") {
       namespace match {
         case Array(db) =>
           try {
@@ -323,7 +463,7 @@ class HiveTableCatalog(sparkSession: SparkSession)
   override def createNamespace(
       namespace: Array[String],
       metadata: util.Map[String, String]): Unit =
-    withSQLConf(LEGACY_NON_IDENTIFIER_OUTPUT_CATALOG_NAME -> "true") {
+    withSparkSQLConf(LEGACY_NON_IDENTIFIER_OUTPUT_CATALOG_NAME -> "true") {
       namespace match {
         case Array(db) if !catalog.databaseExists(db) =>
           catalog.createDatabase(
@@ -339,7 +479,7 @@ class HiveTableCatalog(sparkSession: SparkSession)
     }
 
   override def alterNamespace(namespace: Array[String], changes: 
NamespaceChange*): Unit =
-    withSQLConf(LEGACY_NON_IDENTIFIER_OUTPUT_CATALOG_NAME -> "true") {
+    withSparkSQLConf(LEGACY_NON_IDENTIFIER_OUTPUT_CATALOG_NAME -> "true") {
       namespace match {
         case Array(db) =>
           // validate that this catalog's reserved properties are not removed
@@ -379,7 +519,7 @@ class HiveTableCatalog(sparkSession: SparkSession)
   override def dropNamespace(
       namespace: Array[String],
       cascade: Boolean): Boolean =
-    withSQLConf(LEGACY_NON_IDENTIFIER_OUTPUT_CATALOG_NAME -> "true") {
+    withSparkSQLConf(LEGACY_NON_IDENTIFIER_OUTPUT_CATALOG_NAME -> "true") {
       namespace match {
         case Array(db) if catalog.databaseExists(db) =>
           catalog.dropDatabase(db, ignoreIfNotExists = false, cascade)
diff --git 
a/extensions/spark/kyuubi-spark-connector-hive/src/main/scala/org/apache/kyuubi/spark/connector/hive/write/HiveWriteHelper.scala
 
b/extensions/spark/kyuubi-spark-connector-hive/src/main/scala/org/apache/kyuubi/spark/connector/hive/write/HiveWriteHelper.scala
index c3e73c0117..558ac8ee91 100644
--- 
a/extensions/spark/kyuubi-spark-connector-hive/src/main/scala/org/apache/kyuubi/spark/connector/hive/write/HiveWriteHelper.scala
+++ 
b/extensions/spark/kyuubi-spark-connector-hive/src/main/scala/org/apache/kyuubi/spark/connector/hive/write/HiveWriteHelper.scala
@@ -28,7 +28,9 @@ import org.apache.hadoop.hive.common.FileUtils
 import org.apache.hadoop.hive.ql.exec.TaskRunner
 import org.apache.spark.internal.Logging
 import org.apache.spark.sql.catalyst.catalog.ExternalCatalogWithListener
-import org.apache.spark.sql.hive.kyuubi.connector.HiveBridgeHelper.{hive, 
HiveExternalCatalog, HiveVersion}
+import 
org.apache.spark.sql.hive.kyuubi.connector.HiveBridgeHelper.HiveExternalCatalog
+
+import org.apache.kyuubi.util.SemanticVersion
 
 // scalastyle:off line.size.limit
 /**
@@ -48,8 +50,6 @@ object HiveWriteHelper extends Logging {
       hadoopConf: Configuration,
       path: Path): Path = {
 
-    import hive._
-
     // Before Hive 1.1, when inserting into a table, Hive will create the 
staging directory under
     // a common scratch directory. After the writing is finished, Hive will 
simply empty the table
     // directory and move the staging directory to it.
@@ -59,24 +59,15 @@ object HiveWriteHelper extends Logging {
     // We have to follow the Hive behavior here, to avoid troubles. For 
example, if we create
     // staging directory under the table director for Hive prior to 1.1, the 
staging directory will
     // be removed by Hive when Hive is trying to empty the table directory.
-    val hiveVersionsUsingOldExternalTempPath: Set[HiveVersion] = Set(v12, v13, 
v14, v1_0)
-    val hiveVersionsUsingNewExternalTempPath: Set[HiveVersion] =
-      Set(v1_1, v1_2, v2_0, v2_1, v2_2, v2_3, v3_0, v3_1)
-
-    // Ensure all the supported versions are considered here.
-    assert(hiveVersionsUsingNewExternalTempPath ++ 
hiveVersionsUsingOldExternalTempPath ==
-      allSupportedHiveVersions)
-
-    val hiveVersion = 
externalCatalog.unwrapped.asInstanceOf[HiveExternalCatalog].client.version
+    val hiveVersion = SemanticVersion(
+      
externalCatalog.unwrapped.asInstanceOf[HiveExternalCatalog].client.version.fullVersion)
     val stagingDir = hadoopConf.get(hiveStagingDir, ".hive-staging")
     val scratchDir = hadoopConf.get(hiveScratchDir, "/tmp/hive")
 
-    if (hiveVersionsUsingOldExternalTempPath.contains(hiveVersion)) {
+    if (hiveVersion < "1.1") {
       oldVersionExternalTempPath(path, hadoopConf, scratchDir)
-    } else if (hiveVersionsUsingNewExternalTempPath.contains(hiveVersion)) {
-      newVersionExternalTempPath(path, hadoopConf, stagingDir)
     } else {
-      throw new IllegalStateException("Unsupported hive version: " + 
hiveVersion.fullVersion)
+      newVersionExternalTempPath(path, hadoopConf, stagingDir)
     }
   }
 
@@ -96,7 +87,7 @@ object HiveWriteHelper extends Logging {
     var dirPath = new Path(
       extURI.getScheme,
       extURI.getAuthority,
-      scratchPath.toUri.getPath + "-" + TaskRunner.getTaskRunnerID())
+      scratchPath.toUri.getPath + "-" + TaskRunner.getTaskRunnerID)
 
     try {
       val fs: FileSystem = dirPath.getFileSystem(hadoopConf)
@@ -120,19 +111,12 @@ object HiveWriteHelper extends Logging {
       stagingDir: String): Path = {
     val extURI: URI = path.toUri
     if (extURI.getScheme == "viewfs") {
-      getExtTmpPathRelTo(path, hadoopConf, stagingDir)
+      new Path(getStagingDir(path, hadoopConf, stagingDir), "-ext-10000") // 
Hive uses 10000
     } else {
       new Path(getExternalScratchDir(extURI, hadoopConf, stagingDir), 
"-ext-10000")
     }
   }
 
-  private def getExtTmpPathRelTo(
-      path: Path,
-      hadoopConf: Configuration,
-      stagingDir: String): Path = {
-    new Path(getStagingDir(path, hadoopConf, stagingDir), "-ext-10000") // 
Hive uses 10000
-  }
-
   private def getExternalScratchDir(
       extURI: URI,
       hadoopConf: Configuration,

Reply via email to