[ https://issues.apache.org/jira/browse/SPARK-18534?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Anton Okolnychyi updated SPARK-18534: ------------------------------------- Description: There is a problem with user-defined aggregations in the Dataset API in Spark 1.6.3, while the identical code works fine in Spark 2.0. The problem appears only if {{ExpressionEncoder()}} is used for Maps. The same code with a Kryo-based alternative produces a correct result. If the encoder for a map is defined with the help of {{ExpressionEncoder()}}, Spark is not capable of reading the reduced values in the merge phase of the considered aggregation. Code to reproduce: {code} case class TestStopPoint(line: String, sequenceNumber: Int, id: String) // Does not work with ExpressionEncoder() and produces an empty map as a result implicit val intStringMapEncoder: Encoder[Map[Int, String]] = ExpressionEncoder() // Will work if a Kryo-based encoder is used // implicit val intStringMapEncoder: Encoder[Map[Int, String]] = org.apache.spark.sql.Encoders.kryo[Map[Int, String]] val sparkConf = new SparkConf() .setAppName("DS Spark 1.6 Test") .setMaster("local[4]") val sparkContext = new SparkContext(sparkConf) val sparkSqlContext = new SQLContext(sparkContext) import sparkSqlContext.implicits._ val stopPointDS = Seq(TestStopPoint("33", 1, "id#1"), TestStopPoint("33", 2, "id#2")).toDS() val stopPointSequenceMap = new Aggregator[TestStopPoint, Map[Int, String], Map[Int, String]] { override def zero = Map[Int, String]() override def reduce(map: Map[Int, String], stopPoint: TestStopPoint) = { map.updated(stopPoint.sequenceNumber, stopPoint.id) } override def merge(map: Map[Int, String], anotherMap: Map[Int, String]) = { map ++ anotherMap } override def finish(reduction: Map[Int, String]) = reduction }.toColumn val resultMap = stopPointDS .groupBy(_.line) .agg(stopPointSequenceMap) .collect() .toMap {code} The code above produces an empty map as a result if the Map encoder is defined as {{ExpressionEncoder()}}. The Kryo-based encoder works fine (commented in the code). A preliminary investigation was done to find out possible reasons for this behavior. I am not a Spark expert but hope it will help. The Physical Plan looks like: {noformat} == Physical Plan == SortBasedAggregate(key=[value#55], functions=[(anon$1(line#4,sequenceNumber#5,id#6),mode=Final,isDistinct=false)], output=[value#55,anon$1(line,sequenceNumber,id)#64]) +- ConvertToSafe +- Sort [value#55 ASC], false, 0 +- TungstenExchange hashpartitioning(value#55,1), None +- ConvertToUnsafe +- SortBasedAggregate(key=[value#55], functions=[(anon$1(line#4,sequenceNumber#5,id#6),mode=Partial,isDistinct=false)], output=[value#55,value#60]) +- ConvertToSafe +- Sort [value#55 ASC], false, 0 +- !AppendColumns <function1>, class[line[0]: string, sequenceNumber[0]: int, id[0]: string], class[value[0]: string], [value#55] +- ConvertToUnsafe +- LocalTableScan [line#4,sequenceNumber#5,id#6], [[0,2000000002,1,2800000004,3333,31236469],[0,2000000002,2,2800000004,3333,32236469]] {noformat} Everything untill the first (from bottom) {{SortBasedAggregate}} step and part of it is handled correctly. In particular, I see that each row correctly updates the mutable aggregation buffer in the {{update()}} method of the {{TypedAggregateExpression}} class. My initial idea was that the problem appeared in the {{ConvertToUnsafe}} step directly after the first {{SortBasedAggregate}}. If I take a look at the {{ConvertToUnsafe}} class, I can see that the first {{SortBasedAggregate}} returns a map with 2 elements (I call {{child.execute().collect()(0).getMap(1)}} in {{doExecute()}} of {{ConvertToUnsafe}} to see this). At the same time, if I examine the output of this {{ConvertToUnsafe}} in the identical way as its input, I see that the result map does not contain any elements. As a consequence, Spark operates on two empty maps in the {{merge()}} method of the {{TypedAggregateExpression}} class. However, my assumption was only partially correct. I did a more detailed investigation and its outcomes are described in comments. was: There is a problem with user-defined aggregations in the Dataset API in Spark 1.6.3, while the identical code works fine in Spark 2.0. The problem appears only if {{ExpressionEncoder()}} is used for Maps. The same code with a Kryo-based alternative produces a correct result. If the encoder for a map is defined with the help of {{ExpressionEncoder()}}, Spark is not capable of reading the reduced values in the merge phase of the considered aggregation. Code to reproduce: {code} case class TestStopPoint(line: String, sequenceNumber: Int, id: String) // Does not work with ExpressionEncoder() and produces an empty map as a result implicit val intStringMapEncoder: Encoder[Map[Int, String]] = ExpressionEncoder() // Will work if a Kryo-based encoder is used // implicit val intStringMapEncoder: Encoder[Map[Int, String]] = org.apache.spark.sql.Encoders.kryo[Map[Int, String]] val sparkConf = new SparkConf() .setAppName("DS Spark 1.6 Test") .setMaster("local[4]") val sparkContext = new SparkContext(sparkConf) val sparkSqlContext = new SQLContext(sparkContext) import sparkSqlContext.implicits._ val stopPointDS = Seq(TestStopPoint("33", 1, "id#1"), TestStopPoint("33", 2, "id#2")).toDS() val stopPointSequenceMap = new Aggregator[TestStopPoint, Map[Int, String], Map[Int, String]] { override def zero = Map[Int, String]() override def reduce(map: Map[Int, String], stopPoint: TestStopPoint) = { map.updated(stopPoint.sequenceNumber, stopPoint.id) } override def merge(map: Map[Int, String], anotherMap: Map[Int, String]) = { map ++ anotherMap } override def finish(reduction: Map[Int, String]) = reduction }.toColumn val resultMap = stopPointDS .groupBy(_.line) .agg(stopPointSequenceMap) .collect() .toMap {code} The code above produces an empty map as a result if the Map encoder is defined as {{ExpressionEncoder()}}. The Kryo-based encoder works fine (commented in the code). A preliminary investigation was done to find out possible reasons for this behavior. I am not a Spark expert but hope it will help. The Physical Plan looks like: {noformat} == Physical Plan == SortBasedAggregate(key=[value#55], functions=[(anon$1(line#4,sequenceNumber#5,id#6),mode=Final,isDistinct=false)], output=[value#55,anon$1(line,sequenceNumber,id)#64]) +- ConvertToSafe +- Sort [value#55 ASC], false, 0 +- TungstenExchange hashpartitioning(value#55,1), None +- ConvertToUnsafe +- SortBasedAggregate(key=[value#55], functions=[(anon$1(line#4,sequenceNumber#5,id#6),mode=Partial,isDistinct=false)], output=[value#55,value#60]) +- ConvertToSafe +- Sort [value#55 ASC], false, 0 +- !AppendColumns <function1>, class[line[0]: string, sequenceNumber[0]: int, id[0]: string], class[value[0]: string], [value#55] +- ConvertToUnsafe +- LocalTableScan [line#4,sequenceNumber#5,id#6], [[0,2000000002,1,2800000004,3333,31236469],[0,2000000002,2,2800000004,3333,32236469]] {noformat} Everything untill the first (from bottom) {{SortBasedAggregate}} step and part of it is handled correctly. In particular, I see that each row correctly updates the mutable aggregation buffer in the {{update()}} method of the {{TypedAggregateExpression}} class. My initial idea was that the problem appeared in the {{ConvertToUnsafe}} step directly after the first {{SortBasedAggregate}}. If I take a look at the {{ConvertToUnsafe}} class, I can see that the first {{SortBasedAggregate}} returns a map with 2 elements (I call {{child.execute().collect()(0).getMap(1)}} in {{doExecute()}} of {{ConvertToUnsafe}} to see this). However, if I examine the output of this {{ConvertToUnsafe}} in the same way as its input, I see that the result map does not contain any elements. As a consequence, Spark operates on two empty maps in the {{merge()}} method of the {{TypedAggregateExpression}} class. However, my assumption was only partially correct. I did a more detailed investigation and its outcomes are described in comments. > Datasets Aggregation with Maps > ------------------------------ > > Key: SPARK-18534 > URL: https://issues.apache.org/jira/browse/SPARK-18534 > Project: Spark > Issue Type: Bug > Components: SQL > Affects Versions: 1.6.2, 1.6.3 > Reporter: Anton Okolnychyi > > There is a problem with user-defined aggregations in the Dataset API in Spark > 1.6.3, while the identical code works fine in Spark 2.0. > The problem appears only if {{ExpressionEncoder()}} is used for Maps. The > same code with a Kryo-based alternative produces a correct result. If the > encoder for a map is defined with the help of {{ExpressionEncoder()}}, Spark > is not capable of reading the reduced values in the merge phase of the > considered aggregation. > Code to reproduce: > {code} > case class TestStopPoint(line: String, sequenceNumber: Int, id: String) > // Does not work with ExpressionEncoder() and produces an empty map as a > result > implicit val intStringMapEncoder: Encoder[Map[Int, String]] = > ExpressionEncoder() > // Will work if a Kryo-based encoder is used > // implicit val intStringMapEncoder: Encoder[Map[Int, String]] = > org.apache.spark.sql.Encoders.kryo[Map[Int, String]] > val sparkConf = new SparkConf() > .setAppName("DS Spark 1.6 Test") > .setMaster("local[4]") > val sparkContext = new SparkContext(sparkConf) > val sparkSqlContext = new SQLContext(sparkContext) > import sparkSqlContext.implicits._ > val stopPointDS = Seq(TestStopPoint("33", 1, "id#1"), TestStopPoint("33", > 2, "id#2")).toDS() > val stopPointSequenceMap = new Aggregator[TestStopPoint, Map[Int, String], > Map[Int, String]] { > override def zero = Map[Int, String]() > override def reduce(map: Map[Int, String], stopPoint: TestStopPoint) = { > map.updated(stopPoint.sequenceNumber, stopPoint.id) > } > override def merge(map: Map[Int, String], anotherMap: Map[Int, String]) = > { > map ++ anotherMap > } > override def finish(reduction: Map[Int, String]) = reduction > }.toColumn > val resultMap = stopPointDS > .groupBy(_.line) > .agg(stopPointSequenceMap) > .collect() > .toMap > {code} > The code above produces an empty map as a result if the Map encoder is > defined as {{ExpressionEncoder()}}. The Kryo-based encoder works fine > (commented in the code). > A preliminary investigation was done to find out possible reasons for this > behavior. I am not a Spark expert but hope it will help. > The Physical Plan looks like: > {noformat} > == Physical Plan == > SortBasedAggregate(key=[value#55], > functions=[(anon$1(line#4,sequenceNumber#5,id#6),mode=Final,isDistinct=false)], > output=[value#55,anon$1(line,sequenceNumber,id)#64]) > +- ConvertToSafe > +- Sort [value#55 ASC], false, 0 > +- TungstenExchange hashpartitioning(value#55,1), None > +- ConvertToUnsafe > +- SortBasedAggregate(key=[value#55], > functions=[(anon$1(line#4,sequenceNumber#5,id#6),mode=Partial,isDistinct=false)], > output=[value#55,value#60]) > +- ConvertToSafe > +- Sort [value#55 ASC], false, 0 > +- !AppendColumns <function1>, class[line[0]: string, > sequenceNumber[0]: int, id[0]: string], class[value[0]: string], [value#55] > +- ConvertToUnsafe > +- LocalTableScan [line#4,sequenceNumber#5,id#6], > [[0,2000000002,1,2800000004,3333,31236469],[0,2000000002,2,2800000004,3333,32236469]] > {noformat} > > Everything untill the first (from bottom) {{SortBasedAggregate}} step and > part of it is handled correctly. In particular, I see that each row correctly > updates the mutable aggregation buffer in the {{update()}} method of the > {{TypedAggregateExpression}} class. My initial idea was that the problem > appeared in the {{ConvertToUnsafe}} step directly after the first > {{SortBasedAggregate}}. If I take a look at the {{ConvertToUnsafe}} class, I > can see that the first {{SortBasedAggregate}} returns a map with 2 elements > (I call {{child.execute().collect()(0).getMap(1)}} in {{doExecute()}} of > {{ConvertToUnsafe}} to see this). At the same time, if I examine the output > of this {{ConvertToUnsafe}} in the identical way as its input, I see that the > result map does not contain any elements. As a consequence, Spark operates on > two empty maps in the {{merge()}} method of the {{TypedAggregateExpression}} > class. However, my assumption was only partially correct. I did a more > detailed investigation and its outcomes are described in comments. -- This message was sent by Atlassian JIRA (v6.3.4#6332) --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org