Kazuaki Ishizaki created SPARK-20254:
----------------------------------------

             Summary: SPARK-19716 generate inefficient Java code from a 
primitive array of Dataset
                 Key: SPARK-20254
                 URL: https://issues.apache.org/jira/browse/SPARK-20254
             Project: Spark
          Issue Type: Bug
          Components: SQL
    Affects Versions: 2.2.0
            Reporter: Kazuaki Ishizaki


Since {{unresolvedmapobjects}} is newly introduced by SPARK-19716, the current 
implementation generates {{mapobjects()}} at {{DeserializeToObject}}. This 
{{mapObject()}} introduces Java code to store an array into 
{{GenericArrayData}}.
cc: [~cloud_fan]
 
{code}
val ds = sparkContext.parallelize(Seq(Array(1.1, 2.2)), 1).toDS.cache
ds.count
val ds2 = ds.map(e => e)
ds2.explain(true)
ds2.show
{code}

{code}
== Parsed Logical Plan ==
'SerializeFromObject [staticinvoke(class 
org.apache.spark.sql.catalyst.expressions.UnsafeArrayData, 
ArrayType(DoubleType,false), fromPrimitiveArray, input[0, [D, true], true) AS 
value#25]
+- 'MapElements <function1>, class [D, 
[StructField(value,ArrayType(DoubleType,false),true)], obj#24: [D
   +- 'DeserializeToObject unresolveddeserializer(upcast(getcolumnbyordinal(0, 
