This is an automated email from the ASF dual-hosted git repository.
chengpan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/kyuubi.git
The following commit(s) were added to refs/heads/master by this push:
new 243143526c [KYUUBI #7198] Remove support for Spark 3.2
243143526c is described below
commit 243143526c47b5f31e9888abee7be057eb19d160
Author: Cheng Pan <[email protected]>
AuthorDate: Fri Sep 12 13:49:57 2025 +0800
[KYUUBI #7198] Remove support for Spark 3.2
### Why are the changes needed?
Discussion in mailing list:
https://lists.apache.org/thread/qrbknzjj3jcwjc9nd64qmtohrwfk1kjp
Spark 3.2 is EOL on 2023/04, Kyuubi deprecated support for Spark 3.2 in
1.10 https://github.com/apache/kyuubi/pull/6545
This PR proposes to fully drop support for Spark 3.2.
### How was this patch tested?
Pass GHA.
### Was this patch authored or co-authored using generative AI tooling?
No.
Closes #7198 from pan3793/remove-spark-3.2.
Closes #7198
7d7e21ca4 [Cheng Pan] address comments
219fb7914 [Cheng Pan] fix
b4c8ae796 [Cheng Pan] fix
6ffd9c2fe [Cheng Pan] address comments
1d0dc2d85 [Cheng Pan] Update
extensions/spark/kyuubi-spark-lineage/src/test/scala/org/apache/kyuubi/plugin/lineage/helper/SparkSQLLineageParserHelperSuite.scala
343b65849 [Cheng Pan] Remove support of Spark 3.2
Authored-by: Cheng Pan <[email protected]>
Signed-off-by: Cheng Pan <[email protected]>
---
.github/workflows/master.yml | 5 --
docs/deployment/migration-guide.md | 1 +
docs/quick_start/quick_start.rst | 2 +-
extensions/spark/kyuubi-spark-authz/README.md | 4 +-
.../src/main/resources/table_command_spec.json | 27 --------
.../plugin/spark/authz/PrivilegesBuilder.scala | 19 +-----
.../plugin/spark/authz/rule/Authorization.scala | 9 +--
.../rowfilter/FilterDataSourceV2Strategy.scala | 9 +--
.../plugin/spark/authz/util/AuthZUtils.scala | 2 -
.../spark/authz/PrivilegesBuilderSuite.scala | 51 ++--------------
.../plugin/spark/authz/SparkSessionProvider.scala | 7 +--
.../spark/authz/V2CommandsPrivilegesSuite.scala | 21 -------
.../plugin/spark/authz/gen/TableCommands.scala | 19 +++---
.../DeltaCatalogRangerSparkExtensionSuite.scala | 22 +------
.../HudiCatalogRangerSparkExtensionSuite.scala | 21 +++----
.../IcebergCatalogRangerSparkExtensionSuite.scala | 2 -
.../PaimonCatalogRangerSparkExtensionSuite.scala | 71 +++++++++++-----------
.../authz/ranger/RangerSparkExtensionSuite.scala | 62 ++++++++-----------
...JdbcTableCatalogRangerSparkExtensionSuite.scala | 4 --
.../spark/connector/tpcds/TPCDSCatalogSuite.scala | 6 +-
.../spark/connector/tpch/TPCHCatalogSuite.scala | 6 +-
extensions/spark/kyuubi-spark-lineage/README.md | 2 +-
.../helper/SparkSQLLineageParseHelper.scala | 20 ++----
.../atlas/AtlasLineageDispatcherSuite.scala | 5 +-
.../events/OperationLineageEventSuite.scala | 5 +-
.../helper/RowLevelCatalogLineageParserSuite.scala | 3 +-
.../helper/SparkSQLLineageParserHelperSuite.scala | 6 +-
.../helper/TableCatalogLineageParserSuite.scala | 4 --
.../kyuubi/engine/spark/SparkSQLEngine.scala | 3 -
.../execution/arrow/KyuubiArrowConverters.scala | 5 +-
.../spark/operation/SparkOperationSuite.scala | 8 +--
.../kyuubi/SparkSQLEngineDeregisterSuite.scala | 23 ++-----
.../kyuubi/operation/IcebergMetadataTests.scala | 9 +--
.../kyuubi/operation/SparkDataTypeTests.scala | 44 ++------------
34 files changed, 123 insertions(+), 384 deletions(-)
diff --git a/.github/workflows/master.yml b/.github/workflows/master.yml
index 4bc51d1a38..04d78d2e19 100644
--- a/.github/workflows/master.yml
+++ b/.github/workflows/master.yml
@@ -60,11 +60,6 @@ jobs:
spark-archive: '-Pscala-2.13'
exclude-tags: ''
comment: 'normal'
- - java: 8
- spark: '3.5'
- spark-archive:
'-Dspark.archive.mirror=https://archive.apache.org/dist/spark/spark-3.2.4
-Dspark.archive.name=spark-3.2.4-bin-hadoop3.2.tgz -Pzookeeper-3.6'
- exclude-tags:
'-Dmaven.plugin.scalatest.exclude.tags=org.scalatest.tags.Slow,org.apache.kyuubi.tags.DeltaTest,org.apache.kyuubi.tags.IcebergTest,org.apache.kyuubi.tags.PaimonTest,org.apache.kyuubi.tags.HudiTest,org.apache.kyuubi.tags.SparkLocalClusterTest'
- comment: 'verify-on-spark-3.2-binary'
- java: 8
spark: '3.5'
spark-archive:
'-Dspark.archive.mirror=https://archive.apache.org/dist/spark/spark-3.3.3
-Dspark.archive.name=spark-3.3.3-bin-hadoop3.tgz -Pzookeeper-3.6'
diff --git a/docs/deployment/migration-guide.md
b/docs/deployment/migration-guide.md
index 03d0093f3d..faa66d40b8 100644
--- a/docs/deployment/migration-guide.md
+++ b/docs/deployment/migration-guide.md
@@ -19,6 +19,7 @@
## Upgrading from Kyuubi 1.10 to 1.11
+* Since Kyuubi 1.11, the support of Spark engine for Spark 3.2 is removed.
* Since Kyuubi 1.11, the support of Flink engine for Flink 1.17 and 1.18 are
deprecated, and will be removed in the future.
* Since Kyuubi 1.11, the configuration
`spark.sql.watchdog.forcedMaxOutputRows` provided by Kyuubi Spark extension is
removed, consider using `kyuubi.operation.result.max.rows` instead. Note, the
latter works without requirement of installing Kyuubi Spark extension.
* Since Kyuubi 1.11, if the engine is running in cluster mode, Kyuubi will
respect the `kyuubi.session.engine.startup.waitCompletion` config to determine
whether to wait for the engine completion or not. If the engine is running in
client mode, Kyuubi will always wait for the engine completion. And for Spark
engine, Kyuubi will append the `spark.yarn.submit.waitAppCompletion` and
`spark.kubernetes.submission.waitAppCompletion` configs to the engine conf
based on the value of `kyuubi.sess [...]
diff --git a/docs/quick_start/quick_start.rst b/docs/quick_start/quick_start.rst
index f638dd9cba..8cb1162d00 100644
--- a/docs/quick_start/quick_start.rst
+++ b/docs/quick_start/quick_start.rst
@@ -43,7 +43,7 @@ pre-installed and the ``JAVA_HOME`` is correctly set to each
component.
**Kyuubi** Gateway \ |release| \ - Kyuubi Server
Engine lib - Kyuubi Engine
Beeline - Kyuubi Beeline
- **Spark** Engine 3.2 to 3.5, 4.0 A Spark distribution
+ **Spark** Engine 3.3 to 3.5, 4.0 A Spark distribution
**Flink** Engine 1.17 to 1.20 A Flink distribution
**Trino** Engine N/A A Trino cluster allows to
access via trino-client v411
**Doris** Engine N/A A Doris cluster
diff --git a/extensions/spark/kyuubi-spark-authz/README.md
b/extensions/spark/kyuubi-spark-authz/README.md
index d866d0df4b..c63da2fcf4 100644
--- a/extensions/spark/kyuubi-spark-authz/README.md
+++ b/extensions/spark/kyuubi-spark-authz/README.md
@@ -37,8 +37,8 @@ build/mvn clean package -DskipTests -pl
:kyuubi-spark-authz_2.12 -am -Dspark.ver
- [x] 3.5.x (default)
- [x] 3.4.x
- [x] 3.3.x
-- [x] 3.2.x
-- [x] 3.1.x
+- [ ] 3.2.x
+- [ ] 3.1.x
- [ ] 3.0.x
- [ ] 2.4.x and earlier
diff --git
a/extensions/spark/kyuubi-spark-authz/src/main/resources/table_command_spec.json
b/extensions/spark/kyuubi-spark-authz/src/main/resources/table_command_spec.json
index 7ce5591525..1e6e10380a 100644
---
a/extensions/spark/kyuubi-spark-authz/src/main/resources/table_command_spec.json
+++
b/extensions/spark/kyuubi-spark-authz/src/main/resources/table_command_spec.json
@@ -840,17 +840,6 @@
"isInput" : true,
"comment" : ""
} ]
-}, {
- "classname" : "org.apache.spark.sql.execution.command.AddFileCommand",
- "tableDescs" : [ ],
- "opType" : "ADD",
- "queryDescs" : [ ],
- "uriDescs" : [ {
- "fieldName" : "path",
- "fieldExtractor" : "StringURIExtractor",
- "isInput" : true,
- "comment" : ""
- } ]
}, {
"classname" : "org.apache.spark.sql.execution.command.AddFilesCommand",
"tableDescs" : [ ],
@@ -969,22 +958,6 @@
"opType" : "ALTERTABLE_DROPPARTS",
"queryDescs" : [ ],
"uriDescs" : [ ]
-}, {
- "classname" :
"org.apache.spark.sql.execution.command.AlterTableRecoverPartitionsCommand",
- "tableDescs" : [ {
- "fieldName" : "tableName",
- "fieldExtractor" : "TableIdentifierTableExtractor",
- "columnDesc" : null,
- "actionTypeDesc" : null,
- "tableTypeDesc" : null,
- "catalogDesc" : null,
- "isInput" : false,
- "setCurrentDatabaseIfMissing" : false,
- "comment" : ""
- } ],
- "opType" : "MSCK",
- "queryDescs" : [ ],
- "uriDescs" : [ ]
}, {
"classname" :
"org.apache.spark.sql.execution.command.AlterTableRenameCommand",
"tableDescs" : [ {
diff --git
a/extensions/spark/kyuubi-spark-authz/src/main/scala/org/apache/kyuubi/plugin/spark/authz/PrivilegesBuilder.scala
b/extensions/spark/kyuubi-spark-authz/src/main/scala/org/apache/kyuubi/plugin/spark/authz/PrivilegesBuilder.scala
index 01266eb2c8..f186940c2b 100644
---
a/extensions/spark/kyuubi-spark-authz/src/main/scala/org/apache/kyuubi/plugin/spark/authz/PrivilegesBuilder.scala
+++
b/extensions/spark/kyuubi-spark-authz/src/main/scala/org/apache/kyuubi/plugin/spark/authz/PrivilegesBuilder.scala
@@ -224,24 +224,7 @@ object PrivilegesBuilder {
}
}
spec.queries(plan).foreach { p =>
- if (p.resolved) {
- buildQuery(Project(p.output, p), inputObjs, spark = spark)
- } else {
- try {
- // For spark 3.1, Some command such as CreateTableASSelect, its
query was unresolved,
- // Before this pr, we just ignore it, now we support this.
- val analyzed = spark.sessionState.analyzer.execute(p)
- buildQuery(Project(analyzed.output, analyzed), inputObjs, spark
= spark)
- } catch {
- case e: Exception =>
- LOG.debug(
- s"""
- |Failed to analyze unresolved
- |$p
- |due to ${e.getMessage}""".stripMargin,
- e)
- }
- }
+ buildQuery(Project(p.output, p), inputObjs, spark = spark)
}
spec.operationType
diff --git
a/extensions/spark/kyuubi-spark-authz/src/main/scala/org/apache/kyuubi/plugin/spark/authz/rule/Authorization.scala
b/extensions/spark/kyuubi-spark-authz/src/main/scala/org/apache/kyuubi/plugin/spark/authz/rule/Authorization.scala
index d682b71d92..dcd534bb0b 100644
---
a/extensions/spark/kyuubi-spark-authz/src/main/scala/org/apache/kyuubi/plugin/spark/authz/rule/Authorization.scala
+++
b/extensions/spark/kyuubi-spark-authz/src/main/scala/org/apache/kyuubi/plugin/spark/authz/rule/Authorization.scala
@@ -18,7 +18,7 @@
package org.apache.kyuubi.plugin.spark.authz.rule
import org.apache.spark.sql.SparkSession
-import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, View}
+import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
import org.apache.spark.sql.catalyst.rules.Rule
import org.apache.spark.sql.catalyst.trees.TreeNodeTag
import org.apache.spark.sql.execution.SQLExecution.EXECUTION_ID_KEY
@@ -52,12 +52,7 @@ object Authorization {
def markAuthChecked(plan: LogicalPlan): LogicalPlan = {
plan.setTagValue(KYUUBI_AUTHZ_TAG, ())
- plan transformDown {
- // TODO: Add this line Support for spark3.1, we can remove this
- // after spark 3.2 since
https://issues.apache.org/jira/browse/SPARK-34269
- case view: View =>
- markAllNodesAuthChecked(view.child)
- }
+ plan
}
protected def isAuthChecked(plan: LogicalPlan): Boolean = {
diff --git
a/extensions/spark/kyuubi-spark-authz/src/main/scala/org/apache/kyuubi/plugin/spark/authz/rule/rowfilter/FilterDataSourceV2Strategy.scala
b/extensions/spark/kyuubi-spark-authz/src/main/scala/org/apache/kyuubi/plugin/spark/authz/rule/rowfilter/FilterDataSourceV2Strategy.scala
index e268ed6bc7..18e2342e36 100644
---
a/extensions/spark/kyuubi-spark-authz/src/main/scala/org/apache/kyuubi/plugin/spark/authz/rule/rowfilter/FilterDataSourceV2Strategy.scala
+++
b/extensions/spark/kyuubi-spark-authz/src/main/scala/org/apache/kyuubi/plugin/spark/authz/rule/rowfilter/FilterDataSourceV2Strategy.scala
@@ -17,18 +17,11 @@
package org.apache.kyuubi.plugin.spark.authz.rule.rowfilter
import org.apache.spark.sql.{SparkSession, Strategy}
-import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, Project}
+import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
import org.apache.spark.sql.execution.SparkPlan
case class FilterDataSourceV2Strategy(spark: SparkSession) extends Strategy {
override def apply(plan: LogicalPlan): Seq[SparkPlan] = plan match {
- // For Spark 3.1 and below, `ColumnPruning` rule will set
`ObjectFilterPlaceHolder#child` to
- // `Project`
- case ObjectFilterPlaceHolder(Project(_, child)) if child.nodeName ==
"ShowNamespaces" =>
- spark.sessionState.planner.plan(child)
- .map(FilteredShowNamespaceExec(_, spark.sparkContext)).toSeq
-
- // For Spark 3.2 and above
case ObjectFilterPlaceHolder(child) if child.nodeName == "ShowNamespaces"
=>
spark.sessionState.planner.plan(child)
.map(FilteredShowNamespaceExec(_, spark.sparkContext)).toSeq
diff --git
a/extensions/spark/kyuubi-spark-authz/src/main/scala/org/apache/kyuubi/plugin/spark/authz/util/AuthZUtils.scala
b/extensions/spark/kyuubi-spark-authz/src/main/scala/org/apache/kyuubi/plugin/spark/authz/util/AuthZUtils.scala
index 1c2ad5f5d3..de567d439f 100644
---
a/extensions/spark/kyuubi-spark-authz/src/main/scala/org/apache/kyuubi/plugin/spark/authz/util/AuthZUtils.scala
+++
b/extensions/spark/kyuubi-spark-authz/src/main/scala/org/apache/kyuubi/plugin/spark/authz/util/AuthZUtils.scala
@@ -84,8 +84,6 @@ private[authz] object AuthZUtils {
}
lazy val SPARK_RUNTIME_VERSION: SemanticVersion =
SemanticVersion(SPARK_VERSION)
- lazy val isSparkV32OrGreater: Boolean = SPARK_RUNTIME_VERSION >= "3.2"
- lazy val isSparkV33OrGreater: Boolean = SPARK_RUNTIME_VERSION >= "3.3"
lazy val isSparkV34OrGreater: Boolean = SPARK_RUNTIME_VERSION >= "3.4"
lazy val isSparkV35OrGreater: Boolean = SPARK_RUNTIME_VERSION >= "3.5"
lazy val isSparkV40OrGreater: Boolean = SPARK_RUNTIME_VERSION >= "4.0"
diff --git
a/extensions/spark/kyuubi-spark-authz/src/test/scala/org/apache/kyuubi/plugin/spark/authz/PrivilegesBuilderSuite.scala
b/extensions/spark/kyuubi-spark-authz/src/test/scala/org/apache/kyuubi/plugin/spark/authz/PrivilegesBuilderSuite.scala
index 63837f2501..64faa2bc15 100644
---
a/extensions/spark/kyuubi-spark-authz/src/test/scala/org/apache/kyuubi/plugin/spark/authz/PrivilegesBuilderSuite.scala
+++
b/extensions/spark/kyuubi-spark-authz/src/test/scala/org/apache/kyuubi/plugin/spark/authz/PrivilegesBuilderSuite.scala
@@ -244,41 +244,6 @@ abstract class PrivilegesBuilderSuite extends AnyFunSuite
assert(accessType === AccessType.ALTER)
}
- test("AlterTableRecoverPartitionsCommand") {
- // AlterTableRecoverPartitionsCommand exists in the version below 3.2
- assume(!isSparkV32OrGreater)
- val tableName = reusedDb + "." + "TableToMsck"
- withTable(tableName) { _ =>
- sql(
- s"""
- |CREATE TABLE $tableName
- |(key int, value string, pid string)
- |USING parquet
- |PARTITIONED BY (pid)""".stripMargin)
- val sqlStr =
- s"""
- |MSCK REPAIR TABLE $tableName
- |""".stripMargin
- val plan = sql(sqlStr).queryExecution.analyzed
- val (inputs, outputs, operationType) = PrivilegesBuilder.build(plan,
spark)
- assert(operationType === MSCK)
- assert(inputs.isEmpty)
-
- assert(outputs.size === 1)
- outputs.foreach { po =>
- assert(po.actionType === PrivilegeObjectActionType.OTHER)
- assert(po.privilegeObjectType === PrivilegeObjectType.TABLE_OR_VIEW)
- assert(po.catalog.isEmpty)
- assertEqualsIgnoreCase(reusedDb)(po.dbname)
- assertEqualsIgnoreCase(tableName.split("\\.").last)(po.objectName)
- assert(po.columns.isEmpty)
- checkTableOwner(po)
- val accessType = ranger.AccessType(po, operationType, isInput = false)
- assert(accessType === AccessType.ALTER)
- }
- }
- }
-
// ALTER TABLE default.StudentInfo PARTITION (age='10') RENAME TO PARTITION
(age='15');
test("AlterTableRenamePartitionCommand") {
sql(s"ALTER TABLE $reusedPartTable ADD IF NOT EXISTS PARTITION (pid=1)")
@@ -367,11 +332,8 @@ abstract class PrivilegesBuilderSuite extends AnyFunSuite
assert(po0.privilegeObjectType === PrivilegeObjectType.TABLE_OR_VIEW)
assertEqualsIgnoreCase(reusedDb)(po0.dbname)
assertEqualsIgnoreCase(reusedPartTableShort)(po0.objectName)
- if (isSparkV32OrGreater) {
- // Query in AlterViewAsCommand can not be resolved before SPARK-34698
- assert(po0.columns === Seq("key", "pid", "value"))
- checkTableOwner(po0)
- }
+ assert(po0.columns === Seq("key", "pid", "value"))
+ checkTableOwner(po0)
val accessType0 = ranger.AccessType(po0, operationType, isInput = true)
assert(accessType0 === AccessType.SELECT)
@@ -482,7 +444,6 @@ abstract class PrivilegesBuilderSuite extends AnyFunSuite
}
test("AnalyzeTablesCommand") {
- assume(isSparkV32OrGreater)
val plan = sql(s"ANALYZE TABLES IN $reusedDb COMPUTE STATISTICS")
.queryExecution.analyzed
val (in, out, operationType) = PrivilegesBuilder.build(plan, spark)
@@ -626,7 +587,7 @@ abstract class PrivilegesBuilderSuite extends AnyFunSuite
assert(po.actionType === PrivilegeObjectActionType.OTHER)
assert(po.privilegeObjectType === PrivilegeObjectType.FUNCTION)
assert(po.catalog.isEmpty)
- val db = if (isSparkV33OrGreater) defaultDb else null
+ val db = defaultDb
assertEqualsIgnoreCase(db)(po.dbname)
assertEqualsIgnoreCase("CreateFunctionCommand")(po.objectName)
assert(po.columns.isEmpty)
@@ -658,7 +619,7 @@ abstract class PrivilegesBuilderSuite extends AnyFunSuite
assert(po.actionType === PrivilegeObjectActionType.OTHER)
assert(po.privilegeObjectType === PrivilegeObjectType.FUNCTION)
assert(po.catalog.isEmpty)
- val db = if (isSparkV33OrGreater) defaultDb else null
+ val db = defaultDb
assertEqualsIgnoreCase(db)(po.dbname)
assertEqualsIgnoreCase("DropFunctionCommand")(po.objectName)
assert(po.columns.isEmpty)
@@ -678,7 +639,7 @@ abstract class PrivilegesBuilderSuite extends AnyFunSuite
assert(po.actionType === PrivilegeObjectActionType.OTHER)
assert(po.privilegeObjectType === PrivilegeObjectType.FUNCTION)
assert(po.catalog.isEmpty)
- val db = if (isSparkV33OrGreater) defaultDb else null
+ val db = defaultDb
assertEqualsIgnoreCase(db)(po.dbname)
assertEqualsIgnoreCase("RefreshFunctionCommand")(po.objectName)
assert(po.columns.isEmpty)
@@ -927,8 +888,6 @@ abstract class PrivilegesBuilderSuite extends AnyFunSuite
}
test("RepairTableCommand") {
- // only spark 3.2 or greater has RepairTableCommand
- assume(isSparkV32OrGreater)
val tableName = reusedDb + "." + "TableToRepair"
withTable(tableName) { _ =>
sql(
diff --git
a/extensions/spark/kyuubi-spark-authz/src/test/scala/org/apache/kyuubi/plugin/spark/authz/SparkSessionProvider.scala
b/extensions/spark/kyuubi-spark-authz/src/test/scala/org/apache/kyuubi/plugin/spark/authz/SparkSessionProvider.scala
index 1fed6bc8b8..f3ecc90f12 100644
---
a/extensions/spark/kyuubi-spark-authz/src/test/scala/org/apache/kyuubi/plugin/spark/authz/SparkSessionProvider.scala
+++
b/extensions/spark/kyuubi-spark-authz/src/test/scala/org/apache/kyuubi/plugin/spark/authz/SparkSessionProvider.scala
@@ -29,7 +29,6 @@ import org.apache.kyuubi.Utils
import org.apache.kyuubi.plugin.spark.authz.RangerTestUsers._
import
org.apache.kyuubi.plugin.spark.authz.V2JdbcTableCatalogPrivilegesBuilderSuite._
import
org.apache.kyuubi.plugin.spark.authz.ranger.DeltaCatalogRangerSparkExtensionSuite._
-import org.apache.kyuubi.plugin.spark.authz.util.AuthZUtils._
trait SparkSessionProvider {
protected val catalogImpl: String
@@ -100,7 +99,7 @@ trait SparkSessionProvider {
case (t, "table") => doAs(
admin, {
val purgeOption =
- if (isSparkV32OrGreater && isCatalogSupportPurge(
+ if (isCatalogSupportPurge(
spark.sessionState.catalogManager.currentCatalog.name())) {
"PURGE"
} else ""
@@ -109,9 +108,7 @@ trait SparkSessionProvider {
case (db, "database") => doAs(admin, sql(s"DROP DATABASE IF EXISTS
$db"))
case (fn, "function") => doAs(admin, sql(s"DROP FUNCTION IF EXISTS
$fn"))
case (view, "view") => doAs(admin, sql(s"DROP VIEW IF EXISTS $view"))
- case (cacheTable, "cache") => if (isSparkV32OrGreater) {
- doAs(admin, sql(s"UNCACHE TABLE IF EXISTS $cacheTable"))
- }
+ case (cacheTable, "cache") => doAs(admin, sql(s"UNCACHE TABLE IF
EXISTS $cacheTable"))
case (_, e) =>
throw new RuntimeException(s"the resource whose resource type is $e
cannot be cleared")
}
diff --git
a/extensions/spark/kyuubi-spark-authz/src/test/scala/org/apache/kyuubi/plugin/spark/authz/V2CommandsPrivilegesSuite.scala
b/extensions/spark/kyuubi-spark-authz/src/test/scala/org/apache/kyuubi/plugin/spark/authz/V2CommandsPrivilegesSuite.scala
index 1b6e07b77d..40cfc38774 100644
---
a/extensions/spark/kyuubi-spark-authz/src/test/scala/org/apache/kyuubi/plugin/spark/authz/V2CommandsPrivilegesSuite.scala
+++
b/extensions/spark/kyuubi-spark-authz/src/test/scala/org/apache/kyuubi/plugin/spark/authz/V2CommandsPrivilegesSuite.scala
@@ -228,7 +228,6 @@ abstract class V2CommandsPrivilegesSuite extends
PrivilegesBuilderSuite {
}
test("UpdateTable") {
- assume(isSparkV32OrGreater)
assume(supportsUpdateTable)
val plan = executePlan(s"UPDATE $catalogTable SET value = 'a' WHERE key =
0").analyzed
@@ -315,7 +314,6 @@ abstract class V2CommandsPrivilegesSuite extends
PrivilegesBuilderSuite {
test("AddPartitions") {
assume(supportsPartitionManagement)
- assume(isSparkV32OrGreater)
val plan = executePlan(s"ALTER TABLE $catalogPartTable " +
s"ADD PARTITION (dt='2022-01-01')").analyzed
@@ -337,7 +335,6 @@ abstract class V2CommandsPrivilegesSuite extends
PrivilegesBuilderSuite {
test("DropPartitions") {
assume(supportsPartitionManagement)
- assume(isSparkV32OrGreater)
val plan = executePlan(s"ALTER TABLE $catalogPartTable " +
s"DROP PARTITION (dt='2022-01-01')").analyzed
@@ -359,7 +356,6 @@ abstract class V2CommandsPrivilegesSuite extends
PrivilegesBuilderSuite {
test("RenamePartitions") {
assume(supportsPartitionManagement)
- assume(isSparkV32OrGreater)
val plan = executePlan(s"ALTER TABLE $catalogPartTable " +
s"PARTITION (dt='2022-01-01') RENAME TO PARTITION
(dt='2022-01-02')").analyzed
@@ -381,7 +377,6 @@ abstract class V2CommandsPrivilegesSuite extends
PrivilegesBuilderSuite {
test("TruncatePartition") {
assume(supportsPartitionManagement)
- assume(isSparkV32OrGreater)
val plan = executePlan(s"ALTER TABLE $catalogPartTable " +
s"PARTITION (dt='2022-01-01') RENAME TO PARTITION
(dt='2022-01-02')").analyzed
@@ -485,7 +480,6 @@ abstract class V2CommandsPrivilegesSuite extends
PrivilegesBuilderSuite {
test("RepairTable") {
assume(supportsPartitionGrammar)
- assume(isSparkV32OrGreater)
val plan = executePlan(s"MSCK REPAIR TABLE $catalogPartTable").analyzed
@@ -506,7 +500,6 @@ abstract class V2CommandsPrivilegesSuite extends
PrivilegesBuilderSuite {
}
test("TruncateTable") {
- assume(isSparkV32OrGreater)
val plan = executePlan(s"TRUNCATE TABLE $catalogTable").analyzed
@@ -547,7 +540,6 @@ abstract class V2CommandsPrivilegesSuite extends
PrivilegesBuilderSuite {
// with V2AlterTableCommand
test("AddColumns") {
- assume(isSparkV32OrGreater)
val table = "AddColumns"
withV2Table(table) { tableId =>
@@ -572,8 +564,6 @@ abstract class V2CommandsPrivilegesSuite extends
PrivilegesBuilderSuite {
}
test("AlterColumn") {
- assume(isSparkV32OrGreater)
-
val table = "AlterColumn"
withV2Table(table) { tableId =>
sql(s"CREATE TABLE $tableId (i int)")
@@ -597,8 +587,6 @@ abstract class V2CommandsPrivilegesSuite extends
PrivilegesBuilderSuite {
}
test("DropColumns") {
- assume(isSparkV32OrGreater)
-
val table = "DropColumns"
withV2Table(table) { tableId =>
sql(s"CREATE TABLE $tableId (i int, j int)")
@@ -622,8 +610,6 @@ abstract class V2CommandsPrivilegesSuite extends
PrivilegesBuilderSuite {
}
test("ReplaceColumns") {
- assume(isSparkV32OrGreater)
-
val table = "ReplaceColumns"
withV2Table(table) { tableId =>
sql(s"CREATE TABLE $tableId (i int, j int)")
@@ -647,8 +633,6 @@ abstract class V2CommandsPrivilegesSuite extends
PrivilegesBuilderSuite {
}
test("RenameColumn") {
- assume(isSparkV32OrGreater)
-
val table = "RenameColumn"
withV2Table(table) { tableId =>
sql(s"CREATE TABLE $tableId (i int, j int)")
@@ -687,7 +671,6 @@ abstract class V2CommandsPrivilegesSuite extends
PrivilegesBuilderSuite {
}
test("SetNamespaceProperties") {
- assume(isSparkV33OrGreater)
val plan = sql("ALTER DATABASE default SET DBPROPERTIES (abc =
'123')").queryExecution.analyzed
val (in, out, operationType) = PrivilegesBuilder.build(plan, spark)
assertResult(plan.getClass.getName)(
@@ -705,7 +688,6 @@ abstract class V2CommandsPrivilegesSuite extends
PrivilegesBuilderSuite {
}
test("CreateNamespace") {
- assume(isSparkV33OrGreater)
withDatabase("CreateNamespace") { db =>
val plan = sql(s"CREATE DATABASE $db").queryExecution.analyzed
val (in, out, operationType) = PrivilegesBuilder.build(plan, spark)
@@ -727,7 +709,6 @@ abstract class V2CommandsPrivilegesSuite extends
PrivilegesBuilderSuite {
}
test("SetNamespaceLocation") {
- assume(isSparkV33OrGreater)
// hive does not support altering database location
assume(catalogImpl !== "hive")
val newLoc = spark.conf.get("spark.sql.warehouse.dir") + "/new_db_location"
@@ -760,7 +741,6 @@ abstract class V2CommandsPrivilegesSuite extends
PrivilegesBuilderSuite {
}
test("DescribeNamespace") {
- assume(isSparkV33OrGreater)
val plan = sql(s"DESC DATABASE $reusedDb").queryExecution.analyzed
val (in, out, operationType) = PrivilegesBuilder.build(plan, spark)
assertResult(plan.getClass.getName)(
@@ -781,7 +761,6 @@ abstract class V2CommandsPrivilegesSuite extends
PrivilegesBuilderSuite {
}
test("DropNamespace") {
- assume(isSparkV33OrGreater)
withDatabase("DropNameSpace") { db =>
sql(s"CREATE DATABASE $db")
val plan = sql(s"DROP DATABASE DropNameSpace").queryExecution.analyzed
diff --git
a/extensions/spark/kyuubi-spark-authz/src/test/scala/org/apache/kyuubi/plugin/spark/authz/gen/TableCommands.scala
b/extensions/spark/kyuubi-spark-authz/src/test/scala/org/apache/kyuubi/plugin/spark/authz/gen/TableCommands.scala
index 3c08ad4ab6..77d8057c02 100644
---
a/extensions/spark/kyuubi-spark-authz/src/test/scala/org/apache/kyuubi/plugin/spark/authz/gen/TableCommands.scala
+++
b/extensions/spark/kyuubi-spark-authz/src/test/scala/org/apache/kyuubi/plugin/spark/authz/gen/TableCommands.scala
@@ -132,13 +132,12 @@ object TableCommands extends
CommandSpecs[TableCommandSpec] {
TableCommandSpec(cmd, Seq(oldTableD), ALTERTABLE_RENAME)
}
- // this is for spark 3.1 or below
- val AlterTableRecoverPartitions = {
- val cmd =
"org.apache.spark.sql.execution.command.AlterTableRecoverPartitionsCommand"
+ val RepairTable = {
+ val cmd = "org.apache.spark.sql.execution.command.RepairTableCommand"
TableCommandSpec(cmd, Seq(tableNameDesc), MSCK)
}
- val RepairTable = {
+ val RepairTableV2 = {
val cmd = "org.apache.spark.sql.catalyst.plans.logical.RepairTable"
TableCommandSpec(cmd, Seq(resolvedTableDesc), MSCK)
}
@@ -667,9 +666,8 @@ object TableCommands extends CommandSpecs[TableCommandSpec]
{
TableCommandSpec(cmd, Nil, ADD, uriDescs = Seq(uriDesc))
}
- // For spark-3.1
- val AddFileCommand = {
- val cmd = "org.apache.spark.sql.execution.command.AddFileCommand"
+ val AddJarCommand = {
+ val cmd = "org.apache.spark.sql.execution.command.AddJarCommand"
val uriDesc = UriDesc("path", classOf[StringURIExtractor], isInput = true)
TableCommandSpec(cmd, Nil, ADD, uriDescs = Seq(uriDesc))
}
@@ -678,8 +676,7 @@ object TableCommands extends CommandSpecs[TableCommandSpec]
{
AddArchivesCommand,
AddArchivesCommand.copy(classname =
"org.apache.spark.sql.execution.command.AddFilesCommand"),
AddArchivesCommand.copy(classname =
"org.apache.spark.sql.execution.command.AddJarsCommand"),
- AddFileCommand,
- AddFileCommand.copy(classname =
"org.apache.spark.sql.execution.command.AddJarCommand"),
+ AddJarCommand,
AddPartitions,
DropPartitions,
RenamePartitions,
@@ -696,9 +693,6 @@ object TableCommands extends CommandSpecs[TableCommandSpec]
{
AlterTableChangeColumn,
AlterTableDropPartition,
AlterTableRename,
- AlterTableRecoverPartitions,
- AlterTableRecoverPartitions.copy(classname =
- "org.apache.spark.sql.execution.command.RepairTableCommand"),
AlterTableRenamePartition,
AlterTableSerDeProperties,
AlterTableSetLocation,
@@ -749,6 +743,7 @@ object TableCommands extends CommandSpecs[TableCommandSpec]
{
OverwriteByExpression,
OverwritePartitionsDynamic,
RepairTable,
+ RepairTableV2,
RefreshTable,
RefreshTableV2,
RefreshTable3d0,
diff --git
a/extensions/spark/kyuubi-spark-authz/src/test/scala/org/apache/kyuubi/plugin/spark/authz/ranger/DeltaCatalogRangerSparkExtensionSuite.scala
b/extensions/spark/kyuubi-spark-authz/src/test/scala/org/apache/kyuubi/plugin/spark/authz/ranger/DeltaCatalogRangerSparkExtensionSuite.scala
index dbf88d7d02..595f243a0f 100644
---
a/extensions/spark/kyuubi-spark-authz/src/test/scala/org/apache/kyuubi/plugin/spark/authz/ranger/DeltaCatalogRangerSparkExtensionSuite.scala
+++
b/extensions/spark/kyuubi-spark-authz/src/test/scala/org/apache/kyuubi/plugin/spark/authz/ranger/DeltaCatalogRangerSparkExtensionSuite.scala
@@ -25,7 +25,7 @@ import
org.apache.kyuubi.plugin.spark.authz.AccessControlException
import org.apache.kyuubi.plugin.spark.authz.RangerTestNamespace._
import org.apache.kyuubi.plugin.spark.authz.RangerTestUsers._
import
org.apache.kyuubi.plugin.spark.authz.ranger.DeltaCatalogRangerSparkExtensionSuite._
-import
org.apache.kyuubi.plugin.spark.authz.util.AuthZUtils.{isSparkV32OrGreater,
isSparkV35OrGreater}
+import org.apache.kyuubi.plugin.spark.authz.util.AuthZUtils.isSparkV35OrGreater
import org.apache.kyuubi.tags.DeltaTest
import org.apache.kyuubi.util.AssertionUtils._
@@ -295,8 +295,6 @@ class DeltaCatalogRangerSparkExtensionSuite extends
RangerSparkExtensionSuite {
}
test("optimize table") {
- assume(isSparkV32OrGreater, "optimize table is available in Delta Lake
1.2.0 and above")
-
withCleanTmpResources(Seq((s"$namespace1.$table1", "table"),
(s"$namespace1", "database"))) {
doAs(admin, sql(s"CREATE DATABASE IF NOT EXISTS $namespace1"))
doAs(admin, sql(createTableSql(namespace1, table1)))
@@ -446,8 +444,6 @@ class DeltaCatalogRangerSparkExtensionSuite extends
RangerSparkExtensionSuite {
}
test("optimize path-based table") {
- assume(isSparkV32OrGreater, "optimize table is available in Delta Lake
1.2.0 and above")
-
withTempDir(path => {
doAs(admin, sql(createPathBasedTableSql(path)))
val optimizeTableSql1 = s"OPTIMIZE delta.`$path`"
@@ -517,10 +513,6 @@ class DeltaCatalogRangerSparkExtensionSuite extends
RangerSparkExtensionSuite {
}
test("alter path-based table drop column") {
- assume(
- isSparkV32OrGreater,
- "alter table drop column is available in Delta Lake 1.2.0 and above")
-
withTempDir(path => {
doAs(admin, sql(createPathBasedTableSql(path,
Map("delta.columnMapping.mode" -> "name"))))
val dropColumnSql = s"ALTER TABLE delta.`$path` DROP COLUMN birthDate"
@@ -532,10 +524,6 @@ class DeltaCatalogRangerSparkExtensionSuite extends
RangerSparkExtensionSuite {
}
test("alter path-based table rename column") {
- assume(
- isSparkV32OrGreater,
- "alter table rename column is available in Delta Lake 1.2.0 and above")
-
withTempDir(path => {
doAs(admin, sql(createPathBasedTableSql(path,
Map("delta.columnMapping.mode" -> "name"))))
val renameColumnSql = s"ALTER TABLE delta.`$path`" +
@@ -548,11 +536,7 @@ class DeltaCatalogRangerSparkExtensionSuite extends
RangerSparkExtensionSuite {
}
test("alter path-based table replace columns") {
- withTempDir(path => {
- assume(
- isSparkV32OrGreater,
- "alter table replace columns is not available in Delta Lake 1.0.1")
-
+ withTempDir { path =>
doAs(admin, sql(createPathBasedTableSql(path,
Map("delta.columnMapping.mode" -> "name"))))
val replaceColumnsSql = s"ALTER TABLE delta.`$path`" +
s" REPLACE COLUMNS (id INT, name STRING, gender STRING)"
@@ -567,7 +551,7 @@ class DeltaCatalogRangerSparkExtensionSuite extends
RangerSparkExtensionSuite {
if (isSparkV35OrGreater) {
doAs(admin, sql(replaceColumnsSql))
}
- })
+ }
}
}
diff --git
a/extensions/spark/kyuubi-spark-authz/src/test/scala/org/apache/kyuubi/plugin/spark/authz/ranger/HudiCatalogRangerSparkExtensionSuite.scala
b/extensions/spark/kyuubi-spark-authz/src/test/scala/org/apache/kyuubi/plugin/spark/authz/ranger/HudiCatalogRangerSparkExtensionSuite.scala
index 8852aec1db..52ec46e5e3 100644
---
a/extensions/spark/kyuubi-spark-authz/src/test/scala/org/apache/kyuubi/plugin/spark/authz/ranger/HudiCatalogRangerSparkExtensionSuite.scala
+++
b/extensions/spark/kyuubi-spark-authz/src/test/scala/org/apache/kyuubi/plugin/spark/authz/ranger/HudiCatalogRangerSparkExtensionSuite.scala
@@ -22,7 +22,6 @@ import org.apache.kyuubi.Utils
import org.apache.kyuubi.plugin.spark.authz.AccessControlException
import org.apache.kyuubi.plugin.spark.authz.RangerTestNamespace._
import org.apache.kyuubi.plugin.spark.authz.RangerTestUsers._
-import org.apache.kyuubi.plugin.spark.authz.util.AuthZUtils._
import org.apache.kyuubi.tags.HudiTest
import org.apache.kyuubi.util.AssertionUtils.interceptEndsWith
@@ -48,15 +47,13 @@ class HudiCatalogRangerSparkExtensionSuite extends
RangerSparkExtensionSuite {
val index1 = "table_hoodie_index1"
override def beforeAll(): Unit = {
- if (isSparkV32OrGreater) {
- spark.conf.set(
- s"spark.sql.catalog.$sparkCatalog",
- "org.apache.spark.sql.hudi.catalog.HoodieCatalog")
- spark.conf.set(s"spark.sql.catalog.$sparkCatalog.type", "hadoop")
- spark.conf.set(
- s"spark.sql.catalog.$sparkCatalog.warehouse",
- Utils.createTempDir("hudi-hadoop").toString)
- }
+ spark.conf.set(
+ s"spark.sql.catalog.$sparkCatalog",
+ "org.apache.spark.sql.hudi.catalog.HoodieCatalog")
+ spark.conf.set(s"spark.sql.catalog.$sparkCatalog.type", "hadoop")
+ spark.conf.set(
+ s"spark.sql.catalog.$sparkCatalog.warehouse",
+ Utils.createTempDir("hudi-hadoop").toString)
super.beforeAll()
}
@@ -549,9 +546,7 @@ class HudiCatalogRangerSparkExtensionSuite extends
RangerSparkExtensionSuite {
}
test("IndexBasedCommand") {
- assume(
- !isSparkV33OrGreater,
- "Hudi index creation not supported on Spark 3.3 or greater currently")
+ assume(false, "Hudi index creation not supported on Spark 3.3 or greater
currently")
withCleanTmpResources(Seq((s"$namespace1.$table1", "table"), (namespace1,
"database"))) {
doAs(admin, sql(s"CREATE DATABASE IF NOT EXISTS $namespace1"))
doAs(
diff --git
a/extensions/spark/kyuubi-spark-authz/src/test/scala/org/apache/kyuubi/plugin/spark/authz/ranger/IcebergCatalogRangerSparkExtensionSuite.scala
b/extensions/spark/kyuubi-spark-authz/src/test/scala/org/apache/kyuubi/plugin/spark/authz/ranger/IcebergCatalogRangerSparkExtensionSuite.scala
index 16a8beb22c..b72e82c170 100644
---
a/extensions/spark/kyuubi-spark-authz/src/test/scala/org/apache/kyuubi/plugin/spark/authz/ranger/IcebergCatalogRangerSparkExtensionSuite.scala
+++
b/extensions/spark/kyuubi-spark-authz/src/test/scala/org/apache/kyuubi/plugin/spark/authz/ranger/IcebergCatalogRangerSparkExtensionSuite.scala
@@ -207,8 +207,6 @@ class IcebergCatalogRangerSparkExtensionSuite extends
RangerSparkExtensionSuite
}
test("KYUUBI #4047 MergeIntoIcebergTable with row filter") {
- assume(isSparkV32OrGreater)
-
val outputTable2 = "outputTable2"
withCleanTmpResources(Seq(
(s"$catalogV2.default.src", "table"),
diff --git
a/extensions/spark/kyuubi-spark-authz/src/test/scala/org/apache/kyuubi/plugin/spark/authz/ranger/PaimonCatalogRangerSparkExtensionSuite.scala
b/extensions/spark/kyuubi-spark-authz/src/test/scala/org/apache/kyuubi/plugin/spark/authz/ranger/PaimonCatalogRangerSparkExtensionSuite.scala
index efaf28df8e..44249e58a4 100644
---
a/extensions/spark/kyuubi-spark-authz/src/test/scala/org/apache/kyuubi/plugin/spark/authz/ranger/PaimonCatalogRangerSparkExtensionSuite.scala
+++
b/extensions/spark/kyuubi-spark-authz/src/test/scala/org/apache/kyuubi/plugin/spark/authz/ranger/PaimonCatalogRangerSparkExtensionSuite.scala
@@ -420,46 +420,43 @@ class PaimonCatalogRangerSparkExtensionSuite extends
RangerSparkExtensionSuite {
}
test("Batch Time Travel") {
- // Batch Time Travel requires Spark 3.3+
- if (isSparkV33OrGreater) {
- withCleanTmpResources(Seq(
- (s"$catalogV2.$namespace1.$table1", "table"))) {
- val createTable = createTableSql(namespace1, table1)
- doAs(admin, sql(createTable))
- val insertSql =
- s"""
- |INSERT INTO $catalogV2.$namespace1.$table1 VALUES
- |(1, "a"), (2, "b");
- |""".stripMargin
- doAs(admin, sql(insertSql))
+ withCleanTmpResources(Seq(
+ (s"$catalogV2.$namespace1.$table1", "table"))) {
+ val createTable = createTableSql(namespace1, table1)
+ doAs(admin, sql(createTable))
+ val insertSql =
+ s"""
+ |INSERT INTO $catalogV2.$namespace1.$table1 VALUES
+ |(1, "a"), (2, "b");
+ |""".stripMargin
+ doAs(admin, sql(insertSql))
- val querySnapshotVersionSql =
- s"""
- |SELECT id from $catalogV2.$namespace1.$table1 VERSION AS OF 1
- |""".stripMargin
- doAs(table1OnlyUserForNs, sql(querySnapshotVersionSql).collect())
- interceptEndsWith[AccessControlException] {
- doAs(someone, sql(querySnapshotVersionSql).collect())
- }(s"does not have [select] privilege on [$namespace1/$table1/id]")
- doAs(admin, sql(querySnapshotVersionSql).collect())
+ val querySnapshotVersionSql =
+ s"""
+ |SELECT id from $catalogV2.$namespace1.$table1 VERSION AS OF 1
+ |""".stripMargin
+ doAs(table1OnlyUserForNs, sql(querySnapshotVersionSql).collect())
+ interceptEndsWith[AccessControlException] {
+ doAs(someone, sql(querySnapshotVersionSql).collect())
+ }(s"does not have [select] privilege on [$namespace1/$table1/id]")
+ doAs(admin, sql(querySnapshotVersionSql).collect())
- val batchTimeTravelTimestamp =
- doAs(
- admin,
- sql(s"SELECT commit_time FROM
$catalogV2.$namespace1.`$table1$$snapshots`" +
- s" ORDER BY commit_time ASC LIMIT
1").collect()(0).getTimestamp(0))
+ val batchTimeTravelTimestamp =
+ doAs(
+ admin,
+ sql(s"SELECT commit_time FROM
$catalogV2.$namespace1.`$table1$$snapshots`" +
+ s" ORDER BY commit_time ASC LIMIT 1").collect()(0).getTimestamp(0))
- val queryWithTimestamp =
- s"""
- |SELECT id FROM $catalogV2.$namespace1.$table1
- |TIMESTAMP AS OF '$batchTimeTravelTimestamp'
- |""".stripMargin
- doAs(table1OnlyUserForNs, sql(queryWithTimestamp).collect())
- interceptEndsWith[AccessControlException] {
- doAs(someone, sql(queryWithTimestamp).collect())
- }(s"does not have [select] privilege on [$namespace1/$table1/id]")
- doAs(admin, sql(queryWithTimestamp).collect())
- }
+ val queryWithTimestamp =
+ s"""
+ |SELECT id FROM $catalogV2.$namespace1.$table1
+ |TIMESTAMP AS OF '$batchTimeTravelTimestamp'
+ |""".stripMargin
+ doAs(table1OnlyUserForNs, sql(queryWithTimestamp).collect())
+ interceptEndsWith[AccessControlException] {
+ doAs(someone, sql(queryWithTimestamp).collect())
+ }(s"does not have [select] privilege on [$namespace1/$table1/id]")
+ doAs(admin, sql(queryWithTimestamp).collect())
}
}
diff --git
a/extensions/spark/kyuubi-spark-authz/src/test/scala/org/apache/kyuubi/plugin/spark/authz/ranger/RangerSparkExtensionSuite.scala
b/extensions/spark/kyuubi-spark-authz/src/test/scala/org/apache/kyuubi/plugin/spark/authz/ranger/RangerSparkExtensionSuite.scala
index 1fdea0ed96..342bdf0e03 100644
---
a/extensions/spark/kyuubi-spark-authz/src/test/scala/org/apache/kyuubi/plugin/spark/authz/ranger/RangerSparkExtensionSuite.scala
+++
b/extensions/spark/kyuubi-spark-authz/src/test/scala/org/apache/kyuubi/plugin/spark/authz/ranger/RangerSparkExtensionSuite.scala
@@ -680,37 +680,34 @@ class HiveCatalogRangerSparkExtensionSuite extends
RangerSparkExtensionSuite {
}
test("[KYUUBI #3411] skip checking cache table") {
- if (isSparkV32OrGreater) { // cache table sql supported since 3.2.0
-
- val db1 = defaultDb
- val srcTable1 = "hive_src1"
- val cacheTable1 = "cacheTable1"
- val cacheTable2 = "cacheTable2"
- val cacheTable3 = "cacheTable3"
- val cacheTable4 = "cacheTable4"
-
- withCleanTmpResources(Seq(
- (s"$db1.$srcTable1", "table"),
- (s"$db1.$cacheTable1", "cache"),
- (s"$db1.$cacheTable2", "cache"),
- (s"$db1.$cacheTable3", "cache"),
- (s"$db1.$cacheTable4", "cache"))) {
+ val db1 = defaultDb
+ val srcTable1 = "hive_src1"
+ val cacheTable1 = "cacheTable1"
+ val cacheTable2 = "cacheTable2"
+ val cacheTable3 = "cacheTable3"
+ val cacheTable4 = "cacheTable4"
- doAs(
- admin,
- sql(s"CREATE TABLE IF NOT EXISTS $db1.$srcTable1" +
- s" (id int, name string, city string)"))
-
- withSingleCallEnabled {
- val e1 = intercept[AccessControlException](
- doAs(someone, sql(s"CACHE TABLE $cacheTable2 select * from
$db1.$srcTable1")))
- assert(
- e1.getMessage.contains(s"does not have [select] privilege on " +
-
s"[$db1/$srcTable1/city,$db1/$srcTable1/id,$db1/$srcTable1/name]"))
- }
- doAs(admin, sql(s"CACHE TABLE $cacheTable3 SELECT 1 AS a, 2 AS b "))
- doAs(someone, sql(s"CACHE TABLE $cacheTable4 select 1 as a, 2 as b "))
+ withCleanTmpResources(Seq(
+ (s"$db1.$srcTable1", "table"),
+ (s"$db1.$cacheTable1", "cache"),
+ (s"$db1.$cacheTable2", "cache"),
+ (s"$db1.$cacheTable3", "cache"),
+ (s"$db1.$cacheTable4", "cache"))) {
+
+ doAs(
+ admin,
+ sql(s"CREATE TABLE IF NOT EXISTS $db1.$srcTable1" +
+ s" (id int, name string, city string)"))
+
+ withSingleCallEnabled {
+ val e1 = intercept[AccessControlException](
+ doAs(someone, sql(s"CACHE TABLE $cacheTable2 select * from
$db1.$srcTable1")))
+ assert(
+ e1.getMessage.contains(s"does not have [select] privilege on " +
+ s"[$db1/$srcTable1/city,$db1/$srcTable1/id,$db1/$srcTable1/name]"))
}
+ doAs(admin, sql(s"CACHE TABLE $cacheTable3 SELECT 1 AS a, 2 AS b "))
+ doAs(someone, sql(s"CACHE TABLE $cacheTable4 select 1 as a, 2 as b "))
}
}
@@ -995,7 +992,6 @@ class HiveCatalogRangerSparkExtensionSuite extends
RangerSparkExtensionSuite {
}
test("[KYUUBI #5503][AUTHZ] Check plan auth checked should not set tag to
all child nodes") {
- assume(isSparkV32OrGreater, "Spark 3.1 not support lateral subquery.")
val db1 = defaultDb
val table1 = "table1"
val table2 = "table2"
@@ -1187,11 +1183,7 @@ class HiveCatalogRangerSparkExtensionSuite extends
RangerSparkExtensionSuite {
test("Add resource command") {
withTempDir { path =>
withSingleCallEnabled {
- val supportedCommand = if (isSparkV32OrGreater) {
- Seq("JAR", "FILE", "ARCHIVE")
- } else {
- Seq("JAR", "FILE")
- }
+ val supportedCommand = Seq("JAR", "FILE", "ARCHIVE")
supportedCommand.foreach { cmd =>
interceptEndsWith[AccessControlException](
doAs(someone, sql(s"ADD $cmd $path")))(
diff --git
a/extensions/spark/kyuubi-spark-authz/src/test/scala/org/apache/kyuubi/plugin/spark/authz/ranger/V2JdbcTableCatalogRangerSparkExtensionSuite.scala
b/extensions/spark/kyuubi-spark-authz/src/test/scala/org/apache/kyuubi/plugin/spark/authz/ranger/V2JdbcTableCatalogRangerSparkExtensionSuite.scala
index 49d51e5d24..f7e886c30c 100644
---
a/extensions/spark/kyuubi-spark-authz/src/test/scala/org/apache/kyuubi/plugin/spark/authz/ranger/V2JdbcTableCatalogRangerSparkExtensionSuite.scala
+++
b/extensions/spark/kyuubi-spark-authz/src/test/scala/org/apache/kyuubi/plugin/spark/authz/ranger/V2JdbcTableCatalogRangerSparkExtensionSuite.scala
@@ -238,8 +238,6 @@ class V2JdbcTableCatalogRangerSparkExtensionSuite extends
RangerSparkExtensionSu
}
test("[KYUUBI #3424] TRUNCATE TABLE") {
- assume(isSparkV32OrGreater)
-
val e1 = intercept[AccessControlException](
doAs(
someone,
@@ -249,8 +247,6 @@ class V2JdbcTableCatalogRangerSparkExtensionSuite extends
RangerSparkExtensionSu
}
test("[KYUUBI #3424] MSCK REPAIR TABLE") {
- assume(isSparkV32OrGreater)
-
val e1 = intercept[AccessControlException](
doAs(
someone,
diff --git
a/extensions/spark/kyuubi-spark-connector-tpcds/src/test/scala/org/apache/kyuubi/spark/connector/tpcds/TPCDSCatalogSuite.scala
b/extensions/spark/kyuubi-spark-connector-tpcds/src/test/scala/org/apache/kyuubi/spark/connector/tpcds/TPCDSCatalogSuite.scala
index 0eed970a4c..844ce1b9ee 100644
---
a/extensions/spark/kyuubi-spark-connector-tpcds/src/test/scala/org/apache/kyuubi/spark/connector/tpcds/TPCDSCatalogSuite.scala
+++
b/extensions/spark/kyuubi-spark-connector-tpcds/src/test/scala/org/apache/kyuubi/spark/connector/tpcds/TPCDSCatalogSuite.scala
@@ -25,7 +25,6 @@ import org.apache.spark.sql.util.CaseInsensitiveStringMap
import org.apache.kyuubi.KyuubiFunSuite
import
org.apache.kyuubi.spark.connector.common.LocalSparkSession.withSparkSession
-import
org.apache.kyuubi.spark.connector.common.SparkUtils.SPARK_RUNTIME_VERSION
class TPCDSCatalogSuite extends KyuubiFunSuite {
@@ -91,10 +90,7 @@ class TPCDSCatalogSuite extends KyuubiFunSuite {
def assertStats(tableName: String, sizeInBytes: BigInt, rowCount:
BigInt): Unit = {
val stats = spark.table(tableName).queryExecution.analyzed.stats
assert(stats.sizeInBytes == sizeInBytes)
- // stats.rowCount only has value after SPARK-33954
- if (SPARK_RUNTIME_VERSION >= "3.2") {
- assert(stats.rowCount.contains(rowCount), tableName)
- }
+ assert(stats.rowCount.contains(rowCount), tableName)
}
assertStats("tpcds.sf1.call_center", 1830, 6)
diff --git
a/extensions/spark/kyuubi-spark-connector-tpch/src/test/scala/org/apache/kyuubi/spark/connector/tpch/TPCHCatalogSuite.scala
b/extensions/spark/kyuubi-spark-connector-tpch/src/test/scala/org/apache/kyuubi/spark/connector/tpch/TPCHCatalogSuite.scala
index 14415141e6..117bc9dd13 100644
---
a/extensions/spark/kyuubi-spark-connector-tpch/src/test/scala/org/apache/kyuubi/spark/connector/tpch/TPCHCatalogSuite.scala
+++
b/extensions/spark/kyuubi-spark-connector-tpch/src/test/scala/org/apache/kyuubi/spark/connector/tpch/TPCHCatalogSuite.scala
@@ -23,7 +23,6 @@ import org.apache.spark.sql.util.CaseInsensitiveStringMap
import org.apache.kyuubi.KyuubiFunSuite
import
org.apache.kyuubi.spark.connector.common.LocalSparkSession.withSparkSession
-import
org.apache.kyuubi.spark.connector.common.SparkUtils.SPARK_RUNTIME_VERSION
class TPCHCatalogSuite extends KyuubiFunSuite {
@@ -129,10 +128,7 @@ class TPCHCatalogSuite extends KyuubiFunSuite {
def assertStats(tableName: String, sizeInBytes: BigInt, rowCount:
BigInt): Unit = {
val stats = spark.table(tableName).queryExecution.analyzed.stats
assert(stats.sizeInBytes == sizeInBytes)
- // stats.rowCount only has value after SPARK-33954
- if (SPARK_RUNTIME_VERSION >= "3.2") {
- assert(stats.rowCount.contains(rowCount), tableName)
- }
+ assert(stats.rowCount.contains(rowCount), tableName)
}
assertStats("tpch.sf1.customer", 26850000, 150000)
assertStats("tpch.sf1.orders", 156000000, 1500000)
diff --git a/extensions/spark/kyuubi-spark-lineage/README.md
b/extensions/spark/kyuubi-spark-lineage/README.md
index 6b3eeb902b..75c995d573 100644
--- a/extensions/spark/kyuubi-spark-lineage/README.md
+++ b/extensions/spark/kyuubi-spark-lineage/README.md
@@ -26,7 +26,7 @@
## Build
```shell
-build/mvn clean package -DskipTests -pl :kyuubi-spark-lineage_2.12 -am
-Dspark.version=3.5.1
+build/mvn clean package -DskipTests -pl :kyuubi-spark-lineage_2.12 -am
-Dspark.version=3.5.6
```
### Supported Apache Spark Versions
diff --git
a/extensions/spark/kyuubi-spark-lineage/src/main/scala/org/apache/kyuubi/plugin/lineage/helper/SparkSQLLineageParseHelper.scala
b/extensions/spark/kyuubi-spark-lineage/src/main/scala/org/apache/kyuubi/plugin/lineage/helper/SparkSQLLineageParseHelper.scala
index 27d74aa173..dc6673d4cf 100644
---
a/extensions/spark/kyuubi-spark-lineage/src/main/scala/org/apache/kyuubi/plugin/lineage/helper/SparkSQLLineageParseHelper.scala
+++
b/extensions/spark/kyuubi-spark-lineage/src/main/scala/org/apache/kyuubi/plugin/lineage/helper/SparkSQLLineageParseHelper.scala
@@ -31,7 +31,7 @@ import org.apache.spark.sql.catalyst.expressions.{Alias,
Attribute, AttributeSet
import org.apache.spark.sql.catalyst.expressions.aggregate.Count
import org.apache.spark.sql.catalyst.plans.{LeftAnti, LeftSemi}
import org.apache.spark.sql.catalyst.plans.logical._
-import org.apache.spark.sql.connector.catalog.{CatalogPlugin, Identifier,
TableCatalog}
+import org.apache.spark.sql.connector.catalog.{CatalogPlugin, Identifier}
import org.apache.spark.sql.execution.SparkPlan
import org.apache.spark.sql.execution.columnar.InMemoryRelation
import org.apache.spark.sql.execution.datasources.LogicalRelation
@@ -265,20 +265,10 @@ trait LineageParser {
case p
if p.nodeName == "CreateTableAsSelect" ||
p.nodeName == "ReplaceTableAsSelect" =>
- val (table, namespace, catalog) =
- if (SPARK_RUNTIME_VERSION <= "3.2") {
- (
- getField[Identifier](plan, "tableName").name,
- getField[Identifier](plan, "tableName").namespace.mkString("."),
- getField[TableCatalog](plan, "catalog").name())
- } else {
- (
- invokeAs[Identifier](plan, "tableName").name(),
- invokeAs[Identifier](plan,
"tableName").namespace().mkString("."),
- getField[CatalogPlugin](
- invokeAs[LogicalPlan](plan, "name"),
- "catalog").name())
- }
+ val (table, namespace, catalog) = (
+ invokeAs[Identifier](plan, "tableName").name(),
+ invokeAs[Identifier](plan, "tableName").namespace().mkString("."),
+ getField[CatalogPlugin](invokeAs[LogicalPlan](plan, "name"),
"catalog").name())
extractColumnsLineage(
getQuery(plan),
parentColumnsLineage,
diff --git
a/extensions/spark/kyuubi-spark-lineage/src/test/scala/org/apache/kyuubi/plugin/lineage/dispatcher/atlas/AtlasLineageDispatcherSuite.scala
b/extensions/spark/kyuubi-spark-lineage/src/test/scala/org/apache/kyuubi/plugin/lineage/dispatcher/atlas/AtlasLineageDispatcherSuite.scala
index 8e8d18f216..03ee36e41f 100644
---
a/extensions/spark/kyuubi-spark-lineage/src/test/scala/org/apache/kyuubi/plugin/lineage/dispatcher/atlas/AtlasLineageDispatcherSuite.scala
+++
b/extensions/spark/kyuubi-spark-lineage/src/test/scala/org/apache/kyuubi/plugin/lineage/dispatcher/atlas/AtlasLineageDispatcherSuite.scala
@@ -33,12 +33,9 @@ import org.scalatest.time.SpanSugar._
import org.apache.kyuubi.KyuubiFunSuite
import org.apache.kyuubi.plugin.lineage.Lineage
import
org.apache.kyuubi.plugin.lineage.dispatcher.atlas.AtlasEntityHelper.{buildColumnQualifiedName,
buildTableQualifiedName, COLUMN_LINEAGE_TYPE, PROCESS_TYPE}
-import
org.apache.kyuubi.plugin.lineage.helper.SparkListenerHelper.SPARK_RUNTIME_VERSION
class AtlasLineageDispatcherSuite extends KyuubiFunSuite with
SparkListenerExtensionTest {
- val catalogName =
- if (SPARK_RUNTIME_VERSION <= "3.1")
"org.apache.spark.sql.connector.InMemoryTableCatalog"
- else "org.apache.spark.sql.connector.catalog.InMemoryTableCatalog"
+ val catalogName =
"org.apache.spark.sql.connector.catalog.InMemoryTableCatalog"
override protected val catalogImpl: String = "hive"
diff --git
a/extensions/spark/kyuubi-spark-lineage/src/test/scala/org/apache/kyuubi/plugin/lineage/events/OperationLineageEventSuite.scala
b/extensions/spark/kyuubi-spark-lineage/src/test/scala/org/apache/kyuubi/plugin/lineage/events/OperationLineageEventSuite.scala
index 378eb3bb46..95c3caa63f 100644
---
a/extensions/spark/kyuubi-spark-lineage/src/test/scala/org/apache/kyuubi/plugin/lineage/events/OperationLineageEventSuite.scala
+++
b/extensions/spark/kyuubi-spark-lineage/src/test/scala/org/apache/kyuubi/plugin/lineage/events/OperationLineageEventSuite.scala
@@ -28,13 +28,10 @@ import org.apache.kyuubi.KyuubiFunSuite
import org.apache.kyuubi.events.EventBus
import org.apache.kyuubi.plugin.lineage.Lineage
import
org.apache.kyuubi.plugin.lineage.dispatcher.{OperationLineageKyuubiEvent,
OperationLineageSparkEvent}
-import
org.apache.kyuubi.plugin.lineage.helper.SparkListenerHelper.SPARK_RUNTIME_VERSION
class OperationLineageEventSuite extends KyuubiFunSuite with
SparkListenerExtensionTest {
- val catalogName =
- if (SPARK_RUNTIME_VERSION <= "3.1")
"org.apache.spark.sql.connector.InMemoryTableCatalog"
- else "org.apache.spark.sql.connector.catalog.InMemoryTableCatalog"
+ val catalogName =
"org.apache.spark.sql.connector.catalog.InMemoryTableCatalog"
override protected val catalogImpl: String = "hive"
diff --git
a/extensions/spark/kyuubi-spark-lineage/src/test/scala/org/apache/kyuubi/plugin/lineage/helper/RowLevelCatalogLineageParserSuite.scala
b/extensions/spark/kyuubi-spark-lineage/src/test/scala/org/apache/kyuubi/plugin/lineage/helper/RowLevelCatalogLineageParserSuite.scala
index 8af5b0f179..966fb70888 100644
---
a/extensions/spark/kyuubi-spark-lineage/src/test/scala/org/apache/kyuubi/plugin/lineage/helper/RowLevelCatalogLineageParserSuite.scala
+++
b/extensions/spark/kyuubi-spark-lineage/src/test/scala/org/apache/kyuubi/plugin/lineage/helper/RowLevelCatalogLineageParserSuite.scala
@@ -22,9 +22,8 @@ import
org.apache.kyuubi.plugin.lineage.helper.SparkListenerHelper.SPARK_RUNTIME
class RowLevelCatalogLineageParserSuite extends
SparkSQLLineageParserHelperSuite {
- override def catalogName: String = {
+ override def catalogName: String =
"org.apache.spark.sql.connector.catalog.InMemoryRowLevelOperationTableCatalog"
- }
test("columns lineage extract - WriteDelta") {
assume(
diff --git
a/extensions/spark/kyuubi-spark-lineage/src/test/scala/org/apache/kyuubi/plugin/lineage/helper/SparkSQLLineageParserHelperSuite.scala
b/extensions/spark/kyuubi-spark-lineage/src/test/scala/org/apache/kyuubi/plugin/lineage/helper/SparkSQLLineageParserHelperSuite.scala
index 380b3eee4f..90da4650b1 100644
---
a/extensions/spark/kyuubi-spark-lineage/src/test/scala/org/apache/kyuubi/plugin/lineage/helper/SparkSQLLineageParserHelperSuite.scala
+++
b/extensions/spark/kyuubi-spark-lineage/src/test/scala/org/apache/kyuubi/plugin/lineage/helper/SparkSQLLineageParserHelperSuite.scala
@@ -29,15 +29,11 @@ import org.apache.spark.sql.types.{IntegerType, StringType,
StructType}
import org.apache.kyuubi.KyuubiFunSuite
import org.apache.kyuubi.plugin.lineage.Lineage
-import
org.apache.kyuubi.plugin.lineage.helper.SparkListenerHelper.SPARK_RUNTIME_VERSION
abstract class SparkSQLLineageParserHelperSuite extends KyuubiFunSuite
with SparkListenerExtensionTest {
- def catalogName: String = {
- if (SPARK_RUNTIME_VERSION <= "3.3")
"org.apache.spark.sql.connector.InMemoryTableCatalog"
- else "org.apache.spark.sql.connector.catalog.InMemoryTableCatalog"
- }
+ def catalogName: String =
"org.apache.spark.sql.connector.catalog.InMemoryTableCatalog"
val DEFAULT_CATALOG = LineageConf.DEFAULT_CATALOG
override protected val catalogImpl: String = "hive"
diff --git
a/extensions/spark/kyuubi-spark-lineage/src/test/scala/org/apache/kyuubi/plugin/lineage/helper/TableCatalogLineageParserSuite.scala
b/extensions/spark/kyuubi-spark-lineage/src/test/scala/org/apache/kyuubi/plugin/lineage/helper/TableCatalogLineageParserSuite.scala
index c9724c3fec..7a58af2077 100644
---
a/extensions/spark/kyuubi-spark-lineage/src/test/scala/org/apache/kyuubi/plugin/lineage/helper/TableCatalogLineageParserSuite.scala
+++
b/extensions/spark/kyuubi-spark-lineage/src/test/scala/org/apache/kyuubi/plugin/lineage/helper/TableCatalogLineageParserSuite.scala
@@ -21,10 +21,6 @@ import org.apache.kyuubi.plugin.lineage.Lineage
class TableCatalogLineageParserSuite extends SparkSQLLineageParserHelperSuite {
- override def catalogName: String = {
- "org.apache.spark.sql.connector.catalog.InMemoryTableCatalog"
- }
-
test("columns lineage extract - MergeIntoTable") {
val ddls =
"""
diff --git
a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/SparkSQLEngine.scala
b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/SparkSQLEngine.scala
index 02d2a7afb5..9151fe0a26 100644
---
a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/SparkSQLEngine.scala
+++
b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/SparkSQLEngine.scala
@@ -383,9 +383,6 @@ object SparkSQLEngine extends Logging {
}
def main(args: Array[String]): Unit = {
- if (KyuubiSparkUtil.SPARK_ENGINE_RUNTIME_VERSION === "3.2") {
- warn("The support for Spark 3.2 is deprecated, and will be removed in
the next version.")
- }
val startedTime = System.currentTimeMillis()
val submitTime = kyuubiConf.getOption(KYUUBI_ENGINE_SUBMIT_TIME_KEY) match
{
case Some(t) => t.toLong
diff --git
a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/spark/sql/execution/arrow/KyuubiArrowConverters.scala
b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/spark/sql/execution/arrow/KyuubiArrowConverters.scala
index e2fb55134c..e13653b01c 100644
---
a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/spark/sql/execution/arrow/KyuubiArrowConverters.scala
+++
b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/spark/sql/execution/arrow/KyuubiArrowConverters.scala
@@ -302,7 +302,7 @@ object KyuubiArrowConverters extends SQLConfHelper with
Logging {
MessageSerializer.serialize(writeChannel, batch)
// Always write the Ipc options at the end.
- ArrowStreamWriter.writeEndOfStream(writeChannel,
ARROW_IPC_OPTION_DEFAULT)
+ ArrowStreamWriter.writeEndOfStream(writeChannel, IpcOption.DEFAULT)
batch.close()
} {
@@ -343,7 +343,4 @@ object KyuubiArrowConverters extends SQLConfHelper with
Logging {
errorOnDuplicatedFieldNames,
largeVarTypes)
}
-
- // IpcOption.DEFAULT was introduced in ARROW-11081(ARROW-4.0.0), add this
for adapt Spark 3.2
- final private val ARROW_IPC_OPTION_DEFAULT = new IpcOption()
}
diff --git
a/externals/kyuubi-spark-sql-engine/src/test/scala/org/apache/kyuubi/engine/spark/operation/SparkOperationSuite.scala
b/externals/kyuubi-spark-sql-engine/src/test/scala/org/apache/kyuubi/engine/spark/operation/SparkOperationSuite.scala
index a5c911ff37..75ad51885a 100644
---
a/externals/kyuubi-spark-sql-engine/src/test/scala/org/apache/kyuubi/engine/spark/operation/SparkOperationSuite.scala
+++
b/externals/kyuubi-spark-sql-engine/src/test/scala/org/apache/kyuubi/engine/spark/operation/SparkOperationSuite.scala
@@ -91,12 +91,8 @@ class SparkOperationSuite extends WithSparkSQLEngine with
HiveMetadataTests with
.add("c15", "struct<X: bigint,Y: double>", nullable = true, "15")
.add("c16", "binary", nullable = false, "16")
.add("c17", "struct<X: string>", nullable = true, "17")
-
- // since spark3.3.0
- if (SPARK_ENGINE_RUNTIME_VERSION >= "3.3") {
- schema = schema.add("c18", "interval day", nullable = true, "18")
- .add("c19", "interval year", nullable = true, "19")
- }
+ .add("c18", "interval day", nullable = true, "18")
+ .add("c19", "interval year", nullable = true, "19")
// since spark3.4.0
if (SPARK_ENGINE_RUNTIME_VERSION >= "3.4") {
schema = schema.add("c20", "timestamp_ntz", nullable = true, "20")
diff --git
a/externals/kyuubi-spark-sql-engine/src/test/scala/org/apache/spark/kyuubi/SparkSQLEngineDeregisterSuite.scala
b/externals/kyuubi-spark-sql-engine/src/test/scala/org/apache/spark/kyuubi/SparkSQLEngineDeregisterSuite.scala
index ddb612ad4d..c52436d982 100644
---
a/externals/kyuubi-spark-sql-engine/src/test/scala/org/apache/spark/kyuubi/SparkSQLEngineDeregisterSuite.scala
+++
b/externals/kyuubi-spark-sql-engine/src/test/scala/org/apache/spark/kyuubi/SparkSQLEngineDeregisterSuite.scala
@@ -25,7 +25,6 @@ import org.scalatest.time.SpanSugar.convertIntToGrainOfTime
import org.apache.kyuubi.config.KyuubiConf._
import org.apache.kyuubi.engine.spark.{WithDiscoverySparkSQLEngine,
WithEmbeddedZookeeper}
-import
org.apache.kyuubi.engine.spark.KyuubiSparkUtil.SPARK_ENGINE_RUNTIME_VERSION
import org.apache.kyuubi.service.ServiceState
abstract class SparkSQLEngineDeregisterSuite
@@ -61,15 +60,9 @@ abstract class SparkSQLEngineDeregisterSuite
class SparkSQLEngineDeregisterExceptionSuite extends
SparkSQLEngineDeregisterSuite {
override def withKyuubiConf: Map[String, String] = {
- super.withKyuubiConf ++ Map(ENGINE_DEREGISTER_EXCEPTION_CLASSES.key -> {
- if (SPARK_ENGINE_RUNTIME_VERSION >= "3.3") {
- // see https://issues.apache.org/jira/browse/SPARK-35958
- "org.apache.spark.SparkArithmeticException"
- } else {
- classOf[ArithmeticException].getCanonicalName
- }
- })
-
+ super.withKyuubiConf ++
+ Map(ENGINE_DEREGISTER_EXCEPTION_CLASSES.key ->
+ classOf[SparkArithmeticException].getCanonicalName)
}
}
@@ -95,14 +88,8 @@ class SparkSQLEngineDeregisterExceptionTTLSuite
super.withKyuubiConf ++
zookeeperConf ++ Map(
ANSI_ENABLED.key -> "true",
- ENGINE_DEREGISTER_EXCEPTION_CLASSES.key -> {
- if (SPARK_ENGINE_RUNTIME_VERSION >= "3.3") {
- // see https://issues.apache.org/jira/browse/SPARK-35958
- "org.apache.spark.SparkArithmeticException"
- } else {
- classOf[ArithmeticException].getCanonicalName
- }
- },
+ ENGINE_DEREGISTER_EXCEPTION_CLASSES.key ->
+ classOf[SparkArithmeticException].getCanonicalName,
ENGINE_DEREGISTER_JOB_MAX_FAILURES.key -> maxJobFailures.toString,
ENGINE_DEREGISTER_EXCEPTION_TTL.key -> deregisterExceptionTTL.toString)
}
diff --git
a/kyuubi-common/src/test/scala/org/apache/kyuubi/operation/IcebergMetadataTests.scala
b/kyuubi-common/src/test/scala/org/apache/kyuubi/operation/IcebergMetadataTests.scala
index 814c08343d..787b229882 100644
---
a/kyuubi-common/src/test/scala/org/apache/kyuubi/operation/IcebergMetadataTests.scala
+++
b/kyuubi-common/src/test/scala/org/apache/kyuubi/operation/IcebergMetadataTests.scala
@@ -19,7 +19,7 @@ package org.apache.kyuubi.operation
import scala.collection.mutable.ListBuffer
-import org.apache.kyuubi.{IcebergSuiteMixin, SPARK_COMPILE_VERSION}
+import org.apache.kyuubi.IcebergSuiteMixin
import org.apache.kyuubi.operation.meta.ResultSetSchemaConstant._
import org.apache.kyuubi.util.AssertionUtils._
import org.apache.kyuubi.util.SparkVersionUtil
@@ -156,12 +156,9 @@ trait IcebergMetadataTests extends HiveJDBCTestHelper with
IcebergSuiteMixin wit
"map<int, bigint>",
"date",
"timestamp",
- // SPARK-37931
- if (SPARK_ENGINE_RUNTIME_VERSION >= "3.3") "struct<X: bigint, Y: double>"
- else "struct<`X`: bigint, `Y`: double>",
+ "struct<X: bigint, Y: double>",
"binary",
- // SPARK-37931
- if (SPARK_COMPILE_VERSION >= "3.3") "struct<X: string>" else
"struct<`X`: string>")
+ "struct<X: string>")
val cols = dataTypes.zipWithIndex.map { case (dt, idx) => s"c$idx" -> dt }
val (colNames, _) = cols.unzip
diff --git
a/kyuubi-common/src/test/scala/org/apache/kyuubi/operation/SparkDataTypeTests.scala
b/kyuubi-common/src/test/scala/org/apache/kyuubi/operation/SparkDataTypeTests.scala
index 49f6b85d89..963b8e8953 100644
---
a/kyuubi-common/src/test/scala/org/apache/kyuubi/operation/SparkDataTypeTests.scala
+++
b/kyuubi-common/src/test/scala/org/apache/kyuubi/operation/SparkDataTypeTests.scala
@@ -27,9 +27,6 @@ trait SparkDataTypeTests extends HiveJDBCTestHelper with
SparkVersionUtil {
def resultFormat: String = "thrift"
test("execute statement - select null") {
- assume(
- resultFormat == "thrift" ||
- (resultFormat == "arrow" && SPARK_ENGINE_RUNTIME_VERSION >= "3.2"))
withJdbcStatement() { statement =>
val resultSet = statement.executeQuery("SELECT NULL AS col")
assert(resultSet.next())
@@ -252,9 +249,6 @@ trait SparkDataTypeTests extends HiveJDBCTestHelper with
SparkVersionUtil {
}
test("execute statement - select daytime interval") {
- assume(
- resultFormat == "thrift" ||
- (resultFormat == "arrow" && SPARK_ENGINE_RUNTIME_VERSION >= "3.3"))
withJdbcStatement() { statement =>
Map(
"interval 1 day 1 hour -60 minutes 30 seconds" ->
@@ -283,25 +277,15 @@ trait SparkDataTypeTests extends HiveJDBCTestHelper with
SparkVersionUtil {
assert(resultSet.next())
val result = resultSet.getString("col")
val metaData = resultSet.getMetaData
- if (SPARK_ENGINE_RUNTIME_VERSION <= "3.1") {
- // for spark 3.1 and backwards
- assert(result === kv._2._2)
- assert(metaData.getPrecision(1) === Int.MaxValue)
- assert(resultSet.getMetaData.getColumnType(1) ===
java.sql.Types.VARCHAR)
- } else {
- assert(result === kv._2._1)
- assert(metaData.getPrecision(1) === 29)
- assert(resultSet.getMetaData.getColumnType(1) ===
java.sql.Types.OTHER)
- }
+ assert(result === kv._2._1)
+ assert(metaData.getPrecision(1) === 29)
+ assert(resultSet.getMetaData.getColumnType(1) ===
java.sql.Types.OTHER)
assert(metaData.getScale(1) === 0)
}
}
}
test("execute statement - select year/month interval") {
- assume(
- resultFormat == "thrift" ||
- (resultFormat == "arrow" && SPARK_ENGINE_RUNTIME_VERSION >= "3.3"))
withJdbcStatement() { statement =>
Map(
"INTERVAL 2022 YEAR" -> Tuple2("2022-0", "2022 years"),
@@ -314,25 +298,15 @@ trait SparkDataTypeTests extends HiveJDBCTestHelper with
SparkVersionUtil {
assert(resultSet.next())
val result = resultSet.getString("col")
val metaData = resultSet.getMetaData
- if (SPARK_ENGINE_RUNTIME_VERSION <= "3.1") {
- // for spark 3.1 and backwards
- assert(result === kv._2._2)
- assert(metaData.getPrecision(1) === Int.MaxValue)
- assert(resultSet.getMetaData.getColumnType(1) ===
java.sql.Types.VARCHAR)
- } else {
- assert(result === kv._2._1)
- assert(metaData.getPrecision(1) === 11)
- assert(resultSet.getMetaData.getColumnType(1) ===
java.sql.Types.OTHER)
- }
+ assert(result === kv._2._1)
+ assert(metaData.getPrecision(1) === 11)
+ assert(resultSet.getMetaData.getColumnType(1) === java.sql.Types.OTHER)
assert(metaData.getScale(1) === 0)
}
}
}
test("execute statement - select array") {
- assume(
- resultFormat == "thrift" ||
- (resultFormat == "arrow" && SPARK_ENGINE_RUNTIME_VERSION >= "3.2"))
withJdbcStatement() { statement =>
val resultSet = statement.executeQuery(
"SELECT array() AS col1, array(1) AS col2, array(null) AS col3")
@@ -350,9 +324,6 @@ trait SparkDataTypeTests extends HiveJDBCTestHelper with
SparkVersionUtil {
}
test("execute statement - select map") {
- assume(
- resultFormat == "thrift" ||
- (resultFormat == "arrow" && SPARK_ENGINE_RUNTIME_VERSION >= "3.2"))
withJdbcStatement() { statement =>
val resultSet = statement.executeQuery(
"SELECT map() AS col1, map(1, 2, 3, 4) AS col2, map(1, null) AS col3")
@@ -370,9 +341,6 @@ trait SparkDataTypeTests extends HiveJDBCTestHelper with
SparkVersionUtil {
}
test("execute statement - select struct") {
- assume(
- resultFormat == "thrift" ||
- (resultFormat == "arrow" && SPARK_ENGINE_RUNTIME_VERSION >= "3.2"))
withJdbcStatement() { statement =>
val resultSet = statement.executeQuery(
"SELECT struct('1', '2') AS col1," +