Hi all,

I am trying to use my custom Aggregator on a GroupedDataset of case classes
to create a hash map using Spark SQL 1.6.2.
My Encoder[Map[Int, String]] is not capable to reconstruct the reduced
values if I define it via ExpressionEncoder().
However, everything works fine if I define it as Encoders.kryo[Map[Int,
String]].
I would like to know if I am doing anything wrong.

I have the following use case:

  implicit val intStringMapEncoder: Encoder[Map[Int, String]] =
ExpressionEncoder()

  val sparkContext = ...
  val sparkSqlContext = new SQLContext(sparkContext)

  import sparkSqlContext.implicits._

  case class StopPoint(line: String, sequenceNumber: Int, id: String)

  val stopPointDS = Seq(StopPoint("33", 1, "1"), StopPoint("33", 2,
"2")).toDS()

  val stopPointSequenceMap = new Aggregator[StopPoint, Map[Int, String],
Map[Int, String]] {
    override def zero = Map[Int, String]()
    override def reduce(map: Map[Int, String], stopPoint: StopPoint) = {
      map.updated(stopPoint.sequenceNumber, stopPoint.id)
    }
    override def merge(map: Map[Int, String], anotherMap: Map[Int, String])
= {
      map ++ anotherMap
    }
    override def finish(reduction: Map[Int, String]) = reduction
  }.toColumn

  val resultMap = stopPointDS
    .groupBy(_.line)
    .agg(stopPointSequenceMap)
    .collect()
    .toMap
In spark.sql.execution.TypedAggregateExpression.scala, I see that each
entry is inserted into the initial map correctly (i.e. reduce() method
works properly).
However, my encoder cannot reconstruct the map from the reduce phase in the
merge phase and I get an empty Map as a result of the merge method.
If I replace my expression-based encoder with
org.apache.spark.sql.Encoders.kryo[Map[Int, String]], I will get the
correct result.
(33, Map(1 -> 1, 2 -> 2))

Any ideas/suggestions are more than welcome.

Sincerely,
Anton Okolnychyi

Reply via email to