Hi, Thank you for the details. It is a typo error while composing the mail. Below is the actual flow.
Any idea, why the combineByKey is not working. aggregateByKey is working. //Defining createCombiner, mergeValue and mergeCombiner functions def createCombiner = (Id: String, value: String) => (value :: Nil).toSet def mergeValue = (accumulator1: Set[String], accumulator2: (String, String)) => accumulator1 ++ Set(accumulator2._2) def mergeCombiner: (Set[String], Set[String]) => Set[String] = (accumulator1: Set[String], accumulator2: Set[String]) => accumulator1 ++ accumulator2 sc.parallelize(messages).map(x => (x.timeStamp+"-"+x.Id, (x.Id, x.value))).combineByKey(createCombiner, mergeValue, mergeCombiner) *Compile Error:-* found : (String, String) => scala.collection.immutable.Set[String] required: ((String, String)) => ? sc.parallelize(messages).map(x => (x.timeStamp+"-"+x.Id, (x.Id, x.value))).combineByKey(createCombiner, mergeValue, mergeCombiner) *aggregateByKey =>* val result = sc.parallelize(messages).map(x => (x.timeStamp+"-"+x.Id, (x.Id, x.value))).aggregateByKey(Set[String]())( (aggr, value) => aggr ++ Set(value._2), (aggr1, aggr2) => aggr1 ++ aggr2).collect().toMap print(result) Map(0-d1 -> Set(t1, t2, t3, t4), 0-d2 -> Set(t1, t5, t6, t2), 0-d3 -> Set(t1, t2)) Regards, Rajesh On Fri, Apr 5, 2019 at 9:58 PM Jason Nerothin <jasonnerot...@gmail.com> wrote: > I broke some of your code down into the following lines: > > import spark.implicits._ > > val a: RDD[Messages]= sc.parallelize(messages) > val b: Dataset[Messages] = a.toDF.as[Messages] > val c: Dataset[(String, (String, String))] = b.map{x => (x.timeStamp + > "-" + x.Id, (x.Id, x.value))} > > You didn't capitalize .Id and your mergeValue0 and mergeCombiner don't > have the types you think for the reduceByKey. > > I recommend breaking the code down like this to statement-by-statement > when you get into a dance with the Scala type system. > > The type-safety that you're after (that eventually makes life *easier*) is > best supported by Dataset (would have prevented the .id vs .Id error). > Although there are some performance tradeoffs vs RDD and DataFrame... > > > > > > > On Fri, Apr 5, 2019 at 2:11 AM Madabhattula Rajesh Kumar < > mrajaf...@gmail.com> wrote: > >> Hi, >> >> Any issue in the below code. >> >> case class Messages(timeStamp: Int, Id: String, value: String) >> >> val messages = Array( >> Messages(0, "d1", "t1"), >> Messages(0, "d1", "t1"), >> Messages(0, "d1", "t1"), >> Messages(0, "d1", "t1"), >> Messages(0, "d1", "t2"), >> Messages(0, "d1", "t2"), >> Messages(0, "d1", "t3"), >> Messages(0, "d1", "t4"), >> Messages(0, "d2", "t1"), >> Messages(0, "d2", "t1"), >> Messages(0, "d2", "t5"), >> Messages(0, "d2", "t6"), >> Messages(0, "d2", "t2"), >> Messages(0, "d2", "t2"), >> Messages(0, "d3", "t1"), >> Messages(0, "d3", "t1"), >> Messages(0, "d3", "t2") >> ) >> >> //Defining createCombiner, mergeValue and mergeCombiner functions >> def createCombiner = (id: String, value: String) => Set(value) >> >> def mergeValue0 = (accumulator1: Set[String], accumulator2: (String, >> String)) => accumulator1 ++ Set(accumulator2._2) >> >> def mergeCombiner: (Set[String], Set[String]) => Set[String] = >> (accumulator1: Set[String], accumulator2: Set[String]) => accumulator1 ++ >> accumulator2 >> >> sc.parallelize(messages).map(x => (x.timeStamp+"-"+x.id, (x.id, >> x.value))).combineByKey(createCombiner, mergeValue, mergeCombiner) >> >> *Compile Error:-* >> found : (String, String) => scala.collection.immutable.Set[String] >> required: ((String, String)) => ? >> sc.parallelize(messages).map(x => (x.timeStamp+"-"+x.id, (x.id, >> x.value))).combineByKey(createCombiner, mergeValue, mergeCombiner) >> >> Regards, >> Rajesh >> >> > > -- > Thanks, > Jason >