This is an automated email from the ASF dual-hosted git repository. gurwls223 pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push: new d4d6df2 [SPARK-26745][SQL] Revert count optimization in JSON datasource by SPARK-24959 d4d6df2 is described below commit d4d6df2f7d97168f0f3073aa42608294030ece55 Author: Hyukjin Kwon <gurwls...@apache.org> AuthorDate: Thu Jan 31 14:32:31 2019 +0800 [SPARK-26745][SQL] Revert count optimization in JSON datasource by SPARK-24959 ## What changes were proposed in this pull request? This PR reverts JSON count optimization part of #21909. We cannot distinguish the cases below without parsing: ``` [{...}, {...}] ``` ``` [] ``` ``` {...} ``` ```bash # empty string ``` when we `count()`. One line (input: IN) can be, 0 record, 1 record and multiple records and this is dependent on each input. See also https://github.com/apache/spark/pull/23665#discussion_r251276720. ## How was this patch tested? Manually tested. Closes #23667 from HyukjinKwon/revert-SPARK-24959. Authored-by: Hyukjin Kwon <gurwls...@apache.org> Signed-off-by: Hyukjin Kwon <gurwls...@apache.org> --- .../apache/spark/sql/catalyst/csv/UnivocityParser.scala | 16 +++++++++++----- .../spark/sql/catalyst/expressions/csvExpressions.scala | 3 +-- .../spark/sql/catalyst/expressions/jsonExpressions.scala | 3 +-- .../spark/sql/catalyst/util/FailureSafeParser.scala | 11 ++--------- sql/core/benchmarks/JSONBenchmark-results.txt | 1 - .../scala/org/apache/spark/sql/DataFrameReader.scala | 6 ++---- .../sql/execution/datasources/json/JsonDataSource.scala | 6 ++---- .../sql/execution/datasources/json/JsonBenchmark.scala | 3 --- 8 files changed, 19 insertions(+), 30 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/csv/UnivocityParser.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/csv/UnivocityParser.scala index 82a5b3c..79dff6f 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/csv/UnivocityParser.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/csv/UnivocityParser.scala @@ -188,11 +188,19 @@ class UnivocityParser( } } + private val doParse = if (requiredSchema.nonEmpty) { + (input: String) => convert(tokenizer.parseLine(input)) + } else { + // If `columnPruning` enabled and partition attributes scanned only, + // `schema` gets empty. + (_: String) => InternalRow.empty + } + /** * Parses a single CSV string and turns it into either one resulting row or no row (if the * the record is malformed). */ - def parse(input: String): InternalRow = convert(tokenizer.parseLine(input)) + def parse(input: String): InternalRow = doParse(input) private val getToken = if (options.columnPruning) { (tokens: Array[String], index: Int) => tokens(index) @@ -282,8 +290,7 @@ private[sql] object UnivocityParser { input => Seq(parser.convert(input)), parser.options.parseMode, schema, - parser.options.columnNameOfCorruptRecord, - parser.options.multiLine) + parser.options.columnNameOfCorruptRecord) val handleHeader: () => Unit = () => headerChecker.checkHeaderColumnNames(tokenizer) @@ -336,8 +343,7 @@ private[sql] object UnivocityParser { input => Seq(parser.parse(input)), parser.options.parseMode, schema, - parser.options.columnNameOfCorruptRecord, - parser.options.multiLine) + parser.options.columnNameOfCorruptRecord) filteredLines.flatMap(safeParser.parse) } } diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/csvExpressions.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/csvExpressions.scala index 83b0299..65b10f3 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/csvExpressions.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/csvExpressions.scala @@ -117,8 +117,7 @@ case class CsvToStructs( input => Seq(rawParser.parse(input)), mode, nullableSchema, - parsedOptions.columnNameOfCorruptRecord, - parsedOptions.multiLine) + parsedOptions.columnNameOfCorruptRecord) } override def dataType: DataType = nullableSchema diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/jsonExpressions.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/jsonExpressions.scala index 3403349..655e44e 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/jsonExpressions.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/jsonExpressions.scala @@ -582,8 +582,7 @@ case class JsonToStructs( input => rawParser.parse(input, createParser, identity[UTF8String]), mode, parserSchema, - parsedOptions.columnNameOfCorruptRecord, - parsedOptions.multiLine) + parsedOptions.columnNameOfCorruptRecord) } override def dataType: DataType = nullableSchema diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/FailureSafeParser.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/FailureSafeParser.scala index 76745b1..361c8b2 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/FailureSafeParser.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/FailureSafeParser.scala @@ -27,8 +27,7 @@ class FailureSafeParser[IN]( rawParser: IN => Seq[InternalRow], mode: ParseMode, schema: StructType, - columnNameOfCorruptRecord: String, - isMultiLine: Boolean) { + columnNameOfCorruptRecord: String) { private val corruptFieldIndex = schema.getFieldIndex(columnNameOfCorruptRecord) private val actualSchema = StructType(schema.filterNot(_.name == columnNameOfCorruptRecord)) @@ -56,15 +55,9 @@ class FailureSafeParser[IN]( } } - private val skipParsing = !isMultiLine && mode == PermissiveMode && schema.isEmpty - def parse(input: IN): Iterator[InternalRow] = { try { - if (skipParsing) { - Iterator.single(InternalRow.empty) - } else { - rawParser.apply(input).toIterator.map(row => toResultRow(Some(row), () => null)) - } + rawParser.apply(input).toIterator.map(row => toResultRow(Some(row), () => null)) } catch { case e: BadRecordException => mode match { case PermissiveMode => diff --git a/sql/core/benchmarks/JSONBenchmark-results.txt b/sql/core/benchmarks/JSONBenchmark-results.txt index 947f57d..f16e60c 100644 --- a/sql/core/benchmarks/JSONBenchmark-results.txt +++ b/sql/core/benchmarks/JSONBenchmark-results.txt @@ -41,7 +41,6 @@ Select a subset of 10 columns: Best/Avg Time(ms) Rate(M/s) Per Ro ------------------------------------------------------------------------------------------------ Select 10 columns + count() 19539 / 19896 0.5 1953.9 1.0X Select 1 column + count() 16412 / 16445 0.6 1641.2 1.2X -count() 2783 / 2801 3.6 278.3 7.0X Preparing data for benchmarking ... Java HotSpot(TM) 64-Bit Server VM 1.8.0_191-b12 on Linux 3.16.0-31-generic diff --git a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala index 2b15217..713c9a9 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala @@ -468,8 +468,7 @@ class DataFrameReader private[sql](sparkSession: SparkSession) extends Logging { input => rawParser.parse(input, createParser, UTF8String.fromString), parsedOptions.parseMode, schema, - parsedOptions.columnNameOfCorruptRecord, - parsedOptions.multiLine) + parsedOptions.columnNameOfCorruptRecord) iter.flatMap(parser.parse) } sparkSession.internalCreateDataFrame(parsed, schema, isStreaming = jsonDataset.isStreaming) @@ -538,8 +537,7 @@ class DataFrameReader private[sql](sparkSession: SparkSession) extends Logging { input => Seq(rawParser.parse(input)), parsedOptions.parseMode, schema, - parsedOptions.columnNameOfCorruptRecord, - parsedOptions.multiLine) + parsedOptions.columnNameOfCorruptRecord) iter.flatMap(parser.parse) } sparkSession.internalCreateDataFrame(parsed, schema, isStreaming = csvDataset.isStreaming) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JsonDataSource.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JsonDataSource.scala index 456f08a..7ec2267 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JsonDataSource.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JsonDataSource.scala @@ -140,8 +140,7 @@ object TextInputJsonDataSource extends JsonDataSource { input => parser.parse(input, textParser, textToUTF8String), parser.options.parseMode, schema, - parser.options.columnNameOfCorruptRecord, - parser.options.multiLine) + parser.options.columnNameOfCorruptRecord) linesReader.flatMap(safeParser.parse) } @@ -225,8 +224,7 @@ object MultiLineJsonDataSource extends JsonDataSource { input => parser.parse[InputStream](input, streamParser, partitionedFileString), parser.options.parseMode, schema, - parser.options.columnNameOfCorruptRecord, - parser.options.multiLine) + parser.options.columnNameOfCorruptRecord) safeParser.parse( CodecStreams.createInputStreamWithCloseResource(conf, new Path(new URI(file.filePath)))) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonBenchmark.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonBenchmark.scala index 27f7023..25f7620 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonBenchmark.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonBenchmark.scala @@ -217,9 +217,6 @@ object JSONBenchmark extends SqlBasedBenchmark { benchmark.addCase(s"Select 1 column + count()", numIters) { _ => ds.select($"col1").filter((_: Row) => true).count() } - benchmark.addCase(s"count()", numIters) { _ => - ds.count() - } benchmark.run() } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org