This is an automated email from the ASF dual-hosted git repository.

wenchen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new 1f4c8e42479 [SPARK-40775][SQL] Fix duplicate description entries for 
V2 file scans
1f4c8e42479 is described below

commit 1f4c8e42479a9c9df37864507dfd39d9819fe83c
Author: Adam Binford <adam...@gmail.com>
AuthorDate: Mon Dec 12 18:15:58 2022 +0800

    [SPARK-40775][SQL] Fix duplicate description entries for V2 file scans
    
    ### What changes were proposed in this pull request?
    
    Remove overriding the description method in the V2 file sources. `FileScan` 
already uses all the metadata to create the description, so adding the same 
fields to the overridden description creates duplicates.
    
    ### Why are the changes needed?
    
    Example parquet scan from the agg pushdown suite:
    
    Before:
    ```
    +- BatchScan parquet file:/...[min(_3)#814, max(_3)#815, min(_1)#816, 
max(_1)#817, count(*)#818L, count(_1)#819L, count(_2)#820L, count(_3)#821L] 
ParquetScan DataFilters: [], Format: parquet, Location: InMemoryFileIndex(1 
paths)[file:/..., PartitionFilters: [], PushedAggregation: [MIN(_3), MAX(_3), 
MIN(_1), MAX(_1), COUNT(*), COUNT(_1), COUNT(_2), COUNT(_3)], PushedFilters: 
[], PushedGroupBy: [], ReadSchema: 
struct<min(_3):int,max(_3):int,min(_1):int,max(_1):int,count(*):bigint,count( 
[...]
    ```
    
    After:
    ```
     +- BatchScan parquet file:/...[min(_3)#814, max(_3)#815, min(_1)#816, 
max(_1)#817, count(*)#818L, count(_1)#819L, count(_2)#820L, count(_3)#821L] 
ParquetScan DataFilters: [], Format: parquet, Location: InMemoryFileIndex(1 
paths)[file:/..., PartitionFilters: [], PushedAggregation: [MIN(_3), MAX(_3), 
MIN(_1), MAX(_1), COUNT(*), COUNT(_1), COUNT(_2), COUNT(_3)], PushedFilters: 
[], PushedGroupBy: [], ReadSchema: 
struct<min(_3):int,max(_3):int,min(_1):int,max(_1):int,count(*):bigint,count 
[...]
    ```
    
    ### Does this PR introduce _any_ user-facing change?
    
    Just description change in explain output.
    
    ### How was this patch tested?
    
    Updated a few UTs to accommodate checking explain string.
    
    Closes #38229 from Kimahriman/remove-file-source-description.
    
    Authored-by: Adam Binford <adam...@gmail.com>
    Signed-off-by: Wenchen Fan <wenc...@databricks.com>
---
 .../scala/org/apache/spark/sql/v2/avro/AvroScan.scala |  4 ----
 .../sql/execution/datasources/v2/csv/CSVScan.scala    |  4 ----
 .../sql/execution/datasources/v2/json/JsonScan.scala  |  4 ++--
 .../sql/execution/datasources/v2/orc/OrcScan.scala    |  6 ------
 .../datasources/v2/parquet/ParquetScan.scala          |  6 ------
 .../scala/org/apache/spark/sql/ExplainSuite.scala     | 19 +++++--------------
 .../FileSourceAggregatePushDownSuite.scala            |  3 ++-
 7 files changed, 9 insertions(+), 37 deletions(-)

