Repository: spark Updated Branches: refs/heads/master 217db56ba -> c775bf09e
[SPARK-13792][SQL] Limit logging of bad records in CSV data source ## What changes were proposed in this pull request? This pull request adds a new option (maxMalformedLogPerPartition) in CSV reader to limit the maximum of logging message Spark generates per partition for malformed records. The error log looks something like ``` 16/06/20 18:50:14 WARN CSVRelation: Dropping malformed line: adsf,1,4 16/06/20 18:50:14 WARN CSVRelation: Dropping malformed line: adsf,1,4 16/06/20 18:50:14 WARN CSVRelation: Dropping malformed line: adsf,1,4 16/06/20 18:50:14 WARN CSVRelation: Dropping malformed line: adsf,1,4 16/06/20 18:50:14 WARN CSVRelation: Dropping malformed line: adsf,1,4 16/06/20 18:50:14 WARN CSVRelation: Dropping malformed line: adsf,1,4 16/06/20 18:50:14 WARN CSVRelation: Dropping malformed line: adsf,1,4 16/06/20 18:50:14 WARN CSVRelation: Dropping malformed line: adsf,1,4 16/06/20 18:50:14 WARN CSVRelation: Dropping malformed line: adsf,1,4 16/06/20 18:50:14 WARN CSVRelation: Dropping malformed line: adsf,1,4 16/06/20 18:50:14 WARN CSVRelation: More than 10 malformed records have been found on this partition. Malformed records from now on will not be logged. ``` Closes #12173 ## How was this patch tested? Manually tested. Author: Reynold Xin <r...@databricks.com> Closes #13795 from rxin/SPARK-13792. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/c775bf09 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/c775bf09 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/c775bf09 Branch: refs/heads/master Commit: c775bf09e0c3540f76de3f15d3fd35112a4912c1 Parents: 217db56 Author: Reynold Xin <r...@databricks.com> Authored: Mon Jun 20 21:46:12 2016 -0700 Committer: Reynold Xin <r...@databricks.com> Committed: Mon Jun 20 21:46:12 2016 -0700 ---------------------------------------------------------------------- python/pyspark/sql/readwriter.py | 4 ++ .../org/apache/spark/sql/DataFrameReader.scala | 2 + .../datasources/csv/CSVFileFormat.scala | 9 ++++- .../execution/datasources/csv/CSVOptions.scala | 2 + .../execution/datasources/csv/CSVRelation.scala | 42 +++++++++++++------- 5 files changed, 44 insertions(+), 15 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/spark/blob/c775bf09/python/pyspark/sql/readwriter.py ---------------------------------------------------------------------- diff --git a/python/pyspark/sql/readwriter.py b/python/pyspark/sql/readwriter.py index 72fd184..89506ca 100644 --- a/python/pyspark/sql/readwriter.py +++ b/python/pyspark/sql/readwriter.py @@ -392,6 +392,10 @@ class DataFrameReader(ReaderUtils): :param maxCharsPerColumn: defines the maximum number of characters allowed for any given value being read. If None is set, it uses the default value, ``1000000``. + :param maxMalformedLogPerPartition: sets the maximum number of malformed rows Spark will + log for each partition. Malformed records beyond this + number will be ignored. If None is set, it + uses the default value, ``10``. :param mode: allows a mode for dealing with corrupt records during parsing. If None is set, it uses the default value, ``PERMISSIVE``. http://git-wip-us.apache.org/repos/asf/spark/blob/c775bf09/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala index 841503b..35ba9c5 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala @@ -382,6 +382,8 @@ class DataFrameReader private[sql](sparkSession: SparkSession) extends Logging { * a record can have.</li> * <li>`maxCharsPerColumn` (default `1000000`): defines the maximum number of characters allowed * for any given value being read.</li> + * <li>`maxMalformedLogPerPartition` (default `10`): sets the maximum number of malformed rows + * Spark will log for each partition. Malformed records beyond this number will be ignored.</li> * <li>`mode` (default `PERMISSIVE`): allows a mode for dealing with corrupt records * during parsing.</li> * <ul> http://git-wip-us.apache.org/repos/asf/spark/blob/c775bf09/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVFileFormat.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVFileFormat.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVFileFormat.scala index be52de8..12e19f9 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVFileFormat.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVFileFormat.scala @@ -120,7 +120,14 @@ class CSVFileFormat extends TextBasedFileFormat with DataSourceRegister { val tokenizedIterator = new BulkCsvReader(lineIterator, csvOptions, headers) val parser = CSVRelation.csvParser(dataSchema, requiredSchema.fieldNames, csvOptions) - tokenizedIterator.flatMap(parser(_).toSeq) + var numMalformedRecords = 0 + tokenizedIterator.flatMap { recordTokens => + val row = parser(recordTokens, numMalformedRecords) + if (row.isEmpty) { + numMalformedRecords += 1 + } + row + } } } http://git-wip-us.apache.org/repos/asf/spark/blob/c775bf09/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVOptions.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVOptions.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVOptions.scala index 9f4ce83..581eda7 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVOptions.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVOptions.scala @@ -113,6 +113,8 @@ private[sql] class CSVOptions(@transient private val parameters: Map[String, Str val escapeQuotes = getBool("escapeQuotes", true) + val maxMalformedLogPerPartition = getInt("maxMalformedLogPerPartition", 10) + val inputBufferSize = 128 val isCommentSet = this.comment != '\u0000' http://git-wip-us.apache.org/repos/asf/spark/blob/c775bf09/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVRelation.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVRelation.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVRelation.scala index d72c8b9..083ac33 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVRelation.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVRelation.scala @@ -50,10 +50,19 @@ object CSVRelation extends Logging { } } + /** + * Returns a function that parses a single CSV record (in the form of an array of strings in which + * each element represents a column) and turns it into either one resulting row or no row (if the + * the record is malformed). + * + * The 2nd argument in the returned function represents the total number of malformed rows + * observed so far. + */ + // This is pretty convoluted and we should probably rewrite the entire CSV parsing soon. def csvParser( schema: StructType, requiredColumns: Array[String], - params: CSVOptions): Array[String] => Option[InternalRow] = { + params: CSVOptions): (Array[String], Int) => Option[InternalRow] = { val schemaFields = schema.fields val requiredFields = StructType(requiredColumns.map(schema(_))).fields val safeRequiredFields = if (params.dropMalformed) { @@ -72,9 +81,16 @@ object CSVRelation extends Logging { val requiredSize = requiredFields.length val row = new GenericMutableRow(requiredSize) - (tokens: Array[String]) => { + (tokens: Array[String], numMalformedRows) => { if (params.dropMalformed && schemaFields.length != tokens.length) { - logWarning(s"Dropping malformed line: ${tokens.mkString(params.delimiter.toString)}") + if (numMalformedRows < params.maxMalformedLogPerPartition) { + logWarning(s"Dropping malformed line: ${tokens.mkString(params.delimiter.toString)}") + } + if (numMalformedRows == params.maxMalformedLogPerPartition - 1) { + logWarning( + s"More than ${params.maxMalformedLogPerPartition} malformed records have been " + + "found on this partition. Malformed records from now on will not be logged.") + } None } else if (params.failFast && schemaFields.length != tokens.length) { throw new RuntimeException(s"Malformed line in FAILFAST mode: " + @@ -109,23 +125,21 @@ object CSVRelation extends Logging { Some(row) } catch { case NonFatal(e) if params.dropMalformed => - logWarning("Parse exception. " + - s"Dropping malformed line: ${tokens.mkString(params.delimiter.toString)}") + if (numMalformedRows < params.maxMalformedLogPerPartition) { + logWarning("Parse exception. " + + s"Dropping malformed line: ${tokens.mkString(params.delimiter.toString)}") + } + if (numMalformedRows == params.maxMalformedLogPerPartition - 1) { + logWarning( + s"More than ${params.maxMalformedLogPerPartition} malformed records have been " + + "found on this partition. Malformed records from now on will not be logged.") + } None } } } } - def parseCsv( - tokenizedRDD: RDD[Array[String]], - schema: StructType, - requiredColumns: Array[String], - options: CSVOptions): RDD[InternalRow] = { - val parser = csvParser(schema, requiredColumns, options) - tokenizedRDD.flatMap(parser(_).toSeq) - } - // Skips the header line of each file if the `header` option is set to true. def dropHeaderLine( file: PartitionedFile, lines: Iterator[String], csvOptions: CSVOptions): Unit = { --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org