Repository: spark
Updated Branches:
  refs/heads/master 23ac3aaba -> b14993e1f


[SPARK-23448][SQL] Clarify JSON and CSV parser behavior in document

## What changes were proposed in this pull request?

Clarify JSON and CSV reader behavior in document.

JSON doesn't support partial results for corrupted records.
CSV only supports partial results for the records with more or less tokens.

## How was this patch tested?

Pass existing tests.

Author: Liang-Chi Hsieh <vii...@gmail.com>

Closes #20666 from viirya/SPARK-23448-2.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/b14993e1
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/b14993e1
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/b14993e1

Branch: refs/heads/master
Commit: b14993e1fcb68e1c946a671c6048605ab4afdf58
Parents: 23ac3aa
Author: Liang-Chi Hsieh <vii...@gmail.com>
Authored: Wed Feb 28 11:00:54 2018 +0900
Committer: hyukjinkwon <gurwls...@gmail.com>
Committed: Wed Feb 28 11:00:54 2018 +0900

----------------------------------------------------------------------
 python/pyspark/sql/readwriter.py                | 30 +++++++++++---------
 python/pyspark/sql/streaming.py                 | 30 +++++++++++---------
 .../spark/sql/catalyst/json/JacksonParser.scala |  3 ++
 .../org/apache/spark/sql/DataFrameReader.scala  | 22 +++++++-------
 .../datasources/csv/UnivocityParser.scala       |  5 ++++
 .../spark/sql/streaming/DataStreamReader.scala  | 22 +++++++-------
 6 files changed, 64 insertions(+), 48 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/b14993e1/python/pyspark/sql/readwriter.py
----------------------------------------------------------------------
diff --git a/python/pyspark/sql/readwriter.py b/python/pyspark/sql/readwriter.py
index 49af1bc..9d05ac7 100644
--- a/python/pyspark/sql/readwriter.py
+++ b/python/pyspark/sql/readwriter.py
@@ -209,13 +209,13 @@ class DataFrameReader(OptionUtils):
         :param mode: allows a mode for dealing with corrupt records during 
parsing. If None is
                      set, it uses the default value, ``PERMISSIVE``.
 
-                * ``PERMISSIVE`` : sets other fields to ``null`` when it meets 
a corrupted \
-                 record, and puts the malformed string into a field configured 
by \
-                 ``columnNameOfCorruptRecord``. To keep corrupt records, an 
user can set \
-                 a string type field named ``columnNameOfCorruptRecord`` in an 
user-defined \
-                 schema. If a schema does not have the field, it drops corrupt 
records during \
-                 parsing. When inferring a schema, it implicitly adds a \
-                 ``columnNameOfCorruptRecord`` field in an output schema.
+                * ``PERMISSIVE`` : when it meets a corrupted record, puts the 
malformed string \
+                  into a field configured by ``columnNameOfCorruptRecord``, 
and sets other \
+                  fields to ``null``. To keep corrupt records, an user can set 
a string type \
+                  field named ``columnNameOfCorruptRecord`` in an user-defined 
schema. If a \
+                  schema does not have the field, it drops corrupt records 
during parsing. \
+                  When inferring a schema, it implicitly adds a 
``columnNameOfCorruptRecord`` \
+                  field in an output schema.
                 *  ``DROPMALFORMED`` : ignores the whole corrupted records.
                 *  ``FAILFAST`` : throws an exception when it meets corrupted 
records.
 
@@ -393,13 +393,15 @@ class DataFrameReader(OptionUtils):
         :param mode: allows a mode for dealing with corrupt records during 
parsing. If None is
                      set, it uses the default value, ``PERMISSIVE``.
 
