This is an automated email from the ASF dual-hosted git repository.

gurwls223 pushed a commit to branch branch-2.4
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/branch-2.4 by this push:
     new 2a83431  [SPARK-26745][SPARK-24959][SQL][BRANCH-2.4] Revert count 
optimization in JSON datasource by
2a83431 is described below

commit 2a8343121e62aabe5c69d1e20fbb2c01e2e520e7
Author: Hyukjin Kwon <gurwls...@apache.org>
AuthorDate: Fri Feb 1 10:22:05 2019 +0800

    [SPARK-26745][SPARK-24959][SQL][BRANCH-2.4] Revert count optimization in 
JSON datasource by
    
    ## What changes were proposed in this pull request?
    
    This PR reverts JSON count optimization part of #21909.
    
    We cannot distinguish the cases below without parsing:
    
    ```
    [{...}, {...}]
    ```
    
    ```
    []
    ```
    
    ```
    {...}
    ```
    
    ```bash
    # empty string
    ```
    
    when we `count()`. One line (input: IN) can be, 0 record, 1 record and 
multiple records and this is dependent on each input.
    
    See also https://github.com/apache/spark/pull/23665#discussion_r251276720.
    
    ## How was this patch tested?
    
    Manually tested.
    
    Closes #23708 from HyukjinKwon/SPARK-26745-backport.
    
    Authored-by: Hyukjin Kwon <gurwls...@apache.org>
    Signed-off-by: Hyukjin Kwon <gurwls...@apache.org>
---
 .../scala/org/apache/spark/sql/DataFrameReader.scala     |  6 ++----
 .../sql/execution/datasources/FailureSafeParser.scala    | 11 ++---------
 .../sql/execution/datasources/csv/UnivocityParser.scala  | 16 +++++++++++-----
 .../sql/execution/datasources/json/JsonDataSource.scala  |  6 ++----
 .../sql/execution/datasources/json/JsonBenchmarks.scala  |  4 ----
 5 files changed, 17 insertions(+), 26 deletions(-)

diff --git a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala
index 869c584..e9278a0 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala
@@ -450,8 +450,7 @@ class DataFrameReader private[sql](sparkSession: 
SparkSession) extends Logging {
         input => rawParser.parse(input, createParser, UTF8String.fromString),
         parsedOptions.parseMode,
         schema,
-        parsedOptions.columnNameOfCorruptRecord,
-        parsedOptions.multiLine)
+        parsedOptions.columnNameOfCorruptRecord)
       iter.flatMap(parser.parse)
     }
     sparkSession.internalCreateDataFrame(parsed, schema, isStreaming = 
jsonDataset.isStreaming)
@@ -526,8 +525,7 @@ class DataFrameReader private[sql](sparkSession: 
SparkSession) extends Logging {
         input => Seq(rawParser.parse(input)),
         parsedOptions.parseMode,
         schema,
-        parsedOptions.columnNameOfCorruptRecord,
-        parsedOptions.multiLine)
+        parsedOptions.columnNameOfCorruptRecord)
       iter.flatMap(parser.parse)
     }
     sparkSession.internalCreateDataFrame(parsed, schema, isStreaming = 
csvDataset.isStreaming)
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FailureSafeParser.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FailureSafeParser.scala
index 90e8166..e618f17 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FailureSafeParser.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FailureSafeParser.scala
@@ -29,8 +29,7 @@ class FailureSafeParser[IN](
     rawParser: IN => Seq[InternalRow],
     mode: ParseMode,
     schema: StructType,
-    columnNameOfCorruptRecord: String,
-    isMultiLine: Boolean) {
+    columnNameOfCorruptRecord: String) {
 
   private val corruptFieldIndex = 
schema.getFieldIndex(columnNameOfCorruptRecord)
   private val actualSchema = StructType(schema.filterNot(_.name == 
columnNameOfCorruptRecord))
@@ -58,15 +57,9 @@ class FailureSafeParser[IN](
     }
   }
 
