Repository: spark
Updated Branches:
  refs/heads/master f0496ee10 -> 4352a2fda


[SPARK-2376][SQL] Selecting list values inside nested JSON objects raises 
java.lang.IllegalArgumentException

JIRA: https://issues.apache.org/jira/browse/SPARK-2376

Author: Yin Huai <h...@cse.ohio-state.edu>

Closes #1320 from yhuai/SPARK-2376 and squashes the following commits:

0107417 [Yin Huai] Merge remote-tracking branch 'upstream/master' into 
SPARK-2376
480803d [Yin Huai] Correctly handling JSON arrays in PySpark.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/4352a2fd
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/4352a2fd
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/4352a2fd

Branch: refs/heads/master
Commit: 4352a2fdaa64efee7158eabef65703460ff284ec
Parents: f0496ee
Author: Yin Huai <h...@cse.ohio-state.edu>
Authored: Mon Jul 7 18:37:38 2014 -0700
Committer: Michael Armbrust <mich...@databricks.com>
Committed: Mon Jul 7 18:37:38 2014 -0700

----------------------------------------------------------------------
 python/pyspark/sql.py                           | 24 ++++++-----
 .../scala/org/apache/spark/sql/SchemaRDD.scala  | 45 +++++++++++++-------
 2 files changed, 44 insertions(+), 25 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/4352a2fd/python/pyspark/sql.py
----------------------------------------------------------------------
diff --git a/python/pyspark/sql.py b/python/pyspark/sql.py
index 5051c82..ffe1775 100644
--- a/python/pyspark/sql.py
+++ b/python/pyspark/sql.py
@@ -152,10 +152,12 @@ class SQLContext:
         >>> ofn.close()
         >>> srdd = sqlCtx.jsonFile(jsonFile)
         >>> sqlCtx.registerRDDAsTable(srdd, "table1")
