Hi, I have experienced a problem using the Datasets API in Spark 1.6, while almost identical code works fine in Spark 2.0. The problem is related to encoders and custom aggregators.
*Spark 1.6 (the aggregation produces an empty map):* implicit val intStringMapEncoder: Encoder[Map[Int, String]] = ExpressionEncoder() // implicit val intStringMapEncoder: Encoder[Map[Int, String]] = org.apache.spark.sql.Encoders.kryo[Map[Int, String]] val sparkConf = new SparkConf() .setAppName("IVU DS Spark 1.6 Test") .setMaster("local[4]") val sparkContext = new SparkContext(sparkConf) val sparkSqlContext = new SQLContext(sparkContext) import sparkSqlContext.implicits._ val stopPointDS = Seq(TestStopPoint("33", 1, "id#1"), TestStopPoint("33", 2, "id#2")).toDS() val stopPointSequenceMap = new Aggregator[TestStopPoint, Map[Int, String], Map[Int, String]] { override def zero = Map[Int, String]() override def reduce(map: Map[Int, String], stopPoint: TestStopPoint) = { map.updated(stopPoint.sequenceNumber, stopPoint.id) } override def merge(map: Map[Int, String], anotherMap: Map[Int, String]) = { map ++ anotherMap } override def finish(reduction: Map[Int, String]) = reduction }.toColumn val resultMap = stopPointDS .groupBy(_.line) .agg(stopPointSequenceMap) .collect() .toMap In spark.sql.execution.TypedAggregateExpression.scala, I see that the reduce is done correctly, but Spark cannot read the reduced values in the merge phase. If I replace the ExperessionEncoder with Kryo-based one (commented in the presented code), then it works fine. *Spark 2.0 (works correctly):* val spark = SparkSession .builder() .appName("IVU DS Spark 2.0 Test") .config("spark.sql.warehouse.dir", "file:///D://sparkSql") .master("local[4]") .getOrCreate() import spark.implicits._ val stopPointDS = Seq(TestStopPoint("33", 1, "id#1"), TestStopPoint("33", 2, "id#2")).toDS() val stopPointSequenceMap = new Aggregator[TestStopPoint, Map[Int, String], Map[Int, String]] { override def zero = Map[Int, String]() override def reduce(map: Map[Int, String], stopPoint: TestStopPoint) = { map.updated(stopPoint.sequenceNumber, stopPoint.id) } override def merge(map: Map[Int, String], anotherMap: Map[Int, String]) = { map ++ anotherMap } override def finish(reduction: Map[Int, String]) = reduction override def bufferEncoder: Encoder[Map[Int, String]] = ExpressionEncoder() override def outputEncoder: Encoder[Map[Int, String]] = ExpressionEncoder() }.toColumn val resultMap = stopPointDS .groupByKey(_.line) .agg(stopPointSequenceMap) .collect() .toMap I know that Spark 1.6 has only a preview of the Datasets concept and a lot changed in 2.0. However, I would like to know if I am doing anything wrong in my 1.6 code. Thanks in advance, Anton