Repository: spark
Updated Branches:
  refs/heads/branch-1.4 1a6b51078 -> 0131142d9


[SPARK-8093] [SQL] Remove empty structs inferred from JSON documents

Author: Nathan Howell <nhow...@godaddy.com>

Closes #6799 from NathanHowell/spark-8093 and squashes the following commits:

76ac3e8 [Nathan Howell] [SPARK-8093] [SQL] Remove empty structs inferred from 
JSON documents

(cherry picked from commit 9814b971f07dff8a99f1b8ad2adf70614f1c690b)
Signed-off-by: Yin Huai <yh...@databricks.com>

Conflicts:
        sql/core/src/test/scala/org/apache/spark/sql/json/TestJsonData.scala


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/0131142d
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/0131142d
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/0131142d

Branch: refs/heads/branch-1.4
Commit: 0131142d98b191f6cc112d383aa10582a3ac35bf
Parents: 1a6b510
Author: Nathan Howell <nhow...@godaddy.com>
Authored: Fri Jun 19 16:19:28 2015 -0700
Committer: Yin Huai <yh...@databricks.com>
Committed: Fri Jun 19 16:23:11 2015 -0700

----------------------------------------------------------------------
 .../org/apache/spark/sql/json/InferSchema.scala | 52 +++++++++++++-------
 .../org/apache/spark/sql/json/JsonSuite.scala   |  4 ++
 .../apache/spark/sql/json/TestJsonData.scala    |  9 ++++
 3 files changed, 48 insertions(+), 17 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/0131142d/sql/core/src/main/scala/org/apache/spark/sql/json/InferSchema.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/json/InferSchema.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/json/InferSchema.scala
index 06aa19e..63764b8 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/json/InferSchema.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/json/InferSchema.scala
@@ -43,7 +43,7 @@ private[sql] object InferSchema {
     }
 
     // perform schema inference on each row and merge afterwards
-    schemaData.mapPartitions { iter =>
+    val rootType = schemaData.mapPartitions { iter =>
       val factory = new JsonFactory()
       iter.map { row =>
         try {
@@ -55,8 +55,13 @@ private[sql] object InferSchema {
             StructType(Seq(StructField(columnNameOfCorruptRecords, 
StringType)))
         }
       }
-    }.treeAggregate[DataType](StructType(Seq()))(compatibleRootType, 
compatibleRootType) match {
-      case st: StructType => nullTypeToStringType(st)
+    }.treeAggregate[DataType](StructType(Seq()))(compatibleRootType, 
compatibleRootType)
+
+    canonicalizeType(rootType) match {
+      case Some(st: StructType) => st
+      case _ =>
+        // canonicalizeType erases all empty structs, including the only one 
we want to keep
+        StructType(Seq())
     }
   }
 
@@ -116,22 +121,35 @@ private[sql] object InferSchema {
     }
   }
 
-  private def nullTypeToStringType(struct: StructType): StructType = {
-    val fields = struct.fields.map {
-      case StructField(fieldName, dataType, nullable, _) =>
-        val newType = dataType match {
-          case NullType => StringType
-          case ArrayType(NullType, containsNull) => ArrayType(StringType, 
containsNull)
-          case ArrayType(struct: StructType, containsNull) =>
-            ArrayType(nullTypeToStringType(struct), containsNull)
-          case struct: StructType => nullTypeToStringType(struct)
-          case other: DataType => other
-        }
+  /**
+   * Convert NullType to StringType and remove StructTypes with no fields
+   */
+  private def canonicalizeType: DataType => Option[DataType] = {
+    case at@ArrayType(elementType, _) =>
+      for {
+        canonicalType <- canonicalizeType(elementType)
+      } yield {
+        at.copy(canonicalType)
+      }
 
-        StructField(fieldName, newType, nullable)
-    }
+    case StructType(fields) =>
+      val canonicalFields = for {
+        field <- fields
+        if field.name.nonEmpty
+        canonicalType <- canonicalizeType(field.dataType)
+      } yield {
+        field.copy(dataType = canonicalType)
+      }
+
+      if (canonicalFields.nonEmpty) {
+        Some(StructType(canonicalFields))
+      } else {
+        // per SPARK-8093: empty structs should be deleted
+        None
+      }
 
-    StructType(fields)
+    case NullType => Some(StringType)
+    case other => Some(other)
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/spark/blob/0131142d/sql/core/src/test/scala/org/apache/spark/sql/json/JsonSuite.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/json/JsonSuite.scala 
b/sql/core/src/test/scala/org/apache/spark/sql/json/JsonSuite.scala
index f8d62f9..96ecf8f 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/json/JsonSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/json/JsonSuite.scala
@@ -1100,4 +1100,8 @@ class JsonSuite extends QueryTest {
     }
   }
 
+  test("SPARK-8093 Erase empty structs") {
+    val emptySchema = InferSchema(emptyRecords, 1.0, "")
+    assert(StructType(Seq()) === emptySchema)
+  }
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/0131142d/sql/core/src/test/scala/org/apache/spark/sql/json/TestJsonData.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/json/TestJsonData.scala 
b/sql/core/src/test/scala/org/apache/spark/sql/json/TestJsonData.scala
index 47a97a4..9322f17 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/json/TestJsonData.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/json/TestJsonData.scala
@@ -186,6 +186,15 @@ object TestJsonData {
       """{"b":"str_b_4", "a":"str_a_4", "c":"str_c_4"}""" ::
       """]""" :: Nil)
 
+  val emptyRecords: RDD[String] =
+    TestSQLContext.sparkContext.parallelize(
+      """{""" ::
+        """""" ::
+        """{"a": {}}""" ::
+        """{"a": {"b": {}}}""" ::
+        """{"b": [{"c": {}}]}""" ::
+        """]""" :: Nil)
+
   val empty =
     TestSQLContext.sparkContext.parallelize(Seq[String]())
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to