I broke some of your code down into the following lines:

    import spark.implicits._

    val a: RDD[Messages]= sc.parallelize(messages)
    val b: Dataset[Messages] = a.toDF.as[Messages]
    val c: Dataset[(String, (String, String))] = b.map{x => (x.timeStamp +
"-" + x.Id, (x.Id, x.value))}

You didn't capitalize .Id and your mergeValue0 and mergeCombiner don't have
the types you think for the reduceByKey.

I recommend breaking the code down like this to statement-by-statement when
you get into a dance with the Scala type system.

The type-safety that you're after (that eventually makes life *easier*) is
best supported by Dataset (would have prevented the .id vs .Id error).
Although there are some performance tradeoffs vs RDD and DataFrame...






On Fri, Apr 5, 2019 at 2:11 AM Madabhattula Rajesh Kumar <
mrajaf...@gmail.com> wrote:

> Hi,
>
> Any issue in the below code.
>
> case class Messages(timeStamp: Int, Id: String, value: String)
>
> val messages = Array(
>       Messages(0, "d1", "t1"),
>       Messages(0, "d1", "t1"),
>       Messages(0, "d1", "t1"),
>       Messages(0, "d1", "t1"),
>       Messages(0, "d1", "t2"),
>       Messages(0, "d1", "t2"),
>       Messages(0, "d1", "t3"),
>       Messages(0, "d1", "t4"),
>       Messages(0, "d2", "t1"),
>       Messages(0, "d2", "t1"),
>       Messages(0, "d2", "t5"),
>       Messages(0, "d2", "t6"),
>       Messages(0, "d2", "t2"),
>       Messages(0, "d2", "t2"),
>       Messages(0, "d3", "t1"),
>       Messages(0, "d3", "t1"),
>       Messages(0, "d3", "t2")
>     )
>
> //Defining createCombiner, mergeValue and mergeCombiner functions
> def createCombiner = (id: String, value: String) => Set(value)
>
> def mergeValue0 = (accumulator1: Set[String], accumulator2: (String,
> String)) => accumulator1 ++ Set(accumulator2._2)
>
> def mergeCombiner: (Set[String], Set[String]) => Set[String] =
> (accumulator1: Set[String], accumulator2: Set[String]) => accumulator1 ++
> accumulator2
>
> sc.parallelize(messages).map(x => (x.timeStamp+"-"+x.id, (x.id,
> x.value))).combineByKey(createCombiner, mergeValue, mergeCombiner)
>
> *Compile Error:-*
>  found   : (String, String) => scala.collection.immutable.Set[String]
>  required: ((String, String)) => ?
> sc.parallelize(messages).map(x => (x.timeStamp+"-"+x.id, (x.id,
> x.value))).combineByKey(createCombiner, mergeValue, mergeCombiner)
>
> Regards,
> Rajesh
>
>

-- 
Thanks,
Jason

Reply via email to