This is an automated email from the ASF dual-hosted git repository.

maxgekk pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new fe2174d8caaf [SPARK-47304][SQL][TESTS] Distribute tests from 
`DataFrameSuite` to more specific suites
fe2174d8caaf is described below

commit fe2174d8caaf6f9474319a8d729a2f537038b4c1
Author: Max Gekk <max.g...@gmail.com>
AuthorDate: Wed Mar 6 18:45:00 2024 +0300

    [SPARK-47304][SQL][TESTS] Distribute tests from `DataFrameSuite` to more 
specific suites
    
    ### What changes were proposed in this pull request?
    In the PR, I propose to move some tests from `DataFrameSuite` out, and 
distribute them across more specific tests suites and new one 
`DataFrameShowSuite`.
    
    ### Why are the changes needed?
    1. Improve maintainability of `DataFrameSuite`
    2. Speed up execution of the test suite. Execution time dropped to < 1min.
    
    ### Does this PR introduce _any_ user-facing change?
    No.
    
    ### How was this patch tested?
    By running the modified test suites.
    
    ### Was this patch authored or co-authored using generative AI tooling?
    No.
    
    Closes #45392 from MaxGekk/split-DataFrameSuite.
    
    Authored-by: Max Gekk <max.g...@gmail.com>
    Signed-off-by: Max Gekk <max.g...@gmail.com>
---
 .../apache/spark/sql/DataFrameAggregateSuite.scala |  109 ++
 .../spark/sql/DataFrameComplexTypeSuite.scala      |  232 +++-
 .../org/apache/spark/sql/DataFrameJoinSuite.scala  |   19 +
 .../apache/spark/sql/DataFrameSelfJoinSuite.scala  |   10 +
 .../org/apache/spark/sql/DataFrameShowSuite.scala  |  487 ++++++++
 .../org/apache/spark/sql/DataFrameStatSuite.scala  |   65 ++
 .../org/apache/spark/sql/DataFrameSuite.scala      | 1202 +-------------------
 .../spark/sql/StatisticsCollectionSuite.scala      |  125 +-
 .../test/scala/org/apache/spark/sql/UDFSuite.scala |   93 +-
 .../execution/datasources/DataSourceSuite.scala    |   49 +
 .../sql/execution/datasources/json/JsonSuite.scala |   55 +
 11 files changed, 1246 insertions(+), 1200 deletions(-)

diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameAggregateSuite.scala 
b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameAggregateSuite.scala
index ec589fa77241..21d7156a62b3 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameAggregateSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameAggregateSuite.scala
@@ -2200,6 +2200,115 @@ class DataFrameAggregateSuite extends QueryTest
       checkAnswer(df, Row(1, 2, 2) :: Row(3, 1, 1) :: Nil)
     }
   }
