Hi, Any issue in the below code.
case class Messages(timeStamp: Int, Id: String, value: String) val messages = Array( Messages(0, "d1", "t1"), Messages(0, "d1", "t1"), Messages(0, "d1", "t1"), Messages(0, "d1", "t1"), Messages(0, "d1", "t2"), Messages(0, "d1", "t2"), Messages(0, "d1", "t3"), Messages(0, "d1", "t4"), Messages(0, "d2", "t1"), Messages(0, "d2", "t1"), Messages(0, "d2", "t5"), Messages(0, "d2", "t6"), Messages(0, "d2", "t2"), Messages(0, "d2", "t2"), Messages(0, "d3", "t1"), Messages(0, "d3", "t1"), Messages(0, "d3", "t2") ) //Defining createCombiner, mergeValue and mergeCombiner functions def createCombiner = (id: String, value: String) => Set(value) def mergeValue0 = (accumulator1: Set[String], accumulator2: (String, String)) => accumulator1 ++ Set(accumulator2._2) def mergeCombiner: (Set[String], Set[String]) => Set[String] = (accumulator1: Set[String], accumulator2: Set[String]) => accumulator1 ++ accumulator2 sc.parallelize(messages).map(x => (x.timeStamp+"-"+x.id, (x.id, x.value))).combineByKey(createCombiner, mergeValue, mergeCombiner) *Compile Error:-* found : (String, String) => scala.collection.immutable.Set[String] required: ((String, String)) => ? sc.parallelize(messages).map(x => (x.timeStamp+"-"+x.id, (x.id, x.value))).combineByKey(createCombiner, mergeValue, mergeCombiner) Regards, Rajesh