spark git commit: [SPARK-13792][SQL] Limit logging of bad records in CSV data source

2016-06-20 Thread rxin
Repository: spark
Updated Branches:
  refs/heads/branch-2.0 10c476fc8 -> 603424c16


[SPARK-13792][SQL] Limit logging of bad records in CSV data source

## What changes were proposed in this pull request?
This pull request adds a new option (maxMalformedLogPerPartition) in CSV reader 
to limit the maximum of logging message Spark generates per partition for 
malformed records.

The error log looks something like
```
16/06/20 18:50:14 WARN CSVRelation: Dropping malformed line: adsf,1,4
16/06/20 18:50:14 WARN CSVRelation: Dropping malformed line: adsf,1,4
16/06/20 18:50:14 WARN CSVRelation: Dropping malformed line: adsf,1,4
16/06/20 18:50:14 WARN CSVRelation: Dropping malformed line: adsf,1,4
16/06/20 18:50:14 WARN CSVRelation: Dropping malformed line: adsf,1,4
16/06/20 18:50:14 WARN CSVRelation: Dropping malformed line: adsf,1,4
16/06/20 18:50:14 WARN CSVRelation: Dropping malformed line: adsf,1,4
16/06/20 18:50:14 WARN CSVRelation: Dropping malformed line: adsf,1,4
16/06/20 18:50:14 WARN CSVRelation: Dropping malformed line: adsf,1,4
16/06/20 18:50:14 WARN CSVRelation: Dropping malformed line: adsf,1,4
16/06/20 18:50:14 WARN CSVRelation: More than 10 malformed records have been 
found on this partition. Malformed records from now on will not be logged.
```

Closes #12173

## How was this patch tested?
Manually tested.

Author: Reynold Xin 

Closes #13795 from rxin/SPARK-13792.

(cherry picked from commit c775bf09e0c3540f76de3f15d3fd35112a4912c1)
Signed-off-by: Reynold Xin 


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/603424c1
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/603424c1
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/603424c1

Branch: refs/heads/branch-2.0
Commit: 603424c161e9be670ee8461053225364cc700515
Parents: 10c476f
Author: Reynold Xin 
Authored: Mon Jun 20 21:46:12 2016 -0700
Committer: Reynold Xin 
Committed: Mon Jun 20 21:46:20 2016 -0700

--
 python/pyspark/sql/readwriter.py|  4 ++
 .../org/apache/spark/sql/DataFrameReader.scala  |  2 +
 .../datasources/csv/CSVFileFormat.scala |  9 -
 .../execution/datasources/csv/CSVOptions.scala  |  2 +
 .../execution/datasources/csv/CSVRelation.scala | 42 +---
 5 files changed, 44 insertions(+), 15 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/603424c1/python/pyspark/sql/readwriter.py
--
diff --git a/python/pyspark/sql/readwriter.py b/python/pyspark/sql/readwriter.py
index 72fd184..89506ca 100644
--- a/python/pyspark/sql/readwriter.py
+++ b/python/pyspark/sql/readwriter.py
@@ -392,6 +392,10 @@ class DataFrameReader(ReaderUtils):
 :param maxCharsPerColumn: defines the maximum number of characters 
allowed for any given
   value being read. If None is set, it uses 
the default value,
   ``100``.
+:param maxMalformedLogPerPartition: sets the maximum number of 
malformed rows Spark will
+log for each partition. Malformed 
records beyond this
+number will be ignored. If None is 
set, it
+uses the default value, ``10``.
 :param mode: allows a mode for dealing with corrupt records during 
parsing. If None is
  set, it uses the default value, ``PERMISSIVE``.
 

