This is an automated email from the ASF dual-hosted git repository.
chengpan pushed a commit to branch branch-1.9
in repository https://gitbox.apache.org/repos/asf/kyuubi.git
The following commit(s) were added to refs/heads/branch-1.9 by this push:
new e22a095879 [KYUUBI #6920][FOLLOWUP] Spark SQL engine supports Spark 4.0
e22a095879 is described below
commit e22a095879fa85cc0b9a9b32e4bbccbe199ad16b
Author: Cheng Pan <[email protected]>
AuthorDate: Fri May 16 11:47:35 2025 +0800
[KYUUBI #6920][FOLLOWUP] Spark SQL engine supports Spark 4.0
There were some breaking changes after we fixed compatibility for Spark
4.0.0 RC1 in #6920, but now Spark has reached 4.0.0 RC6, which has less chance
to receive more breaking changes.
Changes are extracted from https://github.com/apache/kyuubi/pull/6928,
which passed CI with Spark 4.0.0 RC6
No.
Closes #7061 from pan3793/6920-followup.
Closes #6920
17a1bd9e5 [Cheng Pan] [KYUUBI #6920][FOLLOWUP] Spark SQL engine supports
Spark 4.0
Authored-by: Cheng Pan <[email protected]>
Signed-off-by: Cheng Pan <[email protected]>
---
.../spark/connector/hive/HiveConnectorUtils.scala | 69 +++++++--
.../spark/connector/hive/HiveTableCatalog.scala | 172 +++++++++++++++++++--
.../connector/hive/write/HiveWriteHelper.scala | 34 ++--
3 files changed, 224 insertions(+), 51 deletions(-)
diff --git
a/extensions/spark/kyuubi-spark-connector-hive/src/main/scala/org/apache/kyuubi/spark/connector/hive/HiveConnectorUtils.scala
b/extensions/spark/kyuubi-spark-connector-hive/src/main/scala/org/apache/kyuubi/spark/connector/hive/HiveConnectorUtils.scala
index 2d5f4c08f8..371d79abe7 100644
---
a/extensions/spark/kyuubi-spark-connector-hive/src/main/scala/org/apache/kyuubi/spark/connector/hive/HiveConnectorUtils.scala
+++
b/extensions/spark/kyuubi-spark-connector-hive/src/main/scala/org/apache/kyuubi/spark/connector/hive/HiveConnectorUtils.scala
@@ -18,6 +18,7 @@
package org.apache.kyuubi.spark.connector.hive
import java.lang.{Boolean => JBoolean, Long => JLong}
+import java.net.URI
import scala.util.Try
@@ -25,12 +26,11 @@ import org.apache.hadoop.fs.{FileStatus, Path}
import org.apache.hadoop.hive.ql.plan.{FileSinkDesc, TableDesc}
import org.apache.spark.internal.Logging
import org.apache.spark.sql.SparkSession
-import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.{InternalRow, TableIdentifier}
import org.apache.spark.sql.catalyst.catalog.{CatalogTable,
CatalogTablePartition}
import org.apache.spark.sql.connector.catalog.TableChange
import org.apache.spark.sql.connector.catalog.TableChange._
import org.apache.spark.sql.execution.command.CommandUtils
-import
org.apache.spark.sql.execution.command.CommandUtils.{calculateMultipleLocationSizes,
calculateSingleLocationSize}
import org.apache.spark.sql.execution.datasources.{PartitionDirectory,
PartitionedFile}
import org.apache.spark.sql.hive.execution.HiveFileFormat
import org.apache.spark.sql.internal.SQLConf
@@ -82,7 +82,28 @@ object HiveConnectorUtils extends Logging {
isSplitable: JBoolean,
maxSplitBytes: JLong,
partitionValues: InternalRow): Seq[PartitionedFile] =
- Try { // SPARK-42821: 4.0.0-preview2
+ Try { // SPARK-42821, SPARK-51185: Spark 4.0
+ val fileStatusWithMetadataClz = DynClasses.builder()
+
.impl("org.apache.spark.sql.execution.datasources.FileStatusWithMetadata")
+ .buildChecked()
+ DynMethods
+ .builder("splitFiles")
+ .impl(
+ "org.apache.spark.sql.execution.PartitionedFileUtil",
+ fileStatusWithMetadataClz,
+ classOf[Path],
+ classOf[Boolean],
+ classOf[Long],
+ classOf[InternalRow])
+ .buildChecked()
+ .invokeChecked[Seq[PartitionedFile]](
+ null,
+ file,
+ filePath,
+ isSplitable,
+ maxSplitBytes,
+ partitionValues)
+ }.recover { case _: Exception => // SPARK-42821: 4.0.0-preview2
val fileStatusWithMetadataClz = DynClasses.builder()
.impl("org.apache.spark.sql.execution.datasources.FileStatusWithMetadata")
.buildChecked()
@@ -192,6 +213,29 @@ object HiveConnectorUtils extends Logging {
file.asInstanceOf[FileStatus].getPath
}.get
+ private def calculateMultipleLocationSizes(
+ sparkSession: SparkSession,
+ tid: TableIdentifier,
+ paths: Seq[Option[URI]]): Seq[Long] = {
+
+ val sparkSessionClz = DynClasses.builder()
+ .impl("org.apache.spark.sql.classic.SparkSession") // SPARK-49700 (4.0.0)
+ .impl("org.apache.spark.sql.SparkSession")
+ .build()
+
+ val calculateMultipleLocationSizesMethod =
+ DynMethods.builder("calculateMultipleLocationSizes")
+ .impl(
+ CommandUtils.getClass,
+ sparkSessionClz,
+ classOf[TableIdentifier],
+ classOf[Seq[Option[URI]]])
+ .buildChecked(CommandUtils)
+
+ calculateMultipleLocationSizesMethod
+ .invokeChecked[Seq[Long]](sparkSession, tid, paths)
+ }
+
def calculateTotalSize(
spark: SparkSession,
catalogTable: CatalogTable,
@@ -199,12 +243,11 @@ object HiveConnectorUtils extends Logging {
val sessionState = spark.sessionState
val startTime = System.nanoTime()
val (totalSize, newPartitions) = if
(catalogTable.partitionColumnNames.isEmpty) {
- (
- calculateSingleLocationSize(
- sessionState,
- catalogTable.identifier,
- catalogTable.storage.locationUri),
- Seq())
+ val tableSize = CommandUtils.calculateSingleLocationSize(
+ sessionState,
+ catalogTable.identifier,
+ catalogTable.storage.locationUri)
+ (tableSize, Seq())
} else {
// Calculate table size as a sum of the visible partitions. See
SPARK-21079
val partitions = hiveTableCatalog.listPartitions(catalogTable.identifier)
@@ -402,7 +445,13 @@ object HiveConnectorUtils extends Logging {
new StructType(newFields)
}
- def withSQLConf[T](pairs: (String, String)*)(f: => T): T = {
+ // This is a fork of Spark's withSQLConf, and we use a different name to
avoid linkage
+ // issue on cross-version cases.
+ // For example, SPARK-46227(4.0.0) moves `withSQLConf` from SQLHelper to
SQLConfHelper,
+ // classes that extend SQLConfHelper will prefer to linkage super class's
method when
+ // compiling with Spark 4.0, then linkage error will happen when run the jar
with lower
+ // Spark versions.
+ def withSparkSQLConf[T](pairs: (String, String)*)(f: => T): T = {
val conf = SQLConf.get
val (keys, values) = pairs.unzip
val currentValues = keys.map { key =>
diff --git
a/extensions/spark/kyuubi-spark-connector-hive/src/main/scala/org/apache/kyuubi/spark/connector/hive/HiveTableCatalog.scala
b/extensions/spark/kyuubi-spark-connector-hive/src/main/scala/org/apache/kyuubi/spark/connector/hive/HiveTableCatalog.scala
index c128d67f1f..f72881f928 100644
---
a/extensions/spark/kyuubi-spark-connector-hive/src/main/scala/org/apache/kyuubi/spark/connector/hive/HiveTableCatalog.scala
+++
b/extensions/spark/kyuubi-spark-connector-hive/src/main/scala/org/apache/kyuubi/spark/connector/hive/HiveTableCatalog.scala
@@ -17,17 +17,19 @@
package org.apache.kyuubi.spark.connector.hive
+import java.lang.{Boolean => JBoolean, Long => JLong}
import java.net.URI
import java.util
import scala.collection.JavaConverters._
import scala.collection.mutable
+import scala.util.Try
import org.apache.hadoop.conf.Configuration
import org.apache.spark.SparkConf
import org.apache.spark.internal.Logging
import org.apache.spark.sql.SparkSession
-import org.apache.spark.sql.catalyst.{SQLConfHelper, TableIdentifier}
+import org.apache.spark.sql.catalyst.{CurrentUserContext, SQLConfHelper,
TableIdentifier}
import
org.apache.spark.sql.catalyst.analysis.{NamespaceAlreadyExistsException,
NoSuchDatabaseException, NoSuchNamespaceException, NoSuchTableException,
TableAlreadyExistsException}
import org.apache.spark.sql.catalyst.catalog._
import org.apache.spark.sql.catalyst.catalog.CatalogTypes.TablePartitionSpec
@@ -44,9 +46,10 @@ import
org.apache.spark.sql.internal.StaticSQLConf.{CATALOG_IMPLEMENTATION, GLOB
import org.apache.spark.sql.types.StructType
import org.apache.spark.sql.util.CaseInsensitiveStringMap
-import org.apache.kyuubi.spark.connector.hive.HiveConnectorUtils.withSQLConf
+import
org.apache.kyuubi.spark.connector.hive.HiveConnectorUtils.withSparkSQLConf
import
org.apache.kyuubi.spark.connector.hive.HiveTableCatalog.{getStorageFormatAndProvider,
toCatalogDatabase, CatalogDatabaseHelper, IdentifierHelper, NamespaceHelper}
import
org.apache.kyuubi.spark.connector.hive.KyuubiHiveConnectorDelegationTokenProvider.metastoreTokenSignature
+import org.apache.kyuubi.util.reflect.{DynClasses, DynConstructors}
/**
* A [[TableCatalog]] that wrap HiveExternalCatalog to as V2 CatalogPlugin
instance to access Hive.
@@ -100,6 +103,20 @@ class HiveTableCatalog(sparkSession: SparkSession)
catalogName
}
+ private def newHiveMetastoreCatalog(sparkSession: SparkSession):
HiveMetastoreCatalog = {
+ val sparkSessionClz = DynClasses.builder()
+ .impl("org.apache.spark.sql.classic.SparkSession") // SPARK-49700 (4.0.0)
+ .impl("org.apache.spark.sql.SparkSession")
+ .buildChecked()
+
+ val hiveMetastoreCatalogCtor =
+ DynConstructors.builder()
+ .impl("org.apache.spark.sql.hive.HiveMetastoreCatalog",
sparkSessionClz)
+ .buildChecked[HiveMetastoreCatalog]()
+
+ hiveMetastoreCatalogCtor.newInstanceChecked(sparkSession)
+ }
+
override def initialize(name: String, options: CaseInsensitiveStringMap):
Unit = {
assert(catalogName == null, "The Hive table catalog is already initialed.")
assert(
@@ -110,7 +127,7 @@ class HiveTableCatalog(sparkSession: SparkSession)
catalog = new HiveSessionCatalog(
externalCatalogBuilder = () => externalCatalog,
globalTempViewManagerBuilder = () => globalTempViewManager,
- metastoreCatalog = new HiveMetastoreCatalog(sparkSession),
+ metastoreCatalog = newHiveMetastoreCatalog(sparkSession),
functionRegistry = sessionState.functionRegistry,
tableFunctionRegistry = sessionState.tableFunctionRegistry,
hadoopConf = hadoopConf,
@@ -148,7 +165,7 @@ class HiveTableCatalog(sparkSession: SparkSession)
override val defaultNamespace: Array[String] = Array("default")
override def listTables(namespace: Array[String]): Array[Identifier] =
- withSQLConf(LEGACY_NON_IDENTIFIER_OUTPUT_CATALOG_NAME -> "true") {
+ withSparkSQLConf(LEGACY_NON_IDENTIFIER_OUTPUT_CATALOG_NAME -> "true") {
namespace match {
case Array(db) =>
catalog
@@ -162,16 +179,139 @@ class HiveTableCatalog(sparkSession: SparkSession)
}
override def loadTable(ident: Identifier): Table =
- withSQLConf(LEGACY_NON_IDENTIFIER_OUTPUT_CATALOG_NAME -> "true") {
+ withSparkSQLConf(LEGACY_NON_IDENTIFIER_OUTPUT_CATALOG_NAME -> "true") {
HiveTable(sparkSession,
catalog.getTableMetadata(ident.asTableIdentifier), this)
}
+ // scalastyle:off
+ private def newCatalogTable(
+ identifier: TableIdentifier,
+ tableType: CatalogTableType,
+ storage: CatalogStorageFormat,
+ schema: StructType,
+ provider: Option[String] = None,
+ partitionColumnNames: Seq[String] = Seq.empty,
+ bucketSpec: Option[BucketSpec] = None,
+ owner: String =
Option(CurrentUserContext.CURRENT_USER.get()).getOrElse(""),
+ createTime: JLong = System.currentTimeMillis,
+ lastAccessTime: JLong = -1,
+ createVersion: String = "",
+ properties: Map[String, String] = Map.empty,
+ stats: Option[CatalogStatistics] = None,
+ viewText: Option[String] = None,
+ comment: Option[String] = None,
+ collation: Option[String] = None,
+ unsupportedFeatures: Seq[String] = Seq.empty,
+ tracksPartitionsInCatalog: JBoolean = false,
+ schemaPreservesCase: JBoolean = true,
+ ignoredProperties: Map[String, String] = Map.empty,
+ viewOriginalText: Option[String] = None): CatalogTable = {
+ // scalastyle:on
+ Try { // SPARK-50675 (4.0.0)
+ DynConstructors.builder()
+ .impl(
+ classOf[CatalogTable],
+ classOf[TableIdentifier],
+ classOf[CatalogTableType],
+ classOf[CatalogStorageFormat],
+ classOf[StructType],
+ classOf[Option[String]],
+ classOf[Seq[String]],
+ classOf[Option[BucketSpec]],
+ classOf[String],
+ classOf[Long],
+ classOf[Long],
+ classOf[String],
+ classOf[Map[String, String]],
+ classOf[Option[CatalogStatistics]],
+ classOf[Option[String]],
+ classOf[Option[String]],
+ classOf[Option[String]],
+ classOf[Seq[String]],
+ classOf[Boolean],
+ classOf[Boolean],
+ classOf[Map[String, String]],
+ classOf[Option[String]])
+ .buildChecked()
+ .invokeChecked[CatalogTable](
+ null,
+ identifier,
+ tableType,
+ storage,
+ schema,
+ provider,
+ partitionColumnNames,
+ bucketSpec,
+ owner,
+ createTime,
+ lastAccessTime,
+ createVersion,
+ properties,
+ stats,
+ viewText,
+ comment,
+ collation,
+ unsupportedFeatures,
+ tracksPartitionsInCatalog,
+ schemaPreservesCase,
+ ignoredProperties,
+ viewOriginalText)
+ }.recover { case _: Exception => // Spark 3.5 and previous
+ DynConstructors.builder()
+ .impl(
+ classOf[CatalogTable],
+ classOf[TableIdentifier],
+ classOf[CatalogTableType],
+ classOf[CatalogStorageFormat],
+ classOf[StructType],
+ classOf[Option[String]],
+ classOf[Seq[String]],
+ classOf[Option[BucketSpec]],
+ classOf[String],
+ classOf[Long],
+ classOf[Long],
+ classOf[String],
+ classOf[Map[String, String]],
+ classOf[Option[CatalogStatistics]],
+ classOf[Option[String]],
+ classOf[Option[String]],
+ classOf[Seq[String]],
+ classOf[Boolean],
+ classOf[Boolean],
+ classOf[Map[String, String]],
+ classOf[Option[String]])
+ .buildChecked()
+ .invokeChecked[CatalogTable](
+ null,
+ identifier,
+ tableType,
+ storage,
+ schema,
+ provider,
+ partitionColumnNames,
+ bucketSpec,
+ owner,
+ createTime,
+ lastAccessTime,
+ createVersion,
+ properties,
+ stats,
+ viewText,
+ comment,
+ unsupportedFeatures,
+ tracksPartitionsInCatalog,
+ schemaPreservesCase,
+ ignoredProperties,
+ viewOriginalText)
+ }.get
+ }
+
override def createTable(
ident: Identifier,
schema: StructType,
partitions: Array[Transform],
properties: util.Map[String, String]): Table =
- withSQLConf(LEGACY_NON_IDENTIFIER_OUTPUT_CATALOG_NAME -> "true") {
+ withSparkSQLConf(LEGACY_NON_IDENTIFIER_OUTPUT_CATALOG_NAME -> "true") {
import
org.apache.spark.sql.hive.kyuubi.connector.HiveBridgeHelper.TransformHelper
val (partitionColumns, maybeBucketSpec) =
partitions.toSeq.convertTransforms
val location = Option(properties.get(TableCatalog.PROP_LOCATION))
@@ -190,7 +330,7 @@ class HiveTableCatalog(sparkSession: SparkSession)
CatalogTableType.MANAGED
}
- val tableDesc = CatalogTable(
+ val tableDesc = newCatalogTable(
identifier = ident.asTableIdentifier,
tableType = tableType,
storage = storage,
@@ -213,7 +353,7 @@ class HiveTableCatalog(sparkSession: SparkSession)
}
override def alterTable(ident: Identifier, changes: TableChange*): Table =
- withSQLConf(LEGACY_NON_IDENTIFIER_OUTPUT_CATALOG_NAME -> "true") {
+ withSparkSQLConf(LEGACY_NON_IDENTIFIER_OUTPUT_CATALOG_NAME -> "true") {
val catalogTable =
try {
catalog.getTableMetadata(ident.asTableIdentifier)
@@ -253,7 +393,7 @@ class HiveTableCatalog(sparkSession: SparkSession)
}
override def dropTable(ident: Identifier): Boolean =
- withSQLConf(LEGACY_NON_IDENTIFIER_OUTPUT_CATALOG_NAME -> "true") {
+ withSparkSQLConf(LEGACY_NON_IDENTIFIER_OUTPUT_CATALOG_NAME -> "true") {
try {
if (loadTable(ident) != null) {
catalog.dropTable(
@@ -271,7 +411,7 @@ class HiveTableCatalog(sparkSession: SparkSession)
}
override def renameTable(oldIdent: Identifier, newIdent: Identifier): Unit =
- withSQLConf(LEGACY_NON_IDENTIFIER_OUTPUT_CATALOG_NAME -> "true") {
+ withSparkSQLConf(LEGACY_NON_IDENTIFIER_OUTPUT_CATALOG_NAME -> "true") {
if (tableExists(newIdent)) {
throw new TableAlreadyExistsException(newIdent)
}
@@ -288,12 +428,12 @@ class HiveTableCatalog(sparkSession: SparkSession)
}
override def listNamespaces(): Array[Array[String]] =
- withSQLConf(LEGACY_NON_IDENTIFIER_OUTPUT_CATALOG_NAME -> "true") {
+ withSparkSQLConf(LEGACY_NON_IDENTIFIER_OUTPUT_CATALOG_NAME -> "true") {
catalog.listDatabases().map(Array(_)).toArray
}
override def listNamespaces(namespace: Array[String]): Array[Array[String]] =
- withSQLConf(LEGACY_NON_IDENTIFIER_OUTPUT_CATALOG_NAME -> "true") {
+ withSparkSQLConf(LEGACY_NON_IDENTIFIER_OUTPUT_CATALOG_NAME -> "true") {
namespace match {
case Array() =>
listNamespaces()
@@ -305,7 +445,7 @@ class HiveTableCatalog(sparkSession: SparkSession)
}
override def loadNamespaceMetadata(namespace: Array[String]):
util.Map[String, String] =
- withSQLConf(LEGACY_NON_IDENTIFIER_OUTPUT_CATALOG_NAME -> "true") {
+ withSparkSQLConf(LEGACY_NON_IDENTIFIER_OUTPUT_CATALOG_NAME -> "true") {
namespace match {
case Array(db) =>
try {
@@ -323,7 +463,7 @@ class HiveTableCatalog(sparkSession: SparkSession)
override def createNamespace(
namespace: Array[String],
metadata: util.Map[String, String]): Unit =
- withSQLConf(LEGACY_NON_IDENTIFIER_OUTPUT_CATALOG_NAME -> "true") {
+ withSparkSQLConf(LEGACY_NON_IDENTIFIER_OUTPUT_CATALOG_NAME -> "true") {
namespace match {
case Array(db) if !catalog.databaseExists(db) =>
catalog.createDatabase(
@@ -339,7 +479,7 @@ class HiveTableCatalog(sparkSession: SparkSession)
}
override def alterNamespace(namespace: Array[String], changes:
NamespaceChange*): Unit =
- withSQLConf(LEGACY_NON_IDENTIFIER_OUTPUT_CATALOG_NAME -> "true") {
+ withSparkSQLConf(LEGACY_NON_IDENTIFIER_OUTPUT_CATALOG_NAME -> "true") {
namespace match {
case Array(db) =>
// validate that this catalog's reserved properties are not removed
@@ -379,7 +519,7 @@ class HiveTableCatalog(sparkSession: SparkSession)
override def dropNamespace(
namespace: Array[String],
cascade: Boolean): Boolean =
- withSQLConf(LEGACY_NON_IDENTIFIER_OUTPUT_CATALOG_NAME -> "true") {
+ withSparkSQLConf(LEGACY_NON_IDENTIFIER_OUTPUT_CATALOG_NAME -> "true") {
namespace match {
case Array(db) if catalog.databaseExists(db) =>
catalog.dropDatabase(db, ignoreIfNotExists = false, cascade)
diff --git
a/extensions/spark/kyuubi-spark-connector-hive/src/main/scala/org/apache/kyuubi/spark/connector/hive/write/HiveWriteHelper.scala
b/extensions/spark/kyuubi-spark-connector-hive/src/main/scala/org/apache/kyuubi/spark/connector/hive/write/HiveWriteHelper.scala
index c3e73c0117..558ac8ee91 100644
---
a/extensions/spark/kyuubi-spark-connector-hive/src/main/scala/org/apache/kyuubi/spark/connector/hive/write/HiveWriteHelper.scala
+++
b/extensions/spark/kyuubi-spark-connector-hive/src/main/scala/org/apache/kyuubi/spark/connector/hive/write/HiveWriteHelper.scala
@@ -28,7 +28,9 @@ import org.apache.hadoop.hive.common.FileUtils
import org.apache.hadoop.hive.ql.exec.TaskRunner
import org.apache.spark.internal.Logging
import org.apache.spark.sql.catalyst.catalog.ExternalCatalogWithListener
-import org.apache.spark.sql.hive.kyuubi.connector.HiveBridgeHelper.{hive,
HiveExternalCatalog, HiveVersion}
+import
org.apache.spark.sql.hive.kyuubi.connector.HiveBridgeHelper.HiveExternalCatalog
+
+import org.apache.kyuubi.util.SemanticVersion
// scalastyle:off line.size.limit
/**
@@ -48,8 +50,6 @@ object HiveWriteHelper extends Logging {
hadoopConf: Configuration,
path: Path): Path = {
- import hive._
-
// Before Hive 1.1, when inserting into a table, Hive will create the
staging directory under
// a common scratch directory. After the writing is finished, Hive will
simply empty the table
// directory and move the staging directory to it.
@@ -59,24 +59,15 @@ object HiveWriteHelper extends Logging {
// We have to follow the Hive behavior here, to avoid troubles. For
example, if we create
// staging directory under the table director for Hive prior to 1.1, the
staging directory will
// be removed by Hive when Hive is trying to empty the table directory.
- val hiveVersionsUsingOldExternalTempPath: Set[HiveVersion] = Set(v12, v13,
v14, v1_0)
- val hiveVersionsUsingNewExternalTempPath: Set[HiveVersion] =
- Set(v1_1, v1_2, v2_0, v2_1, v2_2, v2_3, v3_0, v3_1)
-
- // Ensure all the supported versions are considered here.
- assert(hiveVersionsUsingNewExternalTempPath ++
hiveVersionsUsingOldExternalTempPath ==
- allSupportedHiveVersions)
-
- val hiveVersion =
externalCatalog.unwrapped.asInstanceOf[HiveExternalCatalog].client.version
+ val hiveVersion = SemanticVersion(
+
externalCatalog.unwrapped.asInstanceOf[HiveExternalCatalog].client.version.fullVersion)
val stagingDir = hadoopConf.get(hiveStagingDir, ".hive-staging")
val scratchDir = hadoopConf.get(hiveScratchDir, "/tmp/hive")
- if (hiveVersionsUsingOldExternalTempPath.contains(hiveVersion)) {
+ if (hiveVersion < "1.1") {
oldVersionExternalTempPath(path, hadoopConf, scratchDir)
- } else if (hiveVersionsUsingNewExternalTempPath.contains(hiveVersion)) {
- newVersionExternalTempPath(path, hadoopConf, stagingDir)
} else {
- throw new IllegalStateException("Unsupported hive version: " +
hiveVersion.fullVersion)
+ newVersionExternalTempPath(path, hadoopConf, stagingDir)
}
}
@@ -96,7 +87,7 @@ object HiveWriteHelper extends Logging {
var dirPath = new Path(
extURI.getScheme,
extURI.getAuthority,
- scratchPath.toUri.getPath + "-" + TaskRunner.getTaskRunnerID())
+ scratchPath.toUri.getPath + "-" + TaskRunner.getTaskRunnerID)
try {
val fs: FileSystem = dirPath.getFileSystem(hadoopConf)
@@ -120,19 +111,12 @@ object HiveWriteHelper extends Logging {
stagingDir: String): Path = {
val extURI: URI = path.toUri
if (extURI.getScheme == "viewfs") {
- getExtTmpPathRelTo(path, hadoopConf, stagingDir)
+ new Path(getStagingDir(path, hadoopConf, stagingDir), "-ext-10000") //
Hive uses 10000
} else {
new Path(getExternalScratchDir(extURI, hadoopConf, stagingDir),
"-ext-10000")
}
}
- private def getExtTmpPathRelTo(
- path: Path,
- hadoopConf: Configuration,
- stagingDir: String): Path = {
- new Path(getStagingDir(path, hadoopConf, stagingDir), "-ext-10000") //
Hive uses 10000
- }
-
private def getExternalScratchDir(
extURI: URI,
hadoopConf: Configuration,