[jira] [Updated] (SPARK-20254) SPARK-19716 generates inefficient Java code from a primitive array of Dataset

2017-04-07 Thread Kazuaki Ishizaki (JIRA)

 [ 
https://issues.apache.org/jira/browse/SPARK-20254?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Kazuaki Ishizaki updated SPARK-20254:
-
Description: 
Since {{unresolvedmapobjects}} is newly introduced by SPARK-19716, the current 
implementation generates {{mapobjects()}} at {{DeserializeToObject}} in 
{{Analyzed Logical Plan}}. This {{mapObject()}} introduces Java code to store 
an array into {{GenericArrayData}}.
cc: [~cloud_fan]
 
{code}
val ds = sparkContext.parallelize(Seq(Array(1.1, 2.2)), 1).toDS.cache
ds.count
val ds2 = ds.map(e => e)
ds2.explain(true)
ds2.show
{code}

Plans before SPARK-19716
{code}
== Parsed Logical Plan ==
'SerializeFromObject [staticinvoke(class 
org.apache.spark.sql.catalyst.expressions.UnsafeArrayData, 
ArrayType(DoubleType,false), fromPrimitiveArray, input[0, [D, true], true) AS 
value#25]
+- 'MapElements , class [D, 
[StructField(value,ArrayType(DoubleType,false),true)], obj#24: [D
   +- 'DeserializeToObject unresolveddeserializer(upcast(getcolumnbyordinal(0, 
ArrayType(DoubleType,false)), ArrayType(DoubleType,false), - root class: 
"scala.Array").toDoubleArray), obj#23: [D
  +- SerializeFromObject [staticinvoke(class 
org.apache.spark.sql.catalyst.expressions.UnsafeArrayData, 
ArrayType(DoubleType,false), fromPrimitiveArray, input[0, [D, true], true) AS 
value#2]
 +- ExternalRDD [obj#1]

== Analyzed Logical Plan ==
value: array
SerializeFromObject [staticinvoke(class 
org.apache.spark.sql.catalyst.expressions.UnsafeArrayData, 
ArrayType(DoubleType,false), fromPrimitiveArray, input[0, [D, true], true) AS 
value#25]
+- MapElements , class [D, 
[StructField(value,ArrayType(DoubleType,false),true)], obj#24: [D
   +- DeserializeToObject cast(value#2 as array).toDoubleArray, obj#23: 
[D
  +- SerializeFromObject [staticinvoke(class 
org.apache.spark.sql.catalyst.expressions.UnsafeArrayData, 
ArrayType(DoubleType,false), fromPrimitiveArray, input[0, [D, true], true) AS 
value#2]
 +- ExternalRDD [obj#1]

== Optimized Logical Plan ==
SerializeFromObject [staticinvoke(class 
org.apache.spark.sql.catalyst.expressions.UnsafeArrayData, 
ArrayType(DoubleType,false), fromPrimitiveArray, input[0, [D, true], true) AS 
value#25]
+- MapElements , class [D, 
[StructField(value,ArrayType(DoubleType,false),true)], obj#24: [D
   +- DeserializeToObject value#2.toDoubleArray, obj#23: [D
  +- InMemoryRelation [value#2], true, 1, StorageLevel(disk, memory, 
deserialized, 1 replicas)
+- *SerializeFromObject [staticinvoke(class 
org.apache.spark.sql.catalyst.expressions.UnsafeArrayData, 
ArrayType(DoubleType,false), fromPrimitiveArray, input[0, [D, true], true) AS 
value#2]
   +- Scan ExternalRDDScan[obj#1]

== Physical Plan ==
*SerializeFromObject [staticinvoke(class 
org.apache.spark.sql.catalyst.expressions.UnsafeArrayData, 
ArrayType(DoubleType,false), fromPrimitiveArray, input[0, [D, true], true) AS 
value#25]
+- *MapElements , obj#24: [D
   +- *DeserializeToObject value#2.toDoubleArray, obj#23: [D
  +- *InMemoryTableScan [value#2]
+- InMemoryRelation [value#2], true, 1, StorageLevel(disk, 
memory, deserialized, 1 replicas)
  +- *SerializeFromObject [staticinvoke(class 
org.apache.spark.sql.catalyst.expressions.UnsafeArrayData, 
ArrayType(DoubleType,false), fromPrimitiveArray, input[0, [D, true], true) AS 
value#2]
 +- Scan ExternalRDDScan[obj#1]

{code}

Plans after SPARK-19716
{code}
== Parsed Logical Plan ==
'SerializeFromObject [staticinvoke(class 
org.apache.spark.sql.catalyst.expressions.UnsafeArrayData, 
ArrayType(DoubleType,false), fromPrimitiveArray, input[0, [D, true], true) AS 
value#25]
+- 'MapElements , class [D, 
[StructField(value,ArrayType(DoubleType,false),true)], obj#24: [D
   +- 'DeserializeToObject 
unresolveddeserializer(unresolvedmapobjects(, getcolumnbyordinal(0, 
ArrayType(DoubleType,false)), None).toDoubleArray), obj#23: [D
  +- SerializeFromObject [staticinvoke(class 
org.apache.spark.sql.catalyst.expressions.UnsafeArrayData, 
ArrayType(DoubleType,false), fromPrimitiveArray, input[0, [D, true], true) AS 
value#2]
 +- ExternalRDD [obj#1]

== Analyzed Logical Plan ==
value: array
SerializeFromObject [staticinvoke(class 
org.apache.spark.sql.catalyst.expressions.UnsafeArrayData, 
ArrayType(DoubleType,false), fromPrimitiveArray, input[0, [D, true], true) AS 
value#25]
+- MapElements , class [D, 
[StructField(value,ArrayType(DoubleType,false),true)], obj#24: [D
   +- DeserializeToObject mapobjects(MapObjects_loopValue4, 
MapObjects_loopIsNull4, DoubleType, 
assertnotnull(lambdavariable(MapObjects_loopValue4, MapObjects_loopIsNull4, 
DoubleType, true), - array element class: "scala.Double", - root class: 
"scala.Array"), value#2, None, MapObjects_builderValue4).toDoubleArray, obj#23: 
[D
  +- SerializeFromObject [staticinvoke(class 

[jira] [Updated] (SPARK-20254) SPARK-19716 generates inefficient Java code from a primitive array of Dataset

2017-04-07 Thread Kazuaki Ishizaki (JIRA)

 [ 
https://issues.apache.org/jira/browse/SPARK-20254?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Kazuaki Ishizaki updated SPARK-20254:
-
Summary: SPARK-19716 generates inefficient Java code from a primitive array 
of Dataset  (was: SPARK-19716 generate inefficient Java code from a primitive 
array of Dataset)

> SPARK-19716 generates inefficient Java code from a primitive array of Dataset
> -
>
> Key: SPARK-20254
> URL: https://issues.apache.org/jira/browse/SPARK-20254
> Project: Spark
>  Issue Type: Bug
>  Components: SQL
>Affects Versions: 2.2.0
>Reporter: Kazuaki Ishizaki
>
> Since {{unresolvedmapobjects}} is newly introduced by SPARK-19716, the 
> current implementation generates {{mapobjects()}} at {{DeserializeToObject}}. 
> This {{mapObject()}} introduces Java code to store an array into 
> {{GenericArrayData}}.
> cc: [~cloud_fan]
>  
> {code}
> val ds = sparkContext.parallelize(Seq(Array(1.1, 2.2)), 1).toDS.cache
> ds.count
> val ds2 = ds.map(e => e)
> ds2.explain(true)
> ds2.show
> {code}
> {code}
> == Parsed Logical Plan ==
> 'SerializeFromObject [staticinvoke(class 
> org.apache.spark.sql.catalyst.expressions.UnsafeArrayData, 
> ArrayType(DoubleType,false), fromPrimitiveArray, input[0, [D, true], true) AS 
> value#25]
> +- 'MapElements , class [D, 
> [StructField(value,ArrayType(DoubleType,false),true)], obj#24: [D
>+- 'DeserializeToObject 
> unresolveddeserializer(upcast(getcolumnbyordinal(0, 
> ArrayType(DoubleType,false)), ArrayType(DoubleType,false), - root class: 
> "scala.Array").toDoubleArray), obj#23: [D
>   +- SerializeFromObject [staticinvoke(class 
> org.apache.spark.sql.catalyst.expressions.UnsafeArrayData, 
> ArrayType(DoubleType,false), fromPrimitiveArray, input[0, [D, true], true) AS 
> value#2]
>  +- ExternalRDD [obj#1]
> == Analyzed Logical Plan ==
> value: array
> SerializeFromObject [staticinvoke(class 
> org.apache.spark.sql.catalyst.expressions.UnsafeArrayData, 
> ArrayType(DoubleType,false), fromPrimitiveArray, input[0, [D, true], true) AS 
> value#25]
> +- MapElements , class [D, 
> [StructField(value,ArrayType(DoubleType,false),true)], obj#24: [D
>+- DeserializeToObject cast(value#2 as array).toDoubleArray, 
> obj#23: [D
>   +- SerializeFromObject [staticinvoke(class 
> org.apache.spark.sql.catalyst.expressions.UnsafeArrayData, 
> ArrayType(DoubleType,false), fromPrimitiveArray, input[0, [D, true], true) AS 
> value#2]
>  +- ExternalRDD [obj#1]
> == Optimized Logical Plan ==
> SerializeFromObject [staticinvoke(class 
> org.apache.spark.sql.catalyst.expressions.UnsafeArrayData, 
> ArrayType(DoubleType,false), fromPrimitiveArray, input[0, [D, true], true) AS 
> value#25]
> +- MapElements , class [D, 
> [StructField(value,ArrayType(DoubleType,false),true)], obj#24: [D
>+- DeserializeToObject value#2.toDoubleArray, obj#23: [D
>   +- InMemoryRelation [value#2], true, 1, StorageLevel(disk, memory, 
> deserialized, 1 replicas)
> +- *SerializeFromObject [staticinvoke(class 
> org.apache.spark.sql.catalyst.expressions.UnsafeArrayData, 
> ArrayType(DoubleType,false), fromPrimitiveArray, input[0, [D, true], true) AS 
> value#2]
>+- Scan ExternalRDDScan[obj#1]
> == Physical Plan ==
> *SerializeFromObject [staticinvoke(class 
> org.apache.spark.sql.catalyst.expressions.UnsafeArrayData, 
> ArrayType(DoubleType,false), fromPrimitiveArray, input[0, [D, true], true) AS 
> value#25]
> +- *MapElements , obj#24: [D
>+- *DeserializeToObject value#2.toDoubleArray, obj#23: [D
>   +- *InMemoryTableScan [value#2]
> +- InMemoryRelation [value#2], true, 1, StorageLevel(disk, 
> memory, deserialized, 1 replicas)
>   +- *SerializeFromObject [staticinvoke(class 
> org.apache.spark.sql.catalyst.expressions.UnsafeArrayData, 
> ArrayType(DoubleType,false), fromPrimitiveArray, input[0, [D, true], true) AS 
> value#2]
>  +- Scan ExternalRDDScan[obj#1]
> {code}
> {code}
> == Parsed Logical Plan ==
> 'SerializeFromObject [staticinvoke(class 
> org.apache.spark.sql.catalyst.expressions.UnsafeArrayData, 
> ArrayType(DoubleType,false), fromPrimitiveArray, input[0, [D, true], true) AS 
> value#25]
> +- 'MapElements , class [D, 
> [StructField(value,ArrayType(DoubleType,false),true)], obj#24: [D
>+- 'DeserializeToObject 
> unresolveddeserializer(unresolvedmapobjects(, 
> getcolumnbyordinal(0, ArrayType(DoubleType,false)), None).toDoubleArray), 
> obj#23: [D
>   +- SerializeFromObject [staticinvoke(class 
> org.apache.spark.sql.catalyst.expressions.UnsafeArrayData, 
> ArrayType(DoubleType,false), fromPrimitiveArray, input[0, [D, true], true) AS 
> value#2]
>  +- ExternalRDD [obj#1]
> == Analyzed Logical Plan ==
> value: array
> SerializeFromObject