ArrayType(DoubleType,false)), ArrayType(DoubleType,false), - root class: 
"scala.Array").toDoubleArray), obj#23: [D
      +- SerializeFromObject [staticinvoke(class 
org.apache.spark.sql.catalyst.expressions.UnsafeArrayData, 
ArrayType(DoubleType,false), fromPrimitiveArray, input[0, [D, true], true) AS 
value#2]
         +- ExternalRDD [obj#1]

== Analyzed Logical Plan ==
value: array<double>
SerializeFromObject [staticinvoke(class 
org.apache.spark.sql.catalyst.expressions.UnsafeArrayData, 
ArrayType(DoubleType,false), fromPrimitiveArray, input[0, [D, true], true) AS 
value#25]
+- MapElements <function1>, class [D, 
[StructField(value,ArrayType(DoubleType,false),true)], obj#24: [D
   +- DeserializeToObject cast(value#2 as array<double>).toDoubleArray, obj#23: 
[D
      +- SerializeFromObject [staticinvoke(class 
org.apache.spark.sql.catalyst.expressions.UnsafeArrayData, 
ArrayType(DoubleType,false), fromPrimitiveArray, input[0, [D, true], true) AS 
value#2]
         +- ExternalRDD [obj#1]

== Optimized Logical Plan ==
SerializeFromObject [staticinvoke(class 
org.apache.spark.sql.catalyst.expressions.UnsafeArrayData, 
ArrayType(DoubleType,false), fromPrimitiveArray, input[0, [D, true], true) AS 
value#25]
+- MapElements <function1>, class [D, 
[StructField(value,ArrayType(DoubleType,false),true)], obj#24: [D
   +- DeserializeToObject value#2.toDoubleArray, obj#23: [D
      +- InMemoryRelation [value#2], true, 10000, StorageLevel(disk, memory, 
deserialized, 1 replicas)
            +- *SerializeFromObject [staticinvoke(class 
org.apache.spark.sql.catalyst.expressions.UnsafeArrayData, 
ArrayType(DoubleType,false), fromPrimitiveArray, input[0, [D, true], true) AS 
value#2]
               +- Scan ExternalRDDScan[obj#1]

== Physical Plan ==
*SerializeFromObject [staticinvoke(class 
org.apache.spark.sql.catalyst.expressions.UnsafeArrayData, 
ArrayType(DoubleType,false), fromPrimitiveArray, input[0, [D, true], true) AS 
value#25]
+- *MapElements <function1>, obj#24: [D
   +- *DeserializeToObject value#2.toDoubleArray, obj#23: [D
      +- *InMemoryTableScan [value#2]
            +- InMemoryRelation [value#2], true, 10000, StorageLevel(disk, 
memory, deserialized, 1 replicas)
                  +- *SerializeFromObject [staticinvoke(class 
org.apache.spark.sql.catalyst.expressions.UnsafeArrayData, 
ArrayType(DoubleType,false), fromPrimitiveArray, input[0, [D, true], true) AS 
value#2]
                     +- Scan ExternalRDDScan[obj#1]

{code}

{code}
== Parsed Logical Plan ==
'SerializeFromObject [staticinvoke(class 
org.apache.spark.sql.catalyst.expressions.UnsafeArrayData, 
ArrayType(DoubleType,false), fromPrimitiveArray, input[0, [D, true], true) AS 
value#25]
+- 'MapElements <function1>, class [D, 
[StructField(value,ArrayType(DoubleType,false),true)], obj#24: [D
   +- 'DeserializeToObject 
unresolveddeserializer(unresolvedmapobjects(<function1>, getcolumnbyordinal(0, 
ArrayType(DoubleType,false)), None).toDoubleArray), obj#23: [D
      +- SerializeFromObject [staticinvoke(class 
org.apache.spark.sql.catalyst.expressions.UnsafeArrayData, 
ArrayType(DoubleType,false), fromPrimitiveArray, input[0, [D, true], true) AS 
value#2]
         +- ExternalRDD [obj#1]

== Analyzed Logical Plan ==
value: array<double>
SerializeFromObject [staticinvoke(class 
org.apache.spark.sql.catalyst.expressions.UnsafeArrayData, 
ArrayType(DoubleType,false), fromPrimitiveArray, input[0, [D, true], true) AS 
value#25]
+- MapElements <function1>, class [D, 
[StructField(value,ArrayType(DoubleType,false),true)], obj#24: [D
   +- DeserializeToObject mapobjects(MapObjects_loopValue4, 
MapObjects_loopIsNull4, DoubleType, 
assertnotnull(lambdavariable(MapObjects_loopValue4, MapObjects_loopIsNull4, 
DoubleType, true), - array element class: "scala.Double", - root class: 
"scala.Array"), value#2, None, MapObjects_builderValue4).toDoubleArray, obj#23: 
[D
      +- SerializeFromObject [staticinvoke(class 
org.apache.spark.sql.catalyst.expressions.UnsafeArrayData, 
ArrayType(DoubleType,false), fromPrimitiveArray, input[0, [D, true], true) AS 
value#2]
         +- ExternalRDD [obj#1]

== Optimized Logical Plan ==
SerializeFromObject [staticinvoke(class 
org.apache.spark.sql.catalyst.expressions.UnsafeArrayData, 
ArrayType(DoubleType,false), fromPrimitiveArray, input[0, [D, true], true) AS 
value#25]
+- MapElements <function1>, class [D, 
[StructField(value,ArrayType(DoubleType,false),true)], obj#24: [D
   +- DeserializeToObject mapobjects(MapObjects_loopValue4, 
MapObjects_loopIsNull4, DoubleType, 
assertnotnull(lambdavariable(MapObjects_loopValue4, MapObjects_loopIsNull4, 
DoubleType, true), - array element class: "scala.Double", - root class: 
"scala.Array"), value#2, None, MapObjects_builderValue4).toDoubleArray, obj#23: 
[D
      +- InMemoryRelation [value#2], true, 10000, StorageLevel(disk, memory, 
deserialized, 1 replicas)
            +- *SerializeFromObject [staticinvoke(class 
org.apache.spark.sql.catalyst.expressions.UnsafeArrayData, 
ArrayType(DoubleType,false), fromPrimitiveArray, input[0, [D, true], true) AS 
value#2]
               +- Scan ExternalRDDScan[obj#1]

== Physical Plan ==
*SerializeFromObject [staticinvoke(class 
org.apache.spark.sql.catalyst.expressions.UnsafeArrayData, 
ArrayType(DoubleType,false), fromPrimitiveArray, input[0, [D, true], true) AS 
value#25]
+- *MapElements <function1>, obj#24: [D
   +- *DeserializeToObject mapobjects(MapObjects_loopValue4, 
MapObjects_loopIsNull4, DoubleType, 
assertnotnull(lambdavariable(MapObjects_loopValue4, MapObjects_loopIsNull4, 
DoubleType, true), - array element class: "scala.Double", - root class: 
"scala.Array"), value#2, None, MapObjects_builderValue4).toDoubleArray, obj#23: 
[D
      +- InMemoryTableScan [value#2]
            +- InMemoryRelation [value#2], true, 10000, StorageLevel(disk, 
memory, deserialized, 1 replicas)
                  +- *SerializeFromObject [staticinvoke(class 
org.apache.spark.sql.catalyst.expressions.UnsafeArrayData, 
ArrayType(DoubleType,false), fromPrimitiveArray, input[0, [D, true], true) AS 
value#2]
                     +- Scan ExternalRDDScan[obj#1]
{{java}}

{{java}}
...
/* 056 */       ArrayData deserializetoobject_value1 = null;
/* 057 */
/* 058 */       if (!inputadapter_isNull) {
/* 059 */         int deserializetoobject_dataLength = 
inputadapter_value.numElements();
/* 060 */
/* 061 */         Double[] deserializetoobject_convertedArray = null;
/* 062 */         deserializetoobject_convertedArray = new 
Double[deserializetoobject_dataLength];
/* 063 */
/* 064 */         int deserializetoobject_loopIndex = 0;
/* 065 */         while (deserializetoobject_loopIndex < 
deserializetoobject_dataLength) {
/* 066 */           MapObjects_loopValue2 = (double) 
(inputadapter_value.getDouble(deserializetoobject_loopIndex));
/* 067 */           MapObjects_loopIsNull2 = 
inputadapter_value.isNullAt(deserializetoobject_loopIndex);
/* 068 */
/* 069 */           if (MapObjects_loopIsNull2) {
/* 070 */             throw new RuntimeException(((java.lang.String) 
references[0]));
/* 071 */           }
/* 072 */           if (false) {
/* 073 */             
deserializetoobject_convertedArray[deserializetoobject_loopIndex] = null;
/* 074 */           } else {
/* 075 */             
deserializetoobject_convertedArray[deserializetoobject_loopIndex] = 
MapObjects_loopValue2;
/* 076 */           }
/* 077 */
/* 078 */           deserializetoobject_loopIndex += 1;
/* 079 */         }
/* 080 */
/* 081 */         deserializetoobject_value1 = new 
org.apache.spark.sql.catalyst.util.GenericArrayData(deserializetoobject_convertedArray);
 /*###*/
/* 082 */       }
...
{code}



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org

Reply via email to