Repository: spark Updated Branches: refs/heads/branch-1.1 31090e43c -> 27a8d4ce3
[SPARK-2875] [PySpark] [SQL] handle null in schemaRDD() Handle null in schemaRDD during converting them into Python. Author: Davies Liu <davies....@gmail.com> Closes #1802 from davies/json and squashes the following commits: 88e6b1f [Davies Liu] handle null in schemaRDD() (cherry picked from commit 48789117c2dd6d38e0bd8d21cdbcb989913205a6) Signed-off-by: Michael Armbrust <mich...@databricks.com> Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/27a8d4ce Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/27a8d4ce Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/27a8d4ce Branch: refs/heads/branch-1.1 Commit: 27a8d4ce39aa620a5926b33371fcf03bbcb18698 Parents: 31090e4 Author: Davies Liu <davies....@gmail.com> Authored: Wed Aug 6 11:08:12 2014 -0700 Committer: Michael Armbrust <mich...@databricks.com> Committed: Wed Aug 6 11:08:23 2014 -0700 ---------------------------------------------------------------------- python/pyspark/sql.py | 7 +++++ .../scala/org/apache/spark/sql/SchemaRDD.scala | 27 ++++++++++++-------- 2 files changed, 23 insertions(+), 11 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/spark/blob/27a8d4ce/python/pyspark/sql.py ---------------------------------------------------------------------- diff --git a/python/pyspark/sql.py b/python/pyspark/sql.py index f109370..adc56e7 100644 --- a/python/pyspark/sql.py +++ b/python/pyspark/sql.py @@ -1231,6 +1231,13 @@ class SQLContext: ... "field3.field5[0] as f3 from table3") >>> srdd6.collect() [Row(f1=u'row1', f2=None,...Row(f1=u'row3', f2=[], f3=None)] + + >>> sqlCtx.jsonRDD(sc.parallelize(['{}', + ... '{"key0": {"key1": "value1"}}'])).collect() + [Row(key0=None), Row(key0=Row(key1=u'value1'))] + >>> sqlCtx.jsonRDD(sc.parallelize(['{"key0": null}', + ... '{"key0": {"key1": "value1"}}'])).collect() + [Row(key0=None), Row(key0=Row(key1=u'value1'))] """ def func(iterator): http://git-wip-us.apache.org/repos/asf/spark/blob/27a8d4ce/sql/core/src/main/scala/org/apache/spark/sql/SchemaRDD.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/SchemaRDD.scala b/sql/core/src/main/scala/org/apache/spark/sql/SchemaRDD.scala index 57df793..33b2ed1 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/SchemaRDD.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/SchemaRDD.scala @@ -382,21 +382,26 @@ class SchemaRDD( private[sql] def javaToPython: JavaRDD[Array[Byte]] = { import scala.collection.Map - def toJava(obj: Any, dataType: DataType): Any = dataType match { - case struct: StructType => rowToArray(obj.asInstanceOf[Row], struct) - case array: ArrayType => obj match { - case seq: Seq[Any] => seq.map(x => toJava(x, array.elementType)).asJava - case list: JList[_] => list.map(x => toJava(x, array.elementType)).asJava - case arr if arr != null && arr.getClass.isArray => - arr.asInstanceOf[Array[Any]].map(x => toJava(x, array.elementType)) - case other => other - } - case mt: MapType => obj.asInstanceOf[Map[_, _]].map { + def toJava(obj: Any, dataType: DataType): Any = (obj, dataType) match { + case (null, _) => null + + case (obj: Row, struct: StructType) => rowToArray(obj, struct) + + case (seq: Seq[Any], array: ArrayType) => + seq.map(x => toJava(x, array.elementType)).asJava + case (list: JList[_], array: ArrayType) => + list.map(x => toJava(x, array.elementType)).asJava + case (arr, array: ArrayType) if arr.getClass.isArray => + arr.asInstanceOf[Array[Any]].map(x => toJava(x, array.elementType)) + + case (obj: Map[_, _], mt: MapType) => obj.map { case (k, v) => (k, toJava(v, mt.valueType)) // key should be primitive type }.asJava + // Pyrolite can handle Timestamp - case other => obj + case (other, _) => other } + def rowToArray(row: Row, structType: StructType): Array[Any] = { val fields = structType.fields.map(field => field.dataType) row.zip(fields).map { --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org