Repository: spark
Updated Branches:
  refs/heads/master b0a935255 -> e219e692e


[SPARK-23772][SQL] Provide an option to ignore column of all null values or 
empty array during JSON schema inference

## What changes were proposed in this pull request?
This pr added a new JSON option `dropFieldIfAllNull ` to ignore column of all 
null values or empty array/struct during JSON schema inference.

## How was this patch tested?
Added tests in `JsonSuite`.

Author: Takeshi Yamamuro <yamam...@apache.org>
Author: Xiangrui Meng <m...@databricks.com>

Closes #20929 from maropu/SPARK-23772.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/e219e692
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/e219e692
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/e219e692

Branch: refs/heads/master
Commit: e219e692ef70c161f37a48bfdec2a94b29260004
Parents: b0a9352
Author: Takeshi Yamamuro <yamam...@apache.org>
Authored: Tue Jun 19 00:24:54 2018 +0800
Committer: hyukjinkwon <gurwls...@apache.org>
Committed: Tue Jun 19 00:24:54 2018 +0800

----------------------------------------------------------------------
 python/pyspark/sql/readwriter.py                |  5 +-
 .../spark/sql/catalyst/json/JSONOptions.scala   |  3 ++
 .../org/apache/spark/sql/DataFrameReader.scala  |  2 +
 .../datasources/json/JsonInferSchema.scala      | 40 ++++++++--------
 .../spark/sql/streaming/DataStreamReader.scala  |  2 +
 .../execution/datasources/json/JsonSuite.scala  | 49 ++++++++++++++++++++
 6 files changed, 80 insertions(+), 21 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/e219e692/python/pyspark/sql/readwriter.py
----------------------------------------------------------------------
diff --git a/python/pyspark/sql/readwriter.py b/python/pyspark/sql/readwriter.py
index a0e20d3..3efe2ad 100644
--- a/python/pyspark/sql/readwriter.py
+++ b/python/pyspark/sql/readwriter.py
@@ -177,7 +177,7 @@ class DataFrameReader(OptionUtils):
              allowNumericLeadingZero=None, 
allowBackslashEscapingAnyCharacter=None,
              mode=None, columnNameOfCorruptRecord=None, dateFormat=None, 
timestampFormat=None,
              multiLine=None, allowUnquotedControlChars=None, lineSep=None, 
