[GitHub] spark pull request #21909: [SPARK-24959][SQL] Speed up count() for JSON and ...

2018-08-19 Thread HyukjinKwon
Github user HyukjinKwon commented on a diff in the pull request:

https://github.com/apache/spark/pull/21909#discussion_r211129911
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FailureSafeParser.scala
 ---
@@ -56,9 +58,15 @@ class FailureSafeParser[IN](
 }
   }
 
+  private val skipParsing = !isMultiLine && mode == PermissiveMode && 
schema.isEmpty
--- End diff --

not a big deal but I would leave a comment to explain why it's permissive 
and non-miltiline only. I assume counts are known when it's actually parsed for 
multiline cases, and counts should be given in any case when the mode is 
permissive, right?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21909: [SPARK-24959][SQL] Speed up count() for JSON and ...

2018-08-18 Thread asfgit
Github user asfgit closed the pull request at:

https://github.com/apache/spark/pull/21909


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21909: [SPARK-24959][SQL] Speed up count() for JSON and ...

2018-08-18 Thread MaxGekk
Github user MaxGekk commented on a diff in the pull request:

https://github.com/apache/spark/pull/21909#discussion_r211075385
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JsonDataSource.scala
 ---
@@ -223,7 +224,8 @@ object MultiLineJsonDataSource extends JsonDataSource {
   input => parser.parse[InputStream](input, streamParser, 
partitionedFileString),
   parser.options.parseMode,
   schema,
-  parser.options.columnNameOfCorruptRecord)
+  parser.options.columnNameOfCorruptRecord,
+  optimizeEmptySchema = false)
--- End diff --

renamed


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21909: [SPARK-24959][SQL] Speed up count() for JSON and ...

2018-08-18 Thread MaxGekk
Github user MaxGekk commented on a diff in the pull request:

https://github.com/apache/spark/pull/21909#discussion_r211075384
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala ---
@@ -1492,6 +1492,15 @@ object SQLConf {
 "This usually speeds up commands that need to list many 
directories.")
   .booleanConf
   .createWithDefault(true)
+
+  val BYPASS_PARSER_FOR_EMPTY_SCHEMA =
+buildConf("spark.sql.legacy.bypassParserForEmptySchema")
--- End diff --

It seems we don't need it anymore


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21909: [SPARK-24959][SQL] Speed up count() for JSON and ...

2018-08-17 Thread gatorsmile
Github user gatorsmile commented on a diff in the pull request:

https://github.com/apache/spark/pull/21909#discussion_r211045699
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JsonDataSource.scala
 ---
@@ -223,7 +224,8 @@ object MultiLineJsonDataSource extends JsonDataSource {
   input => parser.parse[InputStream](input, streamParser, 
partitionedFileString),
   parser.options.parseMode,
   schema,
-  parser.options.columnNameOfCorruptRecord)
+  parser.options.columnNameOfCorruptRecord,
+  optimizeEmptySchema = false)
--- End diff --

Could we rename `optimizeEmptySchema ` to `isMultiLine`?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21909: [SPARK-24959][SQL] Speed up count() for JSON and ...

2018-08-17 Thread gatorsmile
Github user gatorsmile commented on a diff in the pull request:

https://github.com/apache/spark/pull/21909#discussion_r211045061
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala ---
@@ -1492,6 +1492,15 @@ object SQLConf {
 "This usually speeds up commands that need to list many 
directories.")
   .booleanConf
   .createWithDefault(true)
+
+  val BYPASS_PARSER_FOR_EMPTY_SCHEMA =
+buildConf("spark.sql.legacy.bypassParserForEmptySchema")
--- End diff --

If no behavior change, do we still need this conf?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21909: [SPARK-24959][SQL] Speed up count() for JSON and ...

2018-08-16 Thread gatorsmile
Github user gatorsmile commented on a diff in the pull request:

https://github.com/apache/spark/pull/21909#discussion_r210767018
  
--- Diff: 
sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala
 ---
@@ -2223,21 +2223,31 @@ class JsonSuite extends QueryTest with 
SharedSQLContext with TestJsonData {
 checkAnswer(jsonDF, Seq(Row("Chris", "Baird")))
   }
 
-
   test("SPARK-23723: specified encoding is not matched to actual 
encoding") {
-val fileName = "test-data/utf16LE.json"
-val schema = new StructType().add("firstName", 
StringType).add("lastName", StringType)
-val exception = intercept[SparkException] {
-  spark.read.schema(schema)
-.option("mode", "FAILFAST")
-.option("multiline", "true")
-.options(Map("encoding" -> "UTF-16BE"))
-.json(testFile(fileName))
-.count()
+def doCount(bypassParser: Boolean, multiLine: Boolean): Long = {
+  var result: Long = -1
+  withSQLConf(SQLConf.BYPASS_PARSER_FOR_EMPTY_SCHEMA.key -> 
bypassParser.toString) {
+val fileName = "test-data/utf16LE.json"
+val schema = new StructType().add("firstName", 
StringType).add("lastName", StringType)
+result = spark.read.schema(schema)
+  .option("mode", "FAILFAST")
--- End diff --

This sounds good! Let us enable it only when PERMISSIVE is on. You know, 
our default mode is PERMISSIVE. This should benefit most users. 


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21909: [SPARK-24959][SQL] Speed up count() for JSON and ...

2018-08-16 Thread gatorsmile
Github user gatorsmile commented on a diff in the pull request:

https://github.com/apache/spark/pull/21909#discussion_r210765672
  
--- Diff: docs/sql-programming-guide.md ---
@@ -1894,6 +1894,7 @@ working with timestamps in `pandas_udf`s to get the 
best performance, see
   - In version 2.3 and earlier, CSV rows are considered as malformed if at 
least one column value in the row is malformed. CSV parser dropped such rows in 
the DROPMALFORMED mode or outputs an error in the FAILFAST mode. Since Spark 
2.4, CSV row is considered as malformed only when it contains malformed column 
values requested from CSV datasource, other values can be ignored. As an 
example, CSV file contains the "id,name" header and one row "1234". In Spark 
2.4, selection of the id column consists of a row with one column value 1234 
but in Spark 2.3 and earlier it is empty in the DROPMALFORMED mode. To restore 
the previous behavior, set `spark.sql.csv.parser.columnPruning.enabled` to 
`false`.
   - Since Spark 2.4, File listing for compute statistics is done in 
parallel by default. This can be disabled by setting 
`spark.sql.parallelFileListingInStatsComputation.enabled` to `False`.
   - Since Spark 2.4, Metadata files (e.g. Parquet summary files) and 
temporary files are not counted as data files when calculating table size 
during Statistics computation.
+  - Since Spark 2.4, text-based datasources like CSV and JSON don't parse 
input lines if the required schema pushed down to the datasources is empty. The 
schema can be empty in the case of the count() action. For example, Spark 2.3 
and earlier versions failed on JSON files with invalid encoding but Spark 2.4 
returns total number of lines in the file. To restore the previous behavior 
when the underlying parser is always invoked even for the empty schema, set 
`true` to `spark.sql.legacy.bypassParserForEmptySchema`. This option will be 
removed in Spark 3.0.
--- End diff --

Is it right based on what you said 
https://github.com/apache/spark/pull/21909#discussion_r210704902?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21909: [SPARK-24959][SQL] Speed up count() for JSON and ...

2018-08-16 Thread MaxGekk
Github user MaxGekk commented on a diff in the pull request:

https://github.com/apache/spark/pull/21909#discussion_r210704902
  
--- Diff: 
sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala
 ---
@@ -2223,21 +2223,31 @@ class JsonSuite extends QueryTest with 
SharedSQLContext with TestJsonData {
 checkAnswer(jsonDF, Seq(Row("Chris", "Baird")))
   }
 
-
   test("SPARK-23723: specified encoding is not matched to actual 
encoding") {
-val fileName = "test-data/utf16LE.json"
-val schema = new StructType().add("firstName", 
StringType).add("lastName", StringType)
-val exception = intercept[SparkException] {
-  spark.read.schema(schema)
-.option("mode", "FAILFAST")
-.option("multiline", "true")
-.options(Map("encoding" -> "UTF-16BE"))
-.json(testFile(fileName))
-.count()
+def doCount(bypassParser: Boolean, multiLine: Boolean): Long = {
+  var result: Long = -1
+  withSQLConf(SQLConf.BYPASS_PARSER_FOR_EMPTY_SCHEMA.key -> 
bypassParser.toString) {
+val fileName = "test-data/utf16LE.json"
+val schema = new StructType().add("firstName", 
StringType).add("lastName", StringType)
+result = spark.read.schema(schema)
+  .option("mode", "FAILFAST")
--- End diff --

> Does the mode matter?

I just want to have an explicit error in the test instead of `0` for 
`count()` ( `DROPMALFORMED`), or full table of nulls or an exception 
(`PERMISSIVE`) since an exception is expected result.

> What happened if users use DROPMALFORMED before this PR?

It depends on `multiLine`. If it is `true`, behaviour before and after PR 
is the same since the optimization doesn't impact on the `multiLine` mode. For 
`multiLine` equals to `false`, after the PR the result is `5` (total number of 
lines), before the PR - `0` in the `DROPMALFORMED` mode.

We can enable this optimization for the `PERMISSIVE` mode only to exclude 
any deviation in outputs.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21909: [SPARK-24959][SQL] Speed up count() for JSON and ...

2018-08-16 Thread gatorsmile
Github user gatorsmile commented on a diff in the pull request:

https://github.com/apache/spark/pull/21909#discussion_r210693829
  
--- Diff: 
sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala
 ---
@@ -2223,21 +2223,31 @@ class JsonSuite extends QueryTest with 
SharedSQLContext with TestJsonData {
 checkAnswer(jsonDF, Seq(Row("Chris", "Baird")))
   }
 
-
   test("SPARK-23723: specified encoding is not matched to actual 
encoding") {
-val fileName = "test-data/utf16LE.json"
-val schema = new StructType().add("firstName", 
StringType).add("lastName", StringType)
-val exception = intercept[SparkException] {
-  spark.read.schema(schema)
-.option("mode", "FAILFAST")
-.option("multiline", "true")
-.options(Map("encoding" -> "UTF-16BE"))
-.json(testFile(fileName))
-.count()
+def doCount(bypassParser: Boolean, multiLine: Boolean): Long = {
+  var result: Long = -1
+  withSQLConf(SQLConf.BYPASS_PARSER_FOR_EMPTY_SCHEMA.key -> 
bypassParser.toString) {
+val fileName = "test-data/utf16LE.json"
+val schema = new StructType().add("firstName", 
StringType).add("lastName", StringType)
+result = spark.read.schema(schema)
+  .option("mode", "FAILFAST")
--- End diff --

Does the mode matter? What happened if users use `DROPMALFORMED ` before 
this PR? 


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21909: [SPARK-24959][SQL] Speed up count() for JSON and ...

2018-08-16 Thread gatorsmile
Github user gatorsmile commented on a diff in the pull request:

https://github.com/apache/spark/pull/21909#discussion_r210666117
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala ---
@@ -1492,6 +1492,15 @@ object SQLConf {
 "This usually speeds up commands that need to list many 
directories.")
   .booleanConf
   .createWithDefault(true)
+
+  val BYPASS_PARSER_FOR_EMPTY_SCHEMA =
+buildConf("spark.sql.legacy.bypassParserForEmptySchema")
+  .doc("If required schema passed to a text datasource is empty, the 
parameter controls " +
+"invocation of underlying parser. For example, if it is set to 
false, uniVocity parser " +
+"is invoke by CSV datasource or Jackson parser by JSON datasource. 
By default, it is set " +
--- End diff --

`invoke` -> `invoked`


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21909: [SPARK-24959][SQL] Speed up count() for JSON and ...

2018-08-15 Thread MaxGekk
Github user MaxGekk commented on a diff in the pull request:

https://github.com/apache/spark/pull/21909#discussion_r210350101
  
--- Diff: docs/sql-programming-guide.md ---
@@ -1892,6 +1892,7 @@ working with timestamps in `pandas_udf`s to get the 
best performance, see
   - In version 2.3 and earlier, Spark converts Parquet Hive tables by 
default but ignores table properties like `TBLPROPERTIES (parquet.compression 
'NONE')`. This happens for ORC Hive table properties like `TBLPROPERTIES 
(orc.compress 'NONE')` in case of `spark.sql.hive.convertMetastoreOrc=true`, 
too. Since Spark 2.4, Spark respects Parquet/ORC specific table properties 
while converting Parquet/ORC Hive tables. As an example, `CREATE TABLE t(id 
int) STORED AS PARQUET TBLPROPERTIES (parquet.compression 'NONE')` would 
generate Snappy parquet files during insertion in Spark 2.3, and in Spark 2.4, 
the result would be uncompressed parquet files.
   - Since Spark 2.0, Spark converts Parquet Hive tables by default for 
better performance. Since Spark 2.4, Spark converts ORC Hive tables by default, 
too. It means Spark uses its own ORC support by default instead of Hive SerDe. 
As an example, `CREATE TABLE t(id int) STORED AS ORC` would be handled with 
Hive SerDe in Spark 2.3, and in Spark 2.4, it would be converted into Spark's 
ORC data source table and ORC vectorization would be applied. To set `false` to 
`spark.sql.hive.convertMetastoreOrc` restores the previous behavior.
   - In version 2.3 and earlier, CSV rows are considered as malformed if at 
least one column value in the row is malformed. CSV parser dropped such rows in 
the DROPMALFORMED mode or outputs an error in the FAILFAST mode. Since Spark 
2.4, CSV row is considered as malformed only when it contains malformed column 
values requested from CSV datasource, other values can be ignored. As an 
example, CSV file contains the "id,name" header and one row "1234". In Spark 
2.4, selection of the id column consists of a row with one column value 1234 
but in Spark 2.3 and earlier it is empty in the DROPMALFORMED mode. To restore 
the previous behavior, set `spark.sql.csv.parser.columnPruning.enabled` to 
`false`.
+  - Since Spark 2.4, text-based datasources like CSV and JSON don't parse 
input lines if the required schema pushed down to the datasources is empty. The 
schema can be empty in the case of count(), for example. To set `true` to 
`spark.sql.legacy.bypassParserForEmptySchema` restores the previous behavior 
when the underlying parser is always invoked even for the empty schema. This 
option will be removed in Spark 3.0.
--- End diff --

> Does it also mean the result of count() can change if the json/csv files 
contain malformed records?

@cloud-fan Only the difference I have found so far is when a whole JSON 
file is invalid i.e it has wrong enconding, for example. In other cases 
`count()` returns the same.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21909: [SPARK-24959][SQL] Speed up count() for JSON and ...

2018-08-14 Thread MaxGekk
Github user MaxGekk commented on a diff in the pull request:

https://github.com/apache/spark/pull/21909#discussion_r209927964
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FailureSafeParser.scala
 ---
@@ -56,9 +57,14 @@ class FailureSafeParser[IN](
 }
   }
 
+  private val skipParsing = optimizeEmptySchema && schema.isEmpty
   def parse(input: IN): Iterator[InternalRow] = {
 try {
-  rawParser.apply(input).toIterator.map(row => toResultRow(Some(row), 
() => null))
+ if (skipParsing) {
+   Iterator.single(InternalRow.empty)
+ } else {
+   rawParser.apply(input).toIterator.map(row => toResultRow(Some(row), 
() => null))
--- End diff --

I added the tests


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21909: [SPARK-24959][SQL] Speed up count() for JSON and ...

2018-08-14 Thread MaxGekk
Github user MaxGekk commented on a diff in the pull request:

https://github.com/apache/spark/pull/21909#discussion_r209916711
  
--- Diff: 
sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala
 ---
@@ -2225,19 +2225,21 @@ class JsonSuite extends QueryTest with 
SharedSQLContext with TestJsonData {
 
 
   test("SPARK-23723: specified encoding is not matched to actual 
encoding") {
-val fileName = "test-data/utf16LE.json"
-val schema = new StructType().add("firstName", 
StringType).add("lastName", StringType)
-val exception = intercept[SparkException] {
-  spark.read.schema(schema)
-.option("mode", "FAILFAST")
-.option("multiline", "true")
-.options(Map("encoding" -> "UTF-16BE"))
-.json(testFile(fileName))
-.count()
-}
-val errMsg = exception.getMessage
+withSQLConf(SQLConf.BYPASS_PARSER_FOR_EMPTY_SCHEMA.key -> "false") {
--- End diff --

I will revert changes for the test because it enables multiLine mode for 
which the optimization is not applicable.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21909: [SPARK-24959][SQL] Speed up count() for JSON and ...

2018-08-06 Thread MaxGekk
Github user MaxGekk commented on a diff in the pull request:

https://github.com/apache/spark/pull/21909#discussion_r208022783
  
--- Diff: docs/sql-programming-guide.md ---
@@ -1892,6 +1892,7 @@ working with timestamps in `pandas_udf`s to get the 
best performance, see
   - In version 2.3 and earlier, Spark converts Parquet Hive tables by 
default but ignores table properties like `TBLPROPERTIES (parquet.compression 
'NONE')`. This happens for ORC Hive table properties like `TBLPROPERTIES 
(orc.compress 'NONE')` in case of `spark.sql.hive.convertMetastoreOrc=true`, 
too. Since Spark 2.4, Spark respects Parquet/ORC specific table properties 
while converting Parquet/ORC Hive tables. As an example, `CREATE TABLE t(id 
int) STORED AS PARQUET TBLPROPERTIES (parquet.compression 'NONE')` would 
generate Snappy parquet files during insertion in Spark 2.3, and in Spark 2.4, 
the result would be uncompressed parquet files.
   - Since Spark 2.0, Spark converts Parquet Hive tables by default for 
better performance. Since Spark 2.4, Spark converts ORC Hive tables by default, 
too. It means Spark uses its own ORC support by default instead of Hive SerDe. 
As an example, `CREATE TABLE t(id int) STORED AS ORC` would be handled with 
Hive SerDe in Spark 2.3, and in Spark 2.4, it would be converted into Spark's 
ORC data source table and ORC vectorization would be applied. To set `false` to 
`spark.sql.hive.convertMetastoreOrc` restores the previous behavior.
   - In version 2.3 and earlier, CSV rows are considered as malformed if at 
least one column value in the row is malformed. CSV parser dropped such rows in 
the DROPMALFORMED mode or outputs an error in the FAILFAST mode. Since Spark 
2.4, CSV row is considered as malformed only when it contains malformed column 
values requested from CSV datasource, other values can be ignored. As an 
example, CSV file contains the "id,name" header and one row "1234". In Spark 
2.4, selection of the id column consists of a row with one column value 1234 
but in Spark 2.3 and earlier it is empty in the DROPMALFORMED mode. To restore 
the previous behavior, set `spark.sql.csv.parser.columnPruning.enabled` to 
`false`.
+  - Since Spark 2.4, text-based datasources like CSV and JSON don't parse 
input lines if the required schema pushed down to the datasources is empty. The 
schema can be empty in the case of count(), for example. To set `true` to 
`spark.sql.legacy.bypassParserForEmptySchema` restores the previous behavior 
when the underlying parser is always invoked even for the empty schema. This 
option will be removed in Spark 3.0.
--- End diff --

For CSV, the result should be the same since we don't invoke the parser 
when CSV column pruning is enabled (by default). For JSON, result can be 
different like in the test which I changed in the PR. We call Jackson's method 
`nextToken()` even for empty schema, and besides of that the Jackson parser 
touches the input stream on instantiation. I will add a few tests for JSON as 
@gatorsmile asked.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21909: [SPARK-24959][SQL] Speed up count() for JSON and ...

2018-08-06 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/21909#discussion_r207955299
  
--- Diff: docs/sql-programming-guide.md ---
@@ -1892,6 +1892,7 @@ working with timestamps in `pandas_udf`s to get the 
best performance, see
   - In version 2.3 and earlier, Spark converts Parquet Hive tables by 
default but ignores table properties like `TBLPROPERTIES (parquet.compression 
'NONE')`. This happens for ORC Hive table properties like `TBLPROPERTIES 
(orc.compress 'NONE')` in case of `spark.sql.hive.convertMetastoreOrc=true`, 
too. Since Spark 2.4, Spark respects Parquet/ORC specific table properties 
while converting Parquet/ORC Hive tables. As an example, `CREATE TABLE t(id 
int) STORED AS PARQUET TBLPROPERTIES (parquet.compression 'NONE')` would 
generate Snappy parquet files during insertion in Spark 2.3, and in Spark 2.4, 
the result would be uncompressed parquet files.
   - Since Spark 2.0, Spark converts Parquet Hive tables by default for 
better performance. Since Spark 2.4, Spark converts ORC Hive tables by default, 
too. It means Spark uses its own ORC support by default instead of Hive SerDe. 
As an example, `CREATE TABLE t(id int) STORED AS ORC` would be handled with 
Hive SerDe in Spark 2.3, and in Spark 2.4, it would be converted into Spark's 
ORC data source table and ORC vectorization would be applied. To set `false` to 
`spark.sql.hive.convertMetastoreOrc` restores the previous behavior.
   - In version 2.3 and earlier, CSV rows are considered as malformed if at 
least one column value in the row is malformed. CSV parser dropped such rows in 
the DROPMALFORMED mode or outputs an error in the FAILFAST mode. Since Spark 
2.4, CSV row is considered as malformed only when it contains malformed column 
values requested from CSV datasource, other values can be ignored. As an 
example, CSV file contains the "id,name" header and one row "1234". In Spark 
2.4, selection of the id column consists of a row with one column value 1234 
but in Spark 2.3 and earlier it is empty in the DROPMALFORMED mode. To restore 
the previous behavior, set `spark.sql.csv.parser.columnPruning.enabled` to 
`false`.
+  - Since Spark 2.4, text-based datasources like CSV and JSON don't parse 
input lines if the required schema pushed down to the datasources is empty. The 
schema can be empty in the case of count(), for example. To set `true` to 
`spark.sql.legacy.bypassParserForEmptySchema` restores the previous behavior 
when the underlying parser is always invoked even for the empty schema. This 
option will be removed in Spark 3.0.
--- End diff --

Let's highlight the behavior changing, i.e. malformed records won't be 
reported when doing `count()`. Does it also mean the result of `count()` can 
change if the json/csv files contain malformed records?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21909: [SPARK-24959][SQL] Speed up count() for JSON and ...

2018-08-06 Thread gatorsmile
Github user gatorsmile commented on a diff in the pull request:

https://github.com/apache/spark/pull/21909#discussion_r207850329
  
--- Diff: 
sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala
 ---
@@ -2225,19 +2225,21 @@ class JsonSuite extends QueryTest with 
SharedSQLContext with TestJsonData {
 
 
   test("SPARK-23723: specified encoding is not matched to actual 
encoding") {
-val fileName = "test-data/utf16LE.json"
-val schema = new StructType().add("firstName", 
StringType).add("lastName", StringType)
-val exception = intercept[SparkException] {
-  spark.read.schema(schema)
-.option("mode", "FAILFAST")
-.option("multiline", "true")
-.options(Map("encoding" -> "UTF-16BE"))
-.json(testFile(fileName))
-.count()
-}
-val errMsg = exception.getMessage
+withSQLConf(SQLConf.BYPASS_PARSER_FOR_EMPTY_SCHEMA.key -> "false") {
--- End diff --

How about CSV? Could you add the same one too?

Also, we need to add the verification logic when the conf is true. 
```
Seq(true, false).foreach { optimizeEmptySchema =>
  withSQLConf(SQLConf.BYPASS_PARSER_FOR_EMPTY_SCHEMA.key -> 
optimizeEmptySchema.toString) {
  ...
}
```


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21909: [SPARK-24959][SQL] Speed up count() for JSON and ...

2018-08-05 Thread MaxGekk
Github user MaxGekk commented on a diff in the pull request:

https://github.com/apache/spark/pull/21909#discussion_r207738315
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala ---
@@ -1476,6 +1476,14 @@ object SQLConf {
 "are performed before any UNION, EXCEPT and MINUS operations.")
   .booleanConf
   .createWithDefault(false)
+
+  val BYPASS_PARSER_FOR_EMPTY_SCHEMA = 
buildConf("spark.sql.bypassParserForEmptySchema")
--- End diff --

I renamed it to `spark.sql.legacy.bypassParserForEmptySchema`


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21909: [SPARK-24959][SQL] Speed up count() for JSON and ...

2018-08-04 Thread gatorsmile
Github user gatorsmile commented on a diff in the pull request:

https://github.com/apache/spark/pull/21909#discussion_r207701331
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala ---
@@ -1476,6 +1476,14 @@ object SQLConf {
 "are performed before any UNION, EXCEPT and MINUS operations.")
   .booleanConf
   .createWithDefault(false)
+
+  val BYPASS_PARSER_FOR_EMPTY_SCHEMA = 
buildConf("spark.sql.bypassParserForEmptySchema")
--- End diff --

Let us get rid of this in the next release. Mark it as an internal and use 
the legacy scheme. 


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21909: [SPARK-24959][SQL] Speed up count() for JSON and ...

2018-08-01 Thread gatorsmile
Github user gatorsmile commented on a diff in the pull request:

https://github.com/apache/spark/pull/21909#discussion_r207032024
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FailureSafeParser.scala
 ---
@@ -56,9 +57,14 @@ class FailureSafeParser[IN](
 }
   }
 
+  private val skipParsing = optimizeEmptySchema && schema.isEmpty
   def parse(input: IN): Iterator[InternalRow] = {
 try {
-  rawParser.apply(input).toIterator.map(row => toResultRow(Some(row), 
() => null))
+ if (skipParsing) {
+   Iterator.single(InternalRow.empty)
+ } else {
+   rawParser.apply(input).toIterator.map(row => toResultRow(Some(row), 
() => null))
--- End diff --

both? 

If we introduce a behavior change, we need to document it in the migration 
guide and add a conf. Users can do the conf to revert back to the previous 
behaviors. 


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21909: [SPARK-24959][SQL] Speed up count() for JSON and ...

2018-08-01 Thread MaxGekk
Github user MaxGekk commented on a diff in the pull request:

https://github.com/apache/spark/pull/21909#discussion_r207005019
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FailureSafeParser.scala
 ---
@@ -56,9 +57,14 @@ class FailureSafeParser[IN](
 }
   }
 
+  private val skipParsing = optimizeEmptySchema && schema.isEmpty
   def parse(input: IN): Iterator[InternalRow] = {
 try {
-  rawParser.apply(input).toIterator.map(row => toResultRow(Some(row), 
() => null))
+ if (skipParsing) {
+   Iterator.single(InternalRow.empty)
+ } else {
+   rawParser.apply(input).toIterator.map(row => toResultRow(Some(row), 
() => null))
--- End diff --

> ... when the files having broken records?

Syntactically broken or semantically (wrong types for example)?

> Any behavior change after this PR?

We have many tests in `CSVSuite` and `JSONSuite` for broken records. I have 
found behavior change in only one case: 
https://github.com/apache/spark/pull/21909/files#diff-fde14032b0e6ef8086461edf79a27c5dL2227
 . This is due to `Jackson` parser touches a few first bytes in the input 
stream even if it is not called. `Jackson` checks encoding eagerly. 


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21909: [SPARK-24959][SQL] Speed up count() for JSON and ...

2018-08-01 Thread gatorsmile
Github user gatorsmile commented on a diff in the pull request:

https://github.com/apache/spark/pull/21909#discussion_r206985104
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FailureSafeParser.scala
 ---
@@ -56,9 +57,14 @@ class FailureSafeParser[IN](
 }
   }
 
+  private val skipParsing = optimizeEmptySchema && schema.isEmpty
   def parse(input: IN): Iterator[InternalRow] = {
 try {
-  rawParser.apply(input).toIterator.map(row => toResultRow(Some(row), 
() => null))
+ if (skipParsing) {
+   Iterator.single(InternalRow.empty)
+ } else {
+   rawParser.apply(input).toIterator.map(row => toResultRow(Some(row), 
() => null))
--- End diff --

Could you add a test case for counting both CSV and JSON source when the 
files having broken records? Any behavior change after this PR?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21909: [SPARK-24959][SQL] Speed up count() for JSON and ...

2018-08-01 Thread MaxGekk
Github user MaxGekk commented on a diff in the pull request:

https://github.com/apache/spark/pull/21909#discussion_r206983433
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FailureSafeParser.scala
 ---
@@ -56,9 +57,14 @@ class FailureSafeParser[IN](
 }
   }
 
+  private val skipParsing = optimizeEmptySchema && schema.isEmpty
   def parse(input: IN): Iterator[InternalRow] = {
 try {
-  rawParser.apply(input).toIterator.map(row => toResultRow(Some(row), 
() => null))
+ if (skipParsing) {
+   Iterator.single(InternalRow.empty)
+ } else {
+   rawParser.apply(input).toIterator.map(row => toResultRow(Some(row), 
() => null))
--- End diff --

yes. To detect them with 100% guarantee, the parser must fully parse such 
records. We actually don't do that due to the column pruning mechanisms in both 
datasources - CSV and JSON. 


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21909: [SPARK-24959][SQL] Speed up count() for JSON and ...

2018-08-01 Thread viirya
Github user viirya commented on a diff in the pull request:

https://github.com/apache/spark/pull/21909#discussion_r206976717
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FailureSafeParser.scala
 ---
@@ -56,9 +57,14 @@ class FailureSafeParser[IN](
 }
   }
 
+  private val skipParsing = optimizeEmptySchema && schema.isEmpty
   def parse(input: IN): Iterator[InternalRow] = {
 try {
-  rawParser.apply(input).toIterator.map(row => toResultRow(Some(row), 
() => null))
+ if (skipParsing) {
+   Iterator.single(InternalRow.empty)
+ } else {
+   rawParser.apply(input).toIterator.map(row => toResultRow(Some(row), 
() => null))
--- End diff --

If there are broken records the parser can't parse, this skipping won't 
detect them?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21909: [SPARK-24959][SQL] Speed up count() for JSON and ...

2018-07-30 Thread felixcheung
Github user felixcheung commented on a diff in the pull request:

https://github.com/apache/spark/pull/21909#discussion_r206400571
  
--- Diff: 
sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVBenchmarks.scala
 ---
@@ -119,8 +119,47 @@ object CSVBenchmarks {
 }
   }
 
+  def countBenchmark(rowsNum: Int): Unit = {
+val colsNum = 10
+val benchmark = new Benchmark(s"Count a dataset with $colsNum 
columns", rowsNum)
+
+withTempPath { path =>
+  val fields = Seq.tabulate(colsNum)(i => StructField(s"col$i", 
IntegerType))
+  val schema = StructType(fields)
+
+  spark.range(rowsNum)
+.select(Seq.tabulate(colsNum)(i => lit(i).as(s"col$i")): _*)
+.write
+.csv(path.getAbsolutePath)
+
+  val ds = spark.read.schema(schema).csv(path.getAbsolutePath)
+
+  benchmark.addCase(s"Select $colsNum columns + count()", 3) { _ =>
+ds.select("*").filter((_: Row) => true).count()
+  }
+  benchmark.addCase(s"Select 1 column + count()", 3) { _ =>
+ds.select($"col1").filter((_: Row) => true).count()
--- End diff --

does this benchmark result vary if we select `col2` or `col10`?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21909: [SPARK-24959][SQL] Speed up count() for JSON and ...

2018-07-30 Thread MaxGekk
Github user MaxGekk commented on a diff in the pull request:

https://github.com/apache/spark/pull/21909#discussion_r206059735
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala ---
@@ -450,7 +450,8 @@ class DataFrameReader private[sql](sparkSession: 
SparkSession) extends Logging {
 input => rawParser.parse(input, createParser, 
UTF8String.fromString),
 parsedOptions.parseMode,
 schema,
-parsedOptions.columnNameOfCorruptRecord)
+parsedOptions.columnNameOfCorruptRecord,
+optimizeEmptySchema = true)
--- End diff --

> what would be the case to turn it off?

We can apply the optimization if we know in advance that one JSON object 
corresponds to one struct. In that case, we can return empty row if required 
schema (struct) is empty. If `multiLine` is enabled, there could be many 
structs per a JSON document. So, we cannot say in advance how many empty rows 
need to return without parsing.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21909: [SPARK-24959][SQL] Speed up count() for JSON and ...

2018-07-30 Thread dmateusp
Github user dmateusp commented on a diff in the pull request:

https://github.com/apache/spark/pull/21909#discussion_r206047653
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FailureSafeParser.scala
 ---
@@ -56,9 +57,14 @@ class FailureSafeParser[IN](
 }
   }
 
+  private val skipParsing = optimizeEmptySchema && schema.isEmpty
   def parse(input: IN): Iterator[InternalRow] = {
 try {
-  rawParser.apply(input).toIterator.map(row => toResultRow(Some(row), 
() => null))
+ if (skipParsing) {
+   Iterator.single(InternalRow.empty)
--- End diff --

ohh yes my bad!


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21909: [SPARK-24959][SQL] Speed up count() for JSON and ...

2018-07-30 Thread dmateusp
Github user dmateusp commented on a diff in the pull request:

https://github.com/apache/spark/pull/21909#discussion_r206045407
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala ---
@@ -450,7 +450,8 @@ class DataFrameReader private[sql](sparkSession: 
SparkSession) extends Logging {
 input => rawParser.parse(input, createParser, 
UTF8String.fromString),
 parsedOptions.parseMode,
 schema,
-parsedOptions.columnNameOfCorruptRecord)
+parsedOptions.columnNameOfCorruptRecord,
+optimizeEmptySchema = true)
--- End diff --

No, no I'm just wondering since you made it a parameter that you can turn 
off and on, what would be the case to turn it off?

If there is none, shouldn't we just get rid of the parameter altogether ?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21909: [SPARK-24959][SQL] Speed up count() for JSON and ...

2018-07-29 Thread MaxGekk
Github user MaxGekk commented on a diff in the pull request:

https://github.com/apache/spark/pull/21909#discussion_r205978291
  
--- Diff: 
sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala
 ---
@@ -2233,7 +2233,7 @@ class JsonSuite extends QueryTest with 
SharedSQLContext with TestJsonData {
 .option("multiline", "true")
 .options(Map("encoding" -> "UTF-16BE"))
 .json(testFile(fileName))
-.count()
+.collect()
--- End diff --

The test has to really touch JSON to detect encoding even without parsing. 
With this optimization `jackson` parser is not called at all in the case of 
`count()`. `collect()` guarantees that JSON parser will be invoked with wrong 
`encoding`.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21909: [SPARK-24959][SQL] Speed up count() for JSON and ...

2018-07-29 Thread MaxGekk
Github user MaxGekk commented on a diff in the pull request:

https://github.com/apache/spark/pull/21909#discussion_r205978149
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FailureSafeParser.scala
 ---
@@ -56,9 +57,14 @@ class FailureSafeParser[IN](
 }
   }
 
+  private val skipParsing = optimizeEmptySchema && schema.isEmpty
   def parse(input: IN): Iterator[InternalRow] = {
 try {
-  rawParser.apply(input).toIterator.map(row => toResultRow(Some(row), 
() => null))
+ if (skipParsing) {
+   Iterator.single(InternalRow.empty)
--- End diff --

It is not the same. If you return empty iterator, `count()` will always 
return `0`. 


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21909: [SPARK-24959][SQL] Speed up count() for JSON and ...

2018-07-29 Thread MaxGekk
Github user MaxGekk commented on a diff in the pull request:

https://github.com/apache/spark/pull/21909#discussion_r205978091
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/UnivocityParser.scala
 ---
@@ -203,19 +203,11 @@ class UnivocityParser(
 }
   }
 
-  private val doParse = if (requiredSchema.nonEmpty) {
--- End diff --

The introduced optimization works in the case if `multiLine` is disable. In 
that case, this removed code is used. For now it is not needed anymore because 
it just duplicates optimization in some sense.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21909: [SPARK-24959][SQL] Speed up count() for JSON and ...

2018-07-29 Thread MaxGekk
Github user MaxGekk commented on a diff in the pull request:

https://github.com/apache/spark/pull/21909#discussion_r205977956
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala ---
@@ -450,7 +450,8 @@ class DataFrameReader private[sql](sparkSession: 
SparkSession) extends Logging {
 input => rawParser.parse(input, createParser, 
UTF8String.fromString),
 parsedOptions.parseMode,
 schema,
-parsedOptions.columnNameOfCorruptRecord)
+parsedOptions.columnNameOfCorruptRecord,
+optimizeEmptySchema = true)
--- End diff --

Here can be only one JSON object of struct type per input string. Don't see 
any reasons to turn the optimization off. Maybe you have some examples when the 
optimization doesn't work correctly?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21909: [SPARK-24959][SQL] Speed up count() for JSON and ...

2018-07-29 Thread dmateusp
Github user dmateusp commented on a diff in the pull request:

https://github.com/apache/spark/pull/21909#discussion_r205974316
  
--- Diff: 
sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala
 ---
@@ -2233,7 +2233,7 @@ class JsonSuite extends QueryTest with 
SharedSQLContext with TestJsonData {
 .option("multiline", "true")
 .options(Map("encoding" -> "UTF-16BE"))
 .json(testFile(fileName))
-.count()
+.collect()
--- End diff --

just curious why going from count() to collect() here ?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21909: [SPARK-24959][SQL] Speed up count() for JSON and ...

2018-07-29 Thread dmateusp
Github user dmateusp commented on a diff in the pull request:

https://github.com/apache/spark/pull/21909#discussion_r205974224
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/UnivocityParser.scala
 ---
@@ -203,19 +203,11 @@ class UnivocityParser(
 }
   }
 
-  private val doParse = if (requiredSchema.nonEmpty) {
--- End diff --

are the changes here 
https://github.com/apache/spark/pull/21909/files#diff-3a4dc120191f7052e5d98db11934bfb5R63
 replacing the need for the `requiredSchema.nonEmpty` check ?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21909: [SPARK-24959][SQL] Speed up count() for JSON and ...

2018-07-29 Thread dmateusp
Github user dmateusp commented on a diff in the pull request:

https://github.com/apache/spark/pull/21909#discussion_r205974275
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FailureSafeParser.scala
 ---
@@ -56,9 +57,14 @@ class FailureSafeParser[IN](
 }
   }
 
+  private val skipParsing = optimizeEmptySchema && schema.isEmpty
   def parse(input: IN): Iterator[InternalRow] = {
 try {
-  rawParser.apply(input).toIterator.map(row => toResultRow(Some(row), 
() => null))
+ if (skipParsing) {
+   Iterator.single(InternalRow.empty)
--- End diff --

nit: `Iterator.empty`


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21909: [SPARK-24959][SQL] Speed up count() for JSON and ...

2018-07-29 Thread dmateusp
Github user dmateusp commented on a diff in the pull request:

https://github.com/apache/spark/pull/21909#discussion_r205969639
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala ---
@@ -450,7 +450,8 @@ class DataFrameReader private[sql](sparkSession: 
SparkSession) extends Logging {
 input => rawParser.parse(input, createParser, 
UTF8String.fromString),
 parsedOptions.parseMode,
 schema,
-parsedOptions.columnNameOfCorruptRecord)
+parsedOptions.columnNameOfCorruptRecord,
+optimizeEmptySchema = true)
--- End diff --

Is the case to turn off `optimizeEmptySchema` multiline JSONs ?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21909: [SPARK-24959][SQL] Speed up count() for JSON and ...

2018-07-28 Thread MaxGekk
GitHub user MaxGekk opened a pull request:

https://github.com/apache/spark/pull/21909

[SPARK-24959][SQL] Speed up count() for JSON and CSV

## What changes were proposed in this pull request?

In the PR, I propose to skip invoking of the CSV/JSON parser per each line 
in the case if the required schema is empty. Added benchmarks for `count()` 
shows performance improvement up to **3.5 times**.

Before:

```
Count a dataset with 10 columns:  Best/Avg Time(ms)Rate(M/s)   Per 
Row(ns)

--
JSON count()   7676 / 7715  1.3 
767.6
CSV count()3309 / 3363  3.0 
330.9
``` 

After:

```
Count a dataset with 10 columns:  Best/Avg Time(ms)Rate(M/s)   Per 
Row(ns)

--
JSON count()   2104 / 2156  4.8 
210.4
CSV count()2332 / 2386  4.3 
233.2
```

## How was this patch tested?

It was tested by `CSVSuite` and `JSONSuite` as well as on added benchmarks.


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/MaxGekk/spark-1 empty-schema-optimization

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/spark/pull/21909.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #21909


commit bc4ce261a2d13be0a31b18f006da79b55880d409
Author: Maxim Gekk 
Date:   2018-07-28T15:31:20Z

Added a benchmark for count()

commit 91250d21d4bb451062873c59df6fe3b4669bc5ff
Author: Maxim Gekk 
Date:   2018-07-28T15:50:15Z

Added a CSV benchmark for count()

commit bdc5ea540b9eb62bb28606bdeb311ce5662e4bf7
Author: Maxim Gekk 
Date:   2018-07-28T15:59:44Z

Speed up count()

commit d40f9bb229ab8ea9e2d95499ae203f7c41098bcd
Author: Maxim Gekk 
Date:   2018-07-28T16:00:17Z

Updating CSV and JSON benchmarks for count()

commit abd8572497ff742ef6ea942864195be75a40ca71
Author: Maxim Gekk 
Date:   2018-07-28T16:23:03Z

Fix benchmark's output

commit 359c4fcbfdb4f4e77faa3977f381dc8e819e46fa
Author: Maxim Gekk 
Date:   2018-07-28T16:23:44Z

Uncomment other benchmarks




---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org