diff --git 
a/connector/avro/src/main/scala/org/apache/spark/sql/v2/avro/AvroScan.scala 
b/connector/avro/src/main/scala/org/apache/spark/sql/v2/avro/AvroScan.scala
index d0f38c12427..763b9abe4f9 100644
--- a/connector/avro/src/main/scala/org/apache/spark/sql/v2/avro/AvroScan.scala
+++ b/connector/avro/src/main/scala/org/apache/spark/sql/v2/avro/AvroScan.scala
@@ -70,10 +70,6 @@ case class AvroScan(
 
   override def hashCode(): Int = super.hashCode()
 
-  override def description(): String = {
-    super.description() + ", PushedFilters: " + pushedFilters.mkString("[", ", 
", "]")
-  }
-
   override def getMetaData(): Map[String, String] = {
     super.getMetaData() ++ Map("PushedFilters" -> seqToString(pushedFilters))
   }
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/csv/CSVScan.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/csv/CSVScan.scala
index d81223b48a5..734f8165aff 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/csv/CSVScan.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/csv/CSVScan.scala
@@ -91,10 +91,6 @@ case class CSVScan(
 
   override def hashCode(): Int = super.hashCode()
 
-  override def description(): String = {
-    super.description() + ", PushedFilters: " + pushedFilters.mkString("[", ", 
", "]")
-  }
-
   override def getMetaData(): Map[String, String] = {
     super.getMetaData() ++ Map("PushedFilters" -> seqToString(pushedFilters))
   }
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/json/JsonScan.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/json/JsonScan.scala
index 9ab367136fc..c9a3a6f5e7f 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/json/JsonScan.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/json/JsonScan.scala
@@ -91,7 +91,7 @@ case class JsonScan(
 
   override def hashCode(): Int = super.hashCode()
 
-  override def description(): String = {
-    super.description() + ", PushedFilters: " + pushedFilters.mkString("[", ", 
", "]")
+  override def getMetaData(): Map[String, String] = {
+    super.getMetaData() ++ Map("PushedFilters" -> pushedFilters.mkString("[", 
", ", "]"))
   }
 }
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/orc/OrcScan.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/orc/OrcScan.scala
index ccb9ca9c6b3..072ab26774e 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/orc/OrcScan.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/orc/OrcScan.scala
@@ -92,12 +92,6 @@ case class OrcScan(
     ("[]", "[]")
   }
 
-  override def description(): String = {
-    super.description() + ", PushedFilters: " + seqToString(pushedFilters) +
-      ", PushedAggregation: " + pushedAggregationsStr +
-      ", PushedGroupBy: " + pushedGroupByStr
-  }
-
   override def getMetaData(): Map[String, String] = {
     super.getMetaData() ++ Map("PushedFilters" -> seqToString(pushedFilters)) 
++
       Map("PushedAggregation" -> pushedAggregationsStr) ++
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/parquet/ParquetScan.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/parquet/ParquetScan.scala
index ff0b38880fd..619a8fe66e3 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/parquet/ParquetScan.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/parquet/ParquetScan.scala
@@ -127,12 +127,6 @@ case class ParquetScan(
     ("[]", "[]")
   }
 
-  override def description(): String = {
-    super.description() + ", PushedFilters: " + seqToString(pushedFilters) +
-      ", PushedAggregation: " + pushedAggregationsStr +
-      ", PushedGroupBy: " + pushedGroupByStr
-  }
-
   override def getMetaData(): Map[String, String] = {
     super.getMetaData() ++ Map("PushedFilters" -> seqToString(pushedFilters)) 
++
       Map("PushedAggregation" -> pushedAggregationsStr) ++
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/ExplainSuite.scala 
b/sql/core/src/test/scala/org/apache/spark/sql/ExplainSuite.scala
index 50c3b8fbf48..b5353455dc2 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/ExplainSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/ExplainSuite.scala
@@ -462,17 +462,8 @@ class ExplainSuite extends ExplainSuiteHelper with 
DisableAdaptiveExecutionSuite
     withTempDir { dir =>
       Seq("parquet", "orc", "csv", "json").foreach { fmt =>
         val basePath = dir.getCanonicalPath + "/" + fmt
-        val pushFilterMaps = Map (
-          "parquet" ->
-            "|PushedFilters: \\[IsNotNull\\(value\\), 
GreaterThan\\(value,2\\)\\]",
-          "orc" ->
-            "|PushedFilters: \\[IsNotNull\\(value\\), 
GreaterThan\\(value,2\\)\\]",
-          "csv" ->
-            "|PushedFilters: \\[IsNotNull\\(value\\), 
GreaterThan\\(value,2\\)\\]",
-          "json" ->
-            "|remove_marker"
-        )
-        val expected_plan_fragment1 =
+
+        val expectedPlanFragment =
           s"""
              |\\(1\\) BatchScan $fmt file:$basePath
              |Output \\[2\\]: \\[value#x, id#x\\]
@@ -480,9 +471,9 @@ class ExplainSuite extends ExplainSuiteHelper with 
DisableAdaptiveExecutionSuite
              |Format: $fmt
              |Location: InMemoryFileIndex\\([0-9]+ paths\\)\\[.*\\]
              |PartitionFilters: \\[isnotnull\\(id#x\\), \\(id#x > 1\\)\\]
-             ${pushFilterMaps.get(fmt).get}
+             |PushedFilters: \\[IsNotNull\\(value\\), 
GreaterThan\\(value,2\\)\\]
              |ReadSchema: struct\\<value:int\\>
-             |""".stripMargin.replaceAll("\nremove_marker", "").trim
+             |""".stripMargin.trim
 
         spark.range(10)
           .select(col("id"), col("id").as("value"))
@@ -500,7 +491,7 @@ class ExplainSuite extends ExplainSuiteHelper with 
DisableAdaptiveExecutionSuite
             .format(fmt)
             .load(basePath).where($"id" > 1 && $"value" > 2)
           val normalizedOutput = getNormalizedExplain(df, FormattedMode)
-          
assert(expected_plan_fragment1.r.findAllMatchIn(normalizedOutput).length == 1)
+          
assert(expectedPlanFragment.r.findAllMatchIn(normalizedOutput).length == 1)
         }
       }
     }
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileSourceAggregatePushDownSuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileSourceAggregatePushDownSuite.scala
index a68d9b951b7..e8fae210fa4 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileSourceAggregatePushDownSuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileSourceAggregatePushDownSuite.scala
@@ -347,7 +347,8 @@ trait FileSourceAggregatePushDownSuite
         
spark.read.format(format).load(file.getCanonicalPath).createOrReplaceTempView("test")
         Seq("false", "true").foreach { enableVectorizedReader =>
           withSQLConf(aggPushDownEnabledKey -> "true",
-            vectorizedReaderEnabledKey -> enableVectorizedReader) {
+            vectorizedReaderEnabledKey -> enableVectorizedReader,
+            SQLConf.MAX_METADATA_STRING_LENGTH.key -> "1000") {
 
             val testMinWithAllTypes = sql("SELECT min(StringCol), 
min(BooleanCol), min(ByteCol), " +
               "min(BinaryCol), min(ShortCol), min(IntegerCol), min(LongCol), 
min(FloatCol), " +


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to