samplingRatio=None,
-             encoding=None):
+             dropFieldIfAllNull=None, encoding=None):
         """
         Loads JSON files and returns the results as a :class:`DataFrame`.
 
@@ -246,6 +246,9 @@ class DataFrameReader(OptionUtils):
                         set, it covers all ``\\r``, ``\\r\\n`` and ``\\n``.
         :param samplingRatio: defines fraction of input JSON objects used for 
schema inferring.
                               If None is set, it uses the default value, 
``1.0``.
+        :param dropFieldIfAllNull: whether to ignore column of all null values 
or empty
+                                   array/struct during schema inference. If 
None is set, it
+                                   uses the default value, ``false``.
 
         >>> df1 = spark.read.json('python/test_support/sql/people.json')
         >>> df1.dtypes

http://git-wip-us.apache.org/repos/asf/spark/blob/e219e692/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JSONOptions.scala
----------------------------------------------------------------------
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JSONOptions.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JSONOptions.scala
index 2ff12ac..c081772 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JSONOptions.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JSONOptions.scala
@@ -73,6 +73,9 @@ private[sql] class JSONOptions(
   val columnNameOfCorruptRecord =
     parameters.getOrElse("columnNameOfCorruptRecord", 
defaultColumnNameOfCorruptRecord)
 
+  // Whether to ignore column of all null values or empty array/struct during 
schema inference
+  val dropFieldIfAllNull = 
parameters.get("dropFieldIfAllNull").map(_.toBoolean).getOrElse(false)
+
   val timeZone: TimeZone = DateTimeUtils.getTimeZone(
     parameters.getOrElse(DateTimeUtils.TIMEZONE_OPTION, defaultTimeZoneId))
 

http://git-wip-us.apache.org/repos/asf/spark/blob/e219e692/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala
index de6be5f..ec9352a 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala
@@ -381,6 +381,8 @@ class DataFrameReader private[sql](sparkSession: 
SparkSession) extends Logging {
    * that should be used for parsing.</li>
    * <li>`samplingRatio` (default is 1.0): defines fraction of input JSON 
objects used
    * for schema inferring.</li>
+   * <li>`dropFieldIfAllNull` (default `false`): whether to ignore column of 
all null values or
+   * empty array/struct during schema inference.</li>
    * </ul>
    *
    * @since 2.0.0

http://git-wip-us.apache.org/repos/asf/spark/blob/e219e692/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JsonInferSchema.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JsonInferSchema.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JsonInferSchema.scala
index e7eed95..f6edc7b 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JsonInferSchema.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JsonInferSchema.scala
@@ -75,7 +75,7 @@ private[sql] object JsonInferSchema {
     // active SparkSession and `SQLConf.get` may point to the wrong configs.
     val rootType = 
mergedTypesFromPartitions.toLocalIterator.fold(StructType(Nil))(typeMerger)
 
-    canonicalizeType(rootType) match {
+    canonicalizeType(rootType, configOptions) match {
       case Some(st: StructType) => st
       case _ =>
         // canonicalizeType erases all empty structs, including the only one 
we want to keep
@@ -181,33 +181,33 @@ private[sql] object JsonInferSchema {
   }
 
   /**
-   * Convert NullType to StringType and remove StructTypes with no fields
+   * Recursively canonicalizes inferred types, e.g., removes StructTypes with 
no fields,
+   * drops NullTypes or converts them to StringType based on provided options.
    */
-  private def canonicalizeType(tpe: DataType): Option[DataType] = tpe match {
-    case at @ ArrayType(elementType, _) =>
-      for {
-        canonicalType <- canonicalizeType(elementType)
-      } yield {
-        at.copy(canonicalType)
-      }
+  private def canonicalizeType(tpe: DataType, options: JSONOptions): 
Option[DataType] = tpe match {
+    case at: ArrayType =>
+      canonicalizeType(at.elementType, options)
+        .map(t => at.copy(elementType = t))
 
     case StructType(fields) =>
-      val canonicalFields: Array[StructField] = for {
-        field <- fields
-        if field.name.length > 0
-        canonicalType <- canonicalizeType(field.dataType)
-      } yield {
-        field.copy(dataType = canonicalType)
+      val canonicalFields = fields.filter(_.name.nonEmpty).flatMap { f =>
+        canonicalizeType(f.dataType, options)
+          .map(t => f.copy(dataType = t))
       }
-
-      if (canonicalFields.length > 0) {
-        Some(StructType(canonicalFields))
+      // SPARK-8093: empty structs should be deleted
+      if (canonicalFields.isEmpty) {
+        None
       } else {
-        // per SPARK-8093: empty structs should be deleted
+        Some(StructType(canonicalFields))
+      }
+
+    case NullType =>
+      if (options.dropFieldIfAllNull) {
         None
+      } else {
+        Some(StringType)
       }
 
-    case NullType => Some(StringType)
     case other => Some(other)
   }
 

http://git-wip-us.apache.org/repos/asf/spark/blob/e219e692/sql/core/src/main/scala/org/apache/spark/sql/streaming/DataStreamReader.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/streaming/DataStreamReader.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/streaming/DataStreamReader.scala
index ae93965..ef8dc3a 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/streaming/DataStreamReader.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/streaming/DataStreamReader.scala
@@ -270,6 +270,8 @@ final class DataStreamReader private[sql](sparkSession: 
SparkSession) extends Lo
    * per file</li>
    * <li>`lineSep` (default covers all `\r`, `\r\n` and `\n`): defines the 
line separator
    * that should be used for parsing.</li>
+   * <li>`dropFieldIfAllNull` (default `false`): whether to ignore column of 
all null values or
+   * empty array/struct during schema inference.</li>
    * </ul>
    *
    * @since 2.0.0

http://git-wip-us.apache.org/repos/asf/spark/blob/e219e692/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala
index 4b3921c..a8a4a52 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala
@@ -2427,4 +2427,53 @@ class JsonSuite extends QueryTest with SharedSQLContext 
with TestJsonData {
       spark.read.option("mode", "PERMISSIVE").option("encoding", 
"UTF-8").json(Seq(badJson).toDS()),
       Row(badJson))
   }
+
+  test("SPARK-23772 ignore column of all null values or empty array during 
schema inference") {
+     withTempPath { tempDir =>
+      val path = tempDir.getAbsolutePath
+
+      // primitive types
+      Seq(
+        """{"a":null, "b":1, "c":3.0}""",
+        """{"a":null, "b":null, "c":"string"}""",
+        """{"a":null, "b":null, "c":null}""")
+        .toDS().write.text(path)
+      var df = spark.read.format("json")
+        .option("dropFieldIfAllNull", true)
+        .load(path)
+      var expectedSchema = new StructType()
+        .add("b", LongType).add("c", StringType)
+      assert(df.schema === expectedSchema)
+      checkAnswer(df, Row(1, "3.0") :: Row(null, "string") :: Row(null, null) 
:: Nil)
+
+      // arrays
+      Seq(
+        """{"a":[2, 1], "b":[null, null], "c":null, "d":[[], [null]], "e":[[], 
null, [[]]]}""",
+        """{"a":[null], "b":[null], "c":[], "d":[null, []], "e":null}""",
+        """{"a":null, "b":null, "c":[], "d":null, "e":[null, [], null]}""")
+        .toDS().write.mode("overwrite").text(path)
+      df = spark.read.format("json")
+        .option("dropFieldIfAllNull", true)
+        .load(path)
+      expectedSchema = new StructType()
+        .add("a", ArrayType(LongType))
+      assert(df.schema === expectedSchema)
+      checkAnswer(df, Row(Array(2, 1)) :: Row(Array(null)) ::  Row(null) :: 
Nil)
+
+      // structs
+      Seq(
+        """{"a":{"a1": 1, "a2":"string"}, "b":{}}""",
+        """{"a":{"a1": 2, "a2":null}, "b":{"b1":[null]}}""",
+        """{"a":null, "b":null}""")
+        .toDS().write.mode("overwrite").text(path)
+      df = spark.read.format("json")
+        .option("dropFieldIfAllNull", true)
+        .load(path)
+      expectedSchema = new StructType()
+        .add("a", StructType(StructField("a1", LongType) :: StructField("a2", 
StringType)
+          :: Nil))
+      assert(df.schema === expectedSchema)
+      checkAnswer(df, Row(Row(1, "string")) :: Row(Row(2, null)) :: Row(null) 
:: Nil)
+    }
+  }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to