-  private val skipParsing = !isMultiLine && mode == PermissiveMode && 
schema.isEmpty
-
   def parse(input: IN): Iterator[InternalRow] = {
     try {
-     if (skipParsing) {
-       Iterator.single(InternalRow.empty)
-     } else {
-       rawParser.apply(input).toIterator.map(row => toResultRow(Some(row), () 
=> null))
-     }
+      rawParser.apply(input).toIterator.map(row => toResultRow(Some(row), () 
=> null))
     } catch {
       case e: BadRecordException => mode match {
         case PermissiveMode =>
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/UnivocityParser.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/UnivocityParser.scala
index 9088d43..42e3964 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/UnivocityParser.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/UnivocityParser.scala
@@ -203,11 +203,19 @@ class UnivocityParser(
     }
   }
 
+  private val doParse = if (requiredSchema.nonEmpty) {
+    (input: String) => convert(tokenizer.parseLine(input))
+  } else {
+    // If `columnPruning` enabled and partition attributes scanned only,
+    // `schema` gets empty.
+    (_: String) => InternalRow.empty
+  }
+
   /**
    * Parses a single CSV string and turns it into either one resulting row or 
no row (if the
    * the record is malformed).
    */
-  def parse(input: String): InternalRow = convert(tokenizer.parseLine(input))
+  def parse(input: String): InternalRow = doParse(input)
 
   private val getToken = if (options.columnPruning) {
     (tokens: Array[String], index: Int) => tokens(index)
@@ -290,8 +298,7 @@ private[csv] object UnivocityParser {
       input => Seq(parser.convert(input)),
       parser.options.parseMode,
       schema,
-      parser.options.columnNameOfCorruptRecord,
-      parser.options.multiLine)
+      parser.options.columnNameOfCorruptRecord)
     convertStream(inputStream, shouldDropHeader, tokenizer, checkHeader) { 
tokens =>
       safeParser.parse(tokens)
     }.flatten
@@ -339,8 +346,7 @@ private[csv] object UnivocityParser {
       input => Seq(parser.parse(input)),
       parser.options.parseMode,
       schema,
-      parser.options.columnNameOfCorruptRecord,
-      parser.options.multiLine)
+      parser.options.columnNameOfCorruptRecord)
     filteredLines.flatMap(safeParser.parse)
   }
 }
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JsonDataSource.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JsonDataSource.scala
index 76f5837..d6c5888 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JsonDataSource.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JsonDataSource.scala
@@ -139,8 +139,7 @@ object TextInputJsonDataSource extends JsonDataSource {
       input => parser.parse(input, textParser, textToUTF8String),
       parser.options.parseMode,
       schema,
-      parser.options.columnNameOfCorruptRecord,
-      parser.options.multiLine)
+      parser.options.columnNameOfCorruptRecord)
     linesReader.flatMap(safeParser.parse)
   }
 
@@ -224,8 +223,7 @@ object MultiLineJsonDataSource extends JsonDataSource {
       input => parser.parse[InputStream](input, streamParser, 
partitionedFileString),
       parser.options.parseMode,
       schema,
-      parser.options.columnNameOfCorruptRecord,
-      parser.options.multiLine)
+      parser.options.columnNameOfCorruptRecord)
 
     safeParser.parse(
       CodecStreams.createInputStreamWithCloseResource(conf, new Path(new 
URI(file.filePath))))
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonBenchmarks.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonBenchmarks.scala
index a2b747e..5592aa6 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonBenchmarks.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonBenchmarks.scala
@@ -194,9 +194,6 @@ object JSONBenchmarks {
       benchmark.addCase(s"Select 1 column + count()", 3) { _ =>
         ds.select($"col1").filter((_: Row) => true).count()
       }
-      benchmark.addCase(s"count()", 3) { _ =>
-        ds.count()
-      }
 
       /*
       Intel(R) Core(TM) i7-7700HQ CPU @ 2.80GHz
@@ -205,7 +202,6 @@ object JSONBenchmarks {
       
---------------------------------------------------------------------------------------------
       Select 10 columns + count()               9961 / 10006          1.0      
   996.1       1.0X
       Select 1 column + count()                  8355 / 8470          1.2      
   835.5       1.2X
-      count()                                    2104 / 2156          4.8      
   210.4       4.7X
       */
       benchmark.run()
     }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to