[ 
https://issues.apache.org/jira/browse/SPARK-20254?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Kazuaki Ishizaki updated SPARK-20254:
-------------------------------------
    Summary: SPARK-19716 generates inefficient Java code from a primitive array 
of Dataset  (was: SPARK-19716 generate inefficient Java code from a primitive 
array of Dataset)

> SPARK-19716 generates inefficient Java code from a primitive array of Dataset
> -----------------------------------------------------------------------------
>
>                 Key: SPARK-20254
>                 URL: https://issues.apache.org/jira/browse/SPARK-20254
>             Project: Spark
>          Issue Type: Bug
>          Components: SQL
>    Affects Versions: 2.2.0
>            Reporter: Kazuaki Ishizaki
>
> Since {{unresolvedmapobjects}} is newly introduced by SPARK-19716, the 
> current implementation generates {{mapobjects()}} at {{DeserializeToObject}}. 
> This {{mapObject()}} introduces Java code to store an array into 
> {{GenericArrayData}}.
> cc: [~cloud_fan]
>  
> {code}
> val ds = sparkContext.parallelize(Seq(Array(1.1, 2.2)), 1).toDS.cache
> ds.count
> val ds2 = ds.map(e => e)
> ds2.explain(true)
> ds2.show
> {code}
> {code}
> == Parsed Logical Plan ==
> 'SerializeFromObject [staticinvoke(class 
> org.apache.spark.sql.catalyst.expressions.UnsafeArrayData, 
> ArrayType(DoubleType,false), fromPrimitiveArray, input[0, [D, true], true) AS 
> value#25]
> +- 'MapElements <function1>, class [D, 
> [StructField(value,ArrayType(DoubleType,false),true)], obj#24: [D
>    +- 'DeserializeToObject 
> unresolveddeserializer(upcast(getcolumnbyordinal(0, 
> ArrayType(DoubleType,false)), ArrayType(DoubleType,false), - root class: 
> "scala.Array").toDoubleArray), obj#23: [D
>       +- SerializeFromObject [staticinvoke(class 
> org.apache.spark.sql.catalyst.expressions.UnsafeArrayData, 
> ArrayType(DoubleType,false), fromPrimitiveArray, input[0, [D, true], true) AS 
> value#2]
>          +- ExternalRDD [obj#1]
> == Analyzed Logical Plan ==
> value: array<double>
> SerializeFromObject [staticinvoke(class 
> org.apache.spark.sql.catalyst.expressions.UnsafeArrayData, 
> ArrayType(DoubleType,false), fromPrimitiveArray, input[0, [D, true], true) AS 
> value#25]
> +- MapElements <function1>, class [D, 
> [StructField(value,ArrayType(DoubleType,false),true)], obj#24: [D
>    +- DeserializeToObject cast(value#2 as array<double>).toDoubleArray, 
> obj#23: [D
>       +- SerializeFromObject [staticinvoke(class 
> org.apache.spark.sql.catalyst.expressions.UnsafeArrayData, 
> ArrayType(DoubleType,false), fromPrimitiveArray, input[0, [D, true], true) AS 
> value#2]
>          +- ExternalRDD [obj#1]
> == Optimized Logical Plan ==
> SerializeFromObject [staticinvoke(class 
> org.apache.spark.sql.catalyst.expressions.UnsafeArrayData, 
> ArrayType(DoubleType,false), fromPrimitiveArray, input[0, [D, true], true) AS 
> value#25]
> +- MapElements <function1>, class [D, 
> [StructField(value,ArrayType(DoubleType,false),true)], obj#24: [D
>    +- DeserializeToObject value#2.toDoubleArray, obj#23: [D
>       +- InMemoryRelation [value#2], true, 10000, StorageLevel(disk, memory, 
> deserialized, 1 replicas)
>             +- *SerializeFromObject [staticinvoke(class 
> org.apache.spark.sql.catalyst.expressions.UnsafeArrayData, 
> ArrayType(DoubleType,false), fromPrimitiveArray, input[0, [D, true], true) AS 
> value#2]
>                +- Scan ExternalRDDScan[obj#1]
> == Physical Plan ==
> *SerializeFromObject [staticinvoke(class 
> org.apache.spark.sql.catalyst.expressions.UnsafeArrayData, 
> ArrayType(DoubleType,false), fromPrimitiveArray, input[0, [D, true], true) AS 
> value#25]
> +- *MapElements <function1>, obj#24: [D
>    +- *DeserializeToObject value#2.toDoubleArray, obj#23: [D
>       +- *InMemoryTableScan [value#2]
>             +- InMemoryRelation [value#2], true, 10000, StorageLevel(disk, 
> memory, deserialized, 1 replicas)
>                   +- *SerializeFromObject [staticinvoke(class 
> org.apache.spark.sql.catalyst.expressions.UnsafeArrayData, 
> ArrayType(DoubleType,false), fromPrimitiveArray, input[0, [D, true], true) AS 
> value#2]
>                      +- Scan ExternalRDDScan[obj#1]
> {code}
> {code}
> == Parsed Logical Plan ==
> 'SerializeFromObject [staticinvoke(class 
> org.apache.spark.sql.catalyst.expressions.UnsafeArrayData, 
> ArrayType(DoubleType,false), fromPrimitiveArray, input[0, [D, true], true) AS 
> value#25]
> +- 'MapElements <function1>, class [D, 
> [StructField(value,ArrayType(DoubleType,false),true)], obj#24: [D
>    +- 'DeserializeToObject 
> unresolveddeserializer(unresolvedmapobjects(<function1>, 
> getcolumnbyordinal(0, ArrayType(DoubleType,false)), None).toDoubleArray), 
> obj#23: [D
>       +- SerializeFromObject [staticinvoke(class 
> org.apache.spark.sql.catalyst.expressions.UnsafeArrayData, 
> ArrayType(DoubleType,false), fromPrimitiveArray, input[0, [D, true], true) AS 
> value#2]
>          +- ExternalRDD [obj#1]
> == Analyzed Logical Plan ==
> value: array<double>
> SerializeFromObject [staticinvoke(class 
> org.apache.spark.sql.catalyst.expressions.UnsafeArrayData, 
> ArrayType(DoubleType,false), fromPrimitiveArray, input[0, [D, true], true) AS 
> value#25]
> +- MapElements <function1>, class [D, 
> [StructField(value,ArrayType(DoubleType,false),true)], obj#24: [D
>    +- DeserializeToObject mapobjects(MapObjects_loopValue4, 
> MapObjects_loopIsNull4, DoubleType, 
> assertnotnull(lambdavariable(MapObjects_loopValue4, MapObjects_loopIsNull4, 
> DoubleType, true), - array element class: "scala.Double", - root class: 
> "scala.Array"), value#2, None, MapObjects_builderValue4).toDoubleArray, 
> obj#23: [D
>       +- SerializeFromObject [staticinvoke(class 
> org.apache.spark.sql.catalyst.expressions.UnsafeArrayData, 
> ArrayType(DoubleType,false), fromPrimitiveArray, input[0, [D, true], true) AS 
> value#2]
>          +- ExternalRDD [obj#1]
> == Optimized Logical Plan ==
> SerializeFromObject [staticinvoke(class 
> org.apache.spark.sql.catalyst.expressions.UnsafeArrayData, 
> ArrayType(DoubleType,false), fromPrimitiveArray, input[0, [D, true], true) AS 
> value#25]
> +- MapElements <function1>, class [D, 
> [StructField(value,ArrayType(DoubleType,false),true)], obj#24: [D
>    +- DeserializeToObject mapobjects(MapObjects_loopValue4, 
> MapObjects_loopIsNull4, DoubleType, 
> assertnotnull(lambdavariable(MapObjects_loopValue4, MapObjects_loopIsNull4, 
> DoubleType, true), - array element class: "scala.Double", - root class: 
> "scala.Array"), value#2, None, MapObjects_builderValue4).toDoubleArray, 
> obj#23: [D
>       +- InMemoryRelation [value#2], true, 10000, StorageLevel(disk, memory, 
> deserialized, 1 replicas)
>             +- *SerializeFromObject [staticinvoke(class 
> org.apache.spark.sql.catalyst.expressions.UnsafeArrayData, 
> ArrayType(DoubleType,false), fromPrimitiveArray, input[0, [D, true], true) AS 
> value#2]
>                +- Scan ExternalRDDScan[obj#1]
> == Physical Plan ==
> *SerializeFromObject [staticinvoke(class 
> org.apache.spark.sql.catalyst.expressions.UnsafeArrayData, 
> ArrayType(DoubleType,false), fromPrimitiveArray, input[0, [D, true], true) AS 
> value#25]
> +- *MapElements <function1>, obj#24: [D
>    +- *DeserializeToObject mapobjects(MapObjects_loopValue4, 
> MapObjects_loopIsNull4, DoubleType, 
> assertnotnull(lambdavariable(MapObjects_loopValue4, MapObjects_loopIsNull4, 
> DoubleType, true), - array element class: "scala.Double", - root class: 
> "scala.Array"), value#2, None, MapObjects_builderValue4).toDoubleArray, 
> obj#23: [D
>       +- InMemoryTableScan [value#2]
>             +- InMemoryRelation [value#2], true, 10000, StorageLevel(disk, 
> memory, deserialized, 1 replicas)
>                   +- *SerializeFromObject [staticinvoke(class 
> org.apache.spark.sql.catalyst.expressions.UnsafeArrayData, 
> ArrayType(DoubleType,false), fromPrimitiveArray, input[0, [D, true], true) AS 
> value#2]
>                      +- Scan ExternalRDDScan[obj#1]
> {{java}}
> {{java}}
> ...
> /* 056 */       ArrayData deserializetoobject_value1 = null;
> /* 057 */
> /* 058 */       if (!inputadapter_isNull) {
> /* 059 */         int deserializetoobject_dataLength = 
> inputadapter_value.numElements();
> /* 060 */
> /* 061 */         Double[] deserializetoobject_convertedArray = null;
> /* 062 */         deserializetoobject_convertedArray = new 
> Double[deserializetoobject_dataLength];
> /* 063 */
> /* 064 */         int deserializetoobject_loopIndex = 0;
> /* 065 */         while (deserializetoobject_loopIndex < 
> deserializetoobject_dataLength) {
> /* 066 */           MapObjects_loopValue2 = (double) 
> (inputadapter_value.getDouble(deserializetoobject_loopIndex));
> /* 067 */           MapObjects_loopIsNull2 = 
> inputadapter_value.isNullAt(deserializetoobject_loopIndex);
> /* 068 */
> /* 069 */           if (MapObjects_loopIsNull2) {
> /* 070 */             throw new RuntimeException(((java.lang.String) 
> references[0]));
> /* 071 */           }
> /* 072 */           if (false) {
> /* 073 */             
> deserializetoobject_convertedArray[deserializetoobject_loopIndex] = null;
> /* 074 */           } else {
> /* 075 */             
> deserializetoobject_convertedArray[deserializetoobject_loopIndex] = 
> MapObjects_loopValue2;
> /* 076 */           }
> /* 077 */
> /* 078 */           deserializetoobject_loopIndex += 1;
> /* 079 */         }
> /* 080 */
> /* 081 */         deserializetoobject_value1 = new 
> org.apache.spark.sql.catalyst.util.GenericArrayData(deserializetoobject_convertedArray);
>  /*###*/
> /* 082 */       }
> ...
> {code}



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org

Reply via email to