[ 
https://issues.apache.org/jira/browse/SPARK-18534?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Anton Okolnychyi updated SPARK-18534:
-------------------------------------
    Description: 
There is a problem with user-defined aggregations in the Dataset API in Spark 
1.6.3, while the identical code works fine in Spark 2.0. 

The problem appears only if {{ExpressionEncoder()}} is used for Maps. The same 
code with a Kryo-based alternative produces a correct result. If the encoder 
for a map is defined with the help of {{ExpressionEncoder()}}, Spark is not 
capable of reading the reduced values in the merge phase of the considered 
aggregation.

Code to reproduce:
{code}
  case class TestStopPoint(line: String, sequenceNumber: Int, id: String)

  // Does not work with ExpressionEncoder() and produces an empty map as a 
result
  implicit val intStringMapEncoder: Encoder[Map[Int, String]] = 
ExpressionEncoder()
  // Will work if a Kryo-based encoder is used
  // implicit val intStringMapEncoder: Encoder[Map[Int, String]] = 
org.apache.spark.sql.Encoders.kryo[Map[Int, String]]

  val sparkConf = new SparkConf()
    .setAppName("DS Spark 1.6 Test")
    .setMaster("local[4]")
  val sparkContext = new SparkContext(sparkConf)
  val sparkSqlContext = new SQLContext(sparkContext)

  import sparkSqlContext.implicits._

  val stopPointDS = Seq(TestStopPoint("33", 1, "id#1"), TestStopPoint("33", 2, 
"id#2")).toDS()

  val stopPointSequenceMap = new Aggregator[TestStopPoint, Map[Int, String], 
Map[Int, String]] {
    override def zero = Map[Int, String]()
    override def reduce(map: Map[Int, String], stopPoint: TestStopPoint) = {
      map.updated(stopPoint.sequenceNumber, stopPoint.id)
    }
    override def merge(map: Map[Int, String], anotherMap: Map[Int, String]) = {
      map ++ anotherMap
    }
    override def finish(reduction: Map[Int, String]) = reduction
  }.toColumn

  val resultMap = stopPointDS
    .groupBy(_.line)
    .agg(stopPointSequenceMap)
    .collect()
    .toMap
{code}

The code above produces an empty map as a result if the Map encoder is defined 
as {{ExpressionEncoder()}}. The Kryo-based encoder works fine (commented in the 
code).

A preliminary investigation was done to find out possible reasons for this 
behavior. I am not a Spark expert but hope it will help. 

The Physical Plan looks like:
{noformat}
== Physical Plan ==
SortBasedAggregate(key=[value#55], 
functions=[(anon$1(line#4,sequenceNumber#5,id#6),mode=Final,isDistinct=false)], 
output=[value#55,anon$1(line,sequenceNumber,id)#64])
+- ConvertToSafe
   +- Sort [value#55 ASC], false, 0
      +- TungstenExchange hashpartitioning(value#55,1), None
         +- ConvertToUnsafe
            +- SortBasedAggregate(key=[value#55], 
functions=[(anon$1(line#4,sequenceNumber#5,id#6),mode=Partial,isDistinct=false)],
 output=[value#55,value#60])
               +- ConvertToSafe
                  +- Sort [value#55 ASC], false, 0
                     +- !AppendColumns <function1>, class[line[0]: string, 
sequenceNumber[0]: int, id[0]: string], class[value[0]: string], [value#55]
                        +- ConvertToUnsafe
                           +- LocalTableScan [line#4,sequenceNumber#5,id#6], 
[[0,2000000002,1,2800000004,3333,31236469],[0,2000000002,2,2800000004,3333,32236469]]
{noformat}
 
Everything untill the first (from bottom) {{SortBasedAggregate}} step and part 
of it is handled correctly. In particular, I see that each row correctly 
updates the mutable aggregation buffer in the {{update()}} method of the 
{{TypedAggregateExpression}} class. My initial idea was that the problem 
appeared in the {{ConvertToUnsafe}} step directly after the first 
{{SortBasedAggregate}}. If I take a look at the {{ConvertToUnsafe}} class, I 
can see that the first {{SortBasedAggregate}} returns a map with 2 elements (I 
call {{child.execute().collect()(0).getMap(1)}} in {{doExecute()}} of 
{{ConvertToUnsafe}} to see this). At the same time, if I examine the output of 
this {{ConvertToUnsafe}} in the identical way as its input, I see that the 
result map does not contain any elements. As a consequence, Spark operates on 
two empty maps in the {{merge()}} method of the {{TypedAggregateExpression}} 
class. However, my assumption was only partially correct. I did a more detailed 
investigation and its outcomes are described in comments.

  was:
There is a problem with user-defined aggregations in the Dataset API in Spark 
1.6.3, while the identical code works fine in Spark 2.0. 

The problem appears only if {{ExpressionEncoder()}} is used for Maps. The same 
code with a Kryo-based alternative produces a correct result. If the encoder 
for a map is defined with the help of {{ExpressionEncoder()}}, Spark is not 
capable of reading the reduced values in the merge phase of the considered 
aggregation.

Code to reproduce:
{code}
  case class TestStopPoint(line: String, sequenceNumber: Int, id: String)

  // Does not work with ExpressionEncoder() and produces an empty map as a 
result
  implicit val intStringMapEncoder: Encoder[Map[Int, String]] = 
ExpressionEncoder()
  // Will work if a Kryo-based encoder is used
  // implicit val intStringMapEncoder: Encoder[Map[Int, String]] = 
org.apache.spark.sql.Encoders.kryo[Map[Int, String]]

  val sparkConf = new SparkConf()
    .setAppName("DS Spark 1.6 Test")
    .setMaster("local[4]")
  val sparkContext = new SparkContext(sparkConf)
  val sparkSqlContext = new SQLContext(sparkContext)

  import sparkSqlContext.implicits._

  val stopPointDS = Seq(TestStopPoint("33", 1, "id#1"), TestStopPoint("33", 2, 
"id#2")).toDS()

  val stopPointSequenceMap = new Aggregator[TestStopPoint, Map[Int, String], 
Map[Int, String]] {
    override def zero = Map[Int, String]()
    override def reduce(map: Map[Int, String], stopPoint: TestStopPoint) = {
      map.updated(stopPoint.sequenceNumber, stopPoint.id)
    }
    override def merge(map: Map[Int, String], anotherMap: Map[Int, String]) = {
      map ++ anotherMap
    }
    override def finish(reduction: Map[Int, String]) = reduction
  }.toColumn

  val resultMap = stopPointDS
    .groupBy(_.line)
    .agg(stopPointSequenceMap)
    .collect()
    .toMap
{code}

The code above produces an empty map as a result if the Map encoder is defined 
as {{ExpressionEncoder()}}. The Kryo-based encoder works fine (commented in the 
code).

A preliminary investigation was done to find out possible reasons for this 
behavior. I am not a Spark expert but hope it will help. 

The Physical Plan looks like:
{noformat}
== Physical Plan ==
SortBasedAggregate(key=[value#55], 
functions=[(anon$1(line#4,sequenceNumber#5,id#6),mode=Final,isDistinct=false)], 
output=[value#55,anon$1(line,sequenceNumber,id)#64])
+- ConvertToSafe
   +- Sort [value#55 ASC], false, 0
      +- TungstenExchange hashpartitioning(value#55,1), None
         +- ConvertToUnsafe
            +- SortBasedAggregate(key=[value#55], 
functions=[(anon$1(line#4,sequenceNumber#5,id#6),mode=Partial,isDistinct=false)],
 output=[value#55,value#60])
               +- ConvertToSafe
                  +- Sort [value#55 ASC], false, 0
                     +- !AppendColumns <function1>, class[line[0]: string, 
sequenceNumber[0]: int, id[0]: string], class[value[0]: string], [value#55]
                        +- ConvertToUnsafe
                           +- LocalTableScan [line#4,sequenceNumber#5,id#6], 
[[0,2000000002,1,2800000004,3333,31236469],[0,2000000002,2,2800000004,3333,32236469]]
{noformat}
 
Everything untill the first (from bottom) {{SortBasedAggregate}} step and part 
of it is handled correctly. In particular, I see that each row correctly 
updates the mutable aggregation buffer in the {{update()}} method of the 
{{TypedAggregateExpression}} class. My initial idea was that the problem 
appeared in the {{ConvertToUnsafe}} step directly after the first 
{{SortBasedAggregate}}. If I take a look at the {{ConvertToUnsafe}} class, I 
can see that the first {{SortBasedAggregate}} returns a map with 2 elements (I 
call {{child.execute().collect()(0).getMap(1)}} in {{doExecute()}} of 
{{ConvertToUnsafe}} to see this). However, if I examine the output of this 
{{ConvertToUnsafe}} in the same way as its input, I see that the result map 
does not contain any elements. As a consequence, Spark operates on two empty 
maps in the {{merge()}} method of the {{TypedAggregateExpression}} class. 
However, my assumption was only partially correct. I did a more detailed 
investigation and its outcomes are described in comments.


> Datasets Aggregation with Maps
> ------------------------------
>
>                 Key: SPARK-18534
>                 URL: https://issues.apache.org/jira/browse/SPARK-18534
>             Project: Spark
>          Issue Type: Bug
>          Components: SQL
>    Affects Versions: 1.6.2, 1.6.3
>            Reporter: Anton Okolnychyi
>
> There is a problem with user-defined aggregations in the Dataset API in Spark 
> 1.6.3, while the identical code works fine in Spark 2.0. 
> The problem appears only if {{ExpressionEncoder()}} is used for Maps. The 
> same code with a Kryo-based alternative produces a correct result. If the 
> encoder for a map is defined with the help of {{ExpressionEncoder()}}, Spark 
> is not capable of reading the reduced values in the merge phase of the 
> considered aggregation.
> Code to reproduce:
> {code}
>   case class TestStopPoint(line: String, sequenceNumber: Int, id: String)
>   // Does not work with ExpressionEncoder() and produces an empty map as a 
> result
>   implicit val intStringMapEncoder: Encoder[Map[Int, String]] = 
> ExpressionEncoder()
>   // Will work if a Kryo-based encoder is used
>   // implicit val intStringMapEncoder: Encoder[Map[Int, String]] = 
> org.apache.spark.sql.Encoders.kryo[Map[Int, String]]
>   val sparkConf = new SparkConf()
>     .setAppName("DS Spark 1.6 Test")
>     .setMaster("local[4]")
>   val sparkContext = new SparkContext(sparkConf)
>   val sparkSqlContext = new SQLContext(sparkContext)
>   import sparkSqlContext.implicits._
>   val stopPointDS = Seq(TestStopPoint("33", 1, "id#1"), TestStopPoint("33", 
> 2, "id#2")).toDS()
>   val stopPointSequenceMap = new Aggregator[TestStopPoint, Map[Int, String], 
> Map[Int, String]] {
>     override def zero = Map[Int, String]()
>     override def reduce(map: Map[Int, String], stopPoint: TestStopPoint) = {
>       map.updated(stopPoint.sequenceNumber, stopPoint.id)
>     }
>     override def merge(map: Map[Int, String], anotherMap: Map[Int, String]) = 
> {
>       map ++ anotherMap
>     }
>     override def finish(reduction: Map[Int, String]) = reduction
>   }.toColumn
>   val resultMap = stopPointDS
>     .groupBy(_.line)
>     .agg(stopPointSequenceMap)
>     .collect()
>     .toMap
> {code}
> The code above produces an empty map as a result if the Map encoder is 
> defined as {{ExpressionEncoder()}}. The Kryo-based encoder works fine 
> (commented in the code).
> A preliminary investigation was done to find out possible reasons for this 
> behavior. I am not a Spark expert but hope it will help. 
> The Physical Plan looks like:
> {noformat}
> == Physical Plan ==
> SortBasedAggregate(key=[value#55], 
> functions=[(anon$1(line#4,sequenceNumber#5,id#6),mode=Final,isDistinct=false)],
>  output=[value#55,anon$1(line,sequenceNumber,id)#64])
> +- ConvertToSafe
>    +- Sort [value#55 ASC], false, 0
>       +- TungstenExchange hashpartitioning(value#55,1), None
>          +- ConvertToUnsafe
>             +- SortBasedAggregate(key=[value#55], 
> functions=[(anon$1(line#4,sequenceNumber#5,id#6),mode=Partial,isDistinct=false)],
>  output=[value#55,value#60])
>                +- ConvertToSafe
>                   +- Sort [value#55 ASC], false, 0
>                      +- !AppendColumns <function1>, class[line[0]: string, 
> sequenceNumber[0]: int, id[0]: string], class[value[0]: string], [value#55]
>                         +- ConvertToUnsafe
>                            +- LocalTableScan [line#4,sequenceNumber#5,id#6], 
> [[0,2000000002,1,2800000004,3333,31236469],[0,2000000002,2,2800000004,3333,32236469]]
> {noformat}
>  
> Everything untill the first (from bottom) {{SortBasedAggregate}} step and 
> part of it is handled correctly. In particular, I see that each row correctly 
> updates the mutable aggregation buffer in the {{update()}} method of the 
> {{TypedAggregateExpression}} class. My initial idea was that the problem 
> appeared in the {{ConvertToUnsafe}} step directly after the first 
> {{SortBasedAggregate}}. If I take a look at the {{ConvertToUnsafe}} class, I 
> can see that the first {{SortBasedAggregate}} returns a map with 2 elements 
> (I call {{child.execute().collect()(0).getMap(1)}} in {{doExecute()}} of 
> {{ConvertToUnsafe}} to see this). At the same time, if I examine the output 
> of this {{ConvertToUnsafe}} in the identical way as its input, I see that the 
> result map does not contain any elements. As a consequence, Spark operates on 
> two empty maps in the {{merge()}} method of the {{TypedAggregateExpression}} 
> class. However, my assumption was only partially correct. I did a more 
> detailed investigation and its outcomes are described in comments.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org

Reply via email to