[ https://issues.apache.org/jira/browse/SPARK-20254?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Apache Spark reassigned SPARK-20254: ------------------------------------ Assignee: (was: Apache Spark) > SPARK-19716 generates unnecessary data conversion for Dataset with primitive > array > ---------------------------------------------------------------------------------- > > Key: SPARK-20254 > URL: https://issues.apache.org/jira/browse/SPARK-20254 > Project: Spark > Issue Type: Bug > Components: SQL > Affects Versions: 2.2.0 > Reporter: Kazuaki Ishizaki > > Since {{unresolvedmapobjects}} is newly introduced by SPARK-19716, the > current implementation generates {{mapobjects()}} at {{DeserializeToObject}} > in {{Analyzed Logical Plan}}. This {{mapObject()}} introduces Java code to > store an array into {{GenericArrayData}}. > cc: [~cloud_fan] > > {code} > val ds = sparkContext.parallelize(Seq(Array(1.1, 2.2)), 1).toDS.cache > ds.count > val ds2 = ds.map(e => e) > ds2.explain(true) > ds2.show > {code} > Plans before SPARK-19716 > {code} > == Parsed Logical Plan == > 'SerializeFromObject [staticinvoke(class > org.apache.spark.sql.catalyst.expressions.UnsafeArrayData, > ArrayType(DoubleType,false), fromPrimitiveArray, input[0, [D, true], true) AS > value#25] > +- 'MapElements <function1>, class [D, > [StructField(value,ArrayType(DoubleType,false),true)], obj#24: [D > +- 'DeserializeToObject > unresolveddeserializer(upcast(getcolumnbyordinal(0, > ArrayType(DoubleType,false)), ArrayType(DoubleType,false), - root class: > "scala.Array").toDoubleArray), obj#23: [D > +- SerializeFromObject [staticinvoke(class > org.apache.spark.sql.catalyst.expressions.UnsafeArrayData, > ArrayType(DoubleType,false), fromPrimitiveArray, input[0, [D, true], true) AS > value#2] > +- ExternalRDD [obj#1] > == Analyzed Logical Plan == > value: array<double> > SerializeFromObject [staticinvoke(class > org.apache.spark.sql.catalyst.expressions.UnsafeArrayData, > ArrayType(DoubleType,false), fromPrimitiveArray, input[0, [D, true], true) AS > value#25] > +- MapElements <function1>, class [D, > [StructField(value,ArrayType(DoubleType,false),true)], obj#24: [D > +- DeserializeToObject cast(value#2 as array<double>).toDoubleArray, > obj#23: [D > +- SerializeFromObject [staticinvoke(class > org.apache.spark.sql.catalyst.expressions.UnsafeArrayData, > ArrayType(DoubleType,false), fromPrimitiveArray, input[0, [D, true], true) AS > value#2] > +- ExternalRDD [obj#1] > == Optimized Logical Plan == > SerializeFromObject [staticinvoke(class > org.apache.spark.sql.catalyst.expressions.UnsafeArrayData, > ArrayType(DoubleType,false), fromPrimitiveArray, input[0, [D, true], true) AS > value#25] > +- MapElements <function1>, class [D, > [StructField(value,ArrayType(DoubleType,false),true)], obj#24: [D > +- DeserializeToObject value#2.toDoubleArray, obj#23: [D > +- InMemoryRelation [value#2], true, 10000, StorageLevel(disk, memory, > deserialized, 1 replicas) > +- *SerializeFromObject [staticinvoke(class > org.apache.spark.sql.catalyst.expressions.UnsafeArrayData, > ArrayType(DoubleType,false), fromPrimitiveArray, input[0, [D, true], true) AS > value#2] > +- Scan ExternalRDDScan[obj#1] > == Physical Plan == > *SerializeFromObject [staticinvoke(class > org.apache.spark.sql.catalyst.expressions.UnsafeArrayData, > ArrayType(DoubleType,false), fromPrimitiveArray, input[0, [D, true], true) AS > value#25] > +- *MapElements <function1>, obj#24: [D > +- *DeserializeToObject value#2.toDoubleArray, obj#23: [D > +- *InMemoryTableScan [value#2] > +- InMemoryRelation [value#2], true, 10000, StorageLevel(disk, > memory, deserialized, 1 replicas) > +- *SerializeFromObject [staticinvoke(class > org.apache.spark.sql.catalyst.expressions.UnsafeArrayData, > ArrayType(DoubleType,false), fromPrimitiveArray, input[0, [D, true], true) AS > value#2] > +- Scan ExternalRDDScan[obj#1] > {code} > Plans after SPARK-19716 > {code} > == Parsed Logical Plan == > 'SerializeFromObject [staticinvoke(class > org.apache.spark.sql.catalyst.expressions.UnsafeArrayData, > ArrayType(DoubleType,false), fromPrimitiveArray, input[0, [D, true], true) AS > value#25] > +- 'MapElements <function1>, class [D, > [StructField(value,ArrayType(DoubleType,false),true)], obj#24: [D > +- 'DeserializeToObject > unresolveddeserializer(unresolvedmapobjects(<function1>, > getcolumnbyordinal(0, ArrayType(DoubleType,false)), None).toDoubleArray), > obj#23: [D > +- SerializeFromObject [staticinvoke(class > org.apache.spark.sql.catalyst.expressions.UnsafeArrayData, > ArrayType(DoubleType,false), fromPrimitiveArray, input[0, [D, true], true) AS > value#2] > +- ExternalRDD [obj#1] > == Analyzed Logical Plan == > value: array<double> > SerializeFromObject [staticinvoke(class > org.apache.spark.sql.catalyst.expressions.UnsafeArrayData, > ArrayType(DoubleType,false), fromPrimitiveArray, input[0, [D, true], true) AS > value#25] > +- MapElements <function1>, class [D, > [StructField(value,ArrayType(DoubleType,false),true)], obj#24: [D > +- DeserializeToObject mapobjects(MapObjects_loopValue4, > MapObjects_loopIsNull4, DoubleType, > assertnotnull(lambdavariable(MapObjects_loopValue4, MapObjects_loopIsNull4, > DoubleType, true), - array element class: "scala.Double", - root class: > "scala.Array"), value#2, None, MapObjects_builderValue4).toDoubleArray, > obj#23: [D > +- SerializeFromObject [staticinvoke(class > org.apache.spark.sql.catalyst.expressions.UnsafeArrayData, > ArrayType(DoubleType,false), fromPrimitiveArray, input[0, [D, true], true) AS > value#2] > +- ExternalRDD [obj#1] > == Optimized Logical Plan == > SerializeFromObject [staticinvoke(class > org.apache.spark.sql.catalyst.expressions.UnsafeArrayData, > ArrayType(DoubleType,false), fromPrimitiveArray, input[0, [D, true], true) AS > value#25] > +- MapElements <function1>, class [D, > [StructField(value,ArrayType(DoubleType,false),true)], obj#24: [D > +- DeserializeToObject mapobjects(MapObjects_loopValue4, > MapObjects_loopIsNull4, DoubleType, > assertnotnull(lambdavariable(MapObjects_loopValue4, MapObjects_loopIsNull4, > DoubleType, true), - array element class: "scala.Double", - root class: > "scala.Array"), value#2, None, MapObjects_builderValue4).toDoubleArray, > obj#23: [D > +- InMemoryRelation [value#2], true, 10000, StorageLevel(disk, memory, > deserialized, 1 replicas) > +- *SerializeFromObject [staticinvoke(class > org.apache.spark.sql.catalyst.expressions.UnsafeArrayData, > ArrayType(DoubleType,false), fromPrimitiveArray, input[0, [D, true], true) AS > value#2] > +- Scan ExternalRDDScan[obj#1] > == Physical Plan == > *SerializeFromObject [staticinvoke(class > org.apache.spark.sql.catalyst.expressions.UnsafeArrayData, > ArrayType(DoubleType,false), fromPrimitiveArray, input[0, [D, true], true) AS > value#25] > +- *MapElements <function1>, obj#24: [D > +- *DeserializeToObject mapobjects(MapObjects_loopValue4, > MapObjects_loopIsNull4, DoubleType, > assertnotnull(lambdavariable(MapObjects_loopValue4, MapObjects_loopIsNull4, > DoubleType, true), - array element class: "scala.Double", - root class: > "scala.Array"), value#2, None, MapObjects_builderValue4).toDoubleArray, > obj#23: [D > +- InMemoryTableScan [value#2] > +- InMemoryRelation [value#2], true, 10000, StorageLevel(disk, > memory, deserialized, 1 replicas) > +- *SerializeFromObject [staticinvoke(class > org.apache.spark.sql.catalyst.expressions.UnsafeArrayData, > ArrayType(DoubleType,false), fromPrimitiveArray, input[0, [D, true], true) AS > value#2] > +- Scan ExternalRDDScan[obj#1] > {{java}} > {{java}} > ... > /* 056 */ ArrayData deserializetoobject_value1 = null; > /* 057 */ > /* 058 */ if (!inputadapter_isNull) { > /* 059 */ int deserializetoobject_dataLength = > inputadapter_value.numElements(); > /* 060 */ > /* 061 */ Double[] deserializetoobject_convertedArray = null; > /* 062 */ deserializetoobject_convertedArray = new > Double[deserializetoobject_dataLength]; > /* 063 */ > /* 064 */ int deserializetoobject_loopIndex = 0; > /* 065 */ while (deserializetoobject_loopIndex < > deserializetoobject_dataLength) { > /* 066 */ MapObjects_loopValue2 = (double) > (inputadapter_value.getDouble(deserializetoobject_loopIndex)); > /* 067 */ MapObjects_loopIsNull2 = > inputadapter_value.isNullAt(deserializetoobject_loopIndex); > /* 068 */ > /* 069 */ if (MapObjects_loopIsNull2) { > /* 070 */ throw new RuntimeException(((java.lang.String) > references[0])); > /* 071 */ } > /* 072 */ if (false) { > /* 073 */ > deserializetoobject_convertedArray[deserializetoobject_loopIndex] = null; > /* 074 */ } else { > /* 075 */ > deserializetoobject_convertedArray[deserializetoobject_loopIndex] = > MapObjects_loopValue2; > /* 076 */ } > /* 077 */ > /* 078 */ deserializetoobject_loopIndex += 1; > /* 079 */ } > /* 080 */ > /* 081 */ deserializetoobject_value1 = new > org.apache.spark.sql.catalyst.util.GenericArrayData(deserializetoobject_convertedArray); > /*###*/ > /* 082 */ } > ... > {code} -- This message was sent by Atlassian JIRA (v6.3.15#6346) --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org