This is an automated email from the ASF dual-hosted git repository. yangjie01 pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push: new e2af8aba8b44 [SPARK-45627][BUILD] Fix `symbol literal is deprecated` e2af8aba8b44 is described below commit e2af8aba8b4437ed999e04f3d9cfedef294b4e66 Author: panbingkun <pbk1...@gmail.com> AuthorDate: Mon Oct 23 22:48:41 2023 +0800 [SPARK-45627][BUILD] Fix `symbol literal is deprecated` ### What changes were proposed in this pull request? The pr aims to fix `symbol literal is deprecated` and make `symbol literalĀ· not allowed in the code. ### Why are the changes needed? Prepare for future upgrades to scala 3. ### Does this PR introduce _any_ user-facing change? No. ### How was this patch tested? - Pass GA - Manually test: ``` build/sbt -Phadoop-3 -Pdocker-integration-tests -Pspark-ganglia-lgpl -Pkinesis-asl -Pkubernetes -Phive-thriftserver -Pconnect -Pyarn -Phive -Phadoop-cloud -Pvolcano -Pkubernetes-integration-tests Test/package streaming-kinesis-asl-assembly/assembly connect/assembly ``` ### Was this patch authored or co-authored using generative AI tooling? No. Closes #43487 from panbingkun/SPARK-45627. Authored-by: panbingkun <pbk1...@gmail.com> Signed-off-by: yangjie01 <yangji...@baidu.com> --- .../apache/spark/sql/PlanGenerationTestSuite.scala | 6 +- .../apache/spark/sql/SQLImplicitsTestSuite.scala | 2 +- .../sql/UserDefinedFunctionE2ETestSuite.scala | 2 +- .../sql/protobuf/ProtobufFunctionsSuite.scala | 54 ++-- .../deploy/history/FsHistoryProviderSuite.scala | 2 +- pom.xml | 4 + project/SparkBuild.scala | 4 +- .../optimizer/InferWindowGroupLimitSuite.scala | 16 +- .../optimizer/MergeScalarSubqueriesSuite.scala | 281 +++++++++++---------- .../optimizer/NestedColumnAliasingSuite.scala | 20 +- .../sql/catalyst/optimizer/OptimizerSuite.scala | 8 +- .../PushProjectionThroughLimitSuite.scala | 24 +- .../catalyst/optimizer/TransposeWindowSuite.scala | 11 +- .../sql/catalyst/parser/CastingSyntaxSuite.scala | 10 +- .../sql/catalyst/plans/LogicalPlanSuite.scala | 12 +- .../spark/sql/DataFrameWindowFunctionsSuite.scala | 8 +- .../apache/spark/sql/GeneratorFunctionSuite.scala | 25 +- .../scala/org/apache/spark/sql/SQLQuerySuite.scala | 2 +- .../org/apache/spark/sql/execution/SortSuite.scala | 2 +- .../streaming/MultiStatefulOperatorsSuite.scala | 4 +- 20 files changed, 269 insertions(+), 228 deletions(-) diff --git a/connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/PlanGenerationTestSuite.scala b/connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/PlanGenerationTestSuite.scala index aaa510a0676b..e4c5b851b130 100644 --- a/connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/PlanGenerationTestSuite.scala +++ b/connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/PlanGenerationTestSuite.scala @@ -485,7 +485,7 @@ class PlanGenerationTestSuite } test("as symbol") { - simple.as('bar) + simple.as(Symbol("bar")) } test("alias string") { simple.alias("fooz") @@ -3027,7 +3027,7 @@ class PlanGenerationTestSuite test("function lit") { simple.select( fn.lit(fn.col("id")), - fn.lit('id), + fn.lit(Symbol("id")), fn.lit(true), fn.lit(68.toByte), fn.lit(9872.toShort), @@ -3094,7 +3094,7 @@ class PlanGenerationTestSuite test("function typedLit") { simple.select( fn.typedLit(fn.col("id")), - fn.typedLit('id), + fn.typedLit(Symbol("id")), fn.typedLit(1), fn.typedLit[String](null), fn.typedLit(true), diff --git a/connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/SQLImplicitsTestSuite.scala b/connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/SQLImplicitsTestSuite.scala index 2e258a356fcb..53743feb03bb 100644 --- a/connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/SQLImplicitsTestSuite.scala +++ b/connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/SQLImplicitsTestSuite.scala @@ -48,7 +48,7 @@ class SQLImplicitsTestSuite extends ConnectFunSuite with BeforeAndAfterAll { import spark.implicits._ def assertEqual(left: Column, right: Column): Unit = assert(left == right) assertEqual($"x", Column("x")) - assertEqual('y, Column("y")) + assertEqual(Symbol("y"), Column("y")) } test("test implicit encoder resolution") { diff --git a/connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/UserDefinedFunctionE2ETestSuite.scala b/connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/UserDefinedFunctionE2ETestSuite.scala index bd0aabc2342f..baf65e7bb330 100644 --- a/connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/UserDefinedFunctionE2ETestSuite.scala +++ b/connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/UserDefinedFunctionE2ETestSuite.scala @@ -111,7 +111,7 @@ class UserDefinedFunctionE2ETestSuite extends QueryTest { val result2 = Seq((1, "a b c"), (2, "a b"), (3, "a")) .toDF("number", "letters") - .explode('letters) { case Row(letters: String) => + .explode(Symbol("letters")) { case Row(letters: String) => letters.split(' ').map(Tuple1.apply).toSeq } .as[(Int, String, String)] diff --git a/connector/protobuf/src/test/scala/org/apache/spark/sql/protobuf/ProtobufFunctionsSuite.scala b/connector/protobuf/src/test/scala/org/apache/spark/sql/protobuf/ProtobufFunctionsSuite.scala index 33dea4df181f..5362ac9795b9 100644 --- a/connector/protobuf/src/test/scala/org/apache/spark/sql/protobuf/ProtobufFunctionsSuite.scala +++ b/connector/protobuf/src/test/scala/org/apache/spark/sql/protobuf/ProtobufFunctionsSuite.scala @@ -571,7 +571,7 @@ class ProtobufFunctionsSuite extends QueryTest with SharedSparkSession with Prot assert(actualMessage.getField(messageDescriptor.findFieldByName("col_3")) == 0) val fromProtoDf = toProtobuf.select( - functions.from_protobuf($"to_proto", "requiredMsg", testFileDesc) as 'from_proto) + functions.from_protobuf($"to_proto", "requiredMsg", testFileDesc) as Symbol("from_proto")) assert(fromProtoDf.select("from_proto.key").take(1).toSeq(0).get(0) == inputDf.select("requiredMsg.key").take(1).toSeq(0).get(0)) @@ -601,12 +601,15 @@ class ProtobufFunctionsSuite extends QueryTest with SharedSparkSession with Prot val df = Seq(basicMessage.toByteArray).toDF("value") val resultFrom = df - .select(from_protobuf_wrapper($"value", "BasicMessage", Some(testFileDesc)) as 'sample) + .select(from_protobuf_wrapper($"value", "BasicMessage", + Some(testFileDesc)) as Symbol("sample")) .where("sample.string_value == \"slam\"") val resultToFrom = resultFrom - .select(to_protobuf_wrapper($"sample", "BasicMessage", Some(testFileDesc)) as 'value) - .select(from_protobuf_wrapper($"value", "BasicMessage", Some(testFileDesc)) as 'sample) + .select(to_protobuf_wrapper($"sample", "BasicMessage", + Some(testFileDesc)) as Symbol("value")) + .select(from_protobuf_wrapper($"value", "BasicMessage", + Some(testFileDesc)) as Symbol("sample")) .where("sample.string_value == \"slam\"") assert(resultFrom.except(resultToFrom).isEmpty) @@ -633,10 +636,12 @@ class ProtobufFunctionsSuite extends QueryTest with SharedSparkSession with Prot checkWithFileAndClassName("timeStampMsg") { case (name, descFilePathOpt) => val toProtoDf = inputDf - .select(to_protobuf_wrapper($"timeStampMsg", name, descFilePathOpt) as 'to_proto) + .select(to_protobuf_wrapper($"timeStampMsg", name, + descFilePathOpt) as Symbol("to_proto")) val fromProtoDf = toProtoDf - .select(from_protobuf_wrapper($"to_proto", name, descFilePathOpt) as 'timeStampMsg) + .select(from_protobuf_wrapper($"to_proto", name, + descFilePathOpt) as Symbol("timeStampMsg")) val actualFields = fromProtoDf.schema.fields.toList val expectedFields = inputDf.schema.fields.toList @@ -674,10 +679,12 @@ class ProtobufFunctionsSuite extends QueryTest with SharedSparkSession with Prot checkWithFileAndClassName("durationMsg") { case (name, descFilePathOpt) => val toProtoDf = inputDf - .select(to_protobuf_wrapper($"durationMsg", name, descFilePathOpt) as 'to_proto) + .select(to_protobuf_wrapper($"durationMsg", name, + descFilePathOpt) as Symbol("to_proto")) val fromProtoDf = toProtoDf - .select(from_protobuf_wrapper($"to_proto", name, descFilePathOpt) as 'durationMsg) + .select(from_protobuf_wrapper($"to_proto", name, + descFilePathOpt) as Symbol("durationMsg")) val actualFields = fromProtoDf.schema.fields.toList val expectedFields = inputDf.schema.fields.toList @@ -696,7 +703,8 @@ class ProtobufFunctionsSuite extends QueryTest with SharedSparkSession with Prot val descWithoutImports = descriptorSetWithoutImports(testFileDesc, "BasicMessage") val e = intercept[AnalysisException] { - df.select(functions.from_protobuf($"value", "BasicMessage", descWithoutImports) as 'sample) + df.select(functions.from_protobuf($"value", "BasicMessage", + descWithoutImports) as Symbol("sample")) .where("sample.string_value == \"slam\"").show() } checkError( @@ -719,11 +727,11 @@ class ProtobufFunctionsSuite extends QueryTest with SharedSparkSession with Prot checkWithFileAndClassName("OneOfEvent") { case (name, descFilePathOpt) => val fromProtoDf = df.select( - from_protobuf_wrapper($"value", name, descFilePathOpt) as 'sample) + from_protobuf_wrapper($"value", name, descFilePathOpt) as Symbol("sample")) val toDf = fromProtoDf.select( - to_protobuf_wrapper($"sample", name, descFilePathOpt) as 'toProto) + to_protobuf_wrapper($"sample", name, descFilePathOpt) as Symbol("toProto")) val toFromDf = toDf.select( - from_protobuf_wrapper($"toProto", name, descFilePathOpt) as 'fromToProto) + from_protobuf_wrapper($"toProto", name, descFilePathOpt) as Symbol("fromToProto")) checkAnswer(fromProtoDf, toFromDf) val actualFieldNames = fromProtoDf.select("sample.*").schema.fields.toSeq.map(f => f.name) descriptor.getFields.asScala.map(f => { @@ -767,7 +775,7 @@ class ProtobufFunctionsSuite extends QueryTest with SharedSparkSession with Prot ) val dataDf = spark.createDataFrame(spark.sparkContext.parallelize(data), schema) val dataDfToProto = dataDf.select( - to_protobuf_wrapper($"sample", name, descFilePathOpt) as 'toProto) + to_protobuf_wrapper($"sample", name, descFilePathOpt) as Symbol("toProto")) val toProtoResults = dataDfToProto.select("toProto").collect() val eventFromSparkSchema = OneOfEvent.parseFrom(toProtoResults(0).getAs[Array[Byte]](0)) @@ -810,15 +818,16 @@ class ProtobufFunctionsSuite extends QueryTest with SharedSparkSession with Prot options.put("recursive.fields.max.depth", "2") val fromProtoDf = df.select( - functions.from_protobuf($"protoEvent", "Employee", testFileDesc, options) as 'sample) + functions.from_protobuf($"protoEvent", "Employee", testFileDesc, + options) as Symbol("sample")) val toDf = fromProtoDf.select( - functions.to_protobuf($"sample", "Employee", testFileDesc) as 'toProto) + functions.to_protobuf($"sample", "Employee", testFileDesc) as Symbol("toProto")) val toFromDf = toDf.select( functions.from_protobuf($"toProto", "Employee", testFileDesc, - options) as 'fromToProto) + options) as Symbol("fromToProto")) checkAnswer(fromProtoDf, toFromDf) @@ -867,14 +876,15 @@ class ProtobufFunctionsSuite extends QueryTest with SharedSparkSession with Prot val fromProtoDf = df.select( functions.from_protobuf($"value", "OneOfEventWithRecursion", - testFileDesc, options) as 'sample) + testFileDesc, options) as Symbol("sample")) val toDf = fromProtoDf.select( - functions.to_protobuf($"sample", "OneOfEventWithRecursion", testFileDesc) as 'toProto) + functions.to_protobuf($"sample", "OneOfEventWithRecursion", + testFileDesc) as Symbol("toProto")) val toFromDf = toDf.select( functions.from_protobuf($"toProto", "OneOfEventWithRecursion", testFileDesc, - options) as 'fromToProto) + options) as Symbol("fromToProto")) checkAnswer(fromProtoDf, toFromDf) @@ -951,8 +961,8 @@ class ProtobufFunctionsSuite extends QueryTest with SharedSparkSession with Prot ) ) val dataDf = spark.createDataFrame(spark.sparkContext.parallelize(data), schema) - val dataDfToProto = dataDf.select( - functions.to_protobuf($"sample", "OneOfEventWithRecursion", testFileDesc) as 'toProto) + val dataDfToProto = dataDf.select(functions.to_protobuf($"sample", + "OneOfEventWithRecursion", testFileDesc) as Symbol("toProto")) val eventFromSparkSchema = OneOfEventWithRecursion.parseFrom( dataDfToProto.select("toProto").take(1).toSeq(0).getAs[Array[Byte]](0)) @@ -1597,7 +1607,7 @@ class ProtobufFunctionsSuite extends QueryTest with SharedSparkSession with Prot options: java.util.HashMap[String, String], messageName: String): Unit = { val fromProtoDf = df.select( - functions.from_protobuf($"value", messageName, testFileDesc, options) as 'sample) + functions.from_protobuf($"value", messageName, testFileDesc, options) as Symbol("sample")) assert(expectedDf.schema === fromProtoDf.schema) checkAnswer(fromProtoDf, expectedDf) } diff --git a/core/src/test/scala/org/apache/spark/deploy/history/FsHistoryProviderSuite.scala b/core/src/test/scala/org/apache/spark/deploy/history/FsHistoryProviderSuite.scala index b35c576c3b9c..ae8481a852bf 100644 --- a/core/src/test/scala/org/apache/spark/deploy/history/FsHistoryProviderSuite.scala +++ b/core/src/test/scala/org/apache/spark/deploy/history/FsHistoryProviderSuite.scala @@ -245,7 +245,7 @@ abstract class FsHistoryProviderSuite extends SparkFunSuite with Matchers with P val fs = new Path(dir.getAbsolutePath).getFileSystem(hadoopConf) val provider = new FsHistoryProvider(conf) - val mergeApplicationListing = PrivateMethod[Unit]('mergeApplicationListing) + val mergeApplicationListing = PrivateMethod[Unit](Symbol("mergeApplicationListing")) val inProgressFile = newLogFile("app1", None, inProgress = true) val logAppender1 = new LogAppender("in-progress and final event log files does not exist") diff --git a/pom.xml b/pom.xml index ededd5f207cb..848ed252a88d 100644 --- a/pom.xml +++ b/pom.xml @@ -3010,6 +3010,10 @@ Or use `-Wconf:msg=legacy-binding:s` to silence this warning. [quickfixable]" --> <arg>-Wconf:msg=legacy-binding:s</arg> + <!-- + SPARK-45627 Symbol literals are deprecated in Scala 2.13 and it's a compile error in Scala 3. + --> + <arg>-Wconf:cat=deprecation&msg=symbol literal is deprecated:e</arg> </args> <jvmArgs> <jvmArg>-Xss128m</jvmArg> diff --git a/project/SparkBuild.scala b/project/SparkBuild.scala index 82a263d4058a..6e87cab6df81 100644 --- a/project/SparkBuild.scala +++ b/project/SparkBuild.scala @@ -263,7 +263,9 @@ object SparkBuild extends PomBuild { // from a superclass shadow symbols defined in an outer scope. Such references are // ambiguous in Scala 3. To continue using the inherited symbol, write `this.stop`. // Or use `-Wconf:msg=legacy-binding:s` to silence this warning. [quickfixable]" - "-Wconf:msg=legacy-binding:s" + "-Wconf:msg=legacy-binding:s", + // SPARK-45627 Symbol literals are deprecated in Scala 2.13 and it's a compile error in Scala 3. + "-Wconf:cat=deprecation&msg=symbol literal is deprecated:e" ) } ) diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/InferWindowGroupLimitSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/InferWindowGroupLimitSuite.scala index 5ffb45084184..3b185adabc3f 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/InferWindowGroupLimitSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/InferWindowGroupLimitSuite.scala @@ -179,7 +179,7 @@ class InferWindowGroupLimitSuite extends PlanTest { windowSpec(a :: Nil, c.desc :: Nil, windowFrame)).as("rn"), windowExpr(function, windowSpec(a :: Nil, c.desc :: Nil, windowFrame)).as("rn2")) - .where('rn < 2 && 'rn2 === 3) + .where(Symbol("rn") < 2 && Symbol("rn2") === 3) val correctAnswer = testRelation @@ -189,7 +189,7 @@ class InferWindowGroupLimitSuite extends PlanTest { windowSpec(a :: Nil, c.desc :: Nil, windowFrame)).as("rn"), windowExpr(function, windowSpec(a :: Nil, c.desc :: Nil, windowFrame)).as("rn2")) - .where('rn < 2 && 'rn2 === 3) + .where(Symbol("rn") < 2 && Symbol("rn2") === 3) comparePlans( Optimize.execute(originalQuery.analyze), @@ -205,7 +205,7 @@ class InferWindowGroupLimitSuite extends PlanTest { windowSpec(a :: Nil, c.desc :: Nil, windowFrame)).as("rn"), windowExpr(Rank(c :: Nil), windowSpec(a :: Nil, c.desc :: Nil, windowFrame)).as("rank")) - .where('rn < 2) + .where(Symbol("rn") < 2) val correctAnswer = testRelation @@ -215,7 +215,7 @@ class InferWindowGroupLimitSuite extends PlanTest { windowSpec(a :: Nil, c.desc :: Nil, windowFrame)).as("rn"), windowExpr(Rank(c :: Nil), windowSpec(a :: Nil, c.desc :: Nil, windowFrame)).as("rank")) - .where('rn < 2) + .where(Symbol("rn") < 2) comparePlans( Optimize.execute(originalQuery.analyze), @@ -230,7 +230,7 @@ class InferWindowGroupLimitSuite extends PlanTest { windowSpec(a :: Nil, c.desc :: Nil, windowFrame)).as("rn"), windowExpr(new NthValue(c, Literal(1)), windowSpec(a :: Nil, c.desc :: Nil, windowFrame)).as("rank")) - .where('rn < 2) + .where(Symbol("rn") < 2) val correctAnswer = testRelation @@ -240,7 +240,7 @@ class InferWindowGroupLimitSuite extends PlanTest { windowSpec(a :: Nil, c.desc :: Nil, windowFrame)).as("rn"), windowExpr(new NthValue(c, Literal(1)), windowSpec(a :: Nil, c.desc :: Nil, windowFrame)).as("rank")) - .where('rn < 2) + .where(Symbol("rn") < 2) comparePlans( Optimize.execute(originalQuery.analyze), @@ -255,7 +255,7 @@ class InferWindowGroupLimitSuite extends PlanTest { windowSpec(a :: Nil, c.desc :: Nil, windowFrame)).as("rn"), windowExpr(Rank(c :: Nil), windowSpec(a :: Nil, c.desc :: Nil, windowFrame)).as("rank")) - .where('rn < 2 && 'rank === 3) + .where(Symbol("rn") < 2 && Symbol("rank") === 3) val correctAnswer = testRelation @@ -265,7 +265,7 @@ class InferWindowGroupLimitSuite extends PlanTest { windowSpec(a :: Nil, c.desc :: Nil, windowFrame)).as("rn"), windowExpr(Rank(c :: Nil), windowSpec(a :: Nil, c.desc :: Nil, windowFrame)).as("rank")) - .where('rn < 2 && 'rank === 3) + .where(Symbol("rn") < 2 && Symbol("rank") === 3) comparePlans( Optimize.execute(originalQuery.analyze), diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/MergeScalarSubqueriesSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/MergeScalarSubqueriesSuite.scala index 8af0e02855b1..941be5f0c4e7 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/MergeScalarSubqueriesSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/MergeScalarSubqueriesSuite.scala @@ -35,7 +35,7 @@ class MergeScalarSubqueriesSuite extends PlanTest { val batches = Batch("MergeScalarSubqueries", Once, MergeScalarSubqueries) :: Nil } - val testRelation = LocalRelation('a.int, 'b.int, 'c.string) + val testRelation = LocalRelation(Symbol("a").int, Symbol("b").int, Symbol("c").string) private def definitionNode(plan: LogicalPlan, cteIndex: Int) = { CTERelationDef(plan, cteIndex, underSubquery = true) @@ -47,12 +47,12 @@ class MergeScalarSubqueriesSuite extends PlanTest { } test("Merging subqueries with projects") { - val subquery1 = ScalarSubquery(testRelation.select(('a + 1).as("a_plus1"))) - val subquery2 = ScalarSubquery(testRelation.select(('a + 2).as("a_plus2"))) - val subquery3 = ScalarSubquery(testRelation.select('b)) - val subquery4 = ScalarSubquery(testRelation.select(('a + 1).as("a_plus1_2"))) - val subquery5 = ScalarSubquery(testRelation.select(('a + 2).as("a_plus2_2"))) - val subquery6 = ScalarSubquery(testRelation.select('b.as("b_2"))) + val subquery1 = ScalarSubquery(testRelation.select((Symbol("a") + 1).as("a_plus1"))) + val subquery2 = ScalarSubquery(testRelation.select((Symbol("a") + 2).as("a_plus2"))) + val subquery3 = ScalarSubquery(testRelation.select(Symbol("b"))) + val subquery4 = ScalarSubquery(testRelation.select((Symbol("a") + 1).as("a_plus1_2"))) + val subquery5 = ScalarSubquery(testRelation.select((Symbol("a") + 2).as("a_plus2_2"))) + val subquery6 = ScalarSubquery(testRelation.select(Symbol("b").as("b_2"))) val originalQuery = testRelation .select( subquery1, @@ -64,14 +64,14 @@ class MergeScalarSubqueriesSuite extends PlanTest { val mergedSubquery = testRelation .select( - ('a + 1).as("a_plus1"), - ('a + 2).as("a_plus2"), - 'b) + (Symbol("a") + 1).as("a_plus1"), + (Symbol("a") + 2).as("a_plus2"), + Symbol("b")) .select( CreateNamedStruct(Seq( - Literal("a_plus1"), 'a_plus1, - Literal("a_plus2"), 'a_plus2, - Literal("b"), 'b + Literal("a_plus1"), Symbol("a_plus1"), + Literal("a_plus2"), Symbol("a_plus2"), + Literal("b"), Symbol("b") )).as("mergedValue")) val analyzedMergedSubquery = mergedSubquery.analyze val correctAnswer = WithCTE( @@ -89,12 +89,14 @@ class MergeScalarSubqueriesSuite extends PlanTest { } test("Merging subqueries with aggregates") { - val subquery1 = ScalarSubquery(testRelation.groupBy('b)(max('a).as("max_a"))) - val subquery2 = ScalarSubquery(testRelation.groupBy('b)(sum('a).as("sum_a"))) - val subquery3 = ScalarSubquery(testRelation.groupBy('b)('b)) - val subquery4 = ScalarSubquery(testRelation.groupBy('b)(max('a).as("max_a_2"))) - val subquery5 = ScalarSubquery(testRelation.groupBy('b)(sum('a).as("sum_a_2"))) - val subquery6 = ScalarSubquery(testRelation.groupBy('b)('b.as("b_2"))) + val subquery1 = ScalarSubquery(testRelation.groupBy(Symbol("b"))(max(Symbol("a")).as("max_a"))) + val subquery2 = ScalarSubquery(testRelation.groupBy(Symbol("b"))(sum(Symbol("a")).as("sum_a"))) + val subquery3 = ScalarSubquery(testRelation.groupBy(Symbol("b"))(Symbol("b"))) + val subquery4 = ScalarSubquery( + testRelation.groupBy(Symbol("b"))(max(Symbol("a")).as("max_a_2"))) + val subquery5 = ScalarSubquery( + testRelation.groupBy(Symbol("b"))(sum(Symbol("a")).as("sum_a_2"))) + val subquery6 = ScalarSubquery(testRelation.groupBy(Symbol("b"))(Symbol("b").as("b_2"))) val originalQuery = testRelation .select( subquery1, @@ -105,14 +107,14 @@ class MergeScalarSubqueriesSuite extends PlanTest { subquery6) val mergedSubquery = testRelation - .groupBy('b)( - max('a).as("max_a"), - sum('a).as("sum_a"), - 'b) + .groupBy(Symbol("b"))( + max(Symbol("a")).as("max_a"), + sum(Symbol("a")).as("sum_a"), + Symbol("b")) .select(CreateNamedStruct(Seq( - Literal("max_a"), 'max_a, - Literal("sum_a"), 'sum_a, - Literal("b"), 'b + Literal("max_a"), Symbol("max_a"), + Literal("sum_a"), Symbol("sum_a"), + Literal("b"), Symbol("b") )).as("mergedValue")) val analyzedMergedSubquery = mergedSubquery.analyze val correctAnswer = WithCTE( @@ -130,11 +132,13 @@ class MergeScalarSubqueriesSuite extends PlanTest { } test("Merging subqueries with aggregates with complex grouping expressions") { - val subquery1 = ScalarSubquery(testRelation.groupBy('b > 1 && 'a === 2)(max('a).as("max_a"))) + val subquery1 = ScalarSubquery(testRelation.groupBy( + Symbol("b") > 1 && Symbol("a") === 2)(max(Symbol("a")).as("max_a"))) val subquery2 = ScalarSubquery( testRelation - .select('a, 'b.as("b_2")) - .groupBy(Literal(2) === 'a && Literal(1) < 'b_2)(sum('a).as("sum_a"))) + .select(Symbol("a"), Symbol("b").as("b_2")) + .groupBy(Literal(2) === Symbol("a") && + Literal(1) < Symbol("b_2"))(sum(Symbol("a")).as("sum_a"))) val originalQuery = testRelation .select( @@ -142,13 +146,13 @@ class MergeScalarSubqueriesSuite extends PlanTest { subquery2) val mergedSubquery = testRelation - .select('a, 'b, 'c) - .groupBy('b > 1 && 'a === 2)( - max('a).as("max_a"), - sum('a).as("sum_a")) + .select(Symbol("a"), Symbol("b"), Symbol("c")) + .groupBy(Symbol("b") > 1 && Symbol("a") === 2)( + max(Symbol("a")).as("max_a"), + sum(Symbol("a")).as("sum_a")) .select(CreateNamedStruct(Seq( - Literal("max_a"), 'max_a, - Literal("sum_a"), 'sum_a + Literal("max_a"), Symbol("max_a"), + Literal("sum_a"), Symbol("sum_a") )).as("mergedValue")) val analyzedMergedSubquery = mergedSubquery.analyze val correctAnswer = WithCTE( @@ -163,8 +167,10 @@ class MergeScalarSubqueriesSuite extends PlanTest { test("Merging subqueries with aggregates with multiple grouping expressions") { // supports HashAggregate - val subquery1 = ScalarSubquery(testRelation.groupBy('b, 'c)(max('a).as("max_a"))) - val subquery2 = ScalarSubquery(testRelation.groupBy('b, 'c)(min('a).as("min_a"))) + val subquery1 = ScalarSubquery(testRelation.groupBy(Symbol("b"), + Symbol("c"))(max(Symbol("a")).as("max_a"))) + val subquery2 = ScalarSubquery(testRelation.groupBy(Symbol("b"), + Symbol("c"))(min(Symbol("a")).as("min_a"))) val originalQuery = testRelation .select( @@ -172,12 +178,12 @@ class MergeScalarSubqueriesSuite extends PlanTest { subquery2) val hashAggregates = testRelation - .groupBy('b, 'c)( - max('a).as("max_a"), - min('a).as("min_a")) + .groupBy(Symbol("b"), Symbol("c"))( + max(Symbol("a")).as("max_a"), + min(Symbol("a")).as("min_a")) .select(CreateNamedStruct(Seq( - Literal("max_a"), 'max_a, - Literal("min_a"), 'min_a + Literal("max_a"), Symbol("max_a"), + Literal("min_a"), Symbol("min_a") )).as("mergedValue")) val analyzedHashAggregates = hashAggregates.analyze val correctAnswer = WithCTE( @@ -191,14 +197,16 @@ class MergeScalarSubqueriesSuite extends PlanTest { } test("Merging subqueries with filters") { - val subquery1 = ScalarSubquery(testRelation.where('a > 1).select('a)) + val subquery1 = ScalarSubquery(testRelation.where(Symbol("a") > 1).select(Symbol("a"))) // Despite having an extra Project node, `subquery2` is mergeable with `subquery1` - val subquery2 = ScalarSubquery(testRelation.where('a > 1).select('b.as("b_1")).select('b_1)) + val subquery2 = ScalarSubquery(testRelation.where(Symbol("a") > 1).select( + Symbol("b").as("b_1")).select(Symbol("b_1"))) // Despite lacking a Project node, `subquery3` is mergeable with the result of merging // `subquery1` and `subquery2` - val subquery3 = ScalarSubquery(testRelation.select('a.as("a_2")).where('a_2 > 1).select('a_2)) - val subquery4 = ScalarSubquery( - testRelation.select('a.as("a_2"), 'b).where('a_2 > 1).select('b.as("b_2"))) + val subquery3 = ScalarSubquery(testRelation.select( + Symbol("a").as("a_2")).where(Symbol("a_2") > 1).select(Symbol("a_2"))) + val subquery4 = ScalarSubquery(testRelation.select( + Symbol("a").as("a_2"), Symbol("b")).where(Symbol("a_2") > 1).select(Symbol("b").as("b_2"))) val originalQuery = testRelation .select( subquery1, @@ -207,13 +215,13 @@ class MergeScalarSubqueriesSuite extends PlanTest { subquery4) val mergedSubquery = testRelation - .select('a, 'b, 'c) - .where('a > 1) - .select('a, 'b, 'c) - .select('a, 'b) + .select(Symbol("a"), Symbol("b"), Symbol("c")) + .where(Symbol("a") > 1) + .select(Symbol("a"), Symbol("b"), Symbol("c")) + .select(Symbol("a"), Symbol("b")) .select(CreateNamedStruct(Seq( - Literal("a"), 'a, - Literal("b"), 'b + Literal("a"), Symbol("a"), + Literal("b"), Symbol("b") )).as("mergedValue")) val analyzedMergedSubquery = mergedSubquery.analyze val correctAnswer = WithCTE( @@ -229,24 +237,25 @@ class MergeScalarSubqueriesSuite extends PlanTest { } test("Merging subqueries with complex filter conditions") { - val subquery1 = ScalarSubquery(testRelation.where('a > 1 && 'b === 2).select('a)) + val subquery1 = ScalarSubquery( + testRelation.where(Symbol("a") > 1 && Symbol("b") === 2).select(Symbol("a"))) val subquery2 = ScalarSubquery( testRelation - .select('a.as("a_2"), 'b) - .where(Literal(2) === 'b && Literal(1) < 'a_2) - .select('b.as("b_2"))) + .select(Symbol("a").as("a_2"), Symbol("b")) + .where(Literal(2) === Symbol("b") && Literal(1) < Symbol("a_2")) + .select(Symbol("b").as("b_2"))) val originalQuery = testRelation .select( subquery1, subquery2) val mergedSubquery = testRelation - .select('a, 'b, 'c) - .where('a > 1 && 'b === 2) - .select('a, 'b.as("b_2")) + .select(Symbol("a"), Symbol("b"), Symbol("c")) + .where(Symbol("a") > 1 && Symbol("b") === 2) + .select(Symbol("a"), Symbol("b").as("b_2")) .select(CreateNamedStruct(Seq( - Literal("a"), 'a, - Literal("b_2"), 'b_2 + Literal("a"), Symbol("a"), + Literal("b_2"), Symbol("b_2") )).as("mergedValue")) val analyzedMergedSubquery = mergedSubquery.analyze val correctAnswer = WithCTE( @@ -260,8 +269,8 @@ class MergeScalarSubqueriesSuite extends PlanTest { } test("Do not merge subqueries with different filter conditions") { - val subquery1 = ScalarSubquery(testRelation.where('a > 1).select('a)) - val subquery2 = ScalarSubquery(testRelation.where('a < 1).select('a)) + val subquery1 = ScalarSubquery(testRelation.where(Symbol("a") > 1).select(Symbol("a"))) + val subquery2 = ScalarSubquery(testRelation.where(Symbol("a") < 1).select(Symbol("a"))) val originalQuery = testRelation .select( @@ -273,23 +282,23 @@ class MergeScalarSubqueriesSuite extends PlanTest { test("Merging subqueries with aggregate filters") { val subquery1 = ScalarSubquery( - testRelation.having('b)(max('a).as("max_a"))(max('a) > 1)) + testRelation.having(Symbol("b"))(max(Symbol("a")).as("max_a"))(max(Symbol("a")) > 1)) val subquery2 = ScalarSubquery( - testRelation.having('b)(sum('a).as("sum_a"))(max('a) > 1)) + testRelation.having(Symbol("b"))(sum(Symbol("a")).as("sum_a"))(max(Symbol("a")) > 1)) val originalQuery = testRelation.select( subquery1, subquery2) val mergedSubquery = testRelation - .having('b)( - max('a).as("max_a"), - sum('a).as("sum_a"))('max_a > 1) + .having(Symbol("b"))( + max(Symbol("a")).as("max_a"), + sum(Symbol("a")).as("sum_a"))(Symbol("max_a") > 1) .select( - 'max_a, - 'sum_a) + Symbol("max_a"), + Symbol("sum_a")) .select(CreateNamedStruct(Seq( - Literal("max_a"), 'max_a, - Literal("sum_a"), 'sum_a + Literal("max_a"), Symbol("max_a"), + Literal("sum_a"), Symbol("sum_a") )).as("mergedValue")) val analyzedMergedSubquery = mergedSubquery.analyze val correctAnswer = WithCTE( @@ -310,26 +319,27 @@ class MergeScalarSubqueriesSuite extends PlanTest { Some($"t1.b" === $"t2.b")) .select($"t1.a").analyze) val subquery2 = ScalarSubquery(testRelation.as("t1") - .select('a.as("a_1"), 'b.as("b_1"), 'c.as("c_1")) + .select(Symbol("a").as("a_1"), Symbol("b").as("b_1"), Symbol("c").as("c_1")) .join( - testRelation.as("t2").select('a.as("a_2"), 'b.as("b_2"), 'c.as("c_2")), + testRelation.as("t2").select(Symbol("a").as("a_2"), Symbol("b").as("b_2"), + Symbol("c").as("c_2")), Inner, - Some('b_1 === 'b_2)) - .select('c_2).analyze) + Some(Symbol("b_1") === Symbol("b_2"))) + .select(Symbol("c_2")).analyze) val originalQuery = testRelation.select( subquery1, subquery2) val mergedSubquery = testRelation.as("t1") - .select('a, 'b, 'c) + .select(Symbol("a"), Symbol("b"), Symbol("c")) .join( - testRelation.as("t2").select('a, 'b, 'c), + testRelation.as("t2").select(Symbol("a"), Symbol("b"), Symbol("c")), Inner, Some($"t1.b" === $"t2.b")) .select($"t1.a", $"t2.c") .select(CreateNamedStruct(Seq( - Literal("a"), 'a, - Literal("c"), 'c + Literal("a"), Symbol("a"), + Literal("c"), Symbol("c") )).as("mergedValue")) val analyzedMergedSubquery = mergedSubquery.analyze val correctAnswer = WithCTE( @@ -350,26 +360,27 @@ class MergeScalarSubqueriesSuite extends PlanTest { Some($"t1.b" < $"t2.b" && $"t1.a" === $"t2.c")) .select($"t1.a").analyze) val subquery2 = ScalarSubquery(testRelation.as("t1") - .select('a.as("a_1"), 'b.as("b_1"), 'c.as("c_1")) + .select(Symbol("a").as("a_1"), Symbol("b").as("b_1"), Symbol("c").as("c_1")) .join( - testRelation.as("t2").select('a.as("a_2"), 'b.as("b_2"), 'c.as("c_2")), + testRelation.as("t2").select(Symbol("a").as("a_2"), Symbol("b").as("b_2"), + Symbol("c").as("c_2")), Inner, - Some('c_2 === 'a_1 && 'b_1 < 'b_2)) - .select('c_2).analyze) + Some(Symbol("c_2") === Symbol("a_1") && Symbol("b_1") < Symbol("b_2"))) + .select(Symbol("c_2")).analyze) val originalQuery = testRelation.select( subquery1, subquery2) val mergedSubquery = testRelation.as("t1") - .select('a, 'b, 'c) + .select(Symbol("a"), Symbol("b"), Symbol("c")) .join( - testRelation.as("t2").select('a, 'b, 'c), + testRelation.as("t2").select(Symbol("a"), Symbol("b"), Symbol("c")), Inner, Some($"t1.b" < $"t2.b" && $"t1.a" === $"t2.c")) .select($"t1.a", $"t2.c") .select(CreateNamedStruct(Seq( - Literal("a"), 'a, - Literal("c"), 'c + Literal("a"), Symbol("a"), + Literal("c"), Symbol("c") )).as("mergedValue")) val analyzedMergedSubquery = mergedSubquery.analyze val correctAnswer = WithCTE( @@ -423,8 +434,10 @@ class MergeScalarSubqueriesSuite extends PlanTest { } test("Do not merge subqueries with nondeterministic elements") { - val subquery1 = ScalarSubquery(testRelation.select(('a + rand(0)).as("rand_a"))) - val subquery2 = ScalarSubquery(testRelation.select(('b + rand(0)).as("rand_b"))) + val subquery1 = ScalarSubquery( + testRelation.select((Symbol("a") + rand(0)).as("rand_a"))) + val subquery2 = ScalarSubquery( + testRelation.select((Symbol("b") + rand(0)).as("rand_b"))) val originalQuery = testRelation .select( subquery1, @@ -432,8 +445,10 @@ class MergeScalarSubqueriesSuite extends PlanTest { comparePlans(Optimize.execute(originalQuery.analyze), originalQuery.analyze) - val subquery3 = ScalarSubquery(testRelation.where('a > rand(0)).select('a)) - val subquery4 = ScalarSubquery(testRelation.where('a > rand(0)).select('b)) + val subquery3 = ScalarSubquery( + testRelation.where(Symbol("a") > rand(0)).select(Symbol("a"))) + val subquery4 = ScalarSubquery( + testRelation.where(Symbol("a") > rand(0)).select(Symbol("b"))) val originalQuery2 = testRelation .select( subquery3, @@ -441,8 +456,10 @@ class MergeScalarSubqueriesSuite extends PlanTest { comparePlans(Optimize.execute(originalQuery2.analyze), originalQuery2.analyze) - val subquery5 = ScalarSubquery(testRelation.groupBy()((max('a) + rand(0)).as("max_a"))) - val subquery6 = ScalarSubquery(testRelation.groupBy()((max('b) + rand(0)).as("max_b"))) + val subquery5 = ScalarSubquery( + testRelation.groupBy()((max(Symbol("a")) + rand(0)).as("max_a"))) + val subquery6 = ScalarSubquery( + testRelation.groupBy()((max(Symbol("b")) + rand(0)).as("max_b"))) val originalQuery3 = testRelation .select( subquery5, @@ -453,18 +470,20 @@ class MergeScalarSubqueriesSuite extends PlanTest { test("Do not merge different aggregate implementations") { // supports HashAggregate - val subquery1 = ScalarSubquery(testRelation.groupBy('b)(max('a).as("max_a"))) - val subquery2 = ScalarSubquery(testRelation.groupBy('b)(min('a).as("min_a"))) + val subquery1 = ScalarSubquery(testRelation.groupBy(Symbol("b"))(max(Symbol("a")).as("max_a"))) + val subquery2 = ScalarSubquery(testRelation.groupBy(Symbol("b"))(min(Symbol("a")).as("min_a"))) // supports ObjectHashAggregate val subquery3 = ScalarSubquery(testRelation - .groupBy('b)(CollectList('a).toAggregateExpression(isDistinct = false).as("collectlist_a"))) + .groupBy(Symbol("b"))(CollectList(Symbol("a")). + toAggregateExpression(isDistinct = false).as("collectlist_a"))) val subquery4 = ScalarSubquery(testRelation - .groupBy('b)(CollectSet('a).toAggregateExpression(isDistinct = false).as("collectset_a"))) + .groupBy(Symbol("b"))(CollectSet(Symbol("a")). + toAggregateExpression(isDistinct = false).as("collectset_a"))) // supports SortAggregate - val subquery5 = ScalarSubquery(testRelation.groupBy('b)(max('c).as("max_c"))) - val subquery6 = ScalarSubquery(testRelation.groupBy('b)(min('c).as("min_c"))) + val subquery5 = ScalarSubquery(testRelation.groupBy(Symbol("b"))(max(Symbol("c")).as("max_c"))) + val subquery6 = ScalarSubquery(testRelation.groupBy(Symbol("b"))(min(Symbol("c")).as("min_c"))) val originalQuery = testRelation .select( @@ -476,30 +495,30 @@ class MergeScalarSubqueriesSuite extends PlanTest { subquery6) val hashAggregates = testRelation - .groupBy('b)( - max('a).as("max_a"), - min('a).as("min_a")) + .groupBy(Symbol("b"))( + max(Symbol("a")).as("max_a"), + min(Symbol("a")).as("min_a")) .select(CreateNamedStruct(Seq( - Literal("max_a"), 'max_a, - Literal("min_a"), 'min_a + Literal("max_a"), Symbol("max_a"), + Literal("min_a"), Symbol("min_a") )).as("mergedValue")) val analyzedHashAggregates = hashAggregates.analyze val objectHashAggregates = testRelation - .groupBy('b)( - CollectList('a).toAggregateExpression(isDistinct = false).as("collectlist_a"), - CollectSet('a).toAggregateExpression(isDistinct = false).as("collectset_a")) + .groupBy(Symbol("b"))( + CollectList(Symbol("a")).toAggregateExpression(isDistinct = false).as("collectlist_a"), + CollectSet(Symbol("a")).toAggregateExpression(isDistinct = false).as("collectset_a")) .select(CreateNamedStruct(Seq( - Literal("collectlist_a"), 'collectlist_a, - Literal("collectset_a"), 'collectset_a + Literal("collectlist_a"), Symbol("collectlist_a"), + Literal("collectset_a"), Symbol("collectset_a") )).as("mergedValue")) val analyzedObjectHashAggregates = objectHashAggregates.analyze val sortAggregates = testRelation - .groupBy('b)( - max('c).as("max_c"), - min('c).as("min_c")) + .groupBy(Symbol("b"))( + max(Symbol("c")).as("max_c"), + min(Symbol("c")).as("min_c")) .select(CreateNamedStruct(Seq( - Literal("max_c"), 'max_c, - Literal("min_c"), 'min_c + Literal("max_c"), Symbol("max_c"), + Literal("min_c"), Symbol("min_c") )).as("mergedValue")) val analyzedSortAggregates = sortAggregates.analyze val correctAnswer = WithCTE( @@ -521,8 +540,10 @@ class MergeScalarSubqueriesSuite extends PlanTest { test("Do not merge subqueries with different aggregate grouping orders") { // supports HashAggregate - val subquery1 = ScalarSubquery(testRelation.groupBy('b, 'c)(max('a).as("max_a"))) - val subquery2 = ScalarSubquery(testRelation.groupBy('c, 'b)(min('a).as("min_a"))) + val subquery1 = ScalarSubquery( + testRelation.groupBy(Symbol("b"), Symbol("c"))(max(Symbol("a")).as("max_a"))) + val subquery2 = ScalarSubquery( + testRelation.groupBy(Symbol("c"), Symbol("b"))(min(Symbol("a")).as("min_a"))) val originalQuery = testRelation .select( @@ -533,12 +554,12 @@ class MergeScalarSubqueriesSuite extends PlanTest { } test("Merging subqueries from different places") { - val subquery1 = ScalarSubquery(testRelation.select(('a + 1).as("a_plus1"))) - val subquery2 = ScalarSubquery(testRelation.select(('a + 2).as("a_plus2"))) - val subquery3 = ScalarSubquery(testRelation.select('b)) - val subquery4 = ScalarSubquery(testRelation.select(('a + 1).as("a_plus1_2"))) - val subquery5 = ScalarSubquery(testRelation.select(('a + 2).as("a_plus2_2"))) - val subquery6 = ScalarSubquery(testRelation.select('b.as("b_2"))) + val subquery1 = ScalarSubquery(testRelation.select((Symbol("a") + 1).as("a_plus1"))) + val subquery2 = ScalarSubquery(testRelation.select((Symbol("a") + 2).as("a_plus2"))) + val subquery3 = ScalarSubquery(testRelation.select(Symbol("b"))) + val subquery4 = ScalarSubquery(testRelation.select((Symbol("a") + 1).as("a_plus1_2"))) + val subquery5 = ScalarSubquery(testRelation.select((Symbol("a") + 2).as("a_plus2_2"))) + val subquery6 = ScalarSubquery(testRelation.select(Symbol("b").as("b_2"))) val originalQuery = testRelation .select( subquery1, @@ -551,14 +572,14 @@ class MergeScalarSubqueriesSuite extends PlanTest { val mergedSubquery = testRelation .select( - ('a + 1).as("a_plus1"), - ('a + 2).as("a_plus2"), - 'b) + (Symbol("a") + 1).as("a_plus1"), + (Symbol("a") + 2).as("a_plus2"), + Symbol("b")) .select( CreateNamedStruct(Seq( - Literal("a_plus1"), 'a_plus1, - Literal("a_plus2"), 'a_plus2, - Literal("b"), 'b + Literal("a_plus1"), Symbol("a_plus1"), + Literal("a_plus2"), Symbol("a_plus2"), + Literal("b"), Symbol("b") )).as("mergedValue")) val analyzedMergedSubquery = mergedSubquery.analyze val correctAnswer = WithCTE( diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/NestedColumnAliasingSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/NestedColumnAliasingSuite.scala index cb6b9ac8d8be..bd0cc6216f7a 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/NestedColumnAliasingSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/NestedColumnAliasingSuite.scala @@ -821,15 +821,15 @@ class NestedColumnAliasingSuite extends SchemaPruningTest { "b struct<c: struct<d: int, e: int>, c2 int>" ) val input = LocalRelation( - 'id.int, - 'col1.array(ArrayType(inputType))) + Symbol("id").int, + Symbol("col1").array(ArrayType(inputType))) val query = input - .generate(Explode('col1)) + .generate(Explode(Symbol("col1"))) .select( UnresolvedExtractValue( UnresolvedExtractValue( - CaseWhen(Seq(('col.getField("a") === 1, + CaseWhen(Seq((Symbol("col").getField("a") === 1, Literal.default(simpleStruct)))), Literal("b")), Literal("c")).as("result")) @@ -840,10 +840,10 @@ class NestedColumnAliasingSuite extends SchemaPruningTest { // Only the inner-most col.a should be pushed down. val expected = input - .select('col1.getField("a").as(aliases(0))) + .select(Symbol("col1").getField("a").as(aliases(0))) .generate(Explode($"${aliases(0)}"), unrequiredChildIndex = Seq(0)) .select(UnresolvedExtractValue(UnresolvedExtractValue( - CaseWhen(Seq(('col === 1, + CaseWhen(Seq((Symbol("col") === 1, Literal.default(simpleStruct)))), Literal("b")), Literal("c")).as("result")) .analyze @@ -853,10 +853,12 @@ class NestedColumnAliasingSuite extends SchemaPruningTest { test("SPARK-38529: GeneratorNestedColumnAliasing does not pushdown for non-Explode") { val employer = StructType.fromDDL("id int, company struct<name:string, address:string>") val input = LocalRelation( - 'col1.int, - 'col2.array(ArrayType(StructType.fromDDL("field1 struct<col1: int, col2: int>, field2 int"))) + Symbol("col1").int, + Symbol("col2").array( + ArrayType(StructType.fromDDL("field1 struct<col1: int, col2: int>, field2 int"))) ) - val plan = input.generate(Inline('col2)).select('field1.getField("col1")).analyze + val plan = input.generate( + Inline(Symbol("col2"))).select(Symbol("field1").getField("col1")).analyze val optimized = GeneratorNestedColumnAliasing.unapply(plan) // The plan is expected to be unchanged. comparePlans(plan, RemoveNoopOperators.apply(optimized.get)) diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/OptimizerSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/OptimizerSuite.scala index e40fff22bc1c..590fb323000b 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/OptimizerSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/OptimizerSuite.scala @@ -104,8 +104,8 @@ class OptimizerSuite extends PlanTest { } test("Optimizer per rule validation catches invalid grouping types") { - val analyzed = LocalRelation('a.map(IntegerType, IntegerType)) - .select('a).analyze + val analyzed = LocalRelation(Symbol("a").map(IntegerType, IntegerType)) + .select(Symbol("a")).analyze /** * A dummy optimizer rule for testing that invalid grouping types are not allowed. @@ -128,8 +128,8 @@ class OptimizerSuite extends PlanTest { } test("Optimizer per rule validation catches invalid aggregation expressions") { - val analyzed = LocalRelation('a.long, 'b.long) - .select('a, 'b).analyze + val analyzed = LocalRelation(Symbol("a").long, Symbol("b").long) + .select(Symbol("a"), Symbol("b")).analyze /** * A dummy optimizer rule for testing that a non grouping key reference diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/PushProjectionThroughLimitSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/PushProjectionThroughLimitSuite.scala index 7e45fc5aeb3b..9af73158ee73 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/PushProjectionThroughLimitSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/PushProjectionThroughLimitSuite.scala @@ -39,51 +39,51 @@ class PushProjectionThroughLimitSuite extends PlanTest { val query1 = testRelation .limit(10) - .select('a, 'b, 'c') + .select(Symbol("a"), Symbol("b"), 'c') .limit(15).analyze val optimized1 = Optimize.execute(query1) val expected1 = testRelation - .select('a, 'b, 'c') + .select(Symbol("a"), Symbol("b"), 'c') .limit(10).analyze comparePlans(optimized1, expected1) val query2 = testRelation .sortBy($"a".asc) .limit(10) - .select('a, 'b, 'c') + .select(Symbol("a"), Symbol("b"), 'c') .limit(15).analyze val optimized2 = Optimize.execute(query2) val expected2 = testRelation .sortBy($"a".asc) - .select('a, 'b, 'c') + .select(Symbol("a"), Symbol("b"), 'c') .limit(10).analyze comparePlans(optimized2, expected2) val query3 = testRelation .limit(10) - .select('a, 'b, 'c') + .select(Symbol("a"), Symbol("b"), 'c') .limit(20) - .select('a) + .select(Symbol("a")) .limit(15).analyze val optimized3 = Optimize.execute(query3) val expected3 = testRelation - .select('a, 'b, 'c') - .select('a) + .select(Symbol("a"), Symbol("b"), 'c') + .select(Symbol("a")) .limit(10).analyze comparePlans(optimized3, expected3) val query4 = testRelation .sortBy($"a".asc) .limit(10) - .select('a, 'b, 'c') + .select(Symbol("a"), Symbol("b"), 'c') .limit(20) - .select('a) + .select(Symbol("a")) .limit(15).analyze val optimized4 = Optimize.execute(query4) val expected4 = testRelation .sortBy($"a".asc) - .select('a, 'b, 'c') - .select('a) + .select(Symbol("a"), Symbol("b"), 'c') + .select(Symbol("a")) .limit(10).analyze comparePlans(optimized4, expected4) } diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/TransposeWindowSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/TransposeWindowSuite.scala index a9796141c0c7..8d4c2de10e34 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/TransposeWindowSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/TransposeWindowSuite.scala @@ -146,16 +146,17 @@ class TransposeWindowSuite extends PlanTest { test("SPARK-38034: transpose two adjacent windows with compatible partitions " + "which is not a prefix") { val query = testRelation - .window(Seq(sum(c).as('sum_a_2)), partitionSpec4, orderSpec2) - .window(Seq(sum(c).as('sum_a_1)), partitionSpec3, orderSpec1) + .window(Seq(sum(c).as(Symbol("sum_a_2"))), partitionSpec4, orderSpec2) + .window(Seq(sum(c).as(Symbol("sum_a_1"))), partitionSpec3, orderSpec1) val analyzed = query.analyze val optimized = Optimize.execute(analyzed) val correctAnswer = testRelation - .window(Seq(sum(c).as('sum_a_1)), partitionSpec3, orderSpec1) - .window(Seq(sum(c).as('sum_a_2)), partitionSpec4, orderSpec2) - .select('a, 'b, 'c, 'd, 'sum_a_2, 'sum_a_1) + .window(Seq(sum(c).as(Symbol("sum_a_1"))), partitionSpec3, orderSpec1) + .window(Seq(sum(c).as(Symbol("sum_a_2"))), partitionSpec4, orderSpec2) + .select(Symbol("a"), Symbol("b"), Symbol("c"), Symbol("d"), + Symbol("sum_a_2"), Symbol("sum_a_1")) comparePlans(optimized, correctAnswer.analyze) } diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/parser/CastingSyntaxSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/parser/CastingSyntaxSuite.scala index 7f3bb74b2924..577ecf0d61cf 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/parser/CastingSyntaxSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/parser/CastingSyntaxSuite.scala @@ -62,14 +62,14 @@ class CastingSyntaxSuite extends AnalysisTest { } test("boolean expressions") { - assertEqual("(a and b) :: int", Cast('a && 'b, IntegerType)) - assertEqual("(a or b) :: int", Cast('a || 'b, IntegerType)) + assertEqual("(a and b) :: int", Cast(Symbol("a") && Symbol("b"), IntegerType)) + assertEqual("(a or b) :: int", Cast(Symbol("a") || Symbol("b"), IntegerType)) } test("arithmetic expressions") { - assertEqual("(a - b) :: int", Cast('a - 'b, IntegerType)) - assertEqual("(a * b) :: int", Cast('a * 'b, IntegerType)) - assertEqual("a + b :: int", 'a + Cast('b, IntegerType)) + assertEqual("(a - b) :: int", Cast(Symbol("a") - Symbol("b"), IntegerType)) + assertEqual("(a * b) :: int", Cast(Symbol("a") * Symbol("b"), IntegerType)) + assertEqual("a + b :: int", Symbol("a") + Cast(Symbol("b"), IntegerType)) } test("star expansion") { diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/plans/LogicalPlanSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/plans/LogicalPlanSuite.scala index a3bdfd07aee3..8266d30d0557 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/plans/LogicalPlanSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/plans/LogicalPlanSuite.scala @@ -119,19 +119,19 @@ class LogicalPlanSuite extends SparkFunSuite { assert(range.maxRows === Some(100)) assert(range.maxRowsPerPartition === Some(34)) - val sort = Sort(Seq('id.asc), false, range) + val sort = Sort(Seq(Symbol("id").asc), false, range) assert(sort.maxRows === Some(100)) assert(sort.maxRowsPerPartition === Some(34)) - val sort2 = Sort(Seq('id.asc), true, range) + val sort2 = Sort(Seq(Symbol("id").asc), true, range) assert(sort2.maxRows === Some(100)) assert(sort2.maxRowsPerPartition === Some(100)) - val c1 = Literal(1).as('a).toAttribute.newInstance().withNullability(true) - val c2 = Literal(2).as('b).toAttribute.newInstance().withNullability(true) + val c1 = Literal(1).as(Symbol("a")).toAttribute.newInstance().withNullability(true) + val c2 = Literal(2).as(Symbol("b")).toAttribute.newInstance().withNullability(true) val expand = Expand( - Seq(Seq(Literal(null), 'b), Seq('a, Literal(null))), + Seq(Seq(Literal(null), Symbol("b")), Seq(Symbol("a"), Literal(null))), Seq(c1, c2), - sort.select('id as 'a, 'id + 1 as 'b)) + sort.select(Symbol("id") as Symbol("a"), Symbol("id") + 1 as Symbol("b"))) assert(expand.maxRows === Some(200)) assert(expand.maxRowsPerPartition === Some(68)) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameWindowFunctionsSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameWindowFunctionsSuite.scala index 6dcc03343761..8aad34e2c0ab 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameWindowFunctionsSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameWindowFunctionsSuite.scala @@ -1451,7 +1451,7 @@ class DataFrameWindowFunctionsSuite extends QueryTest val multipleRowNumbers = df .withColumn("rn", row_number().over(window)) .withColumn("rn2", row_number().over(window)) - .where('rn < 2 && 'rn2 < 3) + .where(Symbol("rn") < 2 && Symbol("rn2") < 3) checkAnswer(multipleRowNumbers, Seq( Row("a", 4, "", 2.0, 1, 1), @@ -1464,7 +1464,7 @@ class DataFrameWindowFunctionsSuite extends QueryTest val multipleRanks = df .withColumn("rn", rank().over(window)) .withColumn("rn2", rank().over(window)) - .where('rn < 2 && 'rn2 < 3) + .where(Symbol("rn") < 2 && Symbol("rn2") < 3) checkAnswer(multipleRanks, Seq( Row("a", 4, "", 2.0, 1, 1), @@ -1479,7 +1479,7 @@ class DataFrameWindowFunctionsSuite extends QueryTest val multipleDenseRanks = df .withColumn("rn", dense_rank().over(window)) .withColumn("rn2", dense_rank().over(window)) - .where('rn < 2 && 'rn2 < 3) + .where(Symbol("rn") < 2 && Symbol("rn2") < 3) checkAnswer(multipleDenseRanks, Seq( Row("a", 4, "", 2.0, 1, 1), @@ -1494,7 +1494,7 @@ class DataFrameWindowFunctionsSuite extends QueryTest val multipleWindows = df .withColumn("rn2", row_number().over(window2)) .withColumn("rn", row_number().over(window)) - .where('rn < 2 && 'rn2 < 3) + .where(Symbol("rn") < 2 && Symbol("rn2") < 3) checkAnswer(multipleWindows, Seq( Row("b", 1, "h", Double.NaN, 2, 1), diff --git a/sql/core/src/test/scala/org/apache/spark/sql/GeneratorFunctionSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/GeneratorFunctionSuite.scala index c55d8b79e888..afc152e072ac 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/GeneratorFunctionSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/GeneratorFunctionSuite.scala @@ -315,17 +315,17 @@ class GeneratorFunctionSuite extends QueryTest with SharedSparkSession { val df = Seq((1, 2)).toDF("a", "b") checkAnswer( - df.select(inline(array(struct('a), struct('a)))), + df.select(inline(array(struct(Symbol("a")), struct(Symbol("a"))))), Row(1) :: Row(1) :: Nil) checkAnswer( - df.select(inline(array(struct('a, 'b), struct('a, 'b)))), + df.select(inline(array(struct(Symbol("a"), Symbol("b")), struct(Symbol("a"), Symbol("b"))))), Row(1, 2) :: Row(1, 2) :: Nil) // Spark think [struct<a:int>, struct<b:int>] is heterogeneous due to name difference. checkError( exception = intercept[AnalysisException] { - df.select(inline(array(struct('a), struct('b)))) + df.select(inline(array(struct(Symbol("a")), struct(Symbol("b"))))) }, errorClass = "DATATYPE_MISMATCH.DATA_DIFF_TYPES", parameters = Map( @@ -334,13 +334,13 @@ class GeneratorFunctionSuite extends QueryTest with SharedSparkSession { "dataType" -> "(\"STRUCT<a: INT>\" or \"STRUCT<b: INT>\")")) checkAnswer( - df.select(inline(array(struct('a), struct('b.alias("a"))))), + df.select(inline(array(struct(Symbol("a")), struct(Symbol("b").alias("a"))))), Row(1) :: Row(2) :: Nil) // Spark think [struct<a:int>, struct<col1:int>] is heterogeneous due to name difference. checkError( exception = intercept[AnalysisException] { - df.select(inline(array(struct('a), struct(lit(2))))) + df.select(inline(array(struct(Symbol("a")), struct(lit(2))))) }, errorClass = "DATATYPE_MISMATCH.DATA_DIFF_TYPES", parameters = Map( @@ -349,15 +349,16 @@ class GeneratorFunctionSuite extends QueryTest with SharedSparkSession { "dataType" -> "(\"STRUCT<a: INT>\" or \"STRUCT<col1: INT>\")")) checkAnswer( - df.select(inline(array(struct('a), struct(lit(2).alias("a"))))), + df.select(inline(array(struct(Symbol("a")), struct(lit(2).alias("a"))))), Row(1) :: Row(2) :: Nil) checkAnswer( - df.select(struct('a)).select(inline(array("*"))), + df.select(struct(Symbol("a"))).select(inline(array("*"))), Row(1) :: Nil) checkAnswer( - df.select(array(struct('a), struct('b.alias("a")))).selectExpr("inline(*)"), + df.select(array(struct(Symbol("a")), + struct(Symbol("b").alias("a")))).selectExpr("inline(*)"), Row(1) :: Row(2) :: Nil) } @@ -366,11 +367,11 @@ class GeneratorFunctionSuite extends QueryTest with SharedSparkSession { val df2 = df.select( when($"col1" === 1, null).otherwise(array(struct($"col1", $"col2"))).as("col1")) checkAnswer( - df2.select(inline('col1)), + df2.select(inline(Symbol("col1"))), Row(3, "4") :: Row(5, "6") :: Nil ) checkAnswer( - df2.select(inline_outer('col1)), + df2.select(inline_outer(Symbol("col1"))), Row(null, null) :: Row(3, "4") :: Row(5, "6") :: Nil ) } @@ -500,11 +501,11 @@ class GeneratorFunctionSuite extends QueryTest with SharedSparkSession { """.stripMargin) checkAnswer( - df.select(inline('b)), + df.select(inline(Symbol("b"))), Row(0, 1) :: Row(null, null) :: Row(2, 3) :: Row(null, null) :: Nil) checkAnswer( - df.select('a, inline('b)), + df.select(Symbol("a"), inline(Symbol("b"))), Row(1, 0, 1) :: Row(1, null, null) :: Row(1, 2, 3) :: Row(1, null, null) :: Nil) } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala index 09db7721af07..3612f4a7eda8 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala @@ -283,7 +283,7 @@ class SQLQuerySuite extends QueryTest with SharedSparkSession with AdaptiveSpark } test("SPARK-43522: Fix creating struct column name with index of array") { - val df = Seq("a=b,c=d,d=f").toDF().withColumn("key_value", split('value, ",")) + val df = Seq("a=b,c=d,d=f").toDF().withColumn("key_value", split(Symbol("value"), ",")) .withColumn("map_entry", transform(col("key_value"), x => struct(split(x, "=") .getItem(0), split(x, "=").getItem(1)))).select("map_entry") diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/SortSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/SortSuite.scala index 9fa7acf4c817..03e56cb95329 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/SortSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/SortSuite.scala @@ -127,7 +127,7 @@ class SortSuite extends SparkPlanTest with SharedSparkSession { StructType(StructField("a", DecimalType(20, 2)) :: Nil)) checkAnswer( inputDf, - (child: SparkPlan) => SortExec('a.asc :: Nil, global = true, child = child), + (child: SparkPlan) => SortExec(Symbol("a").asc :: Nil, global = true, child = child), input.sorted.map(Row(_)), sortAnswers = false) } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/MultiStatefulOperatorsSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/MultiStatefulOperatorsSuite.scala index 405c0bbbd1e9..ea1442047d42 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/MultiStatefulOperatorsSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/MultiStatefulOperatorsSuite.scala @@ -426,8 +426,8 @@ class MultiStatefulOperatorsSuite val stream = inputDF1.join(inputDF2, expr("v1 >= start2 AND v1 < end2 " + "AND eventTime1 = start2"), "inner") - .groupBy(window($"eventTime1", "5 seconds") as 'window) - .agg(count("*") as 'count) + .groupBy(window($"eventTime1", "5 seconds") as Symbol("window")) + .agg(count("*") as Symbol("count")) .select($"window".getField("start").cast("long").as[Long], $"count".as[Long]) testStream(stream)( --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org