http://git-wip-us.apache.org/repos/asf/spark/blob/603424c1/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala
--
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala
index 841503b..35ba9c5 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala
@@ -382,6 +382,8 @@ class DataFrameReader private[sql](sparkSession: 
SparkSession) extends Logging {
* a record can have.
* `maxCharsPerColumn` (default `100`): defines the maximum number 
of characters allowed
* for any given value being read.
+   * `maxMalformedLogPerPartition` (default `10`): sets the maximum number 
of malformed rows
+   * Spark will log for each partition. Malformed records beyond this number 
will be ignored.
* `mode` (default `PERMISSIVE`): allows a mode for dealing with corrupt 
records
*during parsing.
* 


spark git commit: [SPARK-13792][SQL] Limit logging of bad records in CSV data source

2016-06-20 Thread rxin
Repository: spark
Updated Branches:
  refs/heads/master 217db56ba -> c775bf09e


[SPARK-13792][SQL] Limit logging of bad records in CSV data source

## What changes were proposed in this pull request?
This pull request adds a new option (maxMalformedLogPerPartition) in CSV reader 
to limit the maximum of logging message Spark generates per partition for 
malformed records.

The error log looks something like
```
16/06/20 18:50:14 WARN CSVRelation: Dropping malformed line: adsf,1,4
16/06/20 18:50:14 WARN CSVRelation: Dropping malformed line: adsf,1,4
16/06/20 18:50:14 WARN CSVRelation: Dropping malformed line: adsf,1,4
16/06/20 18:50:14 WARN CSVRelation: Dropping malformed line: adsf,1,4
16/06/20 18:50:14 WARN CSVRelation: Dropping malformed line: adsf,1,4
16/06/20 18:50:14 WARN CSVRelation: Dropping malformed line: adsf,1,4
16/06/20 18:50:14 WARN CSVRelation: Dropping malformed line: adsf,1,4
16/06/20 18:50:14 WARN CSVRelation: Dropping malformed line: adsf,1,4
16/06/20 18:50:14 WARN CSVRelation: Dropping malformed line: adsf,1,4
16/06/20 18:50:14 WARN CSVRelation: Dropping malformed line: adsf,1,4
16/06/20 18:50:14 WARN CSVRelation: More than 10 malformed records have been 
found on this partition. Malformed records from now on will not be logged.
```

Closes #12173

## How was this patch tested?
Manually tested.

Author: Reynold Xin 

Closes #13795 from rxin/SPARK-13792.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/c775bf09
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/c775bf09
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/c775bf09

Branch: refs/heads/master
Commit: c775bf09e0c3540f76de3f15d3fd35112a4912c1
Parents: 217db56
Author: Reynold Xin 
Authored: Mon Jun 20 21:46:12 2016 -0700
Committer: Reynold Xin 
Committed: Mon Jun 20 21:46:12 2016 -0700

--
 python/pyspark/sql/readwriter.py|  4 ++
 .../org/apache/spark/sql/DataFrameReader.scala  |  2 +
 .../datasources/csv/CSVFileFormat.scala |  9 -
 .../execution/datasources/csv/CSVOptions.scala  |  2 +
 .../execution/datasources/csv/CSVRelation.scala | 42 +---
 5 files changed, 44 insertions(+), 15 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/c775bf09/python/pyspark/sql/readwriter.py
--
diff --git a/python/pyspark/sql/readwriter.py b/python/pyspark/sql/readwriter.py
index 72fd184..89506ca 100644
--- a/python/pyspark/sql/readwriter.py
+++ b/python/pyspark/sql/readwriter.py
@@ -392,6 +392,10 @@ class DataFrameReader(ReaderUtils):
 :param maxCharsPerColumn: defines the maximum number of characters 
allowed for any given
   value being read. If None is set, it uses 
the default value,
   ``100``.
+:param maxMalformedLogPerPartition: sets the maximum number of 
malformed rows Spark will
+log for each partition. Malformed 
records beyond this
+number will be ignored. If None is 
set, it
+uses the default value, ``10``.
 :param mode: allows a mode for dealing with corrupt records during 
parsing. If None is
  set, it uses the default value, ``PERMISSIVE``.
 

http://git-wip-us.apache.org/repos/asf/spark/blob/c775bf09/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala
--
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala
index 841503b..35ba9c5 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala
@@ -382,6 +382,8 @@ class DataFrameReader private[sql](sparkSession: 
SparkSession) extends Logging {
* a record can have.
* `maxCharsPerColumn` (default `100`): defines the maximum number 
of characters allowed
* for any given value being read.
+   * `maxMalformedLogPerPartition` (default `10`): sets the maximum number 
of malformed rows
+   * Spark will log for each partition. Malformed records beyond this number 
will be ignored.
* `mode` (default `PERMISSIVE`): allows a mode for dealing with corrupt 
records
*during parsing.
* 

http://git-wip-us.apache.org/repos/asf/spark/blob/c775bf09/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVFileFormat.scala
--
diff --git