+
+  private def assertDecimalSumOverflow(
+      df: DataFrame, ansiEnabled: Boolean, expectedAnswer: Row): Unit = {
+    if (!ansiEnabled) {
+      checkAnswer(df, expectedAnswer)
+    } else {
+      val e = intercept[ArithmeticException] {
+        df.collect()
+      }
+      assert(e.getMessage.contains("cannot be represented as Decimal") ||
+        e.getMessage.contains("Overflow in sum of decimals"))
+    }
+  }
+
+  def checkAggResultsForDecimalOverflow(aggFn: Column => Column): Unit = {
+    Seq("true", "false").foreach { wholeStageEnabled =>
+      withSQLConf((SQLConf.WHOLESTAGE_CODEGEN_ENABLED.key, wholeStageEnabled)) 
{
+        Seq(true, false).foreach { ansiEnabled =>
+          withSQLConf((SQLConf.ANSI_ENABLED.key, ansiEnabled.toString)) {
+            val df0 = Seq(
+              (BigDecimal("10000000000000000000"), 1),
+              (BigDecimal("10000000000000000000"), 1),
+              (BigDecimal("10000000000000000000"), 2)).toDF("decNum", "intNum")
+            val df1 = Seq(
+              (BigDecimal("10000000000000000000"), 2),
+              (BigDecimal("10000000000000000000"), 2),
+              (BigDecimal("10000000000000000000"), 2),
+              (BigDecimal("10000000000000000000"), 2),
+              (BigDecimal("10000000000000000000"), 2),
+              (BigDecimal("10000000000000000000"), 2),
+              (BigDecimal("10000000000000000000"), 2),
+              (BigDecimal("10000000000000000000"), 2),
+              (BigDecimal("10000000000000000000"), 2)).toDF("decNum", "intNum")
+            val df = df0.union(df1)
+            val df2 = df.withColumnRenamed("decNum", "decNum2").
+              join(df, "intNum").agg(aggFn($"decNum"))
+
+            val expectedAnswer = Row(null)
+            assertDecimalSumOverflow(df2, ansiEnabled, expectedAnswer)
+
+            val decStr = "1" + "0" * 19
+            val d1 = spark.range(0, 12, 1, 1)
+            val d2 = d1.select(expr(s"cast('$decStr' as decimal (38, 18)) as 
d")).agg(aggFn($"d"))
+            assertDecimalSumOverflow(d2, ansiEnabled, expectedAnswer)
+
+            val d3 = spark.range(0, 1, 1, 1).union(spark.range(0, 11, 1, 1))
+            val d4 = d3.select(expr(s"cast('$decStr' as decimal (38, 18)) as 
d")).agg(aggFn($"d"))
+            assertDecimalSumOverflow(d4, ansiEnabled, expectedAnswer)
+
+            val d5 = d3.select(expr(s"cast('$decStr' as decimal (38, 18)) as 
d"),
+              
lit(1).as("key")).groupBy("key").agg(aggFn($"d").alias("aggd")).select($"aggd")
+            assertDecimalSumOverflow(d5, ansiEnabled, expectedAnswer)
+
+            val nullsDf = spark.range(1, 4, 1).select(expr(s"cast(null as 
decimal(38,18)) as d"))
+
+            val largeDecimals = Seq(BigDecimal("1"* 20 + ".123"), 
BigDecimal("9"* 20 + ".123")).
+              toDF("d")
+            assertDecimalSumOverflow(
+              nullsDf.union(largeDecimals).agg(aggFn($"d")), ansiEnabled, 
expectedAnswer)
+
+            val df3 = Seq(
+              (BigDecimal("10000000000000000000"), 1),
+              (BigDecimal("50000000000000000000"), 1),
+              (BigDecimal("10000000000000000000"), 2)).toDF("decNum", "intNum")
+
+            val df4 = Seq(
+              (BigDecimal("10000000000000000000"), 1),
+              (BigDecimal("10000000000000000000"), 1),
+              (BigDecimal("10000000000000000000"), 2)).toDF("decNum", "intNum")
+
+            val df5 = Seq(
+              (BigDecimal("10000000000000000000"), 1),
+              (BigDecimal("10000000000000000000"), 1),
+              (BigDecimal("20000000000000000000"), 2)).toDF("decNum", "intNum")
+
+            val df6 = df3.union(df4).union(df5)
+            val df7 = df6.groupBy("intNum").agg(sum("decNum"), 
countDistinct("decNum")).
+              filter("intNum == 1")
+            assertDecimalSumOverflow(df7, ansiEnabled, Row(1, null, 2))
+          }
+        }
+      }
+    }
+  }
+
+  test("SPARK-28067: Aggregate sum should not return wrong results for decimal 
overflow") {
+    checkAggResultsForDecimalOverflow(c => sum(c))
+  }
+
+  test("SPARK-35955: Aggregate avg should not return wrong results for decimal 
overflow") {
+    checkAggResultsForDecimalOverflow(c => avg(c))
+  }
+
+  test("SPARK-28224: Aggregate sum big decimal overflow") {
+    val largeDecimals = spark.sparkContext.parallelize(
+      DecimalData(BigDecimal("1"* 20 + ".123"), BigDecimal("1"* 20 + ".123")) 
::
+        DecimalData(BigDecimal("9"* 20 + ".123"), BigDecimal("9"* 20 + 
".123")) :: Nil).toDF()
+
+    Seq(true, false).foreach { ansiEnabled =>
+      withSQLConf((SQLConf.ANSI_ENABLED.key, ansiEnabled.toString)) {
+        val structDf = largeDecimals.select("a").agg(sum("a"))
+        assertDecimalSumOverflow(structDf, ansiEnabled, Row(null))
+      }
+    }
+  }
+
+  test("SPARK-32761: aggregating multiple distinct CONSTANT columns") {
+     checkAnswer(sql("select count(distinct 2), count(distinct 2,3)"), Row(1, 
1))
+  }
 }
 
 case class B(c: Option[Double])
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameComplexTypeSuite.scala 
b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameComplexTypeSuite.scala
index 81a797cc71bc..d982a000ad37 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameComplexTypeSuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameComplexTypeSuite.scala
@@ -17,13 +17,20 @@
 
 package org.apache.spark.sql
 
+import java.sql.{Date, Timestamp}
+
+import scala.reflect.runtime.universe.TypeTag
+
 import org.apache.spark.sql.catalyst.DefinedByConstructorParams
+import org.apache.spark.sql.catalyst.encoders.ExpressionEncoder
 import org.apache.spark.sql.catalyst.expressions.Expression
 import org.apache.spark.sql.catalyst.expressions.objects.MapObjects
+import org.apache.spark.sql.catalyst.util.DateTimeUtils
 import org.apache.spark.sql.functions._
 import org.apache.spark.sql.internal.SQLConf
 import org.apache.spark.sql.test.SharedSparkSession
-import org.apache.spark.sql.types.ArrayType
+import org.apache.spark.sql.types.{ArrayType, BooleanType, Decimal, 
DoubleType, IntegerType, MapType, StringType, StructField, StructType}
+import org.apache.spark.unsafe.types.CalendarInterval
 
 /**
  * A test suite to test DataFrame/SQL functionalities with complex types (i.e. 
array, struct, map).
@@ -86,6 +93,229 @@ class DataFrameComplexTypeSuite extends QueryTest with 
SharedSparkSession {
       assert(result === Row(Seq(Seq(Row(1)), Seq(Row(2)), Seq(Row(3)))) :: Nil)
     }
   }
+
+  test("access complex data") {
+    assert(complexData.filter(complexData("a").getItem(0) === 2).count() == 1)
+    if (!conf.ansiEnabled) {
+      assert(complexData.filter(complexData("m").getItem("1") === 1).count() 
== 1)
+    }
+    assert(complexData.filter(complexData("s").getField("key") === 1).count() 
== 1)
+  }
+
+  test("SPARK-7133: Implement struct, array, and map field accessor") {
+    assert(complexData.filter(complexData("a")(0) === 2).count() == 1)
+    if (!conf.ansiEnabled) {
+      assert(complexData.filter(complexData("m")("1") === 1).count() == 1)
+    }
+    assert(complexData.filter(complexData("s")("key") === 1).count() == 1)
+    assert(complexData.filter(complexData("m")(complexData("s")("value")) === 
1).count() == 1)
+    assert(complexData.filter(complexData("a")(complexData("s")("key")) === 
1).count() == 1)
+  }
+
+  test("SPARK-24313: access map with binary keys") {
+    val mapWithBinaryKey = map(lit(Array[Byte](1.toByte)), lit(1))
+    
checkAnswer(spark.range(1).select(mapWithBinaryKey.getItem(Array[Byte](1.toByte))),
 Row(1))
+  }
+
+  test("SPARK-37855: IllegalStateException when transforming an array inside a 
nested struct") {
+    def makeInput(): DataFrame = {
+      val innerElement1 = Row(3, 3.12)
+      val innerElement2 = Row(4, 2.1)
+      val innerElement3 = Row(1, 985.2)
+      val innerElement4 = Row(10, 757548.0)
+      val innerElement5 = Row(1223, 0.665)
+
+      val outerElement1 = Row(1, Row(List(innerElement1, innerElement2)))
+      val outerElement2 = Row(2, Row(List(innerElement3)))
+      val outerElement3 = Row(3, Row(List(innerElement4, innerElement5)))
+
+      val data = Seq(
+        Row("row1", List(outerElement1)),
+        Row("row2", List(outerElement2, outerElement3))
+      )
+
+      val schema = new StructType()
+        .add("name", StringType)
+        .add("outer_array", ArrayType(new StructType()
+          .add("id", IntegerType)
+          .add("inner_array_struct", new StructType()
+            .add("inner_array", ArrayType(new StructType()
+              .add("id", IntegerType)
+              .add("value", DoubleType)
+            ))
+          )
+        ))
+
+      spark.createDataFrame(spark.sparkContext.parallelize(data), schema)
+    }
+
+    val df = makeInput().limit(2)
+
+    val res = df.withColumn("extracted", transform(
+      col("outer_array"),
+      c1 => {
+        struct(
+          c1.getField("id").alias("outer_id"),
+          transform(
+            c1.getField("inner_array_struct").getField("inner_array"),
+            c2 => {
+              struct(
+                c2.getField("value").alias("inner_value")
+              )
+            }
+          )
+        )
+      }
+    ))
+
+    assert(res.collect().length == 2)
+  }
+
+  test("SPARK-39293: The accumulator of ArrayAggregate to handle complex types 
properly") {
+    val reverse = udf((s: String) => s.reverse)
+
+    val df = Seq(Array("abc", "def")).toDF("array")
+    val testArray = df.select(
+      aggregate(
+        col("array"),
+        array().cast("array<string>"),
+        (acc, s) => concat(acc, array(reverse(s)))))
+    checkAnswer(testArray, Row(Array("cba", "fed")) :: Nil)
+
+    val testMap = df.select(
+      aggregate(
+        col("array"),
+        map().cast("map<string, string>"),
+        (acc, s) => map_concat(acc, map(s, reverse(s)))))
+    checkAnswer(testMap, Row(Map("abc" -> "cba", "def" -> "fed")) :: Nil)
+  }
+
+  test("SPARK-31552: array encoder with different types") {
+    // primitives
+    val booleans = Array(true, false)
+    checkAnswer(Seq(booleans).toDF(), Row(booleans))
+
+    val bytes = Array(1.toByte, 2.toByte)
+    checkAnswer(Seq(bytes).toDF(), Row(bytes))
+    val shorts = Array(1.toShort, 2.toShort)
+    checkAnswer(Seq(shorts).toDF(), Row(shorts))
+    val ints = Array(1, 2)
+    checkAnswer(Seq(ints).toDF(), Row(ints))
+    val longs = Array(1L, 2L)
+    checkAnswer(Seq(longs).toDF(), Row(longs))
+
+    val floats = Array(1.0F, 2.0F)
+    checkAnswer(Seq(floats).toDF(), Row(floats))
+    val doubles = Array(1.0D, 2.0D)
+    checkAnswer(Seq(doubles).toDF(), Row(doubles))
+
+    val strings = Array("2020-04-24", "2020-04-25")
+    checkAnswer(Seq(strings).toDF(), Row(strings))
+
+    // tuples
+    val decOne = Decimal(1, 38, 18)
+    val decTwo = Decimal(2, 38, 18)
+    val tuple1 = (1, 2.2, "3.33", decOne, Date.valueOf("2012-11-22"))
+    val tuple2 = (2, 3.3, "4.44", decTwo, Date.valueOf("2022-11-22"))
+    checkAnswer(Seq(Array(tuple1, tuple2)).toDF(), Seq(Seq(tuple1, 
tuple2)).toDF())
+
+    // case classes
+    val gbks = Array(GroupByKey(1, 2), GroupByKey(4, 5))
+    checkAnswer(Seq(gbks).toDF(), Row(Array(Row(1, 2), Row(4, 5))))
+
+    // We can move this implicit def to [[SQLImplicits]] when we eventually 
make fully
+    // support for array encoder like Seq and Set
+    // For now cases below, decimal/datetime/interval/binary/nested types, etc,
+    // are not supported by array
+    implicit def newArrayEncoder[T <: Array[_] : TypeTag]: Encoder[T] = 
ExpressionEncoder()
+
+    // decimals
+    val decSpark = Array(decOne, decTwo)
+    val decScala = decSpark.map(_.toBigDecimal)
+    val decJava = decSpark.map(_.toJavaBigDecimal)
+    checkAnswer(Seq(decSpark).toDF(), Row(decJava))
+    checkAnswer(Seq(decScala).toDF(), Row(decJava))
+    checkAnswer(Seq(decJava).toDF(), Row(decJava))
+
+    // datetimes and intervals
+    val dates = strings.map(Date.valueOf)
+    checkAnswer(Seq(dates).toDF(), Row(dates))
+    val localDates = dates.map(d => 
DateTimeUtils.daysToLocalDate(DateTimeUtils.fromJavaDate(d)))
+    checkAnswer(Seq(localDates).toDF(), Row(dates))
+
+    val timestamps =
+      Array(Timestamp.valueOf("2020-04-24 12:34:56"), 
Timestamp.valueOf("2020-04-24 11:22:33"))
+    checkAnswer(Seq(timestamps).toDF(), Row(timestamps))
+    val instants =
+      timestamps.map(t => 
DateTimeUtils.microsToInstant(DateTimeUtils.fromJavaTimestamp(t)))
+    checkAnswer(Seq(instants).toDF(), Row(timestamps))
+
+    val intervals = Array(new CalendarInterval(1, 2, 3), new 
CalendarInterval(4, 5, 6))
+    checkAnswer(Seq(intervals).toDF(), Row(intervals))
+
+    // binary
+    val bins = Array(Array(1.toByte), Array(2.toByte), Array(3.toByte), 
Array(4.toByte))
+    checkAnswer(Seq(bins).toDF(), Row(bins))
+
+    // nested
+    val nestedIntArray = Array(Array(1), Array(2))
+    checkAnswer(Seq(nestedIntArray).toDF(), 
Row(nestedIntArray.map(wrapIntArray)))
+    val nestedDecArray = Array(decSpark)
+    checkAnswer(Seq(nestedDecArray).toDF(), Row(Array(wrapRefArray(decJava))))
+  }
+
+  test("SPARK-24165: CaseWhen/If - nullability of nested types") {
+    val rows = new java.util.ArrayList[Row]()
+    rows.add(Row(true, ("x", 1), Seq("x", "y"), Map(0 -> "x")))
+    rows.add(Row(false, (null, 2), Seq(null, "z"), Map(0 -> null)))
+    val schema = StructType(Seq(
+      StructField("cond", BooleanType, true),
+      StructField("s", StructType(Seq(
+        StructField("val1", StringType, true),
+        StructField("val2", IntegerType, false)
+      )), false),
+      StructField("a", ArrayType(StringType, true)),
+      StructField("m", MapType(IntegerType, StringType, true))
+    ))
+
+    val sourceDF = spark.createDataFrame(rows, schema)
+
+    def structWhenDF: DataFrame = sourceDF
+      .select(when($"cond",
+        struct(lit("a").as("val1"), lit(10).as("val2"))).otherwise($"s") as 
"res")
+      .select($"res".getField("val1"))
+    def arrayWhenDF: DataFrame = sourceDF
+      .select(when($"cond", array(lit("a"), lit("b"))).otherwise($"a") as 
"res")
+      .select($"res".getItem(0))
+    def mapWhenDF: DataFrame = sourceDF
+      .select(when($"cond", map(lit(0), lit("a"))).otherwise($"m") as "res")
+      .select($"res".getItem(0))
+
+    def structIfDF: DataFrame = sourceDF
+      .select(expr("if(cond, struct('a' as val1, 10 as val2), s)") as "res")
+      .select($"res".getField("val1"))
+    def arrayIfDF: DataFrame = sourceDF
+      .select(expr("if(cond, array('a', 'b'), a)") as "res")
+      .select($"res".getItem(0))
+    def mapIfDF: DataFrame = sourceDF
+      .select(expr("if(cond, map(0, 'a'), m)") as "res")
+      .select($"res".getItem(0))
+
+    def checkResult(): Unit = {
+      checkAnswer(structWhenDF, Seq(Row("a"), Row(null)))
+      checkAnswer(arrayWhenDF, Seq(Row("a"), Row(null)))
+      checkAnswer(mapWhenDF, Seq(Row("a"), Row(null)))
+      checkAnswer(structIfDF, Seq(Row("a"), Row(null)))
+      checkAnswer(arrayIfDF, Seq(Row("a"), Row(null)))
+      checkAnswer(mapIfDF, Seq(Row("a"), Row(null)))
+    }
+
+    // Test with local relation, the Project will be evaluated without codegen
+    checkResult()
+    // Test with cached relation, the Project will be evaluated with codegen
+    sourceDF.cache()
+    checkResult()
+  }
 }
 
 class S100(
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameJoinSuite.scala 
b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameJoinSuite.scala
index 4c6f34d3d2dd..01905e2c05fd 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameJoinSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameJoinSuite.scala
@@ -602,4 +602,23 @@ class DataFrameJoinSuite extends QueryTest
       )
     }
   }
+
+  test("SPARK-20359: catalyst outer join optimization should not throw npe") {
+    val df1 = Seq("a", "b", "c").toDF("x")
+      .withColumn("y", udf{ (x: String) => x.substring(0, 1) + "!" 
}.apply($"x"))
+    val df2 = Seq("a", "b").toDF("x1")
+    df1
+      .join(df2, df1("x") === df2("x1"), "left_outer")
+      .filter($"x1".isNotNull || !$"y".isin("a!"))
+      .count()
+  }
+
+  test("SPARK-16181: outer join with isNull filter") {
+    val left = Seq("x").toDF("col")
+    val right = Seq("y").toDF("col").withColumn("new", lit(true))
+    val joined = left.join(right, left("col") === right("col"), "left_outer")
+
+    checkAnswer(joined, Row("x", null, null))
+    checkAnswer(joined.filter($"new".isNull), Row("x", null, null))
+  }
 }
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSelfJoinSuite.scala 
b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSelfJoinSuite.scala
index c777d2207584..7dc40549a17b 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSelfJoinSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSelfJoinSuite.scala
@@ -488,4 +488,14 @@ class DataFrameSelfJoinSuite extends QueryTest with 
SharedSparkSession {
         context = ExpectedContext(fragment = "$", 
getCurrentClassCallSitePattern))
     }
   }
+
+  test("SPARK-20897: cached self-join should not fail") {
+    // force to plan sort merge join
+    withSQLConf(SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> "0") {
+      val df = Seq(1 -> "a").toDF("i", "j")
+      val df1 = df.as("t1")
+      val df2 = df.as("t2")
+      assert(df1.join(df2, $"t1.i" === $"t2.i").cache().count() == 1)
+    }
+  }
 }
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameShowSuite.scala 
b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameShowSuite.scala
new file mode 100644
index 000000000000..e889fe2545af
--- /dev/null
+++ b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameShowSuite.scala
@@ -0,0 +1,487 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql
+
+import java.io.ByteArrayOutputStream
+import java.nio.charset.StandardCharsets
+import java.sql.{Date, Timestamp}
+
+import org.apache.spark.sql.functions.rand
+import org.apache.spark.sql.internal.SQLConf
+import org.apache.spark.sql.test.SharedSparkSession
+
+class DataFrameShowSuite extends QueryTest with SharedSparkSession {
+  import testImplicits._
+
+  ignore("show") {
+    // This test case is intended ignored, but to make sure it compiles 
correctly
+    testData.select($"*").show()
+    testData.select($"*").show(1000)
+  }
+
+
+  test("showString: truncate = [0, 20]") {
+    val longString = Array.fill(21)("1").mkString
+    val df = sparkContext.parallelize(Seq("1", longString)).toDF()
+    val expectedAnswerForFalse = """+---------------------+
+                                   ||value                |
+                                   |+---------------------+
+                                   ||1                    |
+                                   ||111111111111111111111|
+                                   |+---------------------+
+                                   |""".stripMargin
+    assert(df.showString(10, truncate = 0) === expectedAnswerForFalse)
+    val expectedAnswerForTrue = """+--------------------+
+                                  ||               value|
+                                  |+--------------------+
+                                  ||                   1|
+                                  ||11111111111111111...|
+                                  |+--------------------+
+                                  |""".stripMargin
+    assert(df.showString(10, truncate = 20) === expectedAnswerForTrue)
+  }
+
+  test("showString: truncate = [0, 20], vertical = true") {
+    val longString = Array.fill(21)("1").mkString
+    val df = sparkContext.parallelize(Seq("1", longString)).toDF()
+    val expectedAnswerForFalse = "-RECORD 0----------------------\n" +
+                                 " value | 1                     \n" +
+                                 "-RECORD 1----------------------\n" +
+                                 " value | 111111111111111111111 \n"
+    assert(df.showString(10, truncate = 0, vertical = true) === 
expectedAnswerForFalse)
+    val expectedAnswerForTrue = "-RECORD 0---------------------\n" +
+                                " value | 1                    \n" +
+                                "-RECORD 1---------------------\n" +
+                                " value | 11111111111111111... \n"
+    assert(df.showString(10, truncate = 20, vertical = true) === 
expectedAnswerForTrue)
+  }
+
+  test("showString: truncate = [3, 17]") {
+    val longString = Array.fill(21)("1").mkString
+    val df = sparkContext.parallelize(Seq("1", longString)).toDF()
+    val expectedAnswerForFalse = """+-----+
+                                   ||value|
+                                   |+-----+
+                                   ||    1|
+                                   ||  111|
+                                   |+-----+
+                                   |""".stripMargin
+    assert(df.showString(10, truncate = 3) === expectedAnswerForFalse)
+    val expectedAnswerForTrue = """+-----------------+
+                                  ||            value|
+                                  |+-----------------+
+                                  ||                1|
+                                  ||11111111111111...|
+                                  |+-----------------+
+                                  |""".stripMargin
+    assert(df.showString(10, truncate = 17) === expectedAnswerForTrue)
+  }
+
+  test("showString: truncate = [3, 17], vertical = true") {
+    val longString = Array.fill(21)("1").mkString
+    val df = sparkContext.parallelize(Seq("1", longString)).toDF()
+    val expectedAnswerForFalse = "-RECORD 0----\n" +
+                                 " value | 1   \n" +
+                                 "-RECORD 1----\n" +
+                                 " value | 111 \n"
+    assert(df.showString(10, truncate = 3, vertical = true) === 
expectedAnswerForFalse)
+    val expectedAnswerForTrue = "-RECORD 0------------------\n" +
+                                " value | 1                 \n" +
+                                "-RECORD 1------------------\n" +
+                                " value | 11111111111111... \n"
+    assert(df.showString(10, truncate = 17, vertical = true) === 
expectedAnswerForTrue)
+  }
+
+  test("showString(negative)") {
+    val expectedAnswer = """+---+-----+
+                           ||key|value|
+                           |+---+-----+
+                           |+---+-----+
+                           |only showing top 0 rows
+                           |""".stripMargin
+    assert(testData.select($"*").showString(-1) === expectedAnswer)
+  }
+
+  test("showString(negative), vertical = true") {
+    val expectedAnswer = "(0 rows)\n"
+    assert(testData.select($"*").showString(-1, vertical = true) === 
expectedAnswer)
+  }
+
+  test("showString(0)") {
+    val expectedAnswer = """+---+-----+
+                           ||key|value|
+                           |+---+-----+
+                           |+---+-----+
+                           |only showing top 0 rows
+                           |""".stripMargin
+    assert(testData.select($"*").showString(0) === expectedAnswer)
+  }
+
+  test("showString(Int.MaxValue)") {
+    val df = Seq((1, 2), (3, 4)).toDF("a", "b")
+    val expectedAnswer = """+---+---+
+                           ||  a|  b|
+                           |+---+---+
+                           ||  1|  2|
+                           ||  3|  4|
+                           |+---+---+
+                           |""".stripMargin
+    assert(df.showString(Int.MaxValue) === expectedAnswer)
+  }
+
+  test("showString(0), vertical = true") {
+    val expectedAnswer = "(0 rows)\n"
+    assert(testData.select($"*").showString(0, vertical = true) === 
expectedAnswer)
+  }
+
+  test("showString: array") {
+    val df = Seq(
+      (Array(1, 2, 3), Array(1, 2, 3)),
+      (Array(2, 3, 4), Array(2, 3, 4))
+    ).toDF()
+    val expectedAnswer = """+---------+---------+
+                           ||       _1|       _2|
+                           |+---------+---------+
+                           ||[1, 2, 3]|[1, 2, 3]|
+                           ||[2, 3, 4]|[2, 3, 4]|
+                           |+---------+---------+
+                           |""".stripMargin
+    assert(df.showString(10) === expectedAnswer)
+  }
+
+  test("showString: array, vertical = true") {
+    val df = Seq(
+      (Array(1, 2, 3), Array(1, 2, 3)),
+      (Array(2, 3, 4), Array(2, 3, 4))
+    ).toDF()
+    val expectedAnswer = "-RECORD 0--------\n" +
+                         " _1  | [1, 2, 3] \n" +
+                         " _2  | [1, 2, 3] \n" +
+                         "-RECORD 1--------\n" +
+                         " _1  | [2, 3, 4] \n" +
+                         " _2  | [2, 3, 4] \n"
+    assert(df.showString(10, vertical = true) === expectedAnswer)
+  }
+
+  test("showString: binary") {
+    val df = Seq(
+      ("12".getBytes(StandardCharsets.UTF_8), 
"ABC.".getBytes(StandardCharsets.UTF_8)),
+      ("34".getBytes(StandardCharsets.UTF_8), 
"12346".getBytes(StandardCharsets.UTF_8))
+    ).toDF()
+    val expectedAnswer = """+-------+----------------+
+                           ||     _1|              _2|
+                           |+-------+----------------+
+                           ||[31 32]|   [41 42 43 2E]|
+                           ||[33 34]|[31 32 33 34 36]|
+                           |+-------+----------------+
+                           |""".stripMargin
+    assert(df.showString(10) === expectedAnswer)
+  }
+
+  test("showString: binary, vertical = true") {
+    val df = Seq(
+      ("12".getBytes(StandardCharsets.UTF_8), 
"ABC.".getBytes(StandardCharsets.UTF_8)),
+      ("34".getBytes(StandardCharsets.UTF_8), 
"12346".getBytes(StandardCharsets.UTF_8))
+    ).toDF()
+    val expectedAnswer = "-RECORD 0---------------\n" +
+                         " _1  | [31 32]          \n" +
+                         " _2  | [41 42 43 2E]    \n" +
+                         "-RECORD 1---------------\n" +
+                         " _1  | [33 34]          \n" +
+                         " _2  | [31 32 33 34 36] \n"
+    assert(df.showString(10, vertical = true) === expectedAnswer)
+  }
+
+  test("showString: minimum column width") {
+    val df = Seq(
+      (1, 1),
+      (2, 2)
+    ).toDF()
+    val expectedAnswer = """+---+---+
+                           || _1| _2|
+                           |+---+---+
+                           ||  1|  1|
+                           ||  2|  2|
+                           |+---+---+
+                           |""".stripMargin
+    assert(df.showString(10) === expectedAnswer)
+  }
+
+  test("showString: minimum column width, vertical = true") {
+    val df = Seq(
+      (1, 1),
+      (2, 2)
+    ).toDF()
+    val expectedAnswer = "-RECORD 0--\n" +
+                         " _1  | 1   \n" +
+                         " _2  | 1   \n" +
+                         "-RECORD 1--\n" +
+                         " _1  | 2   \n" +
+                         " _2  | 2   \n"
+    assert(df.showString(10, vertical = true) === expectedAnswer)
+  }
+
+  test("SPARK-33690: showString: escape meta-characters") {
+    val df1 = spark.sql("SELECT 
'aaa\nbbb\tccc\rddd\feee\bfff\u000Bggg\u0007hhh'")
+    assert(df1.showString(1, truncate = 0) ===
+      """+--------------------------------------+
+        ||aaa\nbbb\tccc\rddd\feee\bfff\vggg\ahhh|
+        |+--------------------------------------+
+        ||aaa\nbbb\tccc\rddd\feee\bfff\vggg\ahhh|
+        |+--------------------------------------+
+        |""".stripMargin)
+
+    val df2 = spark.sql("SELECT 
array('aaa\nbbb\tccc\rddd\feee\bfff\u000Bggg\u0007hhh')")
+    assert(df2.showString(1, truncate = 0) ===
+      """+---------------------------------------------+
+        ||array(aaa\nbbb\tccc\rddd\feee\bfff\vggg\ahhh)|
+        |+---------------------------------------------+
+        ||[aaa\nbbb\tccc\rddd\feee\bfff\vggg\ahhh]     |
+        |+---------------------------------------------+
+        |""".stripMargin)
+
+    val df3 =
+      spark.sql("SELECT map('aaa\nbbb\tccc', 
'aaa\nbbb\tccc\rddd\feee\bfff\u000Bggg\u0007hhh')")
+    assert(df3.showString(1, truncate = 0) ===
+      """+----------------------------------------------------------+
+        ||map(aaa\nbbb\tccc, aaa\nbbb\tccc\rddd\feee\bfff\vggg\ahhh)|
+        |+----------------------------------------------------------+
+        ||{aaa\nbbb\tccc -> aaa\nbbb\tccc\rddd\feee\bfff\vggg\ahhh} |
+        |+----------------------------------------------------------+
+        |""".stripMargin)
+
+    val df4 =
+      spark.sql("SELECT named_struct('v', 
'aaa\nbbb\tccc\rddd\feee\bfff\u000Bggg\u0007hhh')")
+    assert(df4.showString(1, truncate = 0) ===
+      """+-------------------------------------------------------+
+        ||named_struct(v, aaa\nbbb\tccc\rddd\feee\bfff\vggg\ahhh)|
+        |+-------------------------------------------------------+
+        ||{aaa\nbbb\tccc\rddd\feee\bfff\vggg\ahhh}               |
+        |+-------------------------------------------------------+
+        |""".stripMargin)
+  }
+
+  test("SPARK-7319 showString") {
+    val expectedAnswer = """+---+-----+
+                           ||key|value|
+                           |+---+-----+
+                           ||  1|    1|
+                           |+---+-----+
+                           |only showing top 1 row
+                           |""".stripMargin
+    assert(testData.select($"*").showString(1) === expectedAnswer)
+  }
+
+  test("SPARK-7319 showString, vertical = true") {
+    val expectedAnswer = "-RECORD 0----\n" +
+                         " key   | 1   \n" +
+                         " value | 1   \n" +
+                         "only showing top 1 row\n"
+    assert(testData.select($"*").showString(1, vertical = true) === 
expectedAnswer)
+  }
+
+  test("SPARK-23023 Cast rows to strings in showString") {
+    val df1 = Seq(Seq(1, 2, 3, 4)).toDF("a")
+    assert(df1.showString(10) ===
+      s"""+------------+
+         ||           a|
+         |+------------+
+         ||[1, 2, 3, 4]|
+         |+------------+
+         |""".stripMargin)
+    val df2 = Seq(Map(1 -> "a", 2 -> "b")).toDF("a")
+    assert(df2.showString(10) ===
+      s"""+----------------+
+         ||               a|
+         |+----------------+
+         ||{1 -> a, 2 -> b}|
+         |+----------------+
+         |""".stripMargin)
+    val df3 = Seq(((1, "a"), 0), ((2, "b"), 0)).toDF("a", "b")
+    assert(df3.showString(10) ===
+      s"""+------+---+
+         ||     a|  b|
+         |+------+---+
+         ||{1, a}|  0|
+         ||{2, b}|  0|
+         |+------+---+
+         |""".stripMargin)
+  }
+
+  test("SPARK-7327 show with empty dataFrame") {
+    val expectedAnswer = """+---+-----+
+                           ||key|value|
+                           |+---+-----+
+                           |+---+-----+
+                           |""".stripMargin
+    assert(testData.select($"*").filter($"key" < 0).showString(1) === 
expectedAnswer)
+  }
+
+  test("SPARK-7327 show with empty dataFrame, vertical = true") {
+    assert(testData.select($"*").filter($"key" < 0).showString(1, vertical = 
true) === "(0 rows)\n")
+  }
+
+  test("SPARK-18350 show with session local timezone") {
+    val d = Date.valueOf("2016-12-01")
+    val ts = Timestamp.valueOf("2016-12-01 00:00:00")
+    val df = Seq((d, ts)).toDF("d", "ts")
+    val expectedAnswer = """+----------+-------------------+
+                           ||d         |ts                 |
+                           |+----------+-------------------+
+                           ||2016-12-01|2016-12-01 00:00:00|
+                           |+----------+-------------------+
+                           |""".stripMargin
+    assert(df.showString(1, truncate = 0) === expectedAnswer)
+
+    withSQLConf(SQLConf.SESSION_LOCAL_TIMEZONE.key -> "UTC") {
+
+      val expectedAnswer = """+----------+-------------------+
+                             ||d         |ts                 |
+                             |+----------+-------------------+
+                             ||2016-12-01|2016-12-01 08:00:00|
+                             |+----------+-------------------+
+                             |""".stripMargin
+      assert(df.showString(1, truncate = 0) === expectedAnswer)
+    }
+  }
+
+  test("SPARK-18350 show with session local timezone, vertical = true") {
+    val d = Date.valueOf("2016-12-01")
+    val ts = Timestamp.valueOf("2016-12-01 00:00:00")
+    val df = Seq((d, ts)).toDF("d", "ts")
+    val expectedAnswer = "-RECORD 0------------------\n" +
+                         " d   | 2016-12-01          \n" +
+                         " ts  | 2016-12-01 00:00:00 \n"
+    assert(df.showString(1, truncate = 0, vertical = true) === expectedAnswer)
+
+    withSQLConf(SQLConf.SESSION_LOCAL_TIMEZONE.key -> "UTC") {
+
+      val expectedAnswer = "-RECORD 0------------------\n" +
+                           " d   | 2016-12-01          \n" +
+                           " ts  | 2016-12-01 08:00:00 \n"
+      assert(df.showString(1, truncate = 0, vertical = true) === 
expectedAnswer)
+    }
+  }
+
+  test("SPARK-8608: call `show` on local DataFrame with random columns should 
return same value") {
+    val df = testData.select(rand(33))
+    assert(df.showString(5) == df.showString(5))
+
+    // We will reuse the same Expression object for LocalRelation.
+    val df1 = (1 to 10).map(Tuple1.apply).toDF().select(rand(33))
+    assert(df1.showString(5) == df1.showString(5))
+  }
+
+  test("dataframe toString") {
+    assert(testData.toString === "[key: int, value: string]")
+    assert(testData("key").toString === "key")
+    assert($"test".toString === "test")
+  }
+
+  test("SPARK-12398 truncated toString") {
+    val df1 = Seq((1L, "row1")).toDF("id", "name")
+    assert(df1.toString() === "[id: bigint, name: string]")
+
+    val df2 = Seq((1L, "c2", false)).toDF("c1", "c2", "c3")
+    assert(df2.toString === "[c1: bigint, c2: string ... 1 more field]")
+
+    val df3 = Seq((1L, "c2", false, 10)).toDF("c1", "c2", "c3", "c4")
+    assert(df3.toString === "[c1: bigint, c2: string ... 2 more fields]")
+
+    val df4 = Seq((1L, Tuple2(1L, "val"))).toDF("c1", "c2")
+    assert(df4.toString === "[c1: bigint, c2: struct<_1: bigint, _2: string>]")
+
+    val df5 = Seq((1L, Tuple2(1L, "val"), 20.0)).toDF("c1", "c2", "c3")
+    assert(df5.toString === "[c1: bigint, c2: struct<_1: bigint, _2: string> 
... 1 more field]")
+
+    val df6 = Seq((1L, Tuple2(1L, "val"), 20.0, 1)).toDF("c1", "c2", "c3", 
"c4")
+    assert(df6.toString === "[c1: bigint, c2: struct<_1: bigint, _2: string> 
... 2 more fields]")
+
+    val df7 = Seq((1L, Tuple3(1L, "val", 2), 20.0, 1)).toDF("c1", "c2", "c3", 
"c4")
+    assert(
+      df7.toString ===
+        "[c1: bigint, c2: struct<_1: bigint, _2: string ... 1 more field> ... 
2 more fields]")
+
+    val df8 = Seq((1L, Tuple7(1L, "val", 2, 3, 4, 5, 6), 20.0, 1)).toDF("c1", 
"c2", "c3", "c4")
+    assert(
+      df8.toString ===
+        "[c1: bigint, c2: struct<_1: bigint, _2: string ... 5 more fields> ... 
2 more fields]")
+
+    val df9 =
+      Seq((1L, Tuple4(1L, Tuple4(1L, 2L, 3L, 4L), 2L, 3L), 20.0, 
1)).toDF("c1", "c2", "c3", "c4")
+    assert(
+      df9.toString ===
+        "[c1: bigint, c2: struct<_1: bigint," +
+          " _2: struct<_1: bigint," +
+          " _2: bigint ... 2 more fields> ... 2 more fields> ... 2 more 
fields]")
+
+  }
+
+  test("SPARK-34308: printSchema: escape meta-characters") {
+    val captured = new ByteArrayOutputStream()
+
+    val df1 = spark.sql("SELECT 
'aaa\nbbb\tccc\rddd\feee\bfff\u000Bggg\u0007hhh'")
+    Console.withOut(captured) {
+      df1.printSchema()
+    }
+    assert(captured.toString ===
+      """root
+        | |-- aaa\nbbb\tccc\rddd\feee\bfff\vggg\ahhh: string (nullable = false)
+        |
+        |""".stripMargin)
+    captured.reset()
+
+    val df2 = spark.sql("SELECT 
array('aaa\nbbb\tccc\rddd\feee\bfff\u000Bggg\u0007hhh')")
+    Console.withOut(captured) {
+      df2.printSchema()
+    }
+    assert(captured.toString ===
+      """root
+        | |-- array(aaa\nbbb\tccc\rddd\feee\bfff\vggg\ahhh): array (nullable = 
false)
+        | |    |-- element: string (containsNull = false)
+        |
+        |""".stripMargin)
+    captured.reset()
+
+    val df3 =
+      spark.sql("SELECT map('aaa\nbbb\tccc', 
'aaa\nbbb\tccc\rddd\feee\bfff\u000Bggg\u0007hhh')")
+    Console.withOut(captured) {
+      df3.printSchema()
+    }
+    assert(captured.toString ===
+      """root
+        | |-- map(aaa\nbbb\tccc, aaa\nbbb\tccc\rddd\feee\bfff\vggg\ahhh): map 
(nullable = false)
+        | |    |-- key: string
+        | |    |-- value: string (valueContainsNull = false)
+        |
+        |""".stripMargin)
+    captured.reset()
+
+    val df4 =
+      spark.sql("SELECT named_struct('v', 
'aaa\nbbb\tccc\rddd\feee\bfff\u000Bggg\u0007hhh')")
+    Console.withOut(captured) {
+      df4.printSchema()
+    }
+    assert(captured.toString ===
+      """root
+        | |-- named_struct(v, aaa\nbbb\tccc\rddd\feee\bfff\vggg\ahhh): struct 
(nullable = false)
+        | |    |-- v: string (nullable = false)
+        |
+        |""".stripMargin)
+  }
+}
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameStatSuite.scala 
b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameStatSuite.scala
index 8b53edb0d3c1..8eee8fc37661 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameStatSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameStatSuite.scala
@@ -21,6 +21,7 @@ import java.util.Random
 
 import org.scalatest.matchers.must.Matchers._
 
+import org.apache.spark.SparkIllegalArgumentException
 import org.apache.spark.internal.Logging
 import org.apache.spark.sql.execution.stat.StatFunctions
 import org.apache.spark.sql.functions.{col, lit, struct, when}
@@ -540,6 +541,70 @@ class DataFrameStatSuite extends QueryTest with 
SharedSparkSession {
     assert(filter4.bitSize() == 64 * 5)
     assert(0.until(1000).forall(i => filter4.mightContain(i * 3)))
   }
+
+  test("SPARK-34165: Add count_distinct to summary") {
+    val person3: DataFrame = Seq(
+      ("Luis", 1, 99),
+      ("Luis", 16, 99),
+      ("Luis", 16, 176),
+      ("Fernando", 32, 99),
+      ("Fernando", 32, 164),
+      ("David", 60, 99),
+      ("Amy", 24, 99)).toDF("name", "age", "height")
+    val summaryDF = person3.summary("count", "count_distinct")
+
+    val summaryResult = Seq(
+      Row("count", "7", "7", "7"),
+      Row("count_distinct", "4", "5", "3"))
+
+    def getSchemaAsSeq(df: DataFrame): Seq[String] = df.schema.map(_.name)
+    assert(getSchemaAsSeq(summaryDF) === Seq("summary", "name", "age", 
"height"))
+    checkAnswer(summaryDF, summaryResult)
+
+    val approxSummaryDF = person3.summary("count", "approx_count_distinct")
+    val approxSummaryResult = Seq(
+      Row("count", "7", "7", "7"),
+      Row("approx_count_distinct", "4", "5", "3"))
+    assert(getSchemaAsSeq(summaryDF) === Seq("summary", "name", "age", 
"height"))
+    checkAnswer(approxSummaryDF, approxSummaryResult)
+  }
+
+  test("summary advanced") {
+    import org.apache.spark.util.ArrayImplicits._
+    val person2: DataFrame = Seq(
+      ("Bob", 16, 176),
+      ("Alice", 32, 164),
+      ("David", 60, 192),
+      ("Amy", 24, 180)).toDF("name", "age", "height")
+
+    val stats = Array("count", "50.01%", "max", "mean", "min", "25%")
+    val orderMatters = person2.summary(stats.toImmutableArraySeq: _*)
+    assert(orderMatters.collect().map(_.getString(0)) === stats)
+
+    val onlyPercentiles = person2.summary("0.1%", "99.9%")
+    assert(onlyPercentiles.count() === 2)
+
+    checkError(
+      exception = intercept[SparkIllegalArgumentException] {
+        person2.summary("foo")
+      },
+      errorClass = "_LEGACY_ERROR_TEMP_2114",
+      parameters = Map("stats" -> "foo")
+    )
+
+    checkError(
+      exception = intercept[SparkIllegalArgumentException] {
+        person2.summary("foo%")
+      },
+      errorClass = "_LEGACY_ERROR_TEMP_2113",
+      parameters = Map("stats" -> "foo%")
+    )
+  }
+
+  test("SPARK-19691 Calculating percentile of decimal column fails with 
ClassCastException") {
+    val df = spark.range(1).selectExpr("CAST(id as DECIMAL) as 
x").selectExpr("percentile(x, 0.5)")
+    checkAnswer(df, Row(BigDecimal(0)) :: Nil)
+  }
 }
 
 
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala 
b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala
index 8ef95e6fd129..24a548fa7f81 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala
@@ -20,28 +20,22 @@ package org.apache.spark.sql
 import java.io.{ByteArrayOutputStream, File}
 import java.lang.{Long => JLong}
 import java.nio.charset.StandardCharsets
-import java.sql.{Date, Timestamp}
-import java.util.{Locale, UUID}
-import java.util.concurrent.atomic.AtomicLong
+import java.util.Locale
 
 import scala.collection.immutable.ListMap
-import scala.reflect.runtime.universe.TypeTag
 import scala.util.Random
 
 import org.scalatest.matchers.should.Matchers._
 
-import org.apache.spark.{SparkException, SparkIllegalArgumentException}
+import org.apache.spark.SparkException
 import org.apache.spark.api.python.PythonEvalType
-import org.apache.spark.scheduler.{SparkListener, SparkListenerJobEnd}
 import org.apache.spark.sql.catalyst.{InternalRow, TableIdentifier}
 import org.apache.spark.sql.catalyst.analysis.MultiInstanceRelation
 import org.apache.spark.sql.catalyst.encoders.ExpressionEncoder
-import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeMap, 
AttributeReference, Cast, EqualTo, ExpressionSet, GreaterThan, Literal, 
PythonUDF, ScalarSubquery, Uuid}
+import org.apache.spark.sql.catalyst.expressions.{Attribute, 
AttributeReference, Cast, EqualTo, ExpressionSet, GreaterThan, Literal, 
PythonUDF, ScalarSubquery, Uuid}
 import org.apache.spark.sql.catalyst.optimizer.ConvertToLocalRelation
 import org.apache.spark.sql.catalyst.parser.ParseException
-import org.apache.spark.sql.catalyst.plans.logical.{ColumnStat, Filter, 
LeafNode, LocalRelation, LogicalPlan, OneRowRelation, Statistics}
-import org.apache.spark.sql.catalyst.util.DateTimeUtils
-import org.apache.spark.sql.catalyst.util.HadoopCompressionCodec.GZIP
+import org.apache.spark.sql.catalyst.plans.logical.{Filter, LeafNode, 
LocalRelation, LogicalPlan, OneRowRelation}
 import org.apache.spark.sql.connector.FakeV2Provider
 import org.apache.spark.sql.execution.{FilterExec, LogicalRDD, QueryExecution, 
SortExec, WholeStageCodegenExec}
 import org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanHelper
@@ -51,12 +45,11 @@ import org.apache.spark.sql.expressions.{Aggregator, Window}
 import org.apache.spark.sql.functions._
 import org.apache.spark.sql.internal.SQLConf
 import org.apache.spark.sql.test.{ExamplePoint, ExamplePointUDT, 
SharedSparkSession}
-import org.apache.spark.sql.test.SQLTestData.{ArrayStringWrapper, 
ContainerStringWrapper, DecimalData, StringWrapper, TestData2}
+import org.apache.spark.sql.test.SQLTestData.{ArrayStringWrapper, 
ContainerStringWrapper, StringWrapper, TestData2}
 import org.apache.spark.sql.types._
 import org.apache.spark.tags.SlowSQLTest
 import org.apache.spark.unsafe.types.CalendarInterval
 import org.apache.spark.util.ArrayImplicits._
-import org.apache.spark.util.Utils
 
 @SlowSQLTest
 class DataFrameSuite extends QueryTest
@@ -77,12 +70,6 @@ class DataFrameSuite extends QueryTest
     }
   }
 
-  test("dataframe toString") {
-    assert(testData.toString === "[key: int, value: string]")
-    assert(testData("key").toString === "key")
-    assert($"test".toString === "test")
-  }
-
   test("rename nested groupby") {
     val df = Seq((1, (1, 1))).toDF()
 
@@ -91,14 +78,6 @@ class DataFrameSuite extends QueryTest
       Row(1, 1) :: Nil)
   }
 
-  test("access complex data") {
-    assert(complexData.filter(complexData("a").getItem(0) === 2).count() == 1)
-    if (!conf.ansiEnabled) {
-      assert(complexData.filter(complexData("m").getItem("1") === 1).count() 
== 1)
-    }
-    assert(complexData.filter(complexData("s").getField("key") === 1).count() 
== 1)
-  }
-
   test("table scan") {
     checkAnswer(
       testData,
@@ -204,32 +183,6 @@ class DataFrameSuite extends QueryTest
       structDf.select(xxhash64($"a", $"record.*")))
   }
 
-  private def assertDecimalSumOverflow(
-      df: DataFrame, ansiEnabled: Boolean, expectedAnswer: Row): Unit = {
-    if (!ansiEnabled) {
-      checkAnswer(df, expectedAnswer)
-    } else {
-      val e = intercept[ArithmeticException] {
-        df.collect()
-      }
-      assert(e.getMessage.contains("cannot be represented as Decimal") ||
-        e.getMessage.contains("Overflow in sum of decimals"))
-    }
-  }
-
-  test("SPARK-28224: Aggregate sum big decimal overflow") {
-    val largeDecimals = spark.sparkContext.parallelize(
-      DecimalData(BigDecimal("1"* 20 + ".123"), BigDecimal("1"* 20 + ".123")) 
::
-        DecimalData(BigDecimal("9"* 20 + ".123"), BigDecimal("9"* 20 + 
".123")) :: Nil).toDF()
-
-    Seq(true, false).foreach { ansiEnabled =>
-      withSQLConf((SQLConf.ANSI_ENABLED.key, ansiEnabled.toString)) {
-        val structDf = largeDecimals.select("a").agg(sum("a"))
-        assertDecimalSumOverflow(structDf, ansiEnabled, Row(null))
-      }
-    }
-  }
-
   test("SPARK-28067: sum of null decimal values") {
     Seq("true", "false").foreach { wholeStageEnabled =>
       withSQLConf((SQLConf.WHOLESTAGE_CODEGEN_ENABLED.key, wholeStageEnabled)) 
{
@@ -243,85 +196,6 @@ class DataFrameSuite extends QueryTest
     }
   }
 
-  def checkAggResultsForDecimalOverflow(aggFn: Column => Column): Unit = {
-    Seq("true", "false").foreach { wholeStageEnabled =>
-      withSQLConf((SQLConf.WHOLESTAGE_CODEGEN_ENABLED.key, wholeStageEnabled)) 
{
-        Seq(true, false).foreach { ansiEnabled =>
-          withSQLConf((SQLConf.ANSI_ENABLED.key, ansiEnabled.toString)) {
-            val df0 = Seq(
-              (BigDecimal("10000000000000000000"), 1),
-              (BigDecimal("10000000000000000000"), 1),
-              (BigDecimal("10000000000000000000"), 2)).toDF("decNum", "intNum")
-            val df1 = Seq(
-              (BigDecimal("10000000000000000000"), 2),
-              (BigDecimal("10000000000000000000"), 2),
-              (BigDecimal("10000000000000000000"), 2),
-              (BigDecimal("10000000000000000000"), 2),
-              (BigDecimal("10000000000000000000"), 2),
-              (BigDecimal("10000000000000000000"), 2),
-              (BigDecimal("10000000000000000000"), 2),
-              (BigDecimal("10000000000000000000"), 2),
-              (BigDecimal("10000000000000000000"), 2)).toDF("decNum", "intNum")
-            val df = df0.union(df1)
-            val df2 = df.withColumnRenamed("decNum", "decNum2").
-              join(df, "intNum").agg(aggFn($"decNum"))
-
-            val expectedAnswer = Row(null)
-            assertDecimalSumOverflow(df2, ansiEnabled, expectedAnswer)
-
-            val decStr = "1" + "0" * 19
-            val d1 = spark.range(0, 12, 1, 1)
-            val d2 = d1.select(expr(s"cast('$decStr' as decimal (38, 18)) as 
d")).agg(aggFn($"d"))
-            assertDecimalSumOverflow(d2, ansiEnabled, expectedAnswer)
-
-            val d3 = spark.range(0, 1, 1, 1).union(spark.range(0, 11, 1, 1))
-            val d4 = d3.select(expr(s"cast('$decStr' as decimal (38, 18)) as 
d")).agg(aggFn($"d"))
-            assertDecimalSumOverflow(d4, ansiEnabled, expectedAnswer)
-
-            val d5 = d3.select(expr(s"cast('$decStr' as decimal (38, 18)) as 
d"),
-              
lit(1).as("key")).groupBy("key").agg(aggFn($"d").alias("aggd")).select($"aggd")
-            assertDecimalSumOverflow(d5, ansiEnabled, expectedAnswer)
-
-            val nullsDf = spark.range(1, 4, 1).select(expr(s"cast(null as 
decimal(38,18)) as d"))
-
-            val largeDecimals = Seq(BigDecimal("1"* 20 + ".123"), 
BigDecimal("9"* 20 + ".123")).
-              toDF("d")
-            assertDecimalSumOverflow(
-              nullsDf.union(largeDecimals).agg(aggFn($"d")), ansiEnabled, 
expectedAnswer)
-
-            val df3 = Seq(
-              (BigDecimal("10000000000000000000"), 1),
-              (BigDecimal("50000000000000000000"), 1),
-              (BigDecimal("10000000000000000000"), 2)).toDF("decNum", "intNum")
-
-            val df4 = Seq(
-              (BigDecimal("10000000000000000000"), 1),
-              (BigDecimal("10000000000000000000"), 1),
-              (BigDecimal("10000000000000000000"), 2)).toDF("decNum", "intNum")
-
-            val df5 = Seq(
-              (BigDecimal("10000000000000000000"), 1),
-              (BigDecimal("10000000000000000000"), 1),
-              (BigDecimal("20000000000000000000"), 2)).toDF("decNum", "intNum")
-
-            val df6 = df3.union(df4).union(df5)
-            val df7 = df6.groupBy("intNum").agg(sum("decNum"), 
countDistinct("decNum")).
-              filter("intNum == 1")
-            assertDecimalSumOverflow(df7, ansiEnabled, Row(1, null, 2))
-          }
-        }
-      }
-    }
-  }
-
-  test("SPARK-28067: Aggregate sum should not return wrong results for decimal 
overflow") {
-    checkAggResultsForDecimalOverflow(c => sum(c))
-  }
-
-  test("SPARK-35955: Aggregate avg should not return wrong results for decimal 
overflow") {
-    checkAggResultsForDecimalOverflow(c => avg(c))
-  }
-
   test("Star Expansion - ds.explode should fail with a meaningful message if 
it takes a star") {
     val df = Seq(("1", "1,2"), ("2", "4"), ("3", "7,8,9")).toDF("prefix", 
"csv")
     val e = intercept[AnalysisException] {
@@ -663,24 +537,6 @@ class DataFrameSuite extends QueryTest
       testData.take(15).drop(5).toSeq)
   }
 
-  test("udf") {
-    val foo = udf((a: Int, b: String) => a.toString + b)
-
-    checkAnswer(
-      // SELECT *, foo(key, value) FROM testData
-      testData.select($"*", foo($"key", $"value")).limit(3),
-      Row(1, "1", "11") :: Row(2, "2", "22") :: Row(3, "3", "33") :: Nil
-    )
-  }
-
-  test("callUDF without Hive Support") {
-    val df = Seq(("id1", 1), ("id2", 4), ("id3", 5)).toDF("id", "value")
-    df.sparkSession.udf.register("simpleUDF", (v: Int) => v * v)
-    checkAnswer(
-      df.select($"id", callUDF("simpleUDF", $"value")), // test deprecated one
-      Row("id1", 1) :: Row("id2", 16) :: Row("id3", 25) :: Nil)
-  }
-
   test("withColumn") {
     val df = testData.toDF().withColumn("newCol", col("key") + 1)
     checkAnswer(
@@ -1048,15 +904,6 @@ class DataFrameSuite extends QueryTest
     ("David", 60, 192),
     ("Amy", 24, 180)).toDF("name", "age", "height")
 
-  private lazy val person3: DataFrame = Seq(
-    ("Luis", 1, 99),
-    ("Luis", 16, 99),
-    ("Luis", 16, 176),
-    ("Fernando", 32, 99),
-    ("Fernando", 32, 164),
-    ("David", 60, 99),
-    ("Amy", 24, 99)).toDF("name", "age", "height")
-
   test("describe") {
     val describeResult = Seq(
       Row("count", "4", "4", "4"),
@@ -1156,25 +1003,6 @@ class DataFrameSuite extends QueryTest
     }
   }
 
-  test("SPARK-34165: Add count_distinct to summary") {
-    val summaryDF = person3.summary("count", "count_distinct")
-
-    val summaryResult = Seq(
-      Row("count", "7", "7", "7"),
-      Row("count_distinct", "4", "5", "3"))
-
-    def getSchemaAsSeq(df: DataFrame): Seq[String] = df.schema.map(_.name)
-    assert(getSchemaAsSeq(summaryDF) === Seq("summary", "name", "age", 
"height"))
-    checkAnswer(summaryDF, summaryResult)
-
-    val approxSummaryDF = person3.summary("count", "approx_count_distinct")
-    val approxSummaryResult = Seq(
-      Row("count", "7", "7", "7"),
-      Row("approx_count_distinct", "4", "5", "3"))
-    assert(getSchemaAsSeq(summaryDF) === Seq("summary", "name", "age", 
"height"))
-    checkAnswer(approxSummaryDF, approxSummaryResult)
-  }
-
   test("SPARK-41391: Correct the output column name of 
groupBy.agg(count_distinct)") {
     withTempView("person") {
       person.createOrReplaceTempView("person")
@@ -1187,32 +1015,6 @@ class DataFrameSuite extends QueryTest
     }
   }
 
-  test("summary advanced") {
-    import org.apache.spark.util.ArrayImplicits._
-    val stats = Array("count", "50.01%", "max", "mean", "min", "25%")
-    val orderMatters = person2.summary(stats.toImmutableArraySeq: _*)
-    assert(orderMatters.collect().map(_.getString(0)) === stats)
-
-    val onlyPercentiles = person2.summary("0.1%", "99.9%")
-    assert(onlyPercentiles.count() === 2)
-
-    checkError(
-      exception = intercept[SparkIllegalArgumentException] {
-        person2.summary("foo")
-      },
-      errorClass = "_LEGACY_ERROR_TEMP_2114",
-      parameters = Map("stats" -> "foo")
-    )
-
-    checkError(
-      exception = intercept[SparkIllegalArgumentException] {
-        person2.summary("foo%")
-      },
-      errorClass = "_LEGACY_ERROR_TEMP_2113",
-      parameters = Map("stats" -> "foo%")
-    )
-  }
-
   test("apply on query results (SPARK-5462)") {
     val df = testData.sparkSession.sql("select key from testData")
     checkAnswer(df.select(df("key")), testData.select("key").collect().toSeq)
@@ -1242,12 +1044,6 @@ class DataFrameSuite extends QueryTest
     }
   }
 
-  ignore("show") {
-    // This test case is intended ignored, but to make sure it compiles 
correctly
-    testData.select($"*").show()
-    testData.select($"*").show(1000)
-  }
-
   test("getRows: truncate = [0, 20]") {
     val longString = Array.fill(21)("1").mkString
     val df = sparkContext.parallelize(Seq("1", longString)).toDF()
@@ -1307,402 +1103,6 @@ class DataFrameSuite extends QueryTest
     assert(df.getRows(10, 20) === expectedAnswer)
   }
 
-  test("showString: truncate = [0, 20]") {
-    val longString = Array.fill(21)("1").mkString
-    val df = sparkContext.parallelize(Seq("1", longString)).toDF()
-    val expectedAnswerForFalse = """+---------------------+
-                                   ||value                |
-                                   |+---------------------+
-                                   ||1                    |
-                                   ||111111111111111111111|
-                                   |+---------------------+
-                                   |""".stripMargin
-    assert(df.showString(10, truncate = 0) === expectedAnswerForFalse)
-    val expectedAnswerForTrue = """+--------------------+
-                                  ||               value|
-                                  |+--------------------+
-                                  ||                   1|
-                                  ||11111111111111111...|
-                                  |+--------------------+
-                                  |""".stripMargin
-    assert(df.showString(10, truncate = 20) === expectedAnswerForTrue)
-  }
-
-  test("showString: truncate = [0, 20], vertical = true") {
-    val longString = Array.fill(21)("1").mkString
-    val df = sparkContext.parallelize(Seq("1", longString)).toDF()
-    val expectedAnswerForFalse = "-RECORD 0----------------------\n" +
-                                 " value | 1                     \n" +
-                                 "-RECORD 1----------------------\n" +
-                                 " value | 111111111111111111111 \n"
-    assert(df.showString(10, truncate = 0, vertical = true) === 
expectedAnswerForFalse)
-    val expectedAnswerForTrue = "-RECORD 0---------------------\n" +
-                                " value | 1                    \n" +
-                                "-RECORD 1---------------------\n" +
-                                " value | 11111111111111111... \n"
-    assert(df.showString(10, truncate = 20, vertical = true) === 
expectedAnswerForTrue)
-  }
-
-  test("showString: truncate = [3, 17]") {
-    val longString = Array.fill(21)("1").mkString
-    val df = sparkContext.parallelize(Seq("1", longString)).toDF()
-    val expectedAnswerForFalse = """+-----+
-                                   ||value|
-                                   |+-----+
-                                   ||    1|
-                                   ||  111|
-                                   |+-----+
-                                   |""".stripMargin
-    assert(df.showString(10, truncate = 3) === expectedAnswerForFalse)
-    val expectedAnswerForTrue = """+-----------------+
-                                  ||            value|
-                                  |+-----------------+
-                                  ||                1|
-                                  ||11111111111111...|
-                                  |+-----------------+
-                                  |""".stripMargin
-    assert(df.showString(10, truncate = 17) === expectedAnswerForTrue)
-  }
-
-  test("showString: truncate = [3, 17], vertical = true") {
-    val longString = Array.fill(21)("1").mkString
-    val df = sparkContext.parallelize(Seq("1", longString)).toDF()
-    val expectedAnswerForFalse = "-RECORD 0----\n" +
-                                 " value | 1   \n" +
-                                 "-RECORD 1----\n" +
-                                 " value | 111 \n"
-    assert(df.showString(10, truncate = 3, vertical = true) === 
expectedAnswerForFalse)
-    val expectedAnswerForTrue = "-RECORD 0------------------\n" +
-                                " value | 1                 \n" +
-                                "-RECORD 1------------------\n" +
-                                " value | 11111111111111... \n"
-    assert(df.showString(10, truncate = 17, vertical = true) === 
expectedAnswerForTrue)
-  }
-
-  test("showString(negative)") {
-    val expectedAnswer = """+---+-----+
-                           ||key|value|
-                           |+---+-----+
-                           |+---+-----+
-                           |only showing top 0 rows
-                           |""".stripMargin
-    assert(testData.select($"*").showString(-1) === expectedAnswer)
-  }
-
-  test("showString(negative), vertical = true") {
-    val expectedAnswer = "(0 rows)\n"
-    assert(testData.select($"*").showString(-1, vertical = true) === 
expectedAnswer)
-  }
-
-  test("showString(0)") {
-    val expectedAnswer = """+---+-----+
-                           ||key|value|
-                           |+---+-----+
-                           |+---+-----+
-                           |only showing top 0 rows
-                           |""".stripMargin
-    assert(testData.select($"*").showString(0) === expectedAnswer)
-  }
-
-  test("showString(Int.MaxValue)") {
-    val df = Seq((1, 2), (3, 4)).toDF("a", "b")
-    val expectedAnswer = """+---+---+
-                           ||  a|  b|
-                           |+---+---+
-                           ||  1|  2|
-                           ||  3|  4|
-                           |+---+---+
-                           |""".stripMargin
-    assert(df.showString(Int.MaxValue) === expectedAnswer)
-  }
-
-  test("showString(0), vertical = true") {
-    val expectedAnswer = "(0 rows)\n"
-    assert(testData.select($"*").showString(0, vertical = true) === 
expectedAnswer)
-  }
-
-  test("showString: array") {
-    val df = Seq(
-      (Array(1, 2, 3), Array(1, 2, 3)),
-      (Array(2, 3, 4), Array(2, 3, 4))
-    ).toDF()
-    val expectedAnswer = """+---------+---------+
-                           ||       _1|       _2|
-                           |+---------+---------+
-                           ||[1, 2, 3]|[1, 2, 3]|
-                           ||[2, 3, 4]|[2, 3, 4]|
-                           |+---------+---------+
-                           |""".stripMargin
-    assert(df.showString(10) === expectedAnswer)
-  }
-
-  test("showString: array, vertical = true") {
-    val df = Seq(
-      (Array(1, 2, 3), Array(1, 2, 3)),
-      (Array(2, 3, 4), Array(2, 3, 4))
-    ).toDF()
-    val expectedAnswer = "-RECORD 0--------\n" +
-                         " _1  | [1, 2, 3] \n" +
-                         " _2  | [1, 2, 3] \n" +
-                         "-RECORD 1--------\n" +
-                         " _1  | [2, 3, 4] \n" +
-                         " _2  | [2, 3, 4] \n"
-    assert(df.showString(10, vertical = true) === expectedAnswer)
-  }
-
-  test("showString: binary") {
-    val df = Seq(
-      ("12".getBytes(StandardCharsets.UTF_8), 
"ABC.".getBytes(StandardCharsets.UTF_8)),
-      ("34".getBytes(StandardCharsets.UTF_8), 
"12346".getBytes(StandardCharsets.UTF_8))
-    ).toDF()
-    val expectedAnswer = """+-------+----------------+
-                           ||     _1|              _2|
-                           |+-------+----------------+
-                           ||[31 32]|   [41 42 43 2E]|
-                           ||[33 34]|[31 32 33 34 36]|
-                           |+-------+----------------+
-                           |""".stripMargin
-    assert(df.showString(10) === expectedAnswer)
-  }
-
-  test("showString: binary, vertical = true") {
-    val df = Seq(
-      ("12".getBytes(StandardCharsets.UTF_8), 
"ABC.".getBytes(StandardCharsets.UTF_8)),
-      ("34".getBytes(StandardCharsets.UTF_8), 
"12346".getBytes(StandardCharsets.UTF_8))
-    ).toDF()
-    val expectedAnswer = "-RECORD 0---------------\n" +
-                         " _1  | [31 32]          \n" +
-                         " _2  | [41 42 43 2E]    \n" +
-                         "-RECORD 1---------------\n" +
-                         " _1  | [33 34]          \n" +
-                         " _2  | [31 32 33 34 36] \n"
-    assert(df.showString(10, vertical = true) === expectedAnswer)
-  }
-
-  test("showString: minimum column width") {
-    val df = Seq(
-      (1, 1),
-      (2, 2)
-    ).toDF()
-    val expectedAnswer = """+---+---+
-                           || _1| _2|
-                           |+---+---+
-                           ||  1|  1|
-                           ||  2|  2|
-                           |+---+---+
-                           |""".stripMargin
-    assert(df.showString(10) === expectedAnswer)
-  }
-
-  test("showString: minimum column width, vertical = true") {
-    val df = Seq(
-      (1, 1),
-      (2, 2)
-    ).toDF()
-    val expectedAnswer = "-RECORD 0--\n" +
-                         " _1  | 1   \n" +
-                         " _2  | 1   \n" +
-                         "-RECORD 1--\n" +
-                         " _1  | 2   \n" +
-                         " _2  | 2   \n"
-    assert(df.showString(10, vertical = true) === expectedAnswer)
-  }
-
-  test("SPARK-33690: showString: escape meta-characters") {
-    val df1 = spark.sql("SELECT 
'aaa\nbbb\tccc\rddd\feee\bfff\u000Bggg\u0007hhh'")
-    assert(df1.showString(1, truncate = 0) ===
-      """+--------------------------------------+
-        ||aaa\nbbb\tccc\rddd\feee\bfff\vggg\ahhh|
-        |+--------------------------------------+
-        ||aaa\nbbb\tccc\rddd\feee\bfff\vggg\ahhh|
-        |+--------------------------------------+
-        |""".stripMargin)
-
-    val df2 = spark.sql("SELECT 
array('aaa\nbbb\tccc\rddd\feee\bfff\u000Bggg\u0007hhh')")
-    assert(df2.showString(1, truncate = 0) ===
-      """+---------------------------------------------+
-        ||array(aaa\nbbb\tccc\rddd\feee\bfff\vggg\ahhh)|
-        |+---------------------------------------------+
-        ||[aaa\nbbb\tccc\rddd\feee\bfff\vggg\ahhh]     |
-        |+---------------------------------------------+
-        |""".stripMargin)
-
-    val df3 =
-      spark.sql("SELECT map('aaa\nbbb\tccc', 
'aaa\nbbb\tccc\rddd\feee\bfff\u000Bggg\u0007hhh')")
-    assert(df3.showString(1, truncate = 0) ===
-      """+----------------------------------------------------------+
-        ||map(aaa\nbbb\tccc, aaa\nbbb\tccc\rddd\feee\bfff\vggg\ahhh)|
-        |+----------------------------------------------------------+
-        ||{aaa\nbbb\tccc -> aaa\nbbb\tccc\rddd\feee\bfff\vggg\ahhh} |
-        |+----------------------------------------------------------+
-        |""".stripMargin)
-
-    val df4 =
-      spark.sql("SELECT named_struct('v', 
'aaa\nbbb\tccc\rddd\feee\bfff\u000Bggg\u0007hhh')")
-    assert(df4.showString(1, truncate = 0) ===
-      """+-------------------------------------------------------+
-        ||named_struct(v, aaa\nbbb\tccc\rddd\feee\bfff\vggg\ahhh)|
-        |+-------------------------------------------------------+
-        ||{aaa\nbbb\tccc\rddd\feee\bfff\vggg\ahhh}               |
-        |+-------------------------------------------------------+
-        |""".stripMargin)
-  }
-
-  test("SPARK-34308: printSchema: escape meta-characters") {
-    val captured = new ByteArrayOutputStream()
-
-    val df1 = spark.sql("SELECT 
'aaa\nbbb\tccc\rddd\feee\bfff\u000Bggg\u0007hhh'")
-    Console.withOut(captured) {
-      df1.printSchema()
-    }
-    assert(captured.toString ===
-      """root
-        | |-- aaa\nbbb\tccc\rddd\feee\bfff\vggg\ahhh: string (nullable = false)
-        |
-        |""".stripMargin)
-    captured.reset()
-
-    val df2 = spark.sql("SELECT 
array('aaa\nbbb\tccc\rddd\feee\bfff\u000Bggg\u0007hhh')")
-    Console.withOut(captured) {
-      df2.printSchema()
-    }
-    assert(captured.toString ===
-      """root
-        | |-- array(aaa\nbbb\tccc\rddd\feee\bfff\vggg\ahhh): array (nullable = 
false)
-        | |    |-- element: string (containsNull = false)
-        |
-        |""".stripMargin)
-    captured.reset()
-
-    val df3 =
-      spark.sql("SELECT map('aaa\nbbb\tccc', 
'aaa\nbbb\tccc\rddd\feee\bfff\u000Bggg\u0007hhh')")
-    Console.withOut(captured) {
-      df3.printSchema()
-    }
-    assert(captured.toString ===
-      """root
-        | |-- map(aaa\nbbb\tccc, aaa\nbbb\tccc\rddd\feee\bfff\vggg\ahhh): map 
(nullable = false)
-        | |    |-- key: string
-        | |    |-- value: string (valueContainsNull = false)
-        |
-        |""".stripMargin)
-    captured.reset()
-
-    val df4 =
-      spark.sql("SELECT named_struct('v', 
'aaa\nbbb\tccc\rddd\feee\bfff\u000Bggg\u0007hhh')")
-    Console.withOut(captured) {
-      df4.printSchema()
-    }
-    assert(captured.toString ===
-      """root
-        | |-- named_struct(v, aaa\nbbb\tccc\rddd\feee\bfff\vggg\ahhh): struct 
(nullable = false)
-        | |    |-- v: string (nullable = false)
-        |
-        |""".stripMargin)
-  }
-
-  test("SPARK-7319 showString") {
-    val expectedAnswer = """+---+-----+
-                           ||key|value|
-                           |+---+-----+
-                           ||  1|    1|
-                           |+---+-----+
-                           |only showing top 1 row
-                           |""".stripMargin
-    assert(testData.select($"*").showString(1) === expectedAnswer)
-  }
-
-  test("SPARK-7319 showString, vertical = true") {
-    val expectedAnswer = "-RECORD 0----\n" +
-                         " key   | 1   \n" +
-                         " value | 1   \n" +
-                         "only showing top 1 row\n"
-    assert(testData.select($"*").showString(1, vertical = true) === 
expectedAnswer)
-  }
-
-  test("SPARK-23023 Cast rows to strings in showString") {
-    val df1 = Seq(Seq(1, 2, 3, 4)).toDF("a")
-    assert(df1.showString(10) ===
-      s"""+------------+
-         ||           a|
-         |+------------+
-         ||[1, 2, 3, 4]|
-         |+------------+
-         |""".stripMargin)
-    val df2 = Seq(Map(1 -> "a", 2 -> "b")).toDF("a")
-    assert(df2.showString(10) ===
-      s"""+----------------+
-         ||               a|
-         |+----------------+
-         ||{1 -> a, 2 -> b}|
-         |+----------------+
-         |""".stripMargin)
-    val df3 = Seq(((1, "a"), 0), ((2, "b"), 0)).toDF("a", "b")
-    assert(df3.showString(10) ===
-      s"""+------+---+
-         ||     a|  b|
-         |+------+---+
-         ||{1, a}|  0|
-         ||{2, b}|  0|
-         |+------+---+
-         |""".stripMargin)
-  }
-
-  test("SPARK-7327 show with empty dataFrame") {
-    val expectedAnswer = """+---+-----+
-                           ||key|value|
-                           |+---+-----+
-                           |+---+-----+
-                           |""".stripMargin
-    assert(testData.select($"*").filter($"key" < 0).showString(1) === 
expectedAnswer)
-  }
-
-  test("SPARK-7327 show with empty dataFrame, vertical = true") {
-    assert(testData.select($"*").filter($"key" < 0).showString(1, vertical = 
true) === "(0 rows)\n")
-  }
-
-  test("SPARK-18350 show with session local timezone") {
-    val d = Date.valueOf("2016-12-01")
-    val ts = Timestamp.valueOf("2016-12-01 00:00:00")
-    val df = Seq((d, ts)).toDF("d", "ts")
-    val expectedAnswer = """+----------+-------------------+
-                           ||d         |ts                 |
-                           |+----------+-------------------+
-                           ||2016-12-01|2016-12-01 00:00:00|
-                           |+----------+-------------------+
-                           |""".stripMargin
-    assert(df.showString(1, truncate = 0) === expectedAnswer)
-
-    withSQLConf(SQLConf.SESSION_LOCAL_TIMEZONE.key -> "UTC") {
-
-      val expectedAnswer = """+----------+-------------------+
-                             ||d         |ts                 |
-                             |+----------+-------------------+
-                             ||2016-12-01|2016-12-01 08:00:00|
-                             |+----------+-------------------+
-                             |""".stripMargin
-      assert(df.showString(1, truncate = 0) === expectedAnswer)
-    }
-  }
-
-  test("SPARK-18350 show with session local timezone, vertical = true") {
-    val d = Date.valueOf("2016-12-01")
-    val ts = Timestamp.valueOf("2016-12-01 00:00:00")
-    val df = Seq((d, ts)).toDF("d", "ts")
-    val expectedAnswer = "-RECORD 0------------------\n" +
-                         " d   | 2016-12-01          \n" +
-                         " ts  | 2016-12-01 00:00:00 \n"
-    assert(df.showString(1, truncate = 0, vertical = true) === expectedAnswer)
-
-    withSQLConf(SQLConf.SESSION_LOCAL_TIMEZONE.key -> "UTC") {
-
-      val expectedAnswer = "-RECORD 0------------------\n" +
-                           " d   | 2016-12-01          \n" +
-                           " ts  | 2016-12-01 08:00:00 \n"
-      assert(df.showString(1, truncate = 0, vertical = true) === 
expectedAnswer)
-    }
-  }
-
   test("createDataFrame(RDD[Row], StructType) should convert UDTs 
(SPARK-6672)") {
     val rowRDD = sparkContext.parallelize(Seq(Row(new ExamplePoint(1.0, 2.0))))
     val schema = StructType(Array(StructField("point", new ExamplePointUDT(), 
false)))
@@ -1714,16 +1114,6 @@ class DataFrameSuite extends QueryTest
     checkAnswer(decimalData.agg(avg("a")), Row(new java.math.BigDecimal(2)))
   }
 
-  test("SPARK-7133: Implement struct, array, and map field accessor") {
-    assert(complexData.filter(complexData("a")(0) === 2).count() == 1)
-    if (!conf.ansiEnabled) {
-      assert(complexData.filter(complexData("m")("1") === 1).count() == 1)
-    }
-    assert(complexData.filter(complexData("s")("key") === 1).count() == 1)
-    assert(complexData.filter(complexData("m")(complexData("s")("value")) === 
1).count() == 1)
-    assert(complexData.filter(complexData("a")(complexData("s")("key")) === 
1).count() == 1)
-  }
-
   test("SPARK-7551: support backticks for DataFrame attribute resolution") {
     withSQLConf(SQLConf.SUPPORT_QUOTED_REGEX_COLUMN_NAME.key -> "false") {
       val df = spark.read.json(Seq("""{"a.b": {"c": {"d..e": {"f": 
1}}}}""").toDS())
@@ -1900,15 +1290,6 @@ class DataFrameSuite extends QueryTest
     }
   }
 
-  test("SPARK-8608: call `show` on local DataFrame with random columns should 
return same value") {
-    val df = testData.select(rand(33))
-    assert(df.showString(5) == df.showString(5))
-
-    // We will reuse the same Expression object for LocalRelation.
-    val df1 = (1 to 10).map(Tuple1.apply).toDF().select(rand(33))
-    assert(df1.showString(5) == df1.showString(5))
-  }
-
   test("SPARK-8609: local DataFrame with random columns should return same 
value after sort") {
     checkAnswer(testData.sort(rand(33)), testData.sort(rand(33)))
 
@@ -2148,82 +1529,6 @@ class DataFrameSuite extends QueryTest
     }
   }
 
-  test("SPARK-39834: build the stats for LogicalRDD based on origin stats") {
-    def buildExpectedColumnStats(attrs: Seq[Attribute]): 
AttributeMap[ColumnStat] = {
-      AttributeMap(
-        attrs.map {
-          case attr if attr.dataType == BooleanType =>
-            attr -> ColumnStat(
-              distinctCount = Some(2),
-              min = Some(false),
-              max = Some(true),
-              nullCount = Some(0),
-              avgLen = Some(1),
-              maxLen = Some(1))
-
-          case attr if attr.dataType == ByteType =>
-            attr -> ColumnStat(
-              distinctCount = Some(2),
-              min = Some(1),
-              max = Some(2),
-              nullCount = Some(0),
-              avgLen = Some(1),
-              maxLen = Some(1))
-
-          case attr => attr -> ColumnStat()
-        }
-      )
-    }
-
-    val outputList = Seq(
-      AttributeReference("cbool", BooleanType)(),
-      AttributeReference("cbyte", ByteType)(),
-      AttributeReference("cint", IntegerType)()
-    )
-
-    val expectedSize = 16
-    val statsPlan = OutputListAwareStatsTestPlan(
-      outputList = outputList,
-      rowCount = 2,
-      size = Some(expectedSize))
-
-    withSQLConf(SQLConf.CBO_ENABLED.key -> "true") {
-      val df = Dataset.ofRows(spark, statsPlan)
-        // add some map-like operations which optimizer will optimize away, 
and make a divergence
-        // for output between logical plan and optimized plan
-        // logical plan
-        // Project [cb#6 AS cbool#12, cby#7 AS cbyte#13, ci#8 AS cint#14]
-        // +- Project [cbool#0 AS cb#6, cbyte#1 AS cby#7, cint#2 AS ci#8]
-        //    +- OutputListAwareStatsTestPlan [cbool#0, cbyte#1, cint#2], 2, 16
-        // optimized plan
-        // OutputListAwareStatsTestPlan [cbool#0, cbyte#1, cint#2], 2, 16
-        .selectExpr("cbool AS cb", "cbyte AS cby", "cint AS ci")
-        .selectExpr("cb AS cbool", "cby AS cbyte", "ci AS cint")
-
-      // We can't leverage LogicalRDD.fromDataset here, since it triggers 
physical planning and
-      // there is no matching physical node for OutputListAwareStatsTestPlan.
-      val optimizedPlan = df.queryExecution.optimizedPlan
-      val rewrite = LogicalRDD.buildOutputAssocForRewrite(optimizedPlan.output,
-        df.logicalPlan.output)
-      val logicalRDD = LogicalRDD(
-        df.logicalPlan.output, spark.sparkContext.emptyRDD[InternalRow], 
isStreaming = true)(
-        spark, Some(LogicalRDD.rewriteStatistics(optimizedPlan.stats, 
rewrite.get)), None)
-
-      val stats = logicalRDD.computeStats()
-      val expectedStats = Statistics(sizeInBytes = expectedSize, rowCount = 
Some(2),
-        attributeStats = buildExpectedColumnStats(logicalRDD.output))
-      assert(stats === expectedStats)
-
-      // This method re-issues expression IDs for all outputs. We expect 
column stats to be
-      // reflected as well.
-      val newLogicalRDD = logicalRDD.newInstance()
-      val newStats = newLogicalRDD.computeStats()
-      val newExpectedStats = Statistics(sizeInBytes = expectedSize, rowCount = 
Some(2),
-        attributeStats = buildExpectedColumnStats(newLogicalRDD.output))
-      assert(newStats === newExpectedStats)
-    }
-  }
-
   test("SPARK-39834: build the constraints for LogicalRDD based on origin 
constraints") {
     def buildExpectedConstraints(attrs: Seq[Attribute]): ExpressionSet = {
       val exprs = attrs.flatMap { attr =>
@@ -2296,64 +1601,6 @@ class DataFrameSuite extends QueryTest
     checkAnswer(df.withColumnRenamed("d^'a.", "a"), Row(1, "a"))
   }
 
-  test("SPARK-11725: correctly handle null inputs for ScalaUDF") {
-    val df = sparkContext.parallelize(Seq(
-      java.lang.Integer.valueOf(22) -> "John",
-      null.asInstanceOf[java.lang.Integer] -> "Lucy")).toDF("age", "name")
-
-    // passing null into the UDF that could handle it
-    val boxedUDF = udf[java.lang.Integer, java.lang.Integer] {
-      (i: java.lang.Integer) => if (i == null) -10 else null
-    }
-    checkAnswer(df.select(boxedUDF($"age")), Row(null) :: Row(-10) :: Nil)
-
-    spark.udf.register("boxedUDF",
-      (i: java.lang.Integer) => (if (i == null) -10 else null): 
java.lang.Integer)
-    checkAnswer(sql("select boxedUDF(null), boxedUDF(-1)"), Row(-10, null) :: 
Nil)
-
-    val primitiveUDF = udf((i: Int) => i * 2)
-    checkAnswer(df.select(primitiveUDF($"age")), Row(44) :: Row(null) :: Nil)
-  }
-
-  test("SPARK-12398 truncated toString") {
-    val df1 = Seq((1L, "row1")).toDF("id", "name")
-    assert(df1.toString() === "[id: bigint, name: string]")
-
-    val df2 = Seq((1L, "c2", false)).toDF("c1", "c2", "c3")
-    assert(df2.toString === "[c1: bigint, c2: string ... 1 more field]")
-
-    val df3 = Seq((1L, "c2", false, 10)).toDF("c1", "c2", "c3", "c4")
-    assert(df3.toString === "[c1: bigint, c2: string ... 2 more fields]")
-
-    val df4 = Seq((1L, Tuple2(1L, "val"))).toDF("c1", "c2")
-    assert(df4.toString === "[c1: bigint, c2: struct<_1: bigint, _2: string>]")
-
-    val df5 = Seq((1L, Tuple2(1L, "val"), 20.0)).toDF("c1", "c2", "c3")
-    assert(df5.toString === "[c1: bigint, c2: struct<_1: bigint, _2: string> 
... 1 more field]")
-
-    val df6 = Seq((1L, Tuple2(1L, "val"), 20.0, 1)).toDF("c1", "c2", "c3", 
"c4")
-    assert(df6.toString === "[c1: bigint, c2: struct<_1: bigint, _2: string> 
... 2 more fields]")
-
-    val df7 = Seq((1L, Tuple3(1L, "val", 2), 20.0, 1)).toDF("c1", "c2", "c3", 
"c4")
-    assert(
-      df7.toString ===
-        "[c1: bigint, c2: struct<_1: bigint, _2: string ... 1 more field> ... 
2 more fields]")
-
-    val df8 = Seq((1L, Tuple7(1L, "val", 2, 3, 4, 5, 6), 20.0, 1)).toDF("c1", 
"c2", "c3", "c4")
-    assert(
-      df8.toString ===
-        "[c1: bigint, c2: struct<_1: bigint, _2: string ... 5 more fields> ... 
2 more fields]")
-
-    val df9 =
-      Seq((1L, Tuple4(1L, Tuple4(1L, 2L, 3L, 4L), 2L, 3L), 20.0, 
1)).toDF("c1", "c2", "c3", "c4")
-    assert(
-      df9.toString ===
-        "[c1: bigint, c2: struct<_1: bigint," +
-          " _2: struct<_1: bigint," +
-          " _2: bigint ... 2 more fields> ... 2 more fields> ... 2 more 
fields]")
-
-  }
-
   test("reuse exchange") {
     withSQLConf(SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> "2") {
       val df = spark.range(100).toDF()
@@ -2431,66 +1678,11 @@ class DataFrameSuite extends QueryTest
     assert(e.getStackTrace.head.getClassName != 
classOf[QueryExecution].getName)
   }
 
-  test("SPARK-13774: Check error message for non existent path without globbed 
paths") {
-    val uuid = UUID.randomUUID().toString
-    val baseDir = Utils.createTempDir()
-    checkError(
-      exception = intercept[AnalysisException] {
-        spark.read.format("csv").load(
-          new File(baseDir, "file").getAbsolutePath,
-          new File(baseDir, "file2").getAbsolutePath,
-          new File(uuid, "file3").getAbsolutePath,
-          uuid).rdd
-      },
-      errorClass = "PATH_NOT_FOUND",
-      parameters = Map("path" -> "file:.*"),
-      matchPVals = true
-    )
-   }
-
-  test("SPARK-13774: Check error message for not existent globbed paths") {
-    // Non-existent initial path component:
-    val nonExistentBasePath = "/" + UUID.randomUUID().toString
-    assert(!new File(nonExistentBasePath).exists())
-    checkError(
-      exception = intercept[AnalysisException] {
-        spark.read.format("text").load(s"$nonExistentBasePath/*")
-      },
-      errorClass = "PATH_NOT_FOUND",
-      parameters = Map("path" -> s"file:$nonExistentBasePath/*")
-    )
-
-    // Existent initial path component, but no matching files:
-    val baseDir = Utils.createTempDir()
-    val childDir = Utils.createTempDir(baseDir.getAbsolutePath)
-    assert(childDir.exists())
-    try {
-      checkError(
-        exception = intercept[AnalysisException] {
-          spark.read.json(s"${baseDir.getAbsolutePath}/*/*-xyz.json").rdd
-        },
-        errorClass = "PATH_NOT_FOUND",
-        parameters = Map("path" -> 
s"file:${baseDir.getAbsolutePath}/*/*-xyz.json")
-      )
-    } finally {
-      Utils.deleteRecursively(baseDir)
-    }
-  }
-
   test("SPARK-15230: distinct() does not handle column name with dot 
properly") {
     val df = Seq(1, 1, 2).toDF("column.with.dot")
     checkAnswer(df.distinct(), Row(1) :: Row(2) :: Nil)
   }
 
-  test("SPARK-16181: outer join with isNull filter") {
-    val left = Seq("x").toDF("col")
-    val right = Seq("y").toDF("col").withColumn("new", lit(true))
-    val joined = left.join(right, left("col") === right("col"), "left_outer")
-
-    checkAnswer(joined, Row("x", null, null))
-    checkAnswer(joined.filter($"new".isNull), Row("x", null, null))
-  }
-
   test("SPARK-16664: persist with more than 200 columns") {
     val size = 201L
     val rdd = sparkContext.makeRDD(Seq(Row.fromSeq(Seq.range(0, size))))
@@ -2611,21 +1803,6 @@ class DataFrameSuite extends QueryTest
     checkAnswer(df.select($"i" === $"j"), Row(true) :: Row(false) :: Nil)
   }
 
-  test("SPARK-19691 Calculating percentile of decimal column fails with 
ClassCastException") {
-    val df = spark.range(1).selectExpr("CAST(id as DECIMAL) as 
x").selectExpr("percentile(x, 0.5)")
-    checkAnswer(df, Row(BigDecimal(0)) :: Nil)
-  }
-
-  test("SPARK-20359: catalyst outer join optimization should not throw npe") {
-    val df1 = Seq("a", "b", "c").toDF("x")
-      .withColumn("y", udf{ (x: String) => x.substring(0, 1) + "!" 
}.apply($"x"))
-    val df2 = Seq("a", "b").toDF("x1")
-    df1
-      .join(df2, df1("x") === df2("x1"), "left_outer")
-      .filter($"x1".isNotNull || !$"y".isin("a!"))
-      .count()
-  }
-
   // The fix of SPARK-21720 avoid an exception regarding JVM code size limit
   // TODO: When we make a threshold of splitting statements (1024) 
configurable,
   // we will re-enable this with max threshold to cause an exception
@@ -2651,16 +1828,6 @@ class DataFrameSuite extends QueryTest
     }
   }
 
