Hi,

Thank you for the details. It is a typo error while composing the mail.
Below is the actual flow.

Any idea, why the combineByKey is not working. aggregateByKey is working.

//Defining createCombiner, mergeValue and mergeCombiner functions

def createCombiner = (Id: String, value: String) => (value :: Nil).toSet

def mergeValue = (accumulator1: Set[String], accumulator2: (String,
String)) => accumulator1 ++ Set(accumulator2._2)

def mergeCombiner: (Set[String], Set[String]) => Set[String] =
(accumulator1: Set[String], accumulator2: Set[String]) => accumulator1 ++
accumulator2

sc.parallelize(messages).map(x => (x.timeStamp+"-"+x.Id, (x.Id,
x.value))).combineByKey(createCombiner, mergeValue, mergeCombiner)

*Compile Error:-*
 found   : (String, String) => scala.collection.immutable.Set[String]
 required: ((String, String)) => ?
     sc.parallelize(messages).map(x => (x.timeStamp+"-"+x.Id, (x.Id,
x.value))).combineByKey(createCombiner, mergeValue, mergeCombiner)

*aggregateByKey =>*

val result = sc.parallelize(messages).map(x => (x.timeStamp+"-"+x.Id,
(x.Id, x.value))).aggregateByKey(Set[String]())(
        (aggr, value) => aggr ++ Set(value._2),
        (aggr1, aggr2) => aggr1 ++ aggr2).collect().toMap

 print(result)

Map(0-d1 -> Set(t1, t2, t3, t4), 0-d2 -> Set(t1, t5, t6, t2), 0-d3 ->
Set(t1, t2))

Regards,
Rajesh

On Fri, Apr 5, 2019 at 9:58 PM Jason Nerothin <jasonnerot...@gmail.com>
wrote:

> I broke some of your code down into the following lines:
>
>     import spark.implicits._
>
>     val a: RDD[Messages]= sc.parallelize(messages)
>     val b: Dataset[Messages] = a.toDF.as[Messages]
>     val c: Dataset[(String, (String, String))] = b.map{x => (x.timeStamp +
> "-" + x.Id, (x.Id, x.value))}
>
> You didn't capitalize .Id and your mergeValue0 and mergeCombiner don't
> have the types you think for the reduceByKey.
>
> I recommend breaking the code down like this to statement-by-statement
> when you get into a dance with the Scala type system.
>
> The type-safety that you're after (that eventually makes life *easier*) is
> best supported by Dataset (would have prevented the .id vs .Id error).
> Although there are some performance tradeoffs vs RDD and DataFrame...
>
>
>
>
>
>
> On Fri, Apr 5, 2019 at 2:11 AM Madabhattula Rajesh Kumar <
> mrajaf...@gmail.com> wrote:
>
>> Hi,
>>
>> Any issue in the below code.
>>
>> case class Messages(timeStamp: Int, Id: String, value: String)
>>
>> val messages = Array(
>>       Messages(0, "d1", "t1"),
>>       Messages(0, "d1", "t1"),
>>       Messages(0, "d1", "t1"),
>>       Messages(0, "d1", "t1"),
>>       Messages(0, "d1", "t2"),
>>       Messages(0, "d1", "t2"),
>>       Messages(0, "d1", "t3"),
>>       Messages(0, "d1", "t4"),
>>       Messages(0, "d2", "t1"),
>>       Messages(0, "d2", "t1"),
>>       Messages(0, "d2", "t5"),
>>       Messages(0, "d2", "t6"),
>>       Messages(0, "d2", "t2"),
>>       Messages(0, "d2", "t2"),
>>       Messages(0, "d3", "t1"),
>>       Messages(0, "d3", "t1"),
>>       Messages(0, "d3", "t2")
>>     )
>>
>> //Defining createCombiner, mergeValue and mergeCombiner functions
>> def createCombiner = (id: String, value: String) => Set(value)
>>
>> def mergeValue0 = (accumulator1: Set[String], accumulator2: (String,
>> String)) => accumulator1 ++ Set(accumulator2._2)
>>
>> def mergeCombiner: (Set[String], Set[String]) => Set[String] =
>> (accumulator1: Set[String], accumulator2: Set[String]) => accumulator1 ++
>> accumulator2
>>
>> sc.parallelize(messages).map(x => (x.timeStamp+"-"+x.id, (x.id,
>> x.value))).combineByKey(createCombiner, mergeValue, mergeCombiner)
>>
>> *Compile Error:-*
>>  found   : (String, String) => scala.collection.immutable.Set[String]
>>  required: ((String, String)) => ?
>> sc.parallelize(messages).map(x => (x.timeStamp+"-"+x.id, (x.id,
>> x.value))).combineByKey(createCombiner, mergeValue, mergeCombiner)
>>
>> Regards,
>> Rajesh
>>
>>
>
> --
> Thanks,
> Jason
>

Reply via email to