Hi,

I have experienced a problem using the Datasets API in Spark 1.6, while
almost identical code works fine in Spark 2.0.
The problem is related to encoders and custom aggregators.

*Spark 1.6 (the aggregation produces an empty map):*

  implicit val intStringMapEncoder: Encoder[Map[Int, String]] =
ExpressionEncoder()
  // implicit val intStringMapEncoder: Encoder[Map[Int, String]] =
org.apache.spark.sql.Encoders.kryo[Map[Int, String]]

  val sparkConf = new SparkConf()
    .setAppName("IVU DS Spark 1.6 Test")
    .setMaster("local[4]")
  val sparkContext = new SparkContext(sparkConf)
  val sparkSqlContext = new SQLContext(sparkContext)

  import sparkSqlContext.implicits._

  val stopPointDS = Seq(TestStopPoint("33", 1, "id#1"), TestStopPoint("33",
2, "id#2")).toDS()

  val stopPointSequenceMap = new Aggregator[TestStopPoint, Map[Int,
String], Map[Int, String]] {
    override def zero = Map[Int, String]()
    override def reduce(map: Map[Int, String], stopPoint: TestStopPoint) = {
      map.updated(stopPoint.sequenceNumber, stopPoint.id)
    }
    override def merge(map: Map[Int, String], anotherMap: Map[Int, String])
= {
      map ++ anotherMap
    }
    override def finish(reduction: Map[Int, String]) = reduction
  }.toColumn

  val resultMap = stopPointDS
    .groupBy(_.line)
    .agg(stopPointSequenceMap)
    .collect()
    .toMap

In spark.sql.execution.TypedAggregateExpression.scala, I see that the
reduce is done correctly, but Spark cannot read the reduced values in the
merge phase.
If I replace the ExperessionEncoder with Kryo-based one (commented in the
presented code), then it works fine.

*Spark 2.0 (works correctly):*

  val spark = SparkSession
    .builder()
    .appName("IVU DS Spark 2.0 Test")
    .config("spark.sql.warehouse.dir", "file:///D://sparkSql")
    .master("local[4]")
    .getOrCreate()

  import spark.implicits._

  val stopPointDS = Seq(TestStopPoint("33", 1, "id#1"), TestStopPoint("33",
2, "id#2")).toDS()

  val stopPointSequenceMap = new Aggregator[TestStopPoint, Map[Int,
String], Map[Int, String]] {
    override def zero = Map[Int, String]()
    override def reduce(map: Map[Int, String], stopPoint: TestStopPoint) = {
      map.updated(stopPoint.sequenceNumber, stopPoint.id)
    }
    override def merge(map: Map[Int, String], anotherMap: Map[Int, String])
= {
      map ++ anotherMap
    }
    override def finish(reduction: Map[Int, String]) = reduction
    override def bufferEncoder: Encoder[Map[Int, String]] =
ExpressionEncoder()
    override def outputEncoder: Encoder[Map[Int, String]] =
ExpressionEncoder()
  }.toColumn

  val resultMap = stopPointDS
    .groupByKey(_.line)
    .agg(stopPointSequenceMap)
    .collect()
    .toMap
I know that Spark 1.6 has only a preview of the Datasets concept and a lot
changed in 2.0. However, I would like to know if I am doing anything wrong
in my 1.6 code.

Thanks in advance,
Anton

Reply via email to