-  test("SPARK-20897: cached self-join should not fail") {
-    // force to plan sort merge join
-    withSQLConf(SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> "0") {
-      val df = Seq(1 -> "a").toDF("i", "j")
-      val df1 = df.as("t1")
-      val df2 = df.as("t2")
-      assert(df1.join(df2, $"t1.i" === $"t2.i").cache().count() == 1)
-    }
-  }
-
   test("order-by ordinal.") {
     checkAnswer(
       testData2.select(lit(7), $"a", $"b").orderBy(lit(1), lit(2), lit(3)),
@@ -2689,69 +1856,11 @@ class DataFrameSuite extends QueryTest
     assert(df.queryExecution.executedPlan.isInstanceOf[WholeStageCodegenExec])
   }
 
-  test("SPARK-24165: CaseWhen/If - nullability of nested types") {
-    val rows = new java.util.ArrayList[Row]()
-    rows.add(Row(true, ("x", 1), Seq("x", "y"), Map(0 -> "x")))
-    rows.add(Row(false, (null, 2), Seq(null, "z"), Map(0 -> null)))
-    val schema = StructType(Seq(
-      StructField("cond", BooleanType, true),
-      StructField("s", StructType(Seq(
-        StructField("val1", StringType, true),
-        StructField("val2", IntegerType, false)
-      )), false),
-      StructField("a", ArrayType(StringType, true)),
-      StructField("m", MapType(IntegerType, StringType, true))
-    ))
-
-    val sourceDF = spark.createDataFrame(rows, schema)
-
-    def structWhenDF: DataFrame = sourceDF
-      .select(when($"cond",
-        struct(lit("a").as("val1"), lit(10).as("val2"))).otherwise($"s") as 
"res")
-      .select($"res".getField("val1"))
-    def arrayWhenDF: DataFrame = sourceDF
-      .select(when($"cond", array(lit("a"), lit("b"))).otherwise($"a") as 
"res")
-      .select($"res".getItem(0))
-    def mapWhenDF: DataFrame = sourceDF
-      .select(when($"cond", map(lit(0), lit("a"))).otherwise($"m") as "res")
-      .select($"res".getItem(0))
-
-    def structIfDF: DataFrame = sourceDF
-      .select(expr("if(cond, struct('a' as val1, 10 as val2), s)") as "res")
-      .select($"res".getField("val1"))
-    def arrayIfDF: DataFrame = sourceDF
-      .select(expr("if(cond, array('a', 'b'), a)") as "res")
-      .select($"res".getItem(0))
-    def mapIfDF: DataFrame = sourceDF
-      .select(expr("if(cond, map(0, 'a'), m)") as "res")
-      .select($"res".getItem(0))
-
-    def checkResult(): Unit = {
-      checkAnswer(structWhenDF, Seq(Row("a"), Row(null)))
-      checkAnswer(arrayWhenDF, Seq(Row("a"), Row(null)))
-      checkAnswer(mapWhenDF, Seq(Row("a"), Row(null)))
-      checkAnswer(structIfDF, Seq(Row("a"), Row(null)))
-      checkAnswer(arrayIfDF, Seq(Row("a"), Row(null)))
-      checkAnswer(mapIfDF, Seq(Row("a"), Row(null)))
-    }
-
-    // Test with local relation, the Project will be evaluated without codegen
-    checkResult()
-    // Test with cached relation, the Project will be evaluated with codegen
-    sourceDF.cache()
-    checkResult()
-  }
-
   test("Uuid expressions should produce same results at retries in the same 
DataFrame") {
     val df = spark.range(1).select($"id", new Column(Uuid()))
     checkAnswer(df, df.collect())
   }
 
-  test("SPARK-24313: access map with binary keys") {
-    val mapWithBinaryKey = map(lit(Array[Byte](1.toByte)), lit(1))
-    
checkAnswer(spark.range(1).select(mapWithBinaryKey.getItem(Array[Byte](1.toByte))),
 Row(1))
-  }
-
   test("SPARK-24781: Using a reference from Dataset in Filter/Sort") {
     val df = Seq(("test1", 0), ("test2", 1)).toDF("name", "id")
     val filter1 = df.select(df("name")).filter(df("id") === 0)
@@ -2779,31 +1888,6 @@ class DataFrameSuite extends QueryTest
     }
   }
 
