I broke some of your code down into the following lines: import spark.implicits._
val a: RDD[Messages]= sc.parallelize(messages) val b: Dataset[Messages] = a.toDF.as[Messages] val c: Dataset[(String, (String, String))] = b.map{x => (x.timeStamp + "-" + x.Id, (x.Id, x.value))} You didn't capitalize .Id and your mergeValue0 and mergeCombiner don't have the types you think for the reduceByKey. I recommend breaking the code down like this to statement-by-statement when you get into a dance with the Scala type system. The type-safety that you're after (that eventually makes life *easier*) is best supported by Dataset (would have prevented the .id vs .Id error). Although there are some performance tradeoffs vs RDD and DataFrame... On Fri, Apr 5, 2019 at 2:11 AM Madabhattula Rajesh Kumar < mrajaf...@gmail.com> wrote: > Hi, > > Any issue in the below code. > > case class Messages(timeStamp: Int, Id: String, value: String) > > val messages = Array( > Messages(0, "d1", "t1"), > Messages(0, "d1", "t1"), > Messages(0, "d1", "t1"), > Messages(0, "d1", "t1"), > Messages(0, "d1", "t2"), > Messages(0, "d1", "t2"), > Messages(0, "d1", "t3"), > Messages(0, "d1", "t4"), > Messages(0, "d2", "t1"), > Messages(0, "d2", "t1"), > Messages(0, "d2", "t5"), > Messages(0, "d2", "t6"), > Messages(0, "d2", "t2"), > Messages(0, "d2", "t2"), > Messages(0, "d3", "t1"), > Messages(0, "d3", "t1"), > Messages(0, "d3", "t2") > ) > > //Defining createCombiner, mergeValue and mergeCombiner functions > def createCombiner = (id: String, value: String) => Set(value) > > def mergeValue0 = (accumulator1: Set[String], accumulator2: (String, > String)) => accumulator1 ++ Set(accumulator2._2) > > def mergeCombiner: (Set[String], Set[String]) => Set[String] = > (accumulator1: Set[String], accumulator2: Set[String]) => accumulator1 ++ > accumulator2 > > sc.parallelize(messages).map(x => (x.timeStamp+"-"+x.id, (x.id, > x.value))).combineByKey(createCombiner, mergeValue, mergeCombiner) > > *Compile Error:-* > found : (String, String) => scala.collection.immutable.Set[String] > required: ((String, String)) => ? > sc.parallelize(messages).map(x => (x.timeStamp+"-"+x.id, (x.id, > x.value))).combineByKey(createCombiner, mergeValue, mergeCombiner) > > Regards, > Rajesh > > -- Thanks, Jason