Hi,
I have experienced a problem using the Datasets API in Spark 1.6, while
almost identical code works fine in Spark 2.0.
The problem is related to encoders and custom aggregators.
*Spark 1.6 (the aggregation produces an empty map):*
implicit val intStringMapEncoder: Encoder[Map[Int, String]] =
ExpressionEncoder()
// implicit val intStringMapEncoder: Encoder[Map[Int, String]] =
org.apache.spark.sql.Encoders.kryo[Map[Int, String]]
val sparkConf = new SparkConf()
.setAppName("IVU DS Spark 1.6 Test")
.setMaster("local[4]")
val sparkContext = new SparkContext(sparkConf)
val sparkSqlContext = new SQLContext(sparkContext)
import sparkSqlContext.implicits._
val stopPointDS = Seq(TestStopPoint("33", 1, "id#1"), TestStopPoint("33",
2, "id#2")).toDS()
val stopPointSequenceMap = new Aggregator[TestStopPoint, Map[Int,
String], Map[Int, String]] {
override def zero = Map[Int, String]()
override def reduce(map: Map[Int, String], stopPoint: TestStopPoint) = {
map.updated(stopPoint.sequenceNumber, stopPoint.id)
}
override def merge(map: Map[Int, String], anotherMap: Map[Int, String])
= {
map ++ anotherMap
}
override def finish(reduction: Map[Int, String]) = reduction
}.toColumn
val resultMap = stopPointDS
.groupBy(_.line)
.agg(stopPointSequenceMap)
.collect()
.toMap
In spark.sql.execution.TypedAggregateExpression.scala, I see that the
reduce is done correctly, but Spark cannot read the reduced values in the
merge phase.
If I replace the ExperessionEncoder with Kryo-based one (commented in the
presented code), then it works fine.
*Spark 2.0 (works correctly):*
val spark = SparkSession
.builder()
.appName("IVU DS Spark 2.0 Test")
.config("spark.sql.warehouse.dir", "file:///D://sparkSql")
.master("local[4]")
.getOrCreate()
import spark.implicits._
val stopPointDS = Seq(TestStopPoint("33", 1, "id#1"), TestStopPoint("33",
2, "id#2")).toDS()
val stopPointSequenceMap = new Aggregator[TestStopPoint, Map[Int,
String], Map[Int, String]] {
override def zero = Map[Int, String]()
override def reduce(map: Map[Int, String], stopPoint: TestStopPoint) = {
map.updated(stopPoint.sequenceNumber, stopPoint.id)
}
override def merge(map: Map[Int, String], anotherMap: Map[Int, String])
= {
map ++ anotherMap
}
override def finish(reduction: Map[Int, String]) = reduction
override def bufferEncoder: Encoder[Map[Int, String]] =
ExpressionEncoder()
override def outputEncoder: Encoder[Map[Int, String]] =
ExpressionEncoder()
}.toColumn
val resultMap = stopPointDS
.groupByKey(_.line)
.agg(stopPointSequenceMap)
.collect()
.toMap
I know that Spark 1.6 has only a preview of the Datasets concept and a lot
changed in 2.0. However, I would like to know if I am doing anything wrong
in my 1.6 code.
Thanks in advance,
Anton