Take a look at this SOF: https://stackoverflow.com/questions/24804619/how-does-spark-aggregate-function-aggregatebykey-work
On Fri, Apr 5, 2019 at 12:25 PM Madabhattula Rajesh Kumar < mrajaf...@gmail.com> wrote: > Hi, > > Thank you for the details. It is a typo error while composing the mail. > Below is the actual flow. > > Any idea, why the combineByKey is not working. aggregateByKey is working. > > //Defining createCombiner, mergeValue and mergeCombiner functions > > def createCombiner = (Id: String, value: String) => (value :: Nil).toSet > > def mergeValue = (accumulator1: Set[String], accumulator2: (String, > String)) => accumulator1 ++ Set(accumulator2._2) > > def mergeCombiner: (Set[String], Set[String]) => Set[String] = > (accumulator1: Set[String], accumulator2: Set[String]) => accumulator1 ++ > accumulator2 > > sc.parallelize(messages).map(x => (x.timeStamp+"-"+x.Id, (x.Id, > x.value))).combineByKey(createCombiner, mergeValue, mergeCombiner) > > *Compile Error:-* > found : (String, String) => scala.collection.immutable.Set[String] > required: ((String, String)) => ? > sc.parallelize(messages).map(x => (x.timeStamp+"-"+x.Id, (x.Id, > x.value))).combineByKey(createCombiner, mergeValue, mergeCombiner) > > *aggregateByKey =>* > > val result = sc.parallelize(messages).map(x => (x.timeStamp+"-"+x.Id, > (x.Id, x.value))).aggregateByKey(Set[String]())( > (aggr, value) => aggr ++ Set(value._2), > (aggr1, aggr2) => aggr1 ++ aggr2).collect().toMap > > print(result) > > Map(0-d1 -> Set(t1, t2, t3, t4), 0-d2 -> Set(t1, t5, t6, t2), 0-d3 -> > Set(t1, t2)) > > Regards, > Rajesh > > On Fri, Apr 5, 2019 at 9:58 PM Jason Nerothin <jasonnerot...@gmail.com> > wrote: > >> I broke some of your code down into the following lines: >> >> import spark.implicits._ >> >> val a: RDD[Messages]= sc.parallelize(messages) >> val b: Dataset[Messages] = a.toDF.as[Messages] >> val c: Dataset[(String, (String, String))] = b.map{x => (x.timeStamp >> + "-" + x.Id, (x.Id, x.value))} >> >> You didn't capitalize .Id and your mergeValue0 and mergeCombiner don't >> have the types you think for the reduceByKey. >> >> I recommend breaking the code down like this to statement-by-statement >> when you get into a dance with the Scala type system. >> >> The type-safety that you're after (that eventually makes life *easier*) >> is best supported by Dataset (would have prevented the .id vs .Id error). >> Although there are some performance tradeoffs vs RDD and DataFrame... >> >> >> >> >> >> >> On Fri, Apr 5, 2019 at 2:11 AM Madabhattula Rajesh Kumar < >> mrajaf...@gmail.com> wrote: >> >>> Hi, >>> >>> Any issue in the below code. >>> >>> case class Messages(timeStamp: Int, Id: String, value: String) >>> >>> val messages = Array( >>> Messages(0, "d1", "t1"), >>> Messages(0, "d1", "t1"), >>> Messages(0, "d1", "t1"), >>> Messages(0, "d1", "t1"), >>> Messages(0, "d1", "t2"), >>> Messages(0, "d1", "t2"), >>> Messages(0, "d1", "t3"), >>> Messages(0, "d1", "t4"), >>> Messages(0, "d2", "t1"), >>> Messages(0, "d2", "t1"), >>> Messages(0, "d2", "t5"), >>> Messages(0, "d2", "t6"), >>> Messages(0, "d2", "t2"), >>> Messages(0, "d2", "t2"), >>> Messages(0, "d3", "t1"), >>> Messages(0, "d3", "t1"), >>> Messages(0, "d3", "t2") >>> ) >>> >>> //Defining createCombiner, mergeValue and mergeCombiner functions >>> def createCombiner = (id: String, value: String) => Set(value) >>> >>> def mergeValue0 = (accumulator1: Set[String], accumulator2: (String, >>> String)) => accumulator1 ++ Set(accumulator2._2) >>> >>> def mergeCombiner: (Set[String], Set[String]) => Set[String] = >>> (accumulator1: Set[String], accumulator2: Set[String]) => accumulator1 ++ >>> accumulator2 >>> >>> sc.parallelize(messages).map(x => (x.timeStamp+"-"+x.id, (x.id, >>> x.value))).combineByKey(createCombiner, mergeValue, mergeCombiner) >>> >>> *Compile Error:-* >>> found : (String, String) => scala.collection.immutable.Set[String] >>> required: ((String, String)) => ? >>> sc.parallelize(messages).map(x => (x.timeStamp+"-"+x.id, (x.id, >>> x.value))).combineByKey(createCombiner, mergeValue, mergeCombiner) >>> >>> Regards, >>> Rajesh >>> >>> >> >> -- >> Thanks, >> Jason >> > -- Thanks, Jason