Repository: spark Updated Branches: refs/heads/master f0496ee10 -> 4352a2fda
[SPARK-2376][SQL] Selecting list values inside nested JSON objects raises java.lang.IllegalArgumentException JIRA: https://issues.apache.org/jira/browse/SPARK-2376 Author: Yin Huai <h...@cse.ohio-state.edu> Closes #1320 from yhuai/SPARK-2376 and squashes the following commits: 0107417 [Yin Huai] Merge remote-tracking branch 'upstream/master' into SPARK-2376 480803d [Yin Huai] Correctly handling JSON arrays in PySpark. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/4352a2fd Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/4352a2fd Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/4352a2fd Branch: refs/heads/master Commit: 4352a2fdaa64efee7158eabef65703460ff284ec Parents: f0496ee Author: Yin Huai <h...@cse.ohio-state.edu> Authored: Mon Jul 7 18:37:38 2014 -0700 Committer: Michael Armbrust <mich...@databricks.com> Committed: Mon Jul 7 18:37:38 2014 -0700 ---------------------------------------------------------------------- python/pyspark/sql.py | 24 ++++++----- .../scala/org/apache/spark/sql/SchemaRDD.scala | 45 +++++++++++++------- 2 files changed, 44 insertions(+), 25 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/spark/blob/4352a2fd/python/pyspark/sql.py ---------------------------------------------------------------------- diff --git a/python/pyspark/sql.py b/python/pyspark/sql.py index 5051c82..ffe1775 100644 --- a/python/pyspark/sql.py +++ b/python/pyspark/sql.py @@ -152,10 +152,12 @@ class SQLContext: >>> ofn.close() >>> srdd = sqlCtx.jsonFile(jsonFile) >>> sqlCtx.registerRDDAsTable(srdd, "table1") - >>> srdd2 = sqlCtx.sql("SELECT field1 AS f1, field2 as f2, field3 as f3 from table1") - >>> srdd2.collect() == [{"f1": 1, "f2": "row1", "f3":{"field4":11}}, - ... {"f1": 2, "f2": "row2", "f3":{"field4":22}}, - ... {"f1": 3, "f2": "row3", "f3":{"field4":33}}] + >>> srdd2 = sqlCtx.sql( + ... "SELECT field1 AS f1, field2 as f2, field3 as f3, field6 as f4 from table1") + >>> srdd2.collect() == [ + ... {"f1":1, "f2":"row1", "f3":{"field4":11, "field5": None}, "f4":None}, + ... {"f1":2, "f2":None, "f3":{"field4":22, "field5": [10, 11]}, "f4":[{"field7": "row2"}]}, + ... {"f1":None, "f2":"row3", "f3":{"field4":33, "field5": []}, "f4":None}] True """ jschema_rdd = self._ssql_ctx.jsonFile(path) @@ -167,10 +169,12 @@ class SQLContext: >>> srdd = sqlCtx.jsonRDD(json) >>> sqlCtx.registerRDDAsTable(srdd, "table1") - >>> srdd2 = sqlCtx.sql("SELECT field1 AS f1, field2 as f2, field3 as f3 from table1") - >>> srdd2.collect() == [{"f1": 1, "f2": "row1", "f3":{"field4":11}}, - ... {"f1": 2, "f2": "row2", "f3":{"field4":22}}, - ... {"f1": 3, "f2": "row3", "f3":{"field4":33}}] + >>> srdd2 = sqlCtx.sql( + ... "SELECT field1 AS f1, field2 as f2, field3 as f3, field6 as f4 from table1") + >>> srdd2.collect() == [ + ... {"f1":1, "f2":"row1", "f3":{"field4":11, "field5": None}, "f4":None}, + ... {"f1":2, "f2":None, "f3":{"field4":22, "field5": [10, 11]}, "f4":[{"field7": "row2"}]}, + ... {"f1":None, "f2":"row3", "f3":{"field4":33, "field5": []}, "f4":None}] True """ def func(split, iterator): @@ -492,8 +496,8 @@ def _test(): globs['rdd'] = sc.parallelize([{"field1" : 1, "field2" : "row1"}, {"field1" : 2, "field2": "row2"}, {"field1" : 3, "field2": "row3"}]) jsonStrings = ['{"field1": 1, "field2": "row1", "field3":{"field4":11}}', - '{"field1" : 2, "field2": "row2", "field3":{"field4":22}}', - '{"field1" : 3, "field2": "row3", "field3":{"field4":33}}'] + '{"field1" : 2, "field3":{"field4":22, "field5": [10, 11]}, "field6":[{"field7": "row2"}]}', + '{"field1" : null, "field2": "row3", "field3":{"field4":33, "field5": []}}'] globs['jsonStrings'] = jsonStrings globs['json'] = sc.parallelize(jsonStrings) globs['nestedRdd1'] = sc.parallelize([ http://git-wip-us.apache.org/repos/asf/spark/blob/4352a2fd/sql/core/src/main/scala/org/apache/spark/sql/SchemaRDD.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/SchemaRDD.scala b/sql/core/src/main/scala/org/apache/spark/sql/SchemaRDD.scala index 8f9f54f..8bcfc7c 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/SchemaRDD.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/SchemaRDD.scala @@ -17,6 +17,11 @@ package org.apache.spark.sql +import java.util.{Map => JMap, List => JList, Set => JSet} + +import scala.collection.JavaConversions._ +import scala.collection.JavaConverters._ + import net.razorvine.pickle.Pickler import org.apache.spark.{Dependency, OneToOneDependency, Partition, Partitioner, TaskContext} @@ -27,10 +32,9 @@ import org.apache.spark.sql.catalyst.analysis._ import org.apache.spark.sql.catalyst.expressions._ import org.apache.spark.sql.catalyst.plans.logical._ import org.apache.spark.sql.catalyst.plans.{Inner, JoinType} -import org.apache.spark.sql.catalyst.types.{DataType, StructType, BooleanType} +import org.apache.spark.sql.catalyst.types.{ArrayType, BooleanType, StructType} import org.apache.spark.sql.execution.{ExistingRdd, SparkLogicalPlan} import org.apache.spark.api.java.JavaRDD -import java.util.{Map => JMap} /** * :: AlphaComponent :: @@ -359,6 +363,28 @@ class SchemaRDD( case (obj, (name, dataType)) => dataType match { case struct: StructType => map.put(name, rowToMap(obj.asInstanceOf[Row], struct)) + case array @ ArrayType(struct: StructType) => + val arrayValues = obj match { + case seq: Seq[Any] => + seq.map(element => rowToMap(element.asInstanceOf[Row], struct)).asJava + case list: JList[Any] => + list.map(element => rowToMap(element.asInstanceOf[Row], struct)) + case set: JSet[Any] => + set.map(element => rowToMap(element.asInstanceOf[Row], struct)) + case array if array != null && array.getClass.isArray => + array.asInstanceOf[Array[Any]].map { + element => rowToMap(element.asInstanceOf[Row], struct) + } + case other => other + } + map.put(name, arrayValues) + case array: ArrayType => { + val arrayValues = obj match { + case seq: Seq[Any] => seq.asJava + case other => other + } + map.put(name, arrayValues) + } case other => map.put(name, obj) } } @@ -366,22 +392,11 @@ class SchemaRDD( map } - // TODO: Actually, the schema of a row should be represented by a StructType instead of - // a Seq[Attribute]. Once we have finished that change, we can just use rowToMap to - // construct the Map for python. - val fields: Seq[(String, DataType)] = this.queryExecution.analyzed.output.map( - field => (field.name, field.dataType)) + val rowSchema = StructType.fromAttributes(this.queryExecution.analyzed.output) this.mapPartitions { iter => val pickle = new Pickler iter.map { row => - val map: JMap[String, Any] = new java.util.HashMap - row.zip(fields).foreach { case (obj, (name, dataType)) => - dataType match { - case struct: StructType => map.put(name, rowToMap(obj.asInstanceOf[Row], struct)) - case other => map.put(name, obj) - } - } - map + rowToMap(row, rowSchema) }.grouped(10).map(batched => pickle.dumps(batched.toArray)) } }