This is an automated email from the ASF dual-hosted git repository.
chengpan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/kyuubi.git
The following commit(s) were added to refs/heads/master by this push:
new e366b0950f [KYUUBI #6920][FOLLOWUP] Spark SQL engine supports Spark 4.0
e366b0950f is described below
commit e366b0950f71223388dd2ad7cf8396a67df115b2
Author: Cheng Pan <[email protected]>
AuthorDate: Fri May 16 11:47:35 2025 +0800
[KYUUBI #6920][FOLLOWUP] Spark SQL engine supports Spark 4.0
### Why are the changes needed?
There were some breaking changes after we fixed compatibility for Spark
4.0.0 RC1 in #6920, but now Spark has reached 4.0.0 RC6, which has less chance
to receive more breaking changes.
### How was this patch tested?
Changes are extracted from https://github.com/apache/kyuubi/pull/6928,
which passed CI with Spark 4.0.0 RC6
### Was this patch authored or co-authored using generative AI tooling?
No.
Closes #7061 from pan3793/6920-followup.
Closes #6920
17a1bd9e5 [Cheng Pan] [KYUUBI #6920][FOLLOWUP] Spark SQL engine supports
Spark 4.0
Authored-by: Cheng Pan <[email protected]>
Signed-off-by: Cheng Pan <[email protected]>
---
.../spark/connector/hive/HiveConnectorUtils.scala | 61 +++++++--
.../spark/connector/hive/HiveTableCatalog.scala | 146 ++++++++++++++++++++-
.../spark/sql/execution/SparkPlanHelper.scala | 33 +++++
.../execution/arrow/KyuubiArrowConverters.scala | 4 +-
.../spark/sql/kyuubi/SparkDatasetHelper.scala | 16 ++-
5 files changed, 239 insertions(+), 21 deletions(-)
diff --git
a/extensions/spark/kyuubi-spark-connector-hive/src/main/scala/org/apache/kyuubi/spark/connector/hive/HiveConnectorUtils.scala
b/extensions/spark/kyuubi-spark-connector-hive/src/main/scala/org/apache/kyuubi/spark/connector/hive/HiveConnectorUtils.scala
index f56aa977b6..371d79abe7 100644
---
a/extensions/spark/kyuubi-spark-connector-hive/src/main/scala/org/apache/kyuubi/spark/connector/hive/HiveConnectorUtils.scala
+++
b/extensions/spark/kyuubi-spark-connector-hive/src/main/scala/org/apache/kyuubi/spark/connector/hive/HiveConnectorUtils.scala
@@ -18,6 +18,7 @@
package org.apache.kyuubi.spark.connector.hive
import java.lang.{Boolean => JBoolean, Long => JLong}
+import java.net.URI
import scala.util.Try
@@ -25,12 +26,11 @@ import org.apache.hadoop.fs.{FileStatus, Path}
import org.apache.hadoop.hive.ql.plan.{FileSinkDesc, TableDesc}
import org.apache.spark.internal.Logging
import org.apache.spark.sql.SparkSession
-import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.{InternalRow, TableIdentifier}
import org.apache.spark.sql.catalyst.catalog.{CatalogTable,
CatalogTablePartition}
import org.apache.spark.sql.connector.catalog.TableChange
import org.apache.spark.sql.connector.catalog.TableChange._
import org.apache.spark.sql.execution.command.CommandUtils
-import
org.apache.spark.sql.execution.command.CommandUtils.{calculateMultipleLocationSizes,
calculateSingleLocationSize}
import org.apache.spark.sql.execution.datasources.{PartitionDirectory,
PartitionedFile}
import org.apache.spark.sql.hive.execution.HiveFileFormat
import org.apache.spark.sql.internal.SQLConf
@@ -82,7 +82,28 @@ object HiveConnectorUtils extends Logging {
isSplitable: JBoolean,
maxSplitBytes: JLong,
partitionValues: InternalRow): Seq[PartitionedFile] =
- Try { // SPARK-42821: 4.0.0-preview2
+ Try { // SPARK-42821, SPARK-51185: Spark 4.0
+ val fileStatusWithMetadataClz = DynClasses.builder()
+
.impl("org.apache.spark.sql.execution.datasources.FileStatusWithMetadata")
+ .buildChecked()
+ DynMethods
+ .builder("splitFiles")
+ .impl(
+ "org.apache.spark.sql.execution.PartitionedFileUtil",
+ fileStatusWithMetadataClz,
+ classOf[Path],
+ classOf[Boolean],
+ classOf[Long],
+ classOf[InternalRow])
+ .buildChecked()
+ .invokeChecked[Seq[PartitionedFile]](
+ null,
+ file,
+ filePath,
+ isSplitable,
+ maxSplitBytes,
+ partitionValues)
+ }.recover { case _: Exception => // SPARK-42821: 4.0.0-preview2
val fileStatusWithMetadataClz = DynClasses.builder()
.impl("org.apache.spark.sql.execution.datasources.FileStatusWithMetadata")
.buildChecked()
@@ -192,6 +213,29 @@ object HiveConnectorUtils extends Logging {
file.asInstanceOf[FileStatus].getPath
}.get
+ private def calculateMultipleLocationSizes(
+ sparkSession: SparkSession,
+ tid: TableIdentifier,
+ paths: Seq[Option[URI]]): Seq[Long] = {
+
+ val sparkSessionClz = DynClasses.builder()
+ .impl("org.apache.spark.sql.classic.SparkSession") // SPARK-49700 (4.0.0)
+ .impl("org.apache.spark.sql.SparkSession")
+ .build()
+
+ val calculateMultipleLocationSizesMethod =
+ DynMethods.builder("calculateMultipleLocationSizes")
+ .impl(
+ CommandUtils.getClass,
+ sparkSessionClz,
+ classOf[TableIdentifier],
+ classOf[Seq[Option[URI]]])
+ .buildChecked(CommandUtils)
+
+ calculateMultipleLocationSizesMethod
+ .invokeChecked[Seq[Long]](sparkSession, tid, paths)
+ }
+
def calculateTotalSize(
spark: SparkSession,
catalogTable: CatalogTable,
@@ -199,12 +243,11 @@ object HiveConnectorUtils extends Logging {
val sessionState = spark.sessionState
val startTime = System.nanoTime()
val (totalSize, newPartitions) = if
(catalogTable.partitionColumnNames.isEmpty) {
- (
- calculateSingleLocationSize(
- sessionState,
- catalogTable.identifier,
- catalogTable.storage.locationUri),
- Seq())
+ val tableSize = CommandUtils.calculateSingleLocationSize(
+ sessionState,
+ catalogTable.identifier,
+ catalogTable.storage.locationUri)
+ (tableSize, Seq())
} else {
// Calculate table size as a sum of the visible partitions. See
SPARK-21079
val partitions = hiveTableCatalog.listPartitions(catalogTable.identifier)
diff --git
a/extensions/spark/kyuubi-spark-connector-hive/src/main/scala/org/apache/kyuubi/spark/connector/hive/HiveTableCatalog.scala
b/extensions/spark/kyuubi-spark-connector-hive/src/main/scala/org/apache/kyuubi/spark/connector/hive/HiveTableCatalog.scala
index 91088d7877..f72881f928 100644
---
a/extensions/spark/kyuubi-spark-connector-hive/src/main/scala/org/apache/kyuubi/spark/connector/hive/HiveTableCatalog.scala
+++
b/extensions/spark/kyuubi-spark-connector-hive/src/main/scala/org/apache/kyuubi/spark/connector/hive/HiveTableCatalog.scala
@@ -17,17 +17,19 @@
package org.apache.kyuubi.spark.connector.hive
+import java.lang.{Boolean => JBoolean, Long => JLong}
import java.net.URI
import java.util
import scala.collection.JavaConverters._
import scala.collection.mutable
+import scala.util.Try
import org.apache.hadoop.conf.Configuration
import org.apache.spark.SparkConf
import org.apache.spark.internal.Logging
import org.apache.spark.sql.SparkSession
-import org.apache.spark.sql.catalyst.{SQLConfHelper, TableIdentifier}
+import org.apache.spark.sql.catalyst.{CurrentUserContext, SQLConfHelper,
TableIdentifier}
import
org.apache.spark.sql.catalyst.analysis.{NamespaceAlreadyExistsException,
NoSuchDatabaseException, NoSuchNamespaceException, NoSuchTableException,
TableAlreadyExistsException}
import org.apache.spark.sql.catalyst.catalog._
import org.apache.spark.sql.catalyst.catalog.CatalogTypes.TablePartitionSpec
@@ -47,6 +49,7 @@ import org.apache.spark.sql.util.CaseInsensitiveStringMap
import
org.apache.kyuubi.spark.connector.hive.HiveConnectorUtils.withSparkSQLConf
import
org.apache.kyuubi.spark.connector.hive.HiveTableCatalog.{getStorageFormatAndProvider,
toCatalogDatabase, CatalogDatabaseHelper, IdentifierHelper, NamespaceHelper}
import
org.apache.kyuubi.spark.connector.hive.KyuubiHiveConnectorDelegationTokenProvider.metastoreTokenSignature
+import org.apache.kyuubi.util.reflect.{DynClasses, DynConstructors}
/**
* A [[TableCatalog]] that wrap HiveExternalCatalog to as V2 CatalogPlugin
instance to access Hive.
@@ -100,6 +103,20 @@ class HiveTableCatalog(sparkSession: SparkSession)
catalogName
}
+ private def newHiveMetastoreCatalog(sparkSession: SparkSession):
HiveMetastoreCatalog = {
+ val sparkSessionClz = DynClasses.builder()
+ .impl("org.apache.spark.sql.classic.SparkSession") // SPARK-49700 (4.0.0)
+ .impl("org.apache.spark.sql.SparkSession")
+ .buildChecked()
+
+ val hiveMetastoreCatalogCtor =
+ DynConstructors.builder()
+ .impl("org.apache.spark.sql.hive.HiveMetastoreCatalog",
sparkSessionClz)
+ .buildChecked[HiveMetastoreCatalog]()
+
+ hiveMetastoreCatalogCtor.newInstanceChecked(sparkSession)
+ }
+
override def initialize(name: String, options: CaseInsensitiveStringMap):
Unit = {
assert(catalogName == null, "The Hive table catalog is already initialed.")
assert(
@@ -110,7 +127,7 @@ class HiveTableCatalog(sparkSession: SparkSession)
catalog = new HiveSessionCatalog(
externalCatalogBuilder = () => externalCatalog,
globalTempViewManagerBuilder = () => globalTempViewManager,
- metastoreCatalog = new HiveMetastoreCatalog(sparkSession),
+ metastoreCatalog = newHiveMetastoreCatalog(sparkSession),
functionRegistry = sessionState.functionRegistry,
tableFunctionRegistry = sessionState.tableFunctionRegistry,
hadoopConf = hadoopConf,
@@ -166,6 +183,129 @@ class HiveTableCatalog(sparkSession: SparkSession)
HiveTable(sparkSession,
catalog.getTableMetadata(ident.asTableIdentifier), this)
}
+ // scalastyle:off
+ private def newCatalogTable(
+ identifier: TableIdentifier,
+ tableType: CatalogTableType,
+ storage: CatalogStorageFormat,
+ schema: StructType,
+ provider: Option[String] = None,
+ partitionColumnNames: Seq[String] = Seq.empty,
+ bucketSpec: Option[BucketSpec] = None,
+ owner: String =
Option(CurrentUserContext.CURRENT_USER.get()).getOrElse(""),
+ createTime: JLong = System.currentTimeMillis,
+ lastAccessTime: JLong = -1,
+ createVersion: String = "",
+ properties: Map[String, String] = Map.empty,
+ stats: Option[CatalogStatistics] = None,
+ viewText: Option[String] = None,
+ comment: Option[String] = None,
+ collation: Option[String] = None,
+ unsupportedFeatures: Seq[String] = Seq.empty,
+ tracksPartitionsInCatalog: JBoolean = false,
+ schemaPreservesCase: JBoolean = true,
+ ignoredProperties: Map[String, String] = Map.empty,
+ viewOriginalText: Option[String] = None): CatalogTable = {
+ // scalastyle:on
+ Try { // SPARK-50675 (4.0.0)
+ DynConstructors.builder()
+ .impl(
+ classOf[CatalogTable],
+ classOf[TableIdentifier],
+ classOf[CatalogTableType],
+ classOf[CatalogStorageFormat],
+ classOf[StructType],
+ classOf[Option[String]],
+ classOf[Seq[String]],
+ classOf[Option[BucketSpec]],
+ classOf[String],
+ classOf[Long],
+ classOf[Long],
+ classOf[String],
+ classOf[Map[String, String]],
+ classOf[Option[CatalogStatistics]],
+ classOf[Option[String]],
+ classOf[Option[String]],
+ classOf[Option[String]],
+ classOf[Seq[String]],
+ classOf[Boolean],
+ classOf[Boolean],
+ classOf[Map[String, String]],
+ classOf[Option[String]])
+ .buildChecked()
+ .invokeChecked[CatalogTable](
+ null,
+ identifier,
+ tableType,
+ storage,
+ schema,
+ provider,
+ partitionColumnNames,
+ bucketSpec,
+ owner,
+ createTime,
+ lastAccessTime,
+ createVersion,
+ properties,
+ stats,
+ viewText,
+ comment,
+ collation,
+ unsupportedFeatures,
+ tracksPartitionsInCatalog,
+ schemaPreservesCase,
+ ignoredProperties,
+ viewOriginalText)
+ }.recover { case _: Exception => // Spark 3.5 and previous
+ DynConstructors.builder()
+ .impl(
+ classOf[CatalogTable],
+ classOf[TableIdentifier],
+ classOf[CatalogTableType],
+ classOf[CatalogStorageFormat],
+ classOf[StructType],
+ classOf[Option[String]],
+ classOf[Seq[String]],
+ classOf[Option[BucketSpec]],
+ classOf[String],
+ classOf[Long],
+ classOf[Long],
+ classOf[String],
+ classOf[Map[String, String]],
+ classOf[Option[CatalogStatistics]],
+ classOf[Option[String]],
+ classOf[Option[String]],
+ classOf[Seq[String]],
+ classOf[Boolean],
+ classOf[Boolean],
+ classOf[Map[String, String]],
+ classOf[Option[String]])
+ .buildChecked()
+ .invokeChecked[CatalogTable](
+ null,
+ identifier,
+ tableType,
+ storage,
+ schema,
+ provider,
+ partitionColumnNames,
+ bucketSpec,
+ owner,
+ createTime,
+ lastAccessTime,
+ createVersion,
+ properties,
+ stats,
+ viewText,
+ comment,
+ unsupportedFeatures,
+ tracksPartitionsInCatalog,
+ schemaPreservesCase,
+ ignoredProperties,
+ viewOriginalText)
+ }.get
+ }
+
override def createTable(
ident: Identifier,
schema: StructType,
@@ -190,7 +330,7 @@ class HiveTableCatalog(sparkSession: SparkSession)
CatalogTableType.MANAGED
}
- val tableDesc = CatalogTable(
+ val tableDesc = newCatalogTable(
identifier = ident.asTableIdentifier,
tableType = tableType,
storage = storage,
diff --git
a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/spark/sql/execution/SparkPlanHelper.scala
b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/spark/sql/execution/SparkPlanHelper.scala
new file mode 100644
index 0000000000..be0eb02a1c
--- /dev/null
+++
b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/spark/sql/execution/SparkPlanHelper.scala
@@ -0,0 +1,33 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution
+
+import org.apache.spark.sql.SparkSession
+
+import org.apache.kyuubi.util.reflect.DynMethods
+
+object SparkPlanHelper {
+
+ private val sparkSessionMethod = DynMethods.builder("session")
+ .impl(classOf[SparkPlan])
+ .buildChecked()
+
+ def sparkSession(sparkPlan: SparkPlan): SparkSession = {
+ sparkSessionMethod.invokeChecked[SparkSession](sparkPlan)
+ }
+}
diff --git
a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/spark/sql/execution/arrow/KyuubiArrowConverters.scala
b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/spark/sql/execution/arrow/KyuubiArrowConverters.scala
index 04f4ede6cf..e2fb55134c 100644
---
a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/spark/sql/execution/arrow/KyuubiArrowConverters.scala
+++
b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/spark/sql/execution/arrow/KyuubiArrowConverters.scala
@@ -32,7 +32,7 @@ import org.apache.spark.TaskContext
import org.apache.spark.internal.Logging
import org.apache.spark.sql.catalyst.{InternalRow, SQLConfHelper}
import org.apache.spark.sql.catalyst.expressions.UnsafeRow
-import org.apache.spark.sql.execution.CollectLimitExec
+import org.apache.spark.sql.execution.{CollectLimitExec, SparkPlanHelper}
import org.apache.spark.sql.types._
import org.apache.spark.sql.util.ArrowUtils
import org.apache.spark.util.Utils
@@ -157,7 +157,7 @@ object KyuubiArrowConverters extends SQLConfHelper with
Logging {
val partsToScan =
partsScanned.until(math.min(partsScanned + numPartsToTry,
totalParts))
- val sc = collectLimitExec.session.sparkContext
+ val sc = SparkPlanHelper.sparkSession(collectLimitExec).sparkContext
val res = sc.runJob(
childRDD,
(it: Iterator[InternalRow]) => {
diff --git
a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/spark/sql/kyuubi/SparkDatasetHelper.scala
b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/spark/sql/kyuubi/SparkDatasetHelper.scala
index 85cf2971e2..73e7f77993 100644
---
a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/spark/sql/kyuubi/SparkDatasetHelper.scala
+++
b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/spark/sql/kyuubi/SparkDatasetHelper.scala
@@ -26,7 +26,7 @@ import org.apache.spark.rdd.RDD
import org.apache.spark.sql.{DataFrame, Dataset, Row}
import org.apache.spark.sql.catalyst.plans.logical.GlobalLimit
import
org.apache.spark.sql.catalyst.plans.logical.statsEstimation.EstimationUtils
-import org.apache.spark.sql.execution.{CollectLimitExec, CommandResultExec,
HiveResult, LocalTableScanExec, QueryExecution, SparkPlan, SQLExecution}
+import org.apache.spark.sql.execution.{CollectLimitExec, CommandResultExec,
HiveResult, LocalTableScanExec, QueryExecution, SparkPlan, SparkPlanHelper,
SQLExecution}
import org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanExec
import org.apache.spark.sql.execution.arrow.KyuubiArrowConverters
import org.apache.spark.sql.execution.metric.{SQLMetric, SQLMetrics}
@@ -83,8 +83,9 @@ object SparkDatasetHelper extends Logging {
*/
def toArrowBatchRdd(plan: SparkPlan): RDD[Array[Byte]] = {
val schemaCaptured = plan.schema
- val maxRecordsPerBatch =
plan.session.sessionState.conf.arrowMaxRecordsPerBatch
- val timeZoneId = plan.session.sessionState.conf.sessionLocalTimeZone
+ val spark = SparkPlanHelper.sparkSession(plan)
+ val maxRecordsPerBatch = spark.sessionState.conf.arrowMaxRecordsPerBatch
+ val timeZoneId = spark.sessionState.conf.sessionLocalTimeZone
// note that, we can't pass the lazy variable `maxBatchSize` directly,
this is because input
// arguments are serialized and sent to the executor side for execution.
val maxBatchSizePerBatch = maxBatchSize
@@ -169,8 +170,9 @@ object SparkDatasetHelper extends Logging {
}
private def doCollectLimit(collectLimit: CollectLimitExec):
Array[Array[Byte]] = {
- val timeZoneId =
collectLimit.session.sessionState.conf.sessionLocalTimeZone
- val maxRecordsPerBatch =
collectLimit.session.sessionState.conf.arrowMaxRecordsPerBatch
+ val spark = SparkPlanHelper.sparkSession(collectLimit)
+ val timeZoneId = spark.sessionState.conf.sessionLocalTimeZone
+ val maxRecordsPerBatch = spark.sessionState.conf.arrowMaxRecordsPerBatch
val batches = KyuubiArrowConverters.takeAsArrowBatches(
collectLimit,
@@ -199,7 +201,7 @@ object SparkDatasetHelper extends Logging {
}
private def doCommandResultExec(commandResult: CommandResultExec):
Array[Array[Byte]] = {
- val spark = commandResult.session
+ val spark = SparkPlanHelper.sparkSession(commandResult)
commandResult.longMetric("numOutputRows").add(commandResult.rows.size)
sendDriverMetrics(spark.sparkContext, commandResult.metrics)
KyuubiArrowConverters.toBatchIterator(
@@ -212,7 +214,7 @@ object SparkDatasetHelper extends Logging {
}
private def doLocalTableScan(localTableScan: LocalTableScanExec):
Array[Array[Byte]] = {
- val spark = localTableScan.session
+ val spark = SparkPlanHelper.sparkSession(localTableScan)
localTableScan.longMetric("numOutputRows").add(localTableScan.rows.size)
sendDriverMetrics(spark.sparkContext, localTableScan.metrics)
KyuubiArrowConverters.toBatchIterator(