This is an automated email from the ASF dual-hosted git repository.

gurwls223 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new d4d6df2  [SPARK-26745][SQL] Revert count optimization in JSON 
datasource by SPARK-24959
d4d6df2 is described below

commit d4d6df2f7d97168f0f3073aa42608294030ece55
Author: Hyukjin Kwon <gurwls...@apache.org>
AuthorDate: Thu Jan 31 14:32:31 2019 +0800

    [SPARK-26745][SQL] Revert count optimization in JSON datasource by 
SPARK-24959
    
    ## What changes were proposed in this pull request?
    
    This PR reverts JSON count optimization part of #21909.
    
    We cannot distinguish the cases below without parsing:
    
    ```
    [{...}, {...}]
    ```
    
    ```
    []
    ```
    
    ```
    {...}
    ```
    
    ```bash
    # empty string
    ```
    
    when we `count()`. One line (input: IN) can be, 0 record, 1 record and 
multiple records and this is dependent on each input.
    
    See also https://github.com/apache/spark/pull/23665#discussion_r251276720.
    
    ## How was this patch tested?
    
    Manually tested.
    
    Closes #23667 from HyukjinKwon/revert-SPARK-24959.
    
    Authored-by: Hyukjin Kwon <gurwls...@apache.org>
    Signed-off-by: Hyukjin Kwon <gurwls...@apache.org>
---
 .../apache/spark/sql/catalyst/csv/UnivocityParser.scala  | 16 +++++++++++-----
 .../spark/sql/catalyst/expressions/csvExpressions.scala  |  3 +--
 .../spark/sql/catalyst/expressions/jsonExpressions.scala |  3 +--
 .../spark/sql/catalyst/util/FailureSafeParser.scala      | 11 ++---------
 sql/core/benchmarks/JSONBenchmark-results.txt            |  1 -
 .../scala/org/apache/spark/sql/DataFrameReader.scala     |  6 ++----
 .../sql/execution/datasources/json/JsonDataSource.scala  |  6 ++----
 .../sql/execution/datasources/json/JsonBenchmark.scala   |  3 ---
 8 files changed, 19 insertions(+), 30 deletions(-)

diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/csv/UnivocityParser.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/csv/UnivocityParser.scala
index 82a5b3c..79dff6f 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/csv/UnivocityParser.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/csv/UnivocityParser.scala
@@ -188,11 +188,19 @@ class UnivocityParser(
     }
   }
 
+  private val doParse = if (requiredSchema.nonEmpty) {
+    (input: String) => convert(tokenizer.parseLine(input))
+  } else {
+    // If `columnPruning` enabled and partition attributes scanned only,
+    // `schema` gets empty.
+    (_: String) => InternalRow.empty
+  }
+
   /**
    * Parses a single CSV string and turns it into either one resulting row or 
no row (if the
    * the record is malformed).
    */
-  def parse(input: String): InternalRow = convert(tokenizer.parseLine(input))
+  def parse(input: String): InternalRow = doParse(input)
 
   private val getToken = if (options.columnPruning) {
     (tokens: Array[String], index: Int) => tokens(index)
@@ -282,8 +290,7 @@ private[sql] object UnivocityParser {
       input => Seq(parser.convert(input)),
       parser.options.parseMode,
       schema,
-      parser.options.columnNameOfCorruptRecord,
-      parser.options.multiLine)
+      parser.options.columnNameOfCorruptRecord)
 
     val handleHeader: () => Unit =
       () => headerChecker.checkHeaderColumnNames(tokenizer)
@@ -336,8 +343,7 @@ private[sql] object UnivocityParser {
       input => Seq(parser.parse(input)),
       parser.options.parseMode,
       schema,
-      parser.options.columnNameOfCorruptRecord,
-      parser.options.multiLine)
+      parser.options.columnNameOfCorruptRecord)
     filteredLines.flatMap(safeParser.parse)
   }
 }
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/csvExpressions.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/csvExpressions.scala
index 83b0299..65b10f3 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/csvExpressions.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/csvExpressions.scala
@@ -117,8 +117,7 @@ case class CsvToStructs(
       input => Seq(rawParser.parse(input)),
       mode,
       nullableSchema,
-      parsedOptions.columnNameOfCorruptRecord,
-      parsedOptions.multiLine)
+      parsedOptions.columnNameOfCorruptRecord)
   }
 
   override def dataType: DataType = nullableSchema
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/jsonExpressions.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/jsonExpressions.scala
index 3403349..655e44e 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/jsonExpressions.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/jsonExpressions.scala
@@ -582,8 +582,7 @@ case class JsonToStructs(
       input => rawParser.parse(input, createParser, identity[UTF8String]),
       mode,
       parserSchema,
-      parsedOptions.columnNameOfCorruptRecord,
-      parsedOptions.multiLine)
+      parsedOptions.columnNameOfCorruptRecord)
   }
 
   override def dataType: DataType = nullableSchema
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/FailureSafeParser.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/FailureSafeParser.scala
index 76745b1..361c8b2 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/FailureSafeParser.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/FailureSafeParser.scala
@@ -27,8 +27,7 @@ class FailureSafeParser[IN](
     rawParser: IN => Seq[InternalRow],
     mode: ParseMode,
     schema: StructType,
-    columnNameOfCorruptRecord: String,
-    isMultiLine: Boolean) {
+    columnNameOfCorruptRecord: String) {
 
   private val corruptFieldIndex = 
schema.getFieldIndex(columnNameOfCorruptRecord)
   private val actualSchema = StructType(schema.filterNot(_.name == 
columnNameOfCorruptRecord))
@@ -56,15 +55,9 @@ class FailureSafeParser[IN](
     }
   }
 