-  test("SPARK-25159: json schema inference should only trigger one job") {
-    withTempPath { path =>
-      // This test is to prove that the `JsonInferSchema` does not use 
`RDD#toLocalIterator` which
-      // triggers one Spark job per RDD partition.
-      Seq(1 -> "a", 2 -> "b").toDF("i", "p")
-        // The data set has 2 partitions, so Spark will write at least 2 json 
files.
-        // Use a non-splittable compression (gzip), to make sure the json scan 
RDD has at least 2
-        // partitions.
-        .write.partitionBy("p")
-        .option("compression", 
GZIP.lowerCaseName()).json(path.getCanonicalPath)
-
-      val numJobs = new AtomicLong(0)
-      sparkContext.addSparkListener(new SparkListener {
-        override def onJobEnd(jobEnd: SparkListenerJobEnd): Unit = {
-          numJobs.incrementAndGet()
-        }
-      })
-
-      val df = spark.read.json(path.getCanonicalPath)
-      assert(df.columns === Array("i", "p"))
-      spark.sparkContext.listenerBus.waitUntilEmpty()
-      assert(numJobs.get() == 1L)
-    }
-  }
-
   test("SPARK-25402 Null handling in BooleanSimplification") {
     val schema = StructType.fromDDL("a boolean, b int")
     val rows = Seq(Row(null, 1))
@@ -3038,80 +2122,6 @@ class DataFrameSuite extends QueryTest
     checkAnswer(df.selectExpr("b"), Row(new CalendarInterval(1, 2, 3)))
   }
 
