spark git commit: [SPARK-13792][SQL] Limit logging of bad records in CSV data source
Repository: spark Updated Branches: refs/heads/branch-2.0 10c476fc8 -> 603424c16 [SPARK-13792][SQL] Limit logging of bad records in CSV data source ## What changes were proposed in this pull request? This pull request adds a new option (maxMalformedLogPerPartition) in CSV reader to limit the maximum of logging message Spark generates per partition for malformed records. The error log looks something like ``` 16/06/20 18:50:14 WARN CSVRelation: Dropping malformed line: adsf,1,4 16/06/20 18:50:14 WARN CSVRelation: Dropping malformed line: adsf,1,4 16/06/20 18:50:14 WARN CSVRelation: Dropping malformed line: adsf,1,4 16/06/20 18:50:14 WARN CSVRelation: Dropping malformed line: adsf,1,4 16/06/20 18:50:14 WARN CSVRelation: Dropping malformed line: adsf,1,4 16/06/20 18:50:14 WARN CSVRelation: Dropping malformed line: adsf,1,4 16/06/20 18:50:14 WARN CSVRelation: Dropping malformed line: adsf,1,4 16/06/20 18:50:14 WARN CSVRelation: Dropping malformed line: adsf,1,4 16/06/20 18:50:14 WARN CSVRelation: Dropping malformed line: adsf,1,4 16/06/20 18:50:14 WARN CSVRelation: Dropping malformed line: adsf,1,4 16/06/20 18:50:14 WARN CSVRelation: More than 10 malformed records have been found on this partition. Malformed records from now on will not be logged. ``` Closes #12173 ## How was this patch tested? Manually tested. Author: Reynold XinCloses #13795 from rxin/SPARK-13792. (cherry picked from commit c775bf09e0c3540f76de3f15d3fd35112a4912c1) Signed-off-by: Reynold Xin Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/603424c1 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/603424c1 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/603424c1 Branch: refs/heads/branch-2.0 Commit: 603424c161e9be670ee8461053225364cc700515 Parents: 10c476f Author: Reynold Xin Authored: Mon Jun 20 21:46:12 2016 -0700 Committer: Reynold Xin Committed: Mon Jun 20 21:46:20 2016 -0700 -- python/pyspark/sql/readwriter.py| 4 ++ .../org/apache/spark/sql/DataFrameReader.scala | 2 + .../datasources/csv/CSVFileFormat.scala | 9 - .../execution/datasources/csv/CSVOptions.scala | 2 + .../execution/datasources/csv/CSVRelation.scala | 42 +--- 5 files changed, 44 insertions(+), 15 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/603424c1/python/pyspark/sql/readwriter.py -- diff --git a/python/pyspark/sql/readwriter.py b/python/pyspark/sql/readwriter.py index 72fd184..89506ca 100644 --- a/python/pyspark/sql/readwriter.py +++ b/python/pyspark/sql/readwriter.py @@ -392,6 +392,10 @@ class DataFrameReader(ReaderUtils): :param maxCharsPerColumn: defines the maximum number of characters allowed for any given value being read. If None is set, it uses the default value, ``100``. +:param maxMalformedLogPerPartition: sets the maximum number of malformed rows Spark will +log for each partition. Malformed records beyond this +number will be ignored. If None is set, it +uses the default value, ``10``. :param mode: allows a mode for dealing with corrupt records during parsing. If None is set, it uses the default value, ``PERMISSIVE``. http://git-wip-us.apache.org/repos/asf/spark/blob/603424c1/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala -- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala index 841503b..35ba9c5 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala @@ -382,6 +382,8 @@ class DataFrameReader private[sql](sparkSession: SparkSession) extends Logging { * a record can have. * `maxCharsPerColumn` (default `100`): defines the maximum number of characters allowed * for any given value being read. + * `maxMalformedLogPerPartition` (default `10`): sets the maximum number of malformed rows + * Spark will log for each partition. Malformed records beyond this number will be ignored. * `mode` (default `PERMISSIVE`): allows a mode for dealing with corrupt records *during parsing. *
spark git commit: [SPARK-13792][SQL] Limit logging of bad records in CSV data source
Repository: spark Updated Branches: refs/heads/master 217db56ba -> c775bf09e [SPARK-13792][SQL] Limit logging of bad records in CSV data source ## What changes were proposed in this pull request? This pull request adds a new option (maxMalformedLogPerPartition) in CSV reader to limit the maximum of logging message Spark generates per partition for malformed records. The error log looks something like ``` 16/06/20 18:50:14 WARN CSVRelation: Dropping malformed line: adsf,1,4 16/06/20 18:50:14 WARN CSVRelation: Dropping malformed line: adsf,1,4 16/06/20 18:50:14 WARN CSVRelation: Dropping malformed line: adsf,1,4 16/06/20 18:50:14 WARN CSVRelation: Dropping malformed line: adsf,1,4 16/06/20 18:50:14 WARN CSVRelation: Dropping malformed line: adsf,1,4 16/06/20 18:50:14 WARN CSVRelation: Dropping malformed line: adsf,1,4 16/06/20 18:50:14 WARN CSVRelation: Dropping malformed line: adsf,1,4 16/06/20 18:50:14 WARN CSVRelation: Dropping malformed line: adsf,1,4 16/06/20 18:50:14 WARN CSVRelation: Dropping malformed line: adsf,1,4 16/06/20 18:50:14 WARN CSVRelation: Dropping malformed line: adsf,1,4 16/06/20 18:50:14 WARN CSVRelation: More than 10 malformed records have been found on this partition. Malformed records from now on will not be logged. ``` Closes #12173 ## How was this patch tested? Manually tested. Author: Reynold XinCloses #13795 from rxin/SPARK-13792. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/c775bf09 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/c775bf09 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/c775bf09 Branch: refs/heads/master Commit: c775bf09e0c3540f76de3f15d3fd35112a4912c1 Parents: 217db56 Author: Reynold Xin Authored: Mon Jun 20 21:46:12 2016 -0700 Committer: Reynold Xin Committed: Mon Jun 20 21:46:12 2016 -0700 -- python/pyspark/sql/readwriter.py| 4 ++ .../org/apache/spark/sql/DataFrameReader.scala | 2 + .../datasources/csv/CSVFileFormat.scala | 9 - .../execution/datasources/csv/CSVOptions.scala | 2 + .../execution/datasources/csv/CSVRelation.scala | 42 +--- 5 files changed, 44 insertions(+), 15 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/c775bf09/python/pyspark/sql/readwriter.py -- diff --git a/python/pyspark/sql/readwriter.py b/python/pyspark/sql/readwriter.py index 72fd184..89506ca 100644 --- a/python/pyspark/sql/readwriter.py +++ b/python/pyspark/sql/readwriter.py @@ -392,6 +392,10 @@ class DataFrameReader(ReaderUtils): :param maxCharsPerColumn: defines the maximum number of characters allowed for any given value being read. If None is set, it uses the default value, ``100``. +:param maxMalformedLogPerPartition: sets the maximum number of malformed rows Spark will +log for each partition. Malformed records beyond this +number will be ignored. If None is set, it +uses the default value, ``10``. :param mode: allows a mode for dealing with corrupt records during parsing. If None is set, it uses the default value, ``PERMISSIVE``. http://git-wip-us.apache.org/repos/asf/spark/blob/c775bf09/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala -- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala index 841503b..35ba9c5 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala @@ -382,6 +382,8 @@ class DataFrameReader private[sql](sparkSession: SparkSession) extends Logging { * a record can have. * `maxCharsPerColumn` (default `100`): defines the maximum number of characters allowed * for any given value being read. + * `maxMalformedLogPerPartition` (default `10`): sets the maximum number of malformed rows + * Spark will log for each partition. Malformed records beyond this number will be ignored. * `mode` (default `PERMISSIVE`): allows a mode for dealing with corrupt records *during parsing. * http://git-wip-us.apache.org/repos/asf/spark/blob/c775bf09/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVFileFormat.scala -- diff --git