-  private val skipParsing = !isMultiLine && mode == PermissiveMode && 
schema.isEmpty
-
   def parse(input: IN): Iterator[InternalRow] = {
     try {
-     if (skipParsing) {
-       Iterator.single(InternalRow.empty)
-     } else {
-       rawParser.apply(input).toIterator.map(row => toResultRow(Some(row), () 
=> null))
-     }
+      rawParser.apply(input).toIterator.map(row => toResultRow(Some(row), () 
=> null))
     } catch {
       case e: BadRecordException => mode match {
         case PermissiveMode =>
diff --git a/sql/core/benchmarks/JSONBenchmark-results.txt 
b/sql/core/benchmarks/JSONBenchmark-results.txt
index 947f57d..f16e60c 100644
--- a/sql/core/benchmarks/JSONBenchmark-results.txt
+++ b/sql/core/benchmarks/JSONBenchmark-results.txt
@@ -41,7 +41,6 @@ Select a subset of 10 columns:           Best/Avg Time(ms)    
Rate(M/s)   Per Ro
 
------------------------------------------------------------------------------------------------
 Select 10 columns + count()                 19539 / 19896          0.5        
1953.9       1.0X
 Select 1 column + count()                   16412 / 16445          0.6        
1641.2       1.2X
-count()                                       2783 / 2801          3.6         
278.3       7.0X
 
 Preparing data for benchmarking ...
 Java HotSpot(TM) 64-Bit Server VM 1.8.0_191-b12 on Linux 3.16.0-31-generic
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala
index 2b15217..713c9a9 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala
@@ -468,8 +468,7 @@ class DataFrameReader private[sql](sparkSession: 
SparkSession) extends Logging {
         input => rawParser.parse(input, createParser, UTF8String.fromString),
         parsedOptions.parseMode,
         schema,
-        parsedOptions.columnNameOfCorruptRecord,
-        parsedOptions.multiLine)
+        parsedOptions.columnNameOfCorruptRecord)
       iter.flatMap(parser.parse)
     }
     sparkSession.internalCreateDataFrame(parsed, schema, isStreaming = 
jsonDataset.isStreaming)
@@ -538,8 +537,7 @@ class DataFrameReader private[sql](sparkSession: 
SparkSession) extends Logging {
         input => Seq(rawParser.parse(input)),
         parsedOptions.parseMode,
         schema,
-        parsedOptions.columnNameOfCorruptRecord,
-        parsedOptions.multiLine)
+        parsedOptions.columnNameOfCorruptRecord)
       iter.flatMap(parser.parse)
     }
     sparkSession.internalCreateDataFrame(parsed, schema, isStreaming = 
csvDataset.isStreaming)
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JsonDataSource.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JsonDataSource.scala
index 456f08a..7ec2267 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JsonDataSource.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JsonDataSource.scala
@@ -140,8 +140,7 @@ object TextInputJsonDataSource extends JsonDataSource {
       input => parser.parse(input, textParser, textToUTF8String),
       parser.options.parseMode,
       schema,
-      parser.options.columnNameOfCorruptRecord,
-      parser.options.multiLine)
+      parser.options.columnNameOfCorruptRecord)
     linesReader.flatMap(safeParser.parse)
   }
 
@@ -225,8 +224,7 @@ object MultiLineJsonDataSource extends JsonDataSource {
       input => parser.parse[InputStream](input, streamParser, 
partitionedFileString),
       parser.options.parseMode,
       schema,
-      parser.options.columnNameOfCorruptRecord,
-      parser.options.multiLine)
+      parser.options.columnNameOfCorruptRecord)
 
     safeParser.parse(
       CodecStreams.createInputStreamWithCloseResource(conf, new Path(new 
URI(file.filePath))))
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonBenchmark.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonBenchmark.scala
index 27f7023..25f7620 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonBenchmark.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonBenchmark.scala
@@ -217,9 +217,6 @@ object JSONBenchmark extends SqlBasedBenchmark {
       benchmark.addCase(s"Select 1 column + count()", numIters) { _ =>
         ds.select($"col1").filter((_: Row) => true).count()
       }
-      benchmark.addCase(s"count()", numIters) { _ =>
-        ds.count()
-      }
 
       benchmark.run()
     }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to