-                * ``PERMISSIVE`` : sets other fields to ``null`` when it meets 
a corrupted \
-                  record, and puts the malformed string into a field 
configured by \
-                  ``columnNameOfCorruptRecord``. To keep corrupt records, an 
user can set \
-                  a string type field named ``columnNameOfCorruptRecord`` in 
an \
-                  user-defined schema. If a schema does not have the field, it 
drops corrupt \
-                  records during parsing. When a length of parsed CSV tokens 
is shorter than \
-                  an expected length of a schema, it sets `null` for extra 
fields.
+                * ``PERMISSIVE`` : when it meets a corrupted record, puts the 
malformed string \
+                  into a field configured by ``columnNameOfCorruptRecord``, 
and sets other \
+                  fields to ``null``. To keep corrupt records, an user can set 
a string type \
+                  field named ``columnNameOfCorruptRecord`` in an user-defined 
schema. If a \
+                  schema does not have the field, it drops corrupt records 
during parsing. \
+                  A record with less/more tokens than schema is not a 
corrupted record to CSV. \
+                  When it meets a record having fewer tokens than the length 
of the schema, \
+                  sets ``null`` to extra fields. When the record has more 
tokens than the \
+                  length of the schema, it drops extra tokens.
                 * ``DROPMALFORMED`` : ignores the whole corrupted records.
                 * ``FAILFAST`` : throws an exception when it meets corrupted 
records.
 

http://git-wip-us.apache.org/repos/asf/spark/blob/b14993e1/python/pyspark/sql/streaming.py
----------------------------------------------------------------------
diff --git a/python/pyspark/sql/streaming.py b/python/pyspark/sql/streaming.py
index e2a97ac..cc622de 100644
--- a/python/pyspark/sql/streaming.py
+++ b/python/pyspark/sql/streaming.py
@@ -442,13 +442,13 @@ class DataStreamReader(OptionUtils):
         :param mode: allows a mode for dealing with corrupt records during 
parsing. If None is
                      set, it uses the default value, ``PERMISSIVE``.
 
-                * ``PERMISSIVE`` : sets other fields to ``null`` when it meets 
a corrupted \
-                 record, and puts the malformed string into a field configured 
by \
-                 ``columnNameOfCorruptRecord``. To keep corrupt records, an 
user can set \
-                 a string type field named ``columnNameOfCorruptRecord`` in an 
user-defined \
-                 schema. If a schema does not have the field, it drops corrupt 
records during \
-                 parsing. When inferring a schema, it implicitly adds a \
-                 ``columnNameOfCorruptRecord`` field in an output schema.
+                * ``PERMISSIVE`` : when it meets a corrupted record, puts the 
malformed string \
+                  into a field configured by ``columnNameOfCorruptRecord``, 
and sets other \
+                  fields to ``null``. To keep corrupt records, an user can set 
a string type \
+                  field named ``columnNameOfCorruptRecord`` in an user-defined 
schema. If a \
+                  schema does not have the field, it drops corrupt records 
during parsing. \
+                  When inferring a schema, it implicitly adds a 
``columnNameOfCorruptRecord`` \
+                  field in an output schema.
                 *  ``DROPMALFORMED`` : ignores the whole corrupted records.
                 *  ``FAILFAST`` : throws an exception when it meets corrupted 
records.
 
@@ -621,13 +621,15 @@ class DataStreamReader(OptionUtils):
         :param mode: allows a mode for dealing with corrupt records during 
parsing. If None is
                      set, it uses the default value, ``PERMISSIVE``.
 
-                * ``PERMISSIVE`` : sets other fields to ``null`` when it meets 
a corrupted \
-                  record, and puts the malformed string into a field 
configured by \
-                  ``columnNameOfCorruptRecord``. To keep corrupt records, an 
user can set \
-                  a string type field named ``columnNameOfCorruptRecord`` in 
an \
-                  user-defined schema. If a schema does not have the field, it 
drops corrupt \
-                  records during parsing. When a length of parsed CSV tokens 
is shorter than \
-                  an expected length of a schema, it sets `null` for extra 
fields.
+                * ``PERMISSIVE`` : when it meets a corrupted record, puts the 
malformed string \
+                  into a field configured by ``columnNameOfCorruptRecord``, 
and sets other \
+                  fields to ``null``. To keep corrupt records, an user can set 
a string type \
+                  field named ``columnNameOfCorruptRecord`` in an user-defined 
schema. If a \
+                  schema does not have the field, it drops corrupt records 
during parsing. \
+                  A record with less/more tokens than schema is not a 
corrupted record to CSV. \
+                  When it meets a record having fewer tokens than the length 
of the schema, \
+                  sets ``null`` to extra fields. When the record has more 
tokens than the \
+                  length of the schema, it drops extra tokens.
                 * ``DROPMALFORMED`` : ignores the whole corrupted records.
                 * ``FAILFAST`` : throws an exception when it meets corrupted 