-        >>> srdd2 = sqlCtx.sql("SELECT field1 AS f1, field2 as f2, field3 as 
f3 from table1")
-        >>> srdd2.collect() == [{"f1": 1, "f2": "row1", "f3":{"field4":11}},
-        ...                     {"f1": 2, "f2": "row2", "f3":{"field4":22}},
-        ...                     {"f1": 3, "f2": "row3", "f3":{"field4":33}}]
+        >>> srdd2 = sqlCtx.sql(
+        ...   "SELECT field1 AS f1, field2 as f2, field3 as f3, field6 as f4 
from table1")
+        >>> srdd2.collect() == [
+        ... {"f1":1, "f2":"row1", "f3":{"field4":11, "field5": None}, 
"f4":None},
+        ... {"f1":2, "f2":None, "f3":{"field4":22,  "field5": [10, 11]}, 
"f4":[{"field7": "row2"}]},
+        ... {"f1":None, "f2":"row3", "f3":{"field4":33, "field5": []}, 
"f4":None}]
         True
         """
         jschema_rdd = self._ssql_ctx.jsonFile(path)
@@ -167,10 +169,12 @@ class SQLContext:
 
         >>> srdd = sqlCtx.jsonRDD(json)
         >>> sqlCtx.registerRDDAsTable(srdd, "table1")
-        >>> srdd2 = sqlCtx.sql("SELECT field1 AS f1, field2 as f2, field3 as 
f3 from table1")
-        >>> srdd2.collect() == [{"f1": 1, "f2": "row1", "f3":{"field4":11}},
-        ...                     {"f1": 2, "f2": "row2", "f3":{"field4":22}},
-        ...                     {"f1": 3, "f2": "row3", "f3":{"field4":33}}]
+        >>> srdd2 = sqlCtx.sql(
+        ...   "SELECT field1 AS f1, field2 as f2, field3 as f3, field6 as f4 
from table1")
+        >>> srdd2.collect() == [
+        ... {"f1":1, "f2":"row1", "f3":{"field4":11, "field5": None}, 
"f4":None},
+        ... {"f1":2, "f2":None, "f3":{"field4":22,  "field5": [10, 11]}, 
"f4":[{"field7": "row2"}]},
+        ... {"f1":None, "f2":"row3", "f3":{"field4":33, "field5": []}, 
"f4":None}]
         True
         """
         def func(split, iterator):
@@ -492,8 +496,8 @@ def _test():
     globs['rdd'] = sc.parallelize([{"field1" : 1, "field2" : "row1"},
         {"field1" : 2, "field2": "row2"}, {"field1" : 3, "field2": "row3"}])
     jsonStrings = ['{"field1": 1, "field2": "row1", "field3":{"field4":11}}',
-       '{"field1" : 2, "field2": "row2", "field3":{"field4":22}}',
-       '{"field1" : 3, "field2": "row3", "field3":{"field4":33}}']
+       '{"field1" : 2, "field3":{"field4":22, "field5": [10, 11]}, 
"field6":[{"field7": "row2"}]}',
+       '{"field1" : null, "field2": "row3", "field3":{"field4":33, "field5": 
[]}}']
     globs['jsonStrings'] = jsonStrings
     globs['json'] = sc.parallelize(jsonStrings)
     globs['nestedRdd1'] = sc.parallelize([

http://git-wip-us.apache.org/repos/asf/spark/blob/4352a2fd/sql/core/src/main/scala/org/apache/spark/sql/SchemaRDD.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/SchemaRDD.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/SchemaRDD.scala
index 8f9f54f..8bcfc7c 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/SchemaRDD.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/SchemaRDD.scala
@@ -17,6 +17,11 @@
 
 package org.apache.spark.sql
 
+import java.util.{Map => JMap, List => JList, Set => JSet}
+
+import scala.collection.JavaConversions._
+import scala.collection.JavaConverters._
+
 import net.razorvine.pickle.Pickler
 
 import org.apache.spark.{Dependency, OneToOneDependency, Partition, 
Partitioner, TaskContext}
@@ -27,10 +32,9 @@ import org.apache.spark.sql.catalyst.analysis._
 import org.apache.spark.sql.catalyst.expressions._
 import org.apache.spark.sql.catalyst.plans.logical._
 import org.apache.spark.sql.catalyst.plans.{Inner, JoinType}
-import org.apache.spark.sql.catalyst.types.{DataType, StructType, BooleanType}
+import org.apache.spark.sql.catalyst.types.{ArrayType, BooleanType, StructType}
 import org.apache.spark.sql.execution.{ExistingRdd, SparkLogicalPlan}
 import org.apache.spark.api.java.JavaRDD
-import java.util.{Map => JMap}
 
 /**
  * :: AlphaComponent ::
@@ -359,6 +363,28 @@ class SchemaRDD(
         case (obj, (name, dataType)) =>
           dataType match {
             case struct: StructType => map.put(name, 
rowToMap(obj.asInstanceOf[Row], struct))
+            case array @ ArrayType(struct: StructType) =>
+              val arrayValues = obj match {
+                case seq: Seq[Any] =>
+                  seq.map(element => rowToMap(element.asInstanceOf[Row], 
struct)).asJava
+                case list: JList[Any] =>
+                  list.map(element => rowToMap(element.asInstanceOf[Row], 
struct))
+                case set: JSet[Any] =>
+                  set.map(element => rowToMap(element.asInstanceOf[Row], 
struct))
+                case array if array != null && array.getClass.isArray =>
+                  array.asInstanceOf[Array[Any]].map {
+                    element => rowToMap(element.asInstanceOf[Row], struct)
+                  }
+                case other => other
+              }
+              map.put(name, arrayValues)
+            case array: ArrayType => {
+              val arrayValues = obj match {
+                case seq: Seq[Any] => seq.asJava
+                case other => other
+              }
+              map.put(name, arrayValues)
+            }
             case other => map.put(name, obj)
           }
       }
@@ -366,22 +392,11 @@ class SchemaRDD(
       map
     }
 
-    // TODO: Actually, the schema of a row should be represented by a 
StructType instead of
-    // a Seq[Attribute]. Once we have finished that change, we can just use 
rowToMap to
-    // construct the Map for python.
-    val fields: Seq[(String, DataType)] = 
this.queryExecution.analyzed.output.map(
-      field => (field.name, field.dataType))
+    val rowSchema = 
StructType.fromAttributes(this.queryExecution.analyzed.output)
     this.mapPartitions { iter =>
       val pickle = new Pickler
       iter.map { row =>
-        val map: JMap[String, Any] = new java.util.HashMap
-        row.zip(fields).foreach { case (obj, (name, dataType)) =>
-          dataType match {
-            case struct: StructType => map.put(name, 
rowToMap(obj.asInstanceOf[Row], struct))
-            case other => map.put(name, obj)
-          }
-        }
-        map
+        rowToMap(row, rowSchema)
       }.grouped(10).map(batched => pickle.dumps(batched.toArray))
     }
   }

Reply via email to