[GitHub] spark pull request #21909: [SPARK-24959][SQL] Speed up count() for JSON and ...
Github user HyukjinKwon commented on a diff in the pull request: https://github.com/apache/spark/pull/21909#discussion_r211129911 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FailureSafeParser.scala --- @@ -56,9 +58,15 @@ class FailureSafeParser[IN]( } } + private val skipParsing = !isMultiLine && mode == PermissiveMode && schema.isEmpty --- End diff -- not a big deal but I would leave a comment to explain why it's permissive and non-miltiline only. I assume counts are known when it's actually parsed for multiline cases, and counts should be given in any case when the mode is permissive, right? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21909: [SPARK-24959][SQL] Speed up count() for JSON and ...
Github user asfgit closed the pull request at: https://github.com/apache/spark/pull/21909 --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21909: [SPARK-24959][SQL] Speed up count() for JSON and ...
Github user MaxGekk commented on a diff in the pull request: https://github.com/apache/spark/pull/21909#discussion_r211075385 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JsonDataSource.scala --- @@ -223,7 +224,8 @@ object MultiLineJsonDataSource extends JsonDataSource { input => parser.parse[InputStream](input, streamParser, partitionedFileString), parser.options.parseMode, schema, - parser.options.columnNameOfCorruptRecord) + parser.options.columnNameOfCorruptRecord, + optimizeEmptySchema = false) --- End diff -- renamed --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21909: [SPARK-24959][SQL] Speed up count() for JSON and ...
Github user MaxGekk commented on a diff in the pull request: https://github.com/apache/spark/pull/21909#discussion_r211075384 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala --- @@ -1492,6 +1492,15 @@ object SQLConf { "This usually speeds up commands that need to list many directories.") .booleanConf .createWithDefault(true) + + val BYPASS_PARSER_FOR_EMPTY_SCHEMA = +buildConf("spark.sql.legacy.bypassParserForEmptySchema") --- End diff -- It seems we don't need it anymore --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21909: [SPARK-24959][SQL] Speed up count() for JSON and ...
Github user gatorsmile commented on a diff in the pull request: https://github.com/apache/spark/pull/21909#discussion_r211045699 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JsonDataSource.scala --- @@ -223,7 +224,8 @@ object MultiLineJsonDataSource extends JsonDataSource { input => parser.parse[InputStream](input, streamParser, partitionedFileString), parser.options.parseMode, schema, - parser.options.columnNameOfCorruptRecord) + parser.options.columnNameOfCorruptRecord, + optimizeEmptySchema = false) --- End diff -- Could we rename `optimizeEmptySchema ` to `isMultiLine`? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21909: [SPARK-24959][SQL] Speed up count() for JSON and ...
Github user gatorsmile commented on a diff in the pull request: https://github.com/apache/spark/pull/21909#discussion_r211045061 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala --- @@ -1492,6 +1492,15 @@ object SQLConf { "This usually speeds up commands that need to list many directories.") .booleanConf .createWithDefault(true) + + val BYPASS_PARSER_FOR_EMPTY_SCHEMA = +buildConf("spark.sql.legacy.bypassParserForEmptySchema") --- End diff -- If no behavior change, do we still need this conf? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21909: [SPARK-24959][SQL] Speed up count() for JSON and ...
Github user gatorsmile commented on a diff in the pull request: https://github.com/apache/spark/pull/21909#discussion_r210767018 --- Diff: sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala --- @@ -2223,21 +2223,31 @@ class JsonSuite extends QueryTest with SharedSQLContext with TestJsonData { checkAnswer(jsonDF, Seq(Row("Chris", "Baird"))) } - test("SPARK-23723: specified encoding is not matched to actual encoding") { -val fileName = "test-data/utf16LE.json" -val schema = new StructType().add("firstName", StringType).add("lastName", StringType) -val exception = intercept[SparkException] { - spark.read.schema(schema) -.option("mode", "FAILFAST") -.option("multiline", "true") -.options(Map("encoding" -> "UTF-16BE")) -.json(testFile(fileName)) -.count() +def doCount(bypassParser: Boolean, multiLine: Boolean): Long = { + var result: Long = -1 + withSQLConf(SQLConf.BYPASS_PARSER_FOR_EMPTY_SCHEMA.key -> bypassParser.toString) { +val fileName = "test-data/utf16LE.json" +val schema = new StructType().add("firstName", StringType).add("lastName", StringType) +result = spark.read.schema(schema) + .option("mode", "FAILFAST") --- End diff -- This sounds good! Let us enable it only when PERMISSIVE is on. You know, our default mode is PERMISSIVE. This should benefit most users. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21909: [SPARK-24959][SQL] Speed up count() for JSON and ...
Github user gatorsmile commented on a diff in the pull request: https://github.com/apache/spark/pull/21909#discussion_r210765672 --- Diff: docs/sql-programming-guide.md --- @@ -1894,6 +1894,7 @@ working with timestamps in `pandas_udf`s to get the best performance, see - In version 2.3 and earlier, CSV rows are considered as malformed if at least one column value in the row is malformed. CSV parser dropped such rows in the DROPMALFORMED mode or outputs an error in the FAILFAST mode. Since Spark 2.4, CSV row is considered as malformed only when it contains malformed column values requested from CSV datasource, other values can be ignored. As an example, CSV file contains the "id,name" header and one row "1234". In Spark 2.4, selection of the id column consists of a row with one column value 1234 but in Spark 2.3 and earlier it is empty in the DROPMALFORMED mode. To restore the previous behavior, set `spark.sql.csv.parser.columnPruning.enabled` to `false`. - Since Spark 2.4, File listing for compute statistics is done in parallel by default. This can be disabled by setting `spark.sql.parallelFileListingInStatsComputation.enabled` to `False`. - Since Spark 2.4, Metadata files (e.g. Parquet summary files) and temporary files are not counted as data files when calculating table size during Statistics computation. + - Since Spark 2.4, text-based datasources like CSV and JSON don't parse input lines if the required schema pushed down to the datasources is empty. The schema can be empty in the case of the count() action. For example, Spark 2.3 and earlier versions failed on JSON files with invalid encoding but Spark 2.4 returns total number of lines in the file. To restore the previous behavior when the underlying parser is always invoked even for the empty schema, set `true` to `spark.sql.legacy.bypassParserForEmptySchema`. This option will be removed in Spark 3.0. --- End diff -- Is it right based on what you said https://github.com/apache/spark/pull/21909#discussion_r210704902? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21909: [SPARK-24959][SQL] Speed up count() for JSON and ...
Github user MaxGekk commented on a diff in the pull request: https://github.com/apache/spark/pull/21909#discussion_r210704902 --- Diff: sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala --- @@ -2223,21 +2223,31 @@ class JsonSuite extends QueryTest with SharedSQLContext with TestJsonData { checkAnswer(jsonDF, Seq(Row("Chris", "Baird"))) } - test("SPARK-23723: specified encoding is not matched to actual encoding") { -val fileName = "test-data/utf16LE.json" -val schema = new StructType().add("firstName", StringType).add("lastName", StringType) -val exception = intercept[SparkException] { - spark.read.schema(schema) -.option("mode", "FAILFAST") -.option("multiline", "true") -.options(Map("encoding" -> "UTF-16BE")) -.json(testFile(fileName)) -.count() +def doCount(bypassParser: Boolean, multiLine: Boolean): Long = { + var result: Long = -1 + withSQLConf(SQLConf.BYPASS_PARSER_FOR_EMPTY_SCHEMA.key -> bypassParser.toString) { +val fileName = "test-data/utf16LE.json" +val schema = new StructType().add("firstName", StringType).add("lastName", StringType) +result = spark.read.schema(schema) + .option("mode", "FAILFAST") --- End diff -- > Does the mode matter? I just want to have an explicit error in the test instead of `0` for `count()` ( `DROPMALFORMED`), or full table of nulls or an exception (`PERMISSIVE`) since an exception is expected result. > What happened if users use DROPMALFORMED before this PR? It depends on `multiLine`. If it is `true`, behaviour before and after PR is the same since the optimization doesn't impact on the `multiLine` mode. For `multiLine` equals to `false`, after the PR the result is `5` (total number of lines), before the PR - `0` in the `DROPMALFORMED` mode. We can enable this optimization for the `PERMISSIVE` mode only to exclude any deviation in outputs. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21909: [SPARK-24959][SQL] Speed up count() for JSON and ...
Github user gatorsmile commented on a diff in the pull request: https://github.com/apache/spark/pull/21909#discussion_r210693829 --- Diff: sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala --- @@ -2223,21 +2223,31 @@ class JsonSuite extends QueryTest with SharedSQLContext with TestJsonData { checkAnswer(jsonDF, Seq(Row("Chris", "Baird"))) } - test("SPARK-23723: specified encoding is not matched to actual encoding") { -val fileName = "test-data/utf16LE.json" -val schema = new StructType().add("firstName", StringType).add("lastName", StringType) -val exception = intercept[SparkException] { - spark.read.schema(schema) -.option("mode", "FAILFAST") -.option("multiline", "true") -.options(Map("encoding" -> "UTF-16BE")) -.json(testFile(fileName)) -.count() +def doCount(bypassParser: Boolean, multiLine: Boolean): Long = { + var result: Long = -1 + withSQLConf(SQLConf.BYPASS_PARSER_FOR_EMPTY_SCHEMA.key -> bypassParser.toString) { +val fileName = "test-data/utf16LE.json" +val schema = new StructType().add("firstName", StringType).add("lastName", StringType) +result = spark.read.schema(schema) + .option("mode", "FAILFAST") --- End diff -- Does the mode matter? What happened if users use `DROPMALFORMED ` before this PR? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21909: [SPARK-24959][SQL] Speed up count() for JSON and ...
Github user gatorsmile commented on a diff in the pull request: https://github.com/apache/spark/pull/21909#discussion_r210666117 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala --- @@ -1492,6 +1492,15 @@ object SQLConf { "This usually speeds up commands that need to list many directories.") .booleanConf .createWithDefault(true) + + val BYPASS_PARSER_FOR_EMPTY_SCHEMA = +buildConf("spark.sql.legacy.bypassParserForEmptySchema") + .doc("If required schema passed to a text datasource is empty, the parameter controls " + +"invocation of underlying parser. For example, if it is set to false, uniVocity parser " + +"is invoke by CSV datasource or Jackson parser by JSON datasource. By default, it is set " + --- End diff -- `invoke` -> `invoked` --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21909: [SPARK-24959][SQL] Speed up count() for JSON and ...
Github user MaxGekk commented on a diff in the pull request: https://github.com/apache/spark/pull/21909#discussion_r210350101 --- Diff: docs/sql-programming-guide.md --- @@ -1892,6 +1892,7 @@ working with timestamps in `pandas_udf`s to get the best performance, see - In version 2.3 and earlier, Spark converts Parquet Hive tables by default but ignores table properties like `TBLPROPERTIES (parquet.compression 'NONE')`. This happens for ORC Hive table properties like `TBLPROPERTIES (orc.compress 'NONE')` in case of `spark.sql.hive.convertMetastoreOrc=true`, too. Since Spark 2.4, Spark respects Parquet/ORC specific table properties while converting Parquet/ORC Hive tables. As an example, `CREATE TABLE t(id int) STORED AS PARQUET TBLPROPERTIES (parquet.compression 'NONE')` would generate Snappy parquet files during insertion in Spark 2.3, and in Spark 2.4, the result would be uncompressed parquet files. - Since Spark 2.0, Spark converts Parquet Hive tables by default for better performance. Since Spark 2.4, Spark converts ORC Hive tables by default, too. It means Spark uses its own ORC support by default instead of Hive SerDe. As an example, `CREATE TABLE t(id int) STORED AS ORC` would be handled with Hive SerDe in Spark 2.3, and in Spark 2.4, it would be converted into Spark's ORC data source table and ORC vectorization would be applied. To set `false` to `spark.sql.hive.convertMetastoreOrc` restores the previous behavior. - In version 2.3 and earlier, CSV rows are considered as malformed if at least one column value in the row is malformed. CSV parser dropped such rows in the DROPMALFORMED mode or outputs an error in the FAILFAST mode. Since Spark 2.4, CSV row is considered as malformed only when it contains malformed column values requested from CSV datasource, other values can be ignored. As an example, CSV file contains the "id,name" header and one row "1234". In Spark 2.4, selection of the id column consists of a row with one column value 1234 but in Spark 2.3 and earlier it is empty in the DROPMALFORMED mode. To restore the previous behavior, set `spark.sql.csv.parser.columnPruning.enabled` to `false`. + - Since Spark 2.4, text-based datasources like CSV and JSON don't parse input lines if the required schema pushed down to the datasources is empty. The schema can be empty in the case of count(), for example. To set `true` to `spark.sql.legacy.bypassParserForEmptySchema` restores the previous behavior when the underlying parser is always invoked even for the empty schema. This option will be removed in Spark 3.0. --- End diff -- > Does it also mean the result of count() can change if the json/csv files contain malformed records? @cloud-fan Only the difference I have found so far is when a whole JSON file is invalid i.e it has wrong enconding, for example. In other cases `count()` returns the same. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21909: [SPARK-24959][SQL] Speed up count() for JSON and ...
Github user MaxGekk commented on a diff in the pull request: https://github.com/apache/spark/pull/21909#discussion_r209927964 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FailureSafeParser.scala --- @@ -56,9 +57,14 @@ class FailureSafeParser[IN]( } } + private val skipParsing = optimizeEmptySchema && schema.isEmpty def parse(input: IN): Iterator[InternalRow] = { try { - rawParser.apply(input).toIterator.map(row => toResultRow(Some(row), () => null)) + if (skipParsing) { + Iterator.single(InternalRow.empty) + } else { + rawParser.apply(input).toIterator.map(row => toResultRow(Some(row), () => null)) --- End diff -- I added the tests --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21909: [SPARK-24959][SQL] Speed up count() for JSON and ...
Github user MaxGekk commented on a diff in the pull request: https://github.com/apache/spark/pull/21909#discussion_r209916711 --- Diff: sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala --- @@ -2225,19 +2225,21 @@ class JsonSuite extends QueryTest with SharedSQLContext with TestJsonData { test("SPARK-23723: specified encoding is not matched to actual encoding") { -val fileName = "test-data/utf16LE.json" -val schema = new StructType().add("firstName", StringType).add("lastName", StringType) -val exception = intercept[SparkException] { - spark.read.schema(schema) -.option("mode", "FAILFAST") -.option("multiline", "true") -.options(Map("encoding" -> "UTF-16BE")) -.json(testFile(fileName)) -.count() -} -val errMsg = exception.getMessage +withSQLConf(SQLConf.BYPASS_PARSER_FOR_EMPTY_SCHEMA.key -> "false") { --- End diff -- I will revert changes for the test because it enables multiLine mode for which the optimization is not applicable. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21909: [SPARK-24959][SQL] Speed up count() for JSON and ...
Github user MaxGekk commented on a diff in the pull request: https://github.com/apache/spark/pull/21909#discussion_r208022783 --- Diff: docs/sql-programming-guide.md --- @@ -1892,6 +1892,7 @@ working with timestamps in `pandas_udf`s to get the best performance, see - In version 2.3 and earlier, Spark converts Parquet Hive tables by default but ignores table properties like `TBLPROPERTIES (parquet.compression 'NONE')`. This happens for ORC Hive table properties like `TBLPROPERTIES (orc.compress 'NONE')` in case of `spark.sql.hive.convertMetastoreOrc=true`, too. Since Spark 2.4, Spark respects Parquet/ORC specific table properties while converting Parquet/ORC Hive tables. As an example, `CREATE TABLE t(id int) STORED AS PARQUET TBLPROPERTIES (parquet.compression 'NONE')` would generate Snappy parquet files during insertion in Spark 2.3, and in Spark 2.4, the result would be uncompressed parquet files. - Since Spark 2.0, Spark converts Parquet Hive tables by default for better performance. Since Spark 2.4, Spark converts ORC Hive tables by default, too. It means Spark uses its own ORC support by default instead of Hive SerDe. As an example, `CREATE TABLE t(id int) STORED AS ORC` would be handled with Hive SerDe in Spark 2.3, and in Spark 2.4, it would be converted into Spark's ORC data source table and ORC vectorization would be applied. To set `false` to `spark.sql.hive.convertMetastoreOrc` restores the previous behavior. - In version 2.3 and earlier, CSV rows are considered as malformed if at least one column value in the row is malformed. CSV parser dropped such rows in the DROPMALFORMED mode or outputs an error in the FAILFAST mode. Since Spark 2.4, CSV row is considered as malformed only when it contains malformed column values requested from CSV datasource, other values can be ignored. As an example, CSV file contains the "id,name" header and one row "1234". In Spark 2.4, selection of the id column consists of a row with one column value 1234 but in Spark 2.3 and earlier it is empty in the DROPMALFORMED mode. To restore the previous behavior, set `spark.sql.csv.parser.columnPruning.enabled` to `false`. + - Since Spark 2.4, text-based datasources like CSV and JSON don't parse input lines if the required schema pushed down to the datasources is empty. The schema can be empty in the case of count(), for example. To set `true` to `spark.sql.legacy.bypassParserForEmptySchema` restores the previous behavior when the underlying parser is always invoked even for the empty schema. This option will be removed in Spark 3.0. --- End diff -- For CSV, the result should be the same since we don't invoke the parser when CSV column pruning is enabled (by default). For JSON, result can be different like in the test which I changed in the PR. We call Jackson's method `nextToken()` even for empty schema, and besides of that the Jackson parser touches the input stream on instantiation. I will add a few tests for JSON as @gatorsmile asked. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21909: [SPARK-24959][SQL] Speed up count() for JSON and ...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/21909#discussion_r207955299 --- Diff: docs/sql-programming-guide.md --- @@ -1892,6 +1892,7 @@ working with timestamps in `pandas_udf`s to get the best performance, see - In version 2.3 and earlier, Spark converts Parquet Hive tables by default but ignores table properties like `TBLPROPERTIES (parquet.compression 'NONE')`. This happens for ORC Hive table properties like `TBLPROPERTIES (orc.compress 'NONE')` in case of `spark.sql.hive.convertMetastoreOrc=true`, too. Since Spark 2.4, Spark respects Parquet/ORC specific table properties while converting Parquet/ORC Hive tables. As an example, `CREATE TABLE t(id int) STORED AS PARQUET TBLPROPERTIES (parquet.compression 'NONE')` would generate Snappy parquet files during insertion in Spark 2.3, and in Spark 2.4, the result would be uncompressed parquet files. - Since Spark 2.0, Spark converts Parquet Hive tables by default for better performance. Since Spark 2.4, Spark converts ORC Hive tables by default, too. It means Spark uses its own ORC support by default instead of Hive SerDe. As an example, `CREATE TABLE t(id int) STORED AS ORC` would be handled with Hive SerDe in Spark 2.3, and in Spark 2.4, it would be converted into Spark's ORC data source table and ORC vectorization would be applied. To set `false` to `spark.sql.hive.convertMetastoreOrc` restores the previous behavior. - In version 2.3 and earlier, CSV rows are considered as malformed if at least one column value in the row is malformed. CSV parser dropped such rows in the DROPMALFORMED mode or outputs an error in the FAILFAST mode. Since Spark 2.4, CSV row is considered as malformed only when it contains malformed column values requested from CSV datasource, other values can be ignored. As an example, CSV file contains the "id,name" header and one row "1234". In Spark 2.4, selection of the id column consists of a row with one column value 1234 but in Spark 2.3 and earlier it is empty in the DROPMALFORMED mode. To restore the previous behavior, set `spark.sql.csv.parser.columnPruning.enabled` to `false`. + - Since Spark 2.4, text-based datasources like CSV and JSON don't parse input lines if the required schema pushed down to the datasources is empty. The schema can be empty in the case of count(), for example. To set `true` to `spark.sql.legacy.bypassParserForEmptySchema` restores the previous behavior when the underlying parser is always invoked even for the empty schema. This option will be removed in Spark 3.0. --- End diff -- Let's highlight the behavior changing, i.e. malformed records won't be reported when doing `count()`. Does it also mean the result of `count()` can change if the json/csv files contain malformed records? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21909: [SPARK-24959][SQL] Speed up count() for JSON and ...
Github user gatorsmile commented on a diff in the pull request: https://github.com/apache/spark/pull/21909#discussion_r207850329 --- Diff: sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala --- @@ -2225,19 +2225,21 @@ class JsonSuite extends QueryTest with SharedSQLContext with TestJsonData { test("SPARK-23723: specified encoding is not matched to actual encoding") { -val fileName = "test-data/utf16LE.json" -val schema = new StructType().add("firstName", StringType).add("lastName", StringType) -val exception = intercept[SparkException] { - spark.read.schema(schema) -.option("mode", "FAILFAST") -.option("multiline", "true") -.options(Map("encoding" -> "UTF-16BE")) -.json(testFile(fileName)) -.count() -} -val errMsg = exception.getMessage +withSQLConf(SQLConf.BYPASS_PARSER_FOR_EMPTY_SCHEMA.key -> "false") { --- End diff -- How about CSV? Could you add the same one too? Also, we need to add the verification logic when the conf is true. ``` Seq(true, false).foreach { optimizeEmptySchema => withSQLConf(SQLConf.BYPASS_PARSER_FOR_EMPTY_SCHEMA.key -> optimizeEmptySchema.toString) { ... } ``` --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21909: [SPARK-24959][SQL] Speed up count() for JSON and ...
Github user MaxGekk commented on a diff in the pull request: https://github.com/apache/spark/pull/21909#discussion_r207738315 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala --- @@ -1476,6 +1476,14 @@ object SQLConf { "are performed before any UNION, EXCEPT and MINUS operations.") .booleanConf .createWithDefault(false) + + val BYPASS_PARSER_FOR_EMPTY_SCHEMA = buildConf("spark.sql.bypassParserForEmptySchema") --- End diff -- I renamed it to `spark.sql.legacy.bypassParserForEmptySchema` --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21909: [SPARK-24959][SQL] Speed up count() for JSON and ...
Github user gatorsmile commented on a diff in the pull request: https://github.com/apache/spark/pull/21909#discussion_r207701331 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala --- @@ -1476,6 +1476,14 @@ object SQLConf { "are performed before any UNION, EXCEPT and MINUS operations.") .booleanConf .createWithDefault(false) + + val BYPASS_PARSER_FOR_EMPTY_SCHEMA = buildConf("spark.sql.bypassParserForEmptySchema") --- End diff -- Let us get rid of this in the next release. Mark it as an internal and use the legacy scheme. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21909: [SPARK-24959][SQL] Speed up count() for JSON and ...
Github user gatorsmile commented on a diff in the pull request: https://github.com/apache/spark/pull/21909#discussion_r207032024 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FailureSafeParser.scala --- @@ -56,9 +57,14 @@ class FailureSafeParser[IN]( } } + private val skipParsing = optimizeEmptySchema && schema.isEmpty def parse(input: IN): Iterator[InternalRow] = { try { - rawParser.apply(input).toIterator.map(row => toResultRow(Some(row), () => null)) + if (skipParsing) { + Iterator.single(InternalRow.empty) + } else { + rawParser.apply(input).toIterator.map(row => toResultRow(Some(row), () => null)) --- End diff -- both? If we introduce a behavior change, we need to document it in the migration guide and add a conf. Users can do the conf to revert back to the previous behaviors. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21909: [SPARK-24959][SQL] Speed up count() for JSON and ...
Github user MaxGekk commented on a diff in the pull request: https://github.com/apache/spark/pull/21909#discussion_r207005019 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FailureSafeParser.scala --- @@ -56,9 +57,14 @@ class FailureSafeParser[IN]( } } + private val skipParsing = optimizeEmptySchema && schema.isEmpty def parse(input: IN): Iterator[InternalRow] = { try { - rawParser.apply(input).toIterator.map(row => toResultRow(Some(row), () => null)) + if (skipParsing) { + Iterator.single(InternalRow.empty) + } else { + rawParser.apply(input).toIterator.map(row => toResultRow(Some(row), () => null)) --- End diff -- > ... when the files having broken records? Syntactically broken or semantically (wrong types for example)? > Any behavior change after this PR? We have many tests in `CSVSuite` and `JSONSuite` for broken records. I have found behavior change in only one case: https://github.com/apache/spark/pull/21909/files#diff-fde14032b0e6ef8086461edf79a27c5dL2227 . This is due to `Jackson` parser touches a few first bytes in the input stream even if it is not called. `Jackson` checks encoding eagerly. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21909: [SPARK-24959][SQL] Speed up count() for JSON and ...
Github user gatorsmile commented on a diff in the pull request: https://github.com/apache/spark/pull/21909#discussion_r206985104 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FailureSafeParser.scala --- @@ -56,9 +57,14 @@ class FailureSafeParser[IN]( } } + private val skipParsing = optimizeEmptySchema && schema.isEmpty def parse(input: IN): Iterator[InternalRow] = { try { - rawParser.apply(input).toIterator.map(row => toResultRow(Some(row), () => null)) + if (skipParsing) { + Iterator.single(InternalRow.empty) + } else { + rawParser.apply(input).toIterator.map(row => toResultRow(Some(row), () => null)) --- End diff -- Could you add a test case for counting both CSV and JSON source when the files having broken records? Any behavior change after this PR? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21909: [SPARK-24959][SQL] Speed up count() for JSON and ...
Github user MaxGekk commented on a diff in the pull request: https://github.com/apache/spark/pull/21909#discussion_r206983433 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FailureSafeParser.scala --- @@ -56,9 +57,14 @@ class FailureSafeParser[IN]( } } + private val skipParsing = optimizeEmptySchema && schema.isEmpty def parse(input: IN): Iterator[InternalRow] = { try { - rawParser.apply(input).toIterator.map(row => toResultRow(Some(row), () => null)) + if (skipParsing) { + Iterator.single(InternalRow.empty) + } else { + rawParser.apply(input).toIterator.map(row => toResultRow(Some(row), () => null)) --- End diff -- yes. To detect them with 100% guarantee, the parser must fully parse such records. We actually don't do that due to the column pruning mechanisms in both datasources - CSV and JSON. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21909: [SPARK-24959][SQL] Speed up count() for JSON and ...
Github user viirya commented on a diff in the pull request: https://github.com/apache/spark/pull/21909#discussion_r206976717 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FailureSafeParser.scala --- @@ -56,9 +57,14 @@ class FailureSafeParser[IN]( } } + private val skipParsing = optimizeEmptySchema && schema.isEmpty def parse(input: IN): Iterator[InternalRow] = { try { - rawParser.apply(input).toIterator.map(row => toResultRow(Some(row), () => null)) + if (skipParsing) { + Iterator.single(InternalRow.empty) + } else { + rawParser.apply(input).toIterator.map(row => toResultRow(Some(row), () => null)) --- End diff -- If there are broken records the parser can't parse, this skipping won't detect them? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21909: [SPARK-24959][SQL] Speed up count() for JSON and ...
Github user felixcheung commented on a diff in the pull request: https://github.com/apache/spark/pull/21909#discussion_r206400571 --- Diff: sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVBenchmarks.scala --- @@ -119,8 +119,47 @@ object CSVBenchmarks { } } + def countBenchmark(rowsNum: Int): Unit = { +val colsNum = 10 +val benchmark = new Benchmark(s"Count a dataset with $colsNum columns", rowsNum) + +withTempPath { path => + val fields = Seq.tabulate(colsNum)(i => StructField(s"col$i", IntegerType)) + val schema = StructType(fields) + + spark.range(rowsNum) +.select(Seq.tabulate(colsNum)(i => lit(i).as(s"col$i")): _*) +.write +.csv(path.getAbsolutePath) + + val ds = spark.read.schema(schema).csv(path.getAbsolutePath) + + benchmark.addCase(s"Select $colsNum columns + count()", 3) { _ => +ds.select("*").filter((_: Row) => true).count() + } + benchmark.addCase(s"Select 1 column + count()", 3) { _ => +ds.select($"col1").filter((_: Row) => true).count() --- End diff -- does this benchmark result vary if we select `col2` or `col10`? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21909: [SPARK-24959][SQL] Speed up count() for JSON and ...
Github user MaxGekk commented on a diff in the pull request: https://github.com/apache/spark/pull/21909#discussion_r206059735 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala --- @@ -450,7 +450,8 @@ class DataFrameReader private[sql](sparkSession: SparkSession) extends Logging { input => rawParser.parse(input, createParser, UTF8String.fromString), parsedOptions.parseMode, schema, -parsedOptions.columnNameOfCorruptRecord) +parsedOptions.columnNameOfCorruptRecord, +optimizeEmptySchema = true) --- End diff -- > what would be the case to turn it off? We can apply the optimization if we know in advance that one JSON object corresponds to one struct. In that case, we can return empty row if required schema (struct) is empty. If `multiLine` is enabled, there could be many structs per a JSON document. So, we cannot say in advance how many empty rows need to return without parsing. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21909: [SPARK-24959][SQL] Speed up count() for JSON and ...
Github user dmateusp commented on a diff in the pull request: https://github.com/apache/spark/pull/21909#discussion_r206047653 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FailureSafeParser.scala --- @@ -56,9 +57,14 @@ class FailureSafeParser[IN]( } } + private val skipParsing = optimizeEmptySchema && schema.isEmpty def parse(input: IN): Iterator[InternalRow] = { try { - rawParser.apply(input).toIterator.map(row => toResultRow(Some(row), () => null)) + if (skipParsing) { + Iterator.single(InternalRow.empty) --- End diff -- ohh yes my bad! --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21909: [SPARK-24959][SQL] Speed up count() for JSON and ...
Github user dmateusp commented on a diff in the pull request: https://github.com/apache/spark/pull/21909#discussion_r206045407 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala --- @@ -450,7 +450,8 @@ class DataFrameReader private[sql](sparkSession: SparkSession) extends Logging { input => rawParser.parse(input, createParser, UTF8String.fromString), parsedOptions.parseMode, schema, -parsedOptions.columnNameOfCorruptRecord) +parsedOptions.columnNameOfCorruptRecord, +optimizeEmptySchema = true) --- End diff -- No, no I'm just wondering since you made it a parameter that you can turn off and on, what would be the case to turn it off? If there is none, shouldn't we just get rid of the parameter altogether ? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21909: [SPARK-24959][SQL] Speed up count() for JSON and ...
Github user MaxGekk commented on a diff in the pull request: https://github.com/apache/spark/pull/21909#discussion_r205978291 --- Diff: sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala --- @@ -2233,7 +2233,7 @@ class JsonSuite extends QueryTest with SharedSQLContext with TestJsonData { .option("multiline", "true") .options(Map("encoding" -> "UTF-16BE")) .json(testFile(fileName)) -.count() +.collect() --- End diff -- The test has to really touch JSON to detect encoding even without parsing. With this optimization `jackson` parser is not called at all in the case of `count()`. `collect()` guarantees that JSON parser will be invoked with wrong `encoding`. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21909: [SPARK-24959][SQL] Speed up count() for JSON and ...
Github user MaxGekk commented on a diff in the pull request: https://github.com/apache/spark/pull/21909#discussion_r205978149 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FailureSafeParser.scala --- @@ -56,9 +57,14 @@ class FailureSafeParser[IN]( } } + private val skipParsing = optimizeEmptySchema && schema.isEmpty def parse(input: IN): Iterator[InternalRow] = { try { - rawParser.apply(input).toIterator.map(row => toResultRow(Some(row), () => null)) + if (skipParsing) { + Iterator.single(InternalRow.empty) --- End diff -- It is not the same. If you return empty iterator, `count()` will always return `0`. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21909: [SPARK-24959][SQL] Speed up count() for JSON and ...
Github user MaxGekk commented on a diff in the pull request: https://github.com/apache/spark/pull/21909#discussion_r205978091 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/UnivocityParser.scala --- @@ -203,19 +203,11 @@ class UnivocityParser( } } - private val doParse = if (requiredSchema.nonEmpty) { --- End diff -- The introduced optimization works in the case if `multiLine` is disable. In that case, this removed code is used. For now it is not needed anymore because it just duplicates optimization in some sense. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21909: [SPARK-24959][SQL] Speed up count() for JSON and ...
Github user MaxGekk commented on a diff in the pull request: https://github.com/apache/spark/pull/21909#discussion_r205977956 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala --- @@ -450,7 +450,8 @@ class DataFrameReader private[sql](sparkSession: SparkSession) extends Logging { input => rawParser.parse(input, createParser, UTF8String.fromString), parsedOptions.parseMode, schema, -parsedOptions.columnNameOfCorruptRecord) +parsedOptions.columnNameOfCorruptRecord, +optimizeEmptySchema = true) --- End diff -- Here can be only one JSON object of struct type per input string. Don't see any reasons to turn the optimization off. Maybe you have some examples when the optimization doesn't work correctly? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21909: [SPARK-24959][SQL] Speed up count() for JSON and ...
Github user dmateusp commented on a diff in the pull request: https://github.com/apache/spark/pull/21909#discussion_r205974316 --- Diff: sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala --- @@ -2233,7 +2233,7 @@ class JsonSuite extends QueryTest with SharedSQLContext with TestJsonData { .option("multiline", "true") .options(Map("encoding" -> "UTF-16BE")) .json(testFile(fileName)) -.count() +.collect() --- End diff -- just curious why going from count() to collect() here ? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21909: [SPARK-24959][SQL] Speed up count() for JSON and ...
Github user dmateusp commented on a diff in the pull request: https://github.com/apache/spark/pull/21909#discussion_r205974224 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/UnivocityParser.scala --- @@ -203,19 +203,11 @@ class UnivocityParser( } } - private val doParse = if (requiredSchema.nonEmpty) { --- End diff -- are the changes here https://github.com/apache/spark/pull/21909/files#diff-3a4dc120191f7052e5d98db11934bfb5R63 replacing the need for the `requiredSchema.nonEmpty` check ? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21909: [SPARK-24959][SQL] Speed up count() for JSON and ...
Github user dmateusp commented on a diff in the pull request: https://github.com/apache/spark/pull/21909#discussion_r205974275 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FailureSafeParser.scala --- @@ -56,9 +57,14 @@ class FailureSafeParser[IN]( } } + private val skipParsing = optimizeEmptySchema && schema.isEmpty def parse(input: IN): Iterator[InternalRow] = { try { - rawParser.apply(input).toIterator.map(row => toResultRow(Some(row), () => null)) + if (skipParsing) { + Iterator.single(InternalRow.empty) --- End diff -- nit: `Iterator.empty` --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21909: [SPARK-24959][SQL] Speed up count() for JSON and ...
Github user dmateusp commented on a diff in the pull request: https://github.com/apache/spark/pull/21909#discussion_r205969639 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala --- @@ -450,7 +450,8 @@ class DataFrameReader private[sql](sparkSession: SparkSession) extends Logging { input => rawParser.parse(input, createParser, UTF8String.fromString), parsedOptions.parseMode, schema, -parsedOptions.columnNameOfCorruptRecord) +parsedOptions.columnNameOfCorruptRecord, +optimizeEmptySchema = true) --- End diff -- Is the case to turn off `optimizeEmptySchema` multiline JSONs ? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21909: [SPARK-24959][SQL] Speed up count() for JSON and ...
GitHub user MaxGekk opened a pull request: https://github.com/apache/spark/pull/21909 [SPARK-24959][SQL] Speed up count() for JSON and CSV ## What changes were proposed in this pull request? In the PR, I propose to skip invoking of the CSV/JSON parser per each line in the case if the required schema is empty. Added benchmarks for `count()` shows performance improvement up to **3.5 times**. Before: ``` Count a dataset with 10 columns: Best/Avg Time(ms)Rate(M/s) Per Row(ns) -- JSON count() 7676 / 7715 1.3 767.6 CSV count()3309 / 3363 3.0 330.9 ``` After: ``` Count a dataset with 10 columns: Best/Avg Time(ms)Rate(M/s) Per Row(ns) -- JSON count() 2104 / 2156 4.8 210.4 CSV count()2332 / 2386 4.3 233.2 ``` ## How was this patch tested? It was tested by `CSVSuite` and `JSONSuite` as well as on added benchmarks. You can merge this pull request into a Git repository by running: $ git pull https://github.com/MaxGekk/spark-1 empty-schema-optimization Alternatively you can review and apply these changes as the patch at: https://github.com/apache/spark/pull/21909.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #21909 commit bc4ce261a2d13be0a31b18f006da79b55880d409 Author: Maxim Gekk Date: 2018-07-28T15:31:20Z Added a benchmark for count() commit 91250d21d4bb451062873c59df6fe3b4669bc5ff Author: Maxim Gekk Date: 2018-07-28T15:50:15Z Added a CSV benchmark for count() commit bdc5ea540b9eb62bb28606bdeb311ce5662e4bf7 Author: Maxim Gekk Date: 2018-07-28T15:59:44Z Speed up count() commit d40f9bb229ab8ea9e2d95499ae203f7c41098bcd Author: Maxim Gekk Date: 2018-07-28T16:00:17Z Updating CSV and JSON benchmarks for count() commit abd8572497ff742ef6ea942864195be75a40ca71 Author: Maxim Gekk Date: 2018-07-28T16:23:03Z Fix benchmark's output commit 359c4fcbfdb4f4e77faa3977f381dc8e819e46fa Author: Maxim Gekk Date: 2018-07-28T16:23:44Z Uncomment other benchmarks --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org