records.
 

http://git-wip-us.apache.org/repos/asf/spark/blob/b14993e1/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JacksonParser.scala
----------------------------------------------------------------------
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JacksonParser.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JacksonParser.scala
index bd144c9..7f69569 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JacksonParser.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JacksonParser.scala
@@ -357,6 +357,9 @@ class JacksonParser(
       }
     } catch {
       case e @ (_: RuntimeException | _: JsonProcessingException) =>
+        // JSON parser currently doesn't support partial results for corrupted 
records.
+        // For such records, all fields other than the field configured by
+        // `columnNameOfCorruptRecord` are set to `null`.
         throw BadRecordException(() => recordLiteral(record), () => None, e)
     }
   }

http://git-wip-us.apache.org/repos/asf/spark/blob/b14993e1/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala
index 4274f12..0139913 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala
@@ -345,12 +345,12 @@ class DataFrameReader private[sql](sparkSession: 
SparkSession) extends Logging {
    * <li>`mode` (default `PERMISSIVE`): allows a mode for dealing with corrupt 
records
    * during parsing.
    *   <ul>
-   *     <li>`PERMISSIVE` : sets other fields to `null` when it meets a 
corrupted record, and puts
-   *     the malformed string into a field configured by 
`columnNameOfCorruptRecord`. To keep
-   *     corrupt records, an user can set a string type field named 
`columnNameOfCorruptRecord`
-   *     in an user-defined schema. If a schema does not have the field, it 
drops corrupt records
-   *     during parsing. When inferring a schema, it implicitly adds a 
`columnNameOfCorruptRecord`
-   *     field in an output schema.</li>
+   *     <li>`PERMISSIVE` : when it meets a corrupted record, puts the 
malformed string into a
+   *     field configured by `columnNameOfCorruptRecord`, and sets other 
fields to `null`. To
+   *     keep corrupt records, an user can set a string type field named
+   *     `columnNameOfCorruptRecord` in an user-defined schema. If a schema 
does not have the
+   *     field, it drops corrupt records during parsing. When inferring a 
schema, it implicitly
+   *     adds a `columnNameOfCorruptRecord` field in an output schema.</li>
    *     <li>`DROPMALFORMED` : ignores the whole corrupted records.</li>
    *     <li>`FAILFAST` : throws an exception when it meets corrupted 
records.</li>
    *   </ul>
@@ -550,12 +550,14 @@ class DataFrameReader private[sql](sparkSession: 
SparkSession) extends Logging {
    * <li>`mode` (default `PERMISSIVE`): allows a mode for dealing with corrupt 
records
    *    during parsing. It supports the following case-insensitive modes.
    *   <ul>
-   *     <li>`PERMISSIVE` : sets other fields to `null` when it meets a 
corrupted record, and puts
-   *     the malformed string into a field configured by 
`columnNameOfCorruptRecord`. To keep
+   *     <li>`PERMISSIVE` : when it meets a corrupted record, puts the 
malformed string into a
+   *     field configured by `columnNameOfCorruptRecord`, and sets other 
fields to `null`. To keep
    *     corrupt records, an user can set a string type field named 
`columnNameOfCorruptRecord`
    *     in an user-defined schema. If a schema does not have the field, it 
drops corrupt records
-   *     during parsing. When a length of parsed CSV tokens is shorter than an 
expected length
-   *     of a schema, it sets `null` for extra fields.</li>
+   *     during parsing. A record with less/more tokens than schema is not a 
corrupted record to
+   *     CSV. When it meets a record having fewer tokens than the length of 
the schema, sets
+   *     `null` to extra fields. When the record has more tokens than the 
length of the schema,
+   *     it drops extra tokens.</li>
    *     <li>`DROPMALFORMED` : ignores the whole corrupted records.</li>
    *     <li>`FAILFAST` : throws an exception when it meets corrupted 
records.</li>
    *   </ul>

http://git-wip-us.apache.org/repos/asf/spark/blob/b14993e1/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/UnivocityParser.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/UnivocityParser.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/UnivocityParser.scala
index 7d6d7e7..3d6cc30 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/UnivocityParser.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/UnivocityParser.scala
@@ -203,6 +203,8 @@ class UnivocityParser(
           case _: BadRecordException => None
         }
       }
+      // For records with less or more tokens than the schema, tries to return 
partial results
+      // if possible.
       throw BadRecordException(
         () => getCurrentInput,
         () => getPartialResult(),
@@ -218,6 +220,9 @@ class UnivocityParser(
         row
       } catch {
         case NonFatal(e) =>
+          // For corrupted records with the number of tokens same as the 
schema,
+          // CSV reader doesn't support partial results. All fields other than 
the field
+          // configured by `columnNameOfCorruptRecord` are set to `null`.
           throw BadRecordException(() => getCurrentInput, () => None, e)
       }
     }

http://git-wip-us.apache.org/repos/asf/spark/blob/b14993e1/sql/core/src/main/scala/org/apache/spark/sql/streaming/DataStreamReader.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/streaming/DataStreamReader.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/streaming/DataStreamReader.scala
index f238516..61e22fa 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/streaming/DataStreamReader.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/streaming/DataStreamReader.scala
@@ -236,12 +236,12 @@ final class DataStreamReader private[sql](sparkSession: 
SparkSession) extends Lo
    * <li>`mode` (default `PERMISSIVE`): allows a mode for dealing with corrupt 
records
    * during parsing.
    *   <ul>
-   *     <li>`PERMISSIVE` : sets other fields to `null` when it meets a 
corrupted record, and puts
-   *     the malformed string into a field configured by 
`columnNameOfCorruptRecord`. To keep
-   *     corrupt records, an user can set a string type field named 
`columnNameOfCorruptRecord`
-   *     in an user-defined schema. If a schema does not have the field, it 
drops corrupt records
-   *     during parsing. When inferring a schema, it implicitly adds a 
`columnNameOfCorruptRecord`
-   *     field in an output schema.</li>
+   *     <li>`PERMISSIVE` : when it meets a corrupted record, puts the 
malformed string into a
+   *     field configured by `columnNameOfCorruptRecord`, and sets other 
fields to `null`. To
+   *     keep corrupt records, an user can set a string type field named
+   *     `columnNameOfCorruptRecord` in an user-defined schema. If a schema 
does not have the
+   *     field, it drops corrupt records during parsing. When inferring a 
schema, it implicitly
+   *     adds a `columnNameOfCorruptRecord` field in an output schema.</li>
    *     <li>`DROPMALFORMED` : ignores the whole corrupted records.</li>
    *     <li>`FAILFAST` : throws an exception when it meets corrupted 
records.</li>
    *   </ul>
@@ -316,12 +316,14 @@ final class DataStreamReader private[sql](sparkSession: 
SparkSession) extends Lo
    * <li>`mode` (default `PERMISSIVE`): allows a mode for dealing with corrupt 
records
    *    during parsing. It supports the following case-insensitive modes.
    *   <ul>
-   *     <li>`PERMISSIVE` : sets other fields to `null` when it meets a 
corrupted record, and puts
-   *     the malformed string into a field configured by 
`columnNameOfCorruptRecord`. To keep
+   *     <li>`PERMISSIVE` : when it meets a corrupted record, puts the 
malformed string into a
+   *     field configured by `columnNameOfCorruptRecord`, and sets other 
fields to `null`. To keep
    *     corrupt records, an user can set a string type field named 
`columnNameOfCorruptRecord`
    *     in an user-defined schema. If a schema does not have the field, it 
drops corrupt records
-   *     during parsing. When a length of parsed CSV tokens is shorter than an 
expected length
-   *     of a schema, it sets `null` for extra fields.</li>
+   *     during parsing. A record with less/more tokens than schema is not a 
corrupted record to
+   *     CSV. When it meets a record having fewer tokens than the length of 
the schema, sets
+   *     `null` to extra fields. When the record has more tokens than the 
length of the schema,
+   *     it drops extra tokens.</li>
    *     <li>`DROPMALFORMED` : ignores the whole corrupted records.</li>
    *     <li>`FAILFAST` : throws an exception when it meets corrupted 
records.</li>
    *   </ul>


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to