-  test("SPARK-31552: array encoder with different types") {
-    // primitives
-    val booleans = Array(true, false)
-    checkAnswer(Seq(booleans).toDF(), Row(booleans))
-
-    val bytes = Array(1.toByte, 2.toByte)
-    checkAnswer(Seq(bytes).toDF(), Row(bytes))
-    val shorts = Array(1.toShort, 2.toShort)
-    checkAnswer(Seq(shorts).toDF(), Row(shorts))
-    val ints = Array(1, 2)
-    checkAnswer(Seq(ints).toDF(), Row(ints))
-    val longs = Array(1L, 2L)
-    checkAnswer(Seq(longs).toDF(), Row(longs))
-
-    val floats = Array(1.0F, 2.0F)
-    checkAnswer(Seq(floats).toDF(), Row(floats))
-    val doubles = Array(1.0D, 2.0D)
-    checkAnswer(Seq(doubles).toDF(), Row(doubles))
-
-    val strings = Array("2020-04-24", "2020-04-25")
-    checkAnswer(Seq(strings).toDF(), Row(strings))
-
-    // tuples
-    val decOne = Decimal(1, 38, 18)
-    val decTwo = Decimal(2, 38, 18)
-    val tuple1 = (1, 2.2, "3.33", decOne, Date.valueOf("2012-11-22"))
-    val tuple2 = (2, 3.3, "4.44", decTwo, Date.valueOf("2022-11-22"))
-    checkAnswer(Seq(Array(tuple1, tuple2)).toDF(), Seq(Seq(tuple1, 
tuple2)).toDF())
-
-    // case classes
-    val gbks = Array(GroupByKey(1, 2), GroupByKey(4, 5))
-    checkAnswer(Seq(gbks).toDF(), Row(Array(Row(1, 2), Row(4, 5))))
-
-    // We can move this implicit def to [[SQLImplicits]] when we eventually 
make fully
-    // support for array encoder like Seq and Set
-    // For now cases below, decimal/datetime/interval/binary/nested types, etc,
-    // are not supported by array
-    implicit def newArrayEncoder[T <: Array[_] : TypeTag]: Encoder[T] = 
ExpressionEncoder()
-
-    // decimals
-    val decSpark = Array(decOne, decTwo)
-    val decScala = decSpark.map(_.toBigDecimal)
-    val decJava = decSpark.map(_.toJavaBigDecimal)
-    checkAnswer(Seq(decSpark).toDF(), Row(decJava))
-    checkAnswer(Seq(decScala).toDF(), Row(decJava))
-    checkAnswer(Seq(decJava).toDF(), Row(decJava))
-
-    // datetimes and intervals
-    val dates = strings.map(Date.valueOf)
-    checkAnswer(Seq(dates).toDF(), Row(dates))
-    val localDates = dates.map(d => 
DateTimeUtils.daysToLocalDate(DateTimeUtils.fromJavaDate(d)))
-    checkAnswer(Seq(localDates).toDF(), Row(dates))
-
-    val timestamps =
-      Array(Timestamp.valueOf("2020-04-24 12:34:56"), 
Timestamp.valueOf("2020-04-24 11:22:33"))
-    checkAnswer(Seq(timestamps).toDF(), Row(timestamps))
-    val instants =
-      timestamps.map(t => 
DateTimeUtils.microsToInstant(DateTimeUtils.fromJavaTimestamp(t)))
-    checkAnswer(Seq(instants).toDF(), Row(timestamps))
-
-    val intervals = Array(new CalendarInterval(1, 2, 3), new 
CalendarInterval(4, 5, 6))
-    checkAnswer(Seq(intervals).toDF(), Row(intervals))
-
-    // binary
-    val bins = Array(Array(1.toByte), Array(2.toByte), Array(3.toByte), 
Array(4.toByte))
-    checkAnswer(Seq(bins).toDF(), Row(bins))
-
-    // nested
-    val nestedIntArray = Array(Array(1), Array(2))
-    checkAnswer(Seq(nestedIntArray).toDF(), 
Row(nestedIntArray.map(wrapIntArray)))
-    val nestedDecArray = Array(decSpark)
-    checkAnswer(Seq(nestedDecArray).toDF(), Row(Array(wrapRefArray(decJava))))
-  }
-
   test("SPARK-31750: eliminate UpCast if child's dataType is DecimalType") {
     withTempPath { f =>
       sql("select cast(1 as decimal(38, 0)) as d")
@@ -3128,10 +2138,6 @@ class DataFrameSuite extends QueryTest
     checkAnswer(df.selectExpr("ln(d)"), Row(Double.NaN))
   }
 
-  test("SPARK-32761: aggregating multiple distinct CONSTANT columns") {
-     checkAnswer(sql("select count(distinct 2), count(distinct 2,3)"), Row(1, 
1))
-  }
-
   test("SPARK-32764: -0.0 and 0.0 should be equal") {
     val df = Seq(0.0 -> -0.0).toDF("pos", "neg")
     checkAnswer(df.select($"pos" > $"neg"), Row(false))
@@ -3232,79 +2238,6 @@ class DataFrameSuite extends QueryTest
     checkAnswer(test.select($"best_name.name"), Row("bob") :: Row("bob") :: 
Row("sam") :: Nil)
   }
 
-  test("SPARK-34829: Multiple applications of typed ScalaUDFs in higher order 
functions work") {
-    val reverse = udf((s: String) => s.reverse)
-    val reverse2 = udf((b: Bar2) => Bar2(b.s.reverse))
-
-    val df = Seq(Array("abc", "def")).toDF("array")
-    val test = df.select(transform(col("array"), s => reverse(s)))
-    checkAnswer(test, Row(Array("cba", "fed")) :: Nil)
-
-    val df2 = Seq(Array(Bar2("abc"), Bar2("def"))).toDF("array")
-    val test2 = df2.select(transform(col("array"), b => reverse2(b)))
-    checkAnswer(test2, Row(Array(Row("cba"), Row("fed"))) :: Nil)
-
-    val df3 = Seq(Map("abc" -> 1, "def" -> 2)).toDF("map")
-    val test3 = df3.select(transform_keys(col("map"), (s, _) => reverse(s)))
-    checkAnswer(test3, Row(Map("cba" -> 1, "fed" -> 2)) :: Nil)
-
-    val df4 = Seq(Map(Bar2("abc") -> 1, Bar2("def") -> 2)).toDF("map")
-    val test4 = df4.select(transform_keys(col("map"), (b, _) => reverse2(b)))
-    checkAnswer(test4, Row(Map(Row("cba") -> 1, Row("fed") -> 2)) :: Nil)
-
-    val df5 = Seq(Map(1 -> "abc", 2 -> "def")).toDF("map")
-    val test5 = df5.select(transform_values(col("map"), (_, s) => reverse(s)))
-    checkAnswer(test5, Row(Map(1 -> "cba", 2 -> "fed")) :: Nil)
-
-    val df6 = Seq(Map(1 -> Bar2("abc"), 2 -> Bar2("def"))).toDF("map")
-    val test6 = df6.select(transform_values(col("map"), (_, b) => reverse2(b)))
-    checkAnswer(test6, Row(Map(1 -> Row("cba"), 2 -> Row("fed"))) :: Nil)
-
-    val reverseThenConcat = udf((s1: String, s2: String) => s1.reverse ++ 
s2.reverse)
-    val reverseThenConcat2 = udf((b1: Bar2, b2: Bar2) => Bar2(b1.s.reverse ++ 
b2.s.reverse))
-
-    val df7 = Seq((Map(1 -> "abc", 2 -> "def"), Map(1 -> "ghi", 2 -> 
"jkl"))).toDF("map1", "map2")
-    val test7 =
-      df7.select(map_zip_with(col("map1"), col("map2"), (_, s1, s2) => 
reverseThenConcat(s1, s2)))
-    checkAnswer(test7, Row(Map(1 -> "cbaihg", 2 -> "fedlkj")) :: Nil)
-
-    val df8 = Seq((Map(1 -> Bar2("abc"), 2 -> Bar2("def")),
-      Map(1 -> Bar2("ghi"), 2 -> Bar2("jkl")))).toDF("map1", "map2")
-    val test8 =
-      df8.select(map_zip_with(col("map1"), col("map2"), (_, b1, b2) => 
reverseThenConcat2(b1, b2)))
-    checkAnswer(test8, Row(Map(1 -> Row("cbaihg"), 2 -> Row("fedlkj"))) :: Nil)
-
-    val df9 = Seq((Array("abc", "def"), Array("ghi", "jkl"))).toDF("array1", 
"array2")
-    val test9 =
-      df9.select(zip_with(col("array1"), col("array2"), (s1, s2) => 
reverseThenConcat(s1, s2)))
-    checkAnswer(test9, Row(Array("cbaihg", "fedlkj")) :: Nil)
-
-    val df10 = Seq((Array(Bar2("abc"), Bar2("def")), Array(Bar2("ghi"), 
Bar2("jkl"))))
-      .toDF("array1", "array2")
-    val test10 =
-      df10.select(zip_with(col("array1"), col("array2"), (b1, b2) => 
reverseThenConcat2(b1, b2)))
-    checkAnswer(test10, Row(Array(Row("cbaihg"), Row("fedlkj"))) :: Nil)
-  }
-
-  test("SPARK-39293: The accumulator of ArrayAggregate to handle complex types 
properly") {
-    val reverse = udf((s: String) => s.reverse)
-
-    val df = Seq(Array("abc", "def")).toDF("array")
-    val testArray = df.select(
-      aggregate(
-        col("array"),
-        array().cast("array<string>"),
-        (acc, s) => concat(acc, array(reverse(s)))))
-    checkAnswer(testArray, Row(Array("cba", "fed")) :: Nil)
-
-    val testMap = df.select(
-      aggregate(
-        col("array"),
-        map().cast("map<string, string>"),
-        (acc, s) => map_concat(acc, map(s, reverse(s)))))
-    checkAnswer(testMap, Row(Map("abc" -> "cba", "def" -> "fed")) :: Nil)
-  }
-
   test("SPARK-34882: Aggregate with multiple distinct null sensitive 
aggregators") {
     withUserDefinedFunction(("countNulls", true)) {
       spark.udf.register("countNulls", udaf(new Aggregator[JLong, JLong, 
JLong] {
@@ -3405,87 +2338,6 @@ class DataFrameSuite extends QueryTest
     assert(ids.take(5).map(_.getLong(0)).toSet === Range(0, 5).toSet)
   }
 
-  test("SPARK-35320: Reading JSON with key type different to String in a map 
should fail") {
-    Seq(
-      (MapType(IntegerType, StringType), """{"1": "test"}"""),
-      (StructType(Seq(StructField("test", MapType(IntegerType, StringType)))),
-        """"test": {"1": "test"}"""),
-      (ArrayType(MapType(IntegerType, StringType)), """[{"1": "test"}]"""),
-      (MapType(StringType, MapType(IntegerType, StringType)), """{"key": {"1" 
: "test"}}""")
-    ).foreach { case (schema, jsonData) =>
-      withTempDir { dir =>
-        val colName = "col"
-        val msg = "can only contain STRING as a key type for a MAP"
-
-        val thrown1 = intercept[AnalysisException](
-          spark.read.schema(StructType(Seq(StructField(colName, schema))))
-            .json(Seq(jsonData).toDS()).collect())
-        assert(thrown1.getMessage.contains(msg))
-
-        val jsonDir = new File(dir, "json").getCanonicalPath
-        Seq(jsonData).toDF(colName).write.json(jsonDir)
-        val thrown2 = intercept[AnalysisException](
-          spark.read.schema(StructType(Seq(StructField(colName, schema))))
-            .json(jsonDir).collect())
-        assert(thrown2.getMessage.contains(msg))
-      }
-    }
-  }
-
-  test("SPARK-37855: IllegalStateException when transforming an array inside a 
nested struct") {
-    def makeInput(): DataFrame = {
-      val innerElement1 = Row(3, 3.12)
-      val innerElement2 = Row(4, 2.1)
-      val innerElement3 = Row(1, 985.2)
-      val innerElement4 = Row(10, 757548.0)
-      val innerElement5 = Row(1223, 0.665)
-
-      val outerElement1 = Row(1, Row(List(innerElement1, innerElement2)))
-      val outerElement2 = Row(2, Row(List(innerElement3)))
-      val outerElement3 = Row(3, Row(List(innerElement4, innerElement5)))
-
-      val data = Seq(
-        Row("row1", List(outerElement1)),
-        Row("row2", List(outerElement2, outerElement3))
-      )
-
-      val schema = new StructType()
-        .add("name", StringType)
-        .add("outer_array", ArrayType(new StructType()
-          .add("id", IntegerType)
-          .add("inner_array_struct", new StructType()
-            .add("inner_array", ArrayType(new StructType()
-              .add("id", IntegerType)
-              .add("value", DoubleType)
-            ))
-          )
-        ))
-
-      spark.createDataFrame(spark.sparkContext.parallelize(data), schema)
-    }
-
-    val df = makeInput().limit(2)
-
-    val res = df.withColumn("extracted", transform(
-      col("outer_array"),
-      c1 => {
-        struct(
-          c1.getField("id").alias("outer_id"),
-          transform(
-            c1.getField("inner_array_struct").getField("inner_array"),
-            c2 => {
-              struct(
-                c2.getField("value").alias("inner_value")
-              )
-            }
-          )
-        )
-      }
-    ))
-
-    assert(res.collect().length == 2)
-  }
-
   test("SPARK-38285: Fix ClassCastException: GenericArrayData cannot be cast 
to InternalRow") {
     withTempView("v1") {
       val sqlText =
@@ -3782,50 +2634,6 @@ case class GroupByKey(a: Int, b: Int)
 
 case class Bar2(s: String)
 
-/**
- * This class is used for unit-testing. It's a logical plan whose output and 
stats are passed in.
- */
-case class OutputListAwareStatsTestPlan(
-    outputList: Seq[Attribute],
-    rowCount: BigInt,
-    size: Option[BigInt] = None) extends LeafNode with MultiInstanceRelation {
-  override def output: Seq[Attribute] = outputList
-  override def computeStats(): Statistics = {
-    val columnInfo = outputList.map { attr =>
-      attr.dataType match {
-        case BooleanType =>
-          attr -> ColumnStat(
-            distinctCount = Some(2),
-            min = Some(false),
-            max = Some(true),
-            nullCount = Some(0),
-            avgLen = Some(1),
-            maxLen = Some(1))
-
-        case ByteType =>
-          attr -> ColumnStat(
-            distinctCount = Some(2),
-            min = Some(1),
-            max = Some(2),
-            nullCount = Some(0),
-            avgLen = Some(1),
-            maxLen = Some(1))
-
-        case _ =>
-          attr -> ColumnStat()
-      }
-    }
-    val attrStats = AttributeMap(columnInfo)
-
-    Statistics(
-      // If sizeInBytes is useless in testing, we just use a fake value
-      sizeInBytes = size.getOrElse(Int.MaxValue),
-      rowCount = Some(rowCount),
-      attributeStats = attrStats)
-  }
-  override def newInstance(): LogicalPlan = copy(outputList = 
outputList.map(_.newInstance()))
-}
-
 /**
  * This class is used for unit-testing. It's a logical plan whose output is 
passed in.
  */
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/StatisticsCollectionSuite.scala 
b/sql/core/src/test/scala/org/apache/spark/sql/StatisticsCollectionSuite.scala
index e827396009d0..393ecc95b66b 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/StatisticsCollectionSuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/StatisticsCollectionSuite.scala
@@ -24,12 +24,15 @@ import java.util.concurrent.TimeUnit
 
 import scala.collection.mutable
 
-import org.apache.spark.sql.catalyst.TableIdentifier
+import org.apache.spark.sql.catalyst.{InternalRow, TableIdentifier}
+import org.apache.spark.sql.catalyst.analysis.MultiInstanceRelation
 import org.apache.spark.sql.catalyst.catalog.CatalogColumnStat
+import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeMap, 
AttributeReference}
 import org.apache.spark.sql.catalyst.plans.logical._
 import org.apache.spark.sql.catalyst.util.{DateTimeTestUtils, DateTimeUtils}
 import 
org.apache.spark.sql.catalyst.util.DateTimeTestUtils.{withDefaultTimeZone, PST, 
UTC}
 import org.apache.spark.sql.catalyst.util.DateTimeUtils.{getZoneId, 
TimeZoneUTC}
+import org.apache.spark.sql.execution.LogicalRDD
 import org.apache.spark.sql.functions.timestamp_seconds
 import org.apache.spark.sql.internal.SQLConf
 import org.apache.spark.sql.test.SharedSparkSession
@@ -860,4 +863,124 @@ class StatisticsCollectionSuite extends 
StatisticsCollectionTestBase with Shared
       assert(stats.rowCount.isDefined && stats.rowCount.get == 6)
     }
   }
+
+  test("SPARK-39834: build the stats for LogicalRDD based on origin stats") {
+    def buildExpectedColumnStats(attrs: Seq[Attribute]): 
AttributeMap[ColumnStat] = {
+      AttributeMap(
+        attrs.map {
+          case attr if attr.dataType == BooleanType =>
+            attr -> ColumnStat(
+              distinctCount = Some(2),
+              min = Some(false),
+              max = Some(true),
+              nullCount = Some(0),
+              avgLen = Some(1),
+              maxLen = Some(1))
+
+          case attr if attr.dataType == ByteType =>
+            attr -> ColumnStat(
+              distinctCount = Some(2),
+              min = Some(1),
+              max = Some(2),
+              nullCount = Some(0),
+              avgLen = Some(1),
+              maxLen = Some(1))
+
+          case attr => attr -> ColumnStat()
+        }
+      )
+    }
+
+    val outputList = Seq(
+      AttributeReference("cbool", BooleanType)(),
+      AttributeReference("cbyte", ByteType)(),
+      AttributeReference("cint", IntegerType)()
+    )
+
+    val expectedSize = 16
+    val statsPlan = OutputListAwareStatsTestPlan(
+      outputList = outputList,
+      rowCount = 2,
+      size = Some(expectedSize))
+
+    withSQLConf(SQLConf.CBO_ENABLED.key -> "true") {
+      val df = Dataset.ofRows(spark, statsPlan)
+        // add some map-like operations which optimizer will optimize away, 
and make a divergence
+        // for output between logical plan and optimized plan
+        // logical plan
+        // Project [cb#6 AS cbool#12, cby#7 AS cbyte#13, ci#8 AS cint#14]
+        // +- Project [cbool#0 AS cb#6, cbyte#1 AS cby#7, cint#2 AS ci#8]
+        //    +- OutputListAwareStatsTestPlan [cbool#0, cbyte#1, cint#2], 2, 16
+        // optimized plan
+        // OutputListAwareStatsTestPlan [cbool#0, cbyte#1, cint#2], 2, 16
+        .selectExpr("cbool AS cb", "cbyte AS cby", "cint AS ci")
+        .selectExpr("cb AS cbool", "cby AS cbyte", "ci AS cint")
+
+      // We can't leverage LogicalRDD.fromDataset here, since it triggers 
physical planning and
+      // there is no matching physical node for OutputListAwareStatsTestPlan.
+      val optimizedPlan = df.queryExecution.optimizedPlan
+      val rewrite = LogicalRDD.buildOutputAssocForRewrite(optimizedPlan.output,
+        df.logicalPlan.output)
+      val logicalRDD = LogicalRDD(
+        df.logicalPlan.output, spark.sparkContext.emptyRDD[InternalRow], 
isStreaming = true)(
+        spark, Some(LogicalRDD.rewriteStatistics(optimizedPlan.stats, 
rewrite.get)), None)
+
+      val stats = logicalRDD.computeStats()
+      val expectedStats = Statistics(sizeInBytes = expectedSize, rowCount = 
Some(2),
+        attributeStats = buildExpectedColumnStats(logicalRDD.output))
+      assert(stats === expectedStats)
+
+      // This method re-issues expression IDs for all outputs. We expect 
column stats to be
+      // reflected as well.
+      val newLogicalRDD = logicalRDD.newInstance()
+      val newStats = newLogicalRDD.computeStats()
+      val newExpectedStats = Statistics(sizeInBytes = expectedSize, rowCount = 
Some(2),
+        attributeStats = buildExpectedColumnStats(newLogicalRDD.output))
+      assert(newStats === newExpectedStats)
+    }
+  }
+}
+
+/**
+ * This class is used for unit-testing. It's a logical plan whose output and 
stats are passed in.
+ */
+case class OutputListAwareStatsTestPlan(
+    outputList: Seq[Attribute],
+    rowCount: BigInt,
+    size: Option[BigInt] = None) extends LeafNode with MultiInstanceRelation {
+  override def output: Seq[Attribute] = outputList
+  override def computeStats(): Statistics = {
+    val columnInfo = outputList.map { attr =>
+      attr.dataType match {
+        case BooleanType =>
+          attr -> ColumnStat(
+            distinctCount = Some(2),
+            min = Some(false),
+            max = Some(true),
+            nullCount = Some(0),
+            avgLen = Some(1),
+            maxLen = Some(1))
+
+        case ByteType =>
+          attr -> ColumnStat(
+            distinctCount = Some(2),
+            min = Some(1),
+            max = Some(2),
+            nullCount = Some(0),
+            avgLen = Some(1),
+            maxLen = Some(1))
+
+        case _ =>
+          attr -> ColumnStat()
+      }
+    }
+    val attrStats = AttributeMap(columnInfo)
+
+    Statistics(
+      // If sizeInBytes is useless in testing, we just use a fake value
+      sizeInBytes = size.getOrElse(Int.MaxValue),
+      rowCount = Some(rowCount),
+      attributeStats = attrStats)
+  }
+  override def newInstance(): LogicalPlan = copy(outputList = 
outputList.map(_.newInstance()))
 }
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/UDFSuite.scala 
b/sql/core/src/test/scala/org/apache/spark/sql/UDFSuite.scala
index 734a4db89627..87ca3a07c4d5 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/UDFSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/UDFSuite.scala
@@ -40,7 +40,7 @@ import 
org.apache.spark.sql.execution.columnar.InMemoryRelation
 import 
org.apache.spark.sql.execution.command.{CreateDataSourceTableAsSelectCommand, 
ExplainCommand}
 import 
org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelationCommand
 import org.apache.spark.sql.expressions.{Aggregator, MutableAggregationBuffer, 
SparkUserDefinedFunction, UserDefinedAggregateFunction}
-import org.apache.spark.sql.functions.{lit, struct, udaf, udf}
+import org.apache.spark.sql.functions._
 import org.apache.spark.sql.internal.SQLConf
 import org.apache.spark.sql.test.SharedSparkSession
 import org.apache.spark.sql.test.SQLTestData._
@@ -56,6 +56,24 @@ private case class TimestampInstantType(t: Timestamp, 
instant: Instant)
 class UDFSuite extends QueryTest with SharedSparkSession {
   import testImplicits._
 
+  test("udf") {
+    val foo = udf((a: Int, b: String) => a.toString + b)
+
+    checkAnswer(
+      // SELECT *, foo(key, value) FROM testData
+      testData.select($"*", foo($"key", $"value")).limit(3),
+      Row(1, "1", "11") :: Row(2, "2", "22") :: Row(3, "3", "33") :: Nil
+    )
+  }
+
+  test("callUDF without Hive Support") {
+    val df = Seq(("id1", 1), ("id2", 4), ("id3", 5)).toDF("id", "value")
+    df.sparkSession.udf.register("simpleUDF", (v: Int) => v * v)
+    checkAnswer(
+      df.select($"id", callUDF("simpleUDF", $"value")), // test deprecated one
+      Row("id1", 1) :: Row("id2", 16) :: Row("id3", 25) :: Nil)
+  }
+
   test("built-in fixed arity expressions") {
     val df = spark.emptyDataFrame
     df.selectExpr("rand()", "randn()", "rand(5)", "randn(50)")
@@ -1092,4 +1110,77 @@ class UDFSuite extends QueryTest with SharedSparkSession 
{
       .lookupFunctionInfo(FunctionIdentifier("dummyUDF"))
     
assert(expressionInfo.getClassName.contains("org.apache.spark.sql.UDFRegistration$$Lambda"))
   }
+
+  test("SPARK-11725: correctly handle null inputs for ScalaUDF") {
+    val df = sparkContext.parallelize(Seq(
+      java.lang.Integer.valueOf(22) -> "John",
+      null.asInstanceOf[java.lang.Integer] -> "Lucy")).toDF("age", "name")
+
+    // passing null into the UDF that could handle it
+    val boxedUDF = udf[java.lang.Integer, java.lang.Integer] {
+      (i: java.lang.Integer) => if (i == null) -10 else null
+    }
+    checkAnswer(df.select(boxedUDF($"age")), Row(null) :: Row(-10) :: Nil)
+
+    spark.udf.register("boxedUDF",
+      (i: java.lang.Integer) => (if (i == null) -10 else null): 
java.lang.Integer)
+    checkAnswer(sql("select boxedUDF(null), boxedUDF(-1)"), Row(-10, null) :: 
Nil)
+
+    val primitiveUDF = udf((i: Int) => i * 2)
+    checkAnswer(df.select(primitiveUDF($"age")), Row(44) :: Row(null) :: Nil)
+  }
+
+  test("SPARK-34829: Multiple applications of typed ScalaUDFs in higher order 
functions work") {
+    val reverse = udf((s: String) => s.reverse)
+    val reverse2 = udf((b: Bar2) => Bar2(b.s.reverse))
+
+    val df = Seq(Array("abc", "def")).toDF("array")
+    val test = df.select(transform(col("array"), s => reverse(s)))
+    checkAnswer(test, Row(Array("cba", "fed")) :: Nil)
+
+    val df2 = Seq(Array(Bar2("abc"), Bar2("def"))).toDF("array")
+    val test2 = df2.select(transform(col("array"), b => reverse2(b)))
+    checkAnswer(test2, Row(Array(Row("cba"), Row("fed"))) :: Nil)
+
+    val df3 = Seq(Map("abc" -> 1, "def" -> 2)).toDF("map")
+    val test3 = df3.select(transform_keys(col("map"), (s, _) => reverse(s)))
+    checkAnswer(test3, Row(Map("cba" -> 1, "fed" -> 2)) :: Nil)
+
+    val df4 = Seq(Map(Bar2("abc") -> 1, Bar2("def") -> 2)).toDF("map")
+    val test4 = df4.select(transform_keys(col("map"), (b, _) => reverse2(b)))
+    checkAnswer(test4, Row(Map(Row("cba") -> 1, Row("fed") -> 2)) :: Nil)
+
+    val df5 = Seq(Map(1 -> "abc", 2 -> "def")).toDF("map")
+    val test5 = df5.select(transform_values(col("map"), (_, s) => reverse(s)))
+    checkAnswer(test5, Row(Map(1 -> "cba", 2 -> "fed")) :: Nil)
+
+    val df6 = Seq(Map(1 -> Bar2("abc"), 2 -> Bar2("def"))).toDF("map")
+    val test6 = df6.select(transform_values(col("map"), (_, b) => reverse2(b)))
+    checkAnswer(test6, Row(Map(1 -> Row("cba"), 2 -> Row("fed"))) :: Nil)
+
+    val reverseThenConcat = udf((s1: String, s2: String) => s1.reverse ++ 
s2.reverse)
+    val reverseThenConcat2 = udf((b1: Bar2, b2: Bar2) => Bar2(b1.s.reverse ++ 
b2.s.reverse))
+
+    val df7 = Seq((Map(1 -> "abc", 2 -> "def"), Map(1 -> "ghi", 2 -> 
"jkl"))).toDF("map1", "map2")
+    val test7 =
+      df7.select(map_zip_with(col("map1"), col("map2"), (_, s1, s2) => 
reverseThenConcat(s1, s2)))
+    checkAnswer(test7, Row(Map(1 -> "cbaihg", 2 -> "fedlkj")) :: Nil)
+
+    val df8 = Seq((Map(1 -> Bar2("abc"), 2 -> Bar2("def")),
+      Map(1 -> Bar2("ghi"), 2 -> Bar2("jkl")))).toDF("map1", "map2")
+    val test8 =
+      df8.select(map_zip_with(col("map1"), col("map2"), (_, b1, b2) => 
reverseThenConcat2(b1, b2)))
+    checkAnswer(test8, Row(Map(1 -> Row("cbaihg"), 2 -> Row("fedlkj"))) :: Nil)
+
+    val df9 = Seq((Array("abc", "def"), Array("ghi", "jkl"))).toDF("array1", 
"array2")
+    val test9 =
+      df9.select(zip_with(col("array1"), col("array2"), (s1, s2) => 
reverseThenConcat(s1, s2)))
+    checkAnswer(test9, Row(Array("cbaihg", "fedlkj")) :: Nil)
+
+    val df10 = Seq((Array(Bar2("abc"), Bar2("def")), Array(Bar2("ghi"), 
Bar2("jkl"))))
+      .toDF("array1", "array2")
+    val test10 =
+      df10.select(zip_with(col("array1"), col("array2"), (b1, b2) => 
reverseThenConcat2(b1, b2)))
+    checkAnswer(test10, Row(Array(Row("cbaihg"), Row("fedlkj"))) :: Nil)
+  }
 }
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/DataSourceSuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/DataSourceSuite.scala
index 90b341ae1f2c..7f886940473d 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/DataSourceSuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/DataSourceSuite.scala
@@ -17,7 +17,9 @@
 
 package org.apache.spark.sql.execution.datasources
 
+import java.io.File
 import java.net.URI
+import java.util.UUID
 
 import org.apache.hadoop.conf.Configuration
 import org.apache.hadoop.fs.{FileStatus, Path, RawLocalFileSystem}
@@ -25,6 +27,7 @@ import org.scalatest.PrivateMethodTester
 
 import org.apache.spark.sql.AnalysisException
 import org.apache.spark.sql.test.SharedSparkSession
+import org.apache.spark.util.Utils
 
 class DataSourceSuite extends SharedSparkSession with PrivateMethodTester {
   import TestPaths._
@@ -158,6 +161,52 @@ class DataSourceSuite extends SharedSparkSession with 
PrivateMethodTester {
     val expectMessage = "No FileSystem for scheme nonexistentFs"
     assert(message.filterNot(Set(':', '"').contains) == expectMessage)
   }
+
+  test("SPARK-13774: Check error message for non existent path without globbed 
paths") {
+    val uuid = UUID.randomUUID().toString
+    val baseDir = Utils.createTempDir()
+    checkError(
+      exception = intercept[AnalysisException] {
+        spark.read.format("csv").load(
+          new File(baseDir, "file").getAbsolutePath,
+          new File(baseDir, "file2").getAbsolutePath,
+          new File(uuid, "file3").getAbsolutePath,
+          uuid).rdd
+      },
+      errorClass = "PATH_NOT_FOUND",
+      parameters = Map("path" -> "file:.*"),
+      matchPVals = true
+    )
+  }
+
+  test("SPARK-13774: Check error message for not existent globbed paths") {
+    // Non-existent initial path component:
+    val nonExistentBasePath = "/" + UUID.randomUUID().toString
+    assert(!new File(nonExistentBasePath).exists())
+    checkError(
+      exception = intercept[AnalysisException] {
+        spark.read.format("text").load(s"$nonExistentBasePath/*")
+      },
+      errorClass = "PATH_NOT_FOUND",
+      parameters = Map("path" -> s"file:$nonExistentBasePath/*")
+    )
+
+    // Existent initial path component, but no matching files:
+    val baseDir = Utils.createTempDir()
+    val childDir = Utils.createTempDir(baseDir.getAbsolutePath)
+    assert(childDir.exists())
+    try {
+      checkError(
+        exception = intercept[AnalysisException] {
+          spark.read.json(s"${baseDir.getAbsolutePath}/*/*-xyz.json").rdd
+        },
+        errorClass = "PATH_NOT_FOUND",
+        parameters = Map("path" -> 
s"file:${baseDir.getAbsolutePath}/*/*-xyz.json")
+      )
+    } finally {
+      Utils.deleteRecursively(baseDir)
+    }
+  }
 }
 
 object TestPaths {
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala
index 5db12c124f91..ad113f1bb896 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala
@@ -23,6 +23,7 @@ import java.nio.file.Files
 import java.sql.{Date, Timestamp}
 import java.time.{Duration, Instant, LocalDate, LocalDateTime, Period, ZoneId}
 import java.util.Locale
+import java.util.concurrent.atomic.AtomicLong
 
 import com.fasterxml.jackson.core.JsonFactory
 import org.apache.commons.lang3.exception.ExceptionUtils
@@ -32,9 +33,11 @@ import org.apache.hadoop.io.compress.GzipCodec
 
 import org.apache.spark.{SparkConf, SparkException, 
SparkFileNotFoundException, SparkRuntimeException, SparkUpgradeException, 
TestUtils}
 import org.apache.spark.rdd.RDD
+import org.apache.spark.scheduler.{SparkListener, SparkListenerJobEnd}
 import org.apache.spark.sql.{functions => F, _}
 import org.apache.spark.sql.catalyst.json._
 import org.apache.spark.sql.catalyst.util.{DateTimeTestUtils, DateTimeUtils, 
HadoopCompressionCodec}
+import org.apache.spark.sql.catalyst.util.HadoopCompressionCodec.GZIP
 import org.apache.spark.sql.catalyst.util.TypeUtils.toSQLType
 import org.apache.spark.sql.execution.ExternalRDD
 import org.apache.spark.sql.execution.datasources.{CommonFileDataSourceSuite, 
DataSource, InMemoryFileIndex, NoopCache}
@@ -3703,6 +3706,58 @@ abstract class JsonSuite
     assert(JSONOptions.getAlternativeOption("charset").contains("encoding"))
     assert(JSONOptions.getAlternativeOption("dateFormat").isEmpty)
   }
+
+  test("SPARK-25159: json schema inference should only trigger one job") {
+    withTempPath { path =>
+      // This test is to prove that the `JsonInferSchema` does not use 
`RDD#toLocalIterator` which
+      // triggers one Spark job per RDD partition.
+      Seq(1 -> "a", 2 -> "b").toDF("i", "p")
+        // The data set has 2 partitions, so Spark will write at least 2 json 
files.
+        // Use a non-splittable compression (gzip), to make sure the json scan 
RDD has at least 2
+        // partitions.
+        .write.partitionBy("p")
+        .option("compression", 
GZIP.lowerCaseName()).json(path.getCanonicalPath)
+
+      val numJobs = new AtomicLong(0)
+      sparkContext.addSparkListener(new SparkListener {
+        override def onJobEnd(jobEnd: SparkListenerJobEnd): Unit = {
+          numJobs.incrementAndGet()
+        }
+      })
+
+      val df = spark.read.json(path.getCanonicalPath)
+      assert(df.columns === Array("i", "p"))
+      spark.sparkContext.listenerBus.waitUntilEmpty()
+      assert(numJobs.get() == 1L)
+    }
+  }
+
+  test("SPARK-35320: Reading JSON with key type different to String in a map 
should fail") {
+    Seq(
+      (MapType(IntegerType, StringType), """{"1": "test"}"""),
+      (StructType(Seq(StructField("test", MapType(IntegerType, StringType)))),
+        """"test": {"1": "test"}"""),
+      (ArrayType(MapType(IntegerType, StringType)), """[{"1": "test"}]"""),
+      (MapType(StringType, MapType(IntegerType, StringType)), """{"key": {"1" 
: "test"}}""")
+    ).foreach { case (schema, jsonData) =>
+      withTempDir { dir =>
+        val colName = "col"
+        val msg = "can only contain STRING as a key type for a MAP"
+
+        val thrown1 = intercept[AnalysisException](
+          spark.read.schema(StructType(Seq(StructField(colName, schema))))
+            .json(Seq(jsonData).toDS()).collect())
+        assert(thrown1.getMessage.contains(msg))
+
+        val jsonDir = new File(dir, "json").getCanonicalPath
+        Seq(jsonData).toDF(colName).write.json(jsonDir)
+        val thrown2 = intercept[AnalysisException](
+          spark.read.schema(StructType(Seq(StructField(colName, schema))))
+            .json(jsonDir).collect())
+        assert(thrown2.getMessage.contains(msg))
+      }
+    }
+  }
 }
 
 class JsonV1Suite extends JsonSuite {


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to