Re: combineByKey
Take a look at this SOF: https://stackoverflow.com/questions/24804619/how-does-spark-aggregate-function-aggregatebykey-work On Fri, Apr 5, 2019 at 12:25 PM Madabhattula Rajesh Kumar < mrajaf...@gmail.com> wrote: > Hi, > > Thank you for the details. It is a typo error while composing the mail. > Below is the actual flow. > > Any idea, why the combineByKey is not working. aggregateByKey is working. > > //Defining createCombiner, mergeValue and mergeCombiner functions > > def createCombiner = (Id: String, value: String) => (value :: Nil).toSet > > def mergeValue = (accumulator1: Set[String], accumulator2: (String, > String)) => accumulator1 ++ Set(accumulator2._2) > > def mergeCombiner: (Set[String], Set[String]) => Set[String] = > (accumulator1: Set[String], accumulator2: Set[String]) => accumulator1 ++ > accumulator2 > > sc.parallelize(messages).map(x => (x.timeStamp+"-"+x.Id, (x.Id, > x.value))).combineByKey(createCombiner, mergeValue, mergeCombiner) > > *Compile Error:-* > found : (String, String) => scala.collection.immutable.Set[String] > required: ((String, String)) => ? > sc.parallelize(messages).map(x => (x.timeStamp+"-"+x.Id, (x.Id, > x.value))).combineByKey(createCombiner, mergeValue, mergeCombiner) > > *aggregateByKey =>* > > val result = sc.parallelize(messages).map(x => (x.timeStamp+"-"+x.Id, > (x.Id, x.value))).aggregateByKey(Set[String]())( > (aggr, value) => aggr ++ Set(value._2), > (aggr1, aggr2) => aggr1 ++ aggr2).collect().toMap > > print(result) > > Map(0-d1 -> Set(t1, t2, t3, t4), 0-d2 -> Set(t1, t5, t6, t2), 0-d3 -> > Set(t1, t2)) > > Regards, > Rajesh > > On Fri, Apr 5, 2019 at 9:58 PM Jason Nerothin > wrote: > >> I broke some of your code down into the following lines: >> >> import spark.implicits._ >> >> val a: RDD[Messages]= sc.parallelize(messages) >> val b: Dataset[Messages] = a.toDF.as[Messages] >> val c: Dataset[(String, (String, String))] = b.map{x => (x.timeStamp >> + "-" + x.Id, (x.Id, x.value))} >> >> You didn't capitalize .Id and your mergeValue0 and mergeCombiner don't >> have the types you think for the reduceByKey. >> >> I recommend breaking the code down like this to statement-by-statement >> when you get into a dance with the Scala type system. >> >> The type-safety that you're after (that eventually makes life *easier*) >> is best supported by Dataset (would have prevented the .id vs .Id error). >> Although there are some performance tradeoffs vs RDD and DataFrame... >> >> >> >> >> >> >> On Fri, Apr 5, 2019 at 2:11 AM Madabhattula Rajesh Kumar < >> mrajaf...@gmail.com> wrote: >> >>> Hi, >>> >>> Any issue in the below code. >>> >>> case class Messages(timeStamp: Int, Id: String, value: String) >>> >>> val messages = Array( >>> Messages(0, "d1", "t1"), >>> Messages(0, "d1", "t1"), >>> Messages(0, "d1", "t1"), >>> Messages(0, "d1", "t1"), >>> Messages(0, "d1", "t2"), >>> Messages(0, "d1", "t2"), >>> Messages(0, "d1", "t3"), >>> Messages(0, "d1", "t4"), >>> Messages(0, "d2", "t1"), >>> Messages(0, "d2", "t1"), >>> Messages(0, "d2", "t5"), >>> Messages(0, "d2", "t6"), >>> Messages(0, "d2", "t2"), >>> Messages(0, "d2", "t2"), >>> Messages(0, "d3", "t1"), >>> Messages(0, "d3", "t1"), >>> Messages(0, "d3", "t2") >>> ) >>> >>> //Defining createCombiner, mergeValue and mergeCombiner functions >>> def createCombiner = (id: String, value: String) => Set(value) >>> >>> def mergeValue0 = (accumulator1: Set[String], accumulator2: (String, >>> String)) => accumulator1 ++ Set(accumulator2._2) >>> >>> def mergeCombiner: (Set[String], Set[String]) => Set[String] = >>> (accumulator1: Set[String], accumulator2: Set[String]) => accumulator1 ++ >>> accumulator2 >>> >>> sc.parallelize(messages).map(x => (x.timeStamp+"-"+x.id, (x.id, >>> x.value))).combineByKey(createCombiner, mergeValue, mergeCombiner) >>> >>> *Compile Error:-* >>> found : (String, String) => scala.collection.immutable.Set[String] >>> required: ((String, String)) => ? >>> sc.parallelize(messages).map(x => (x.timeStamp+"-"+x.id, (x.id, >>> x.value))).combineByKey(createCombiner, mergeValue, mergeCombiner) >>> >>> Regards, >>> Rajesh >>> >>> >> >> -- >> Thanks, >> Jason >> > -- Thanks, Jason
Re: combineByKey
Hi, Thank you for the details. It is a typo error while composing the mail. Below is the actual flow. Any idea, why the combineByKey is not working. aggregateByKey is working. //Defining createCombiner, mergeValue and mergeCombiner functions def createCombiner = (Id: String, value: String) => (value :: Nil).toSet def mergeValue = (accumulator1: Set[String], accumulator2: (String, String)) => accumulator1 ++ Set(accumulator2._2) def mergeCombiner: (Set[String], Set[String]) => Set[String] = (accumulator1: Set[String], accumulator2: Set[String]) => accumulator1 ++ accumulator2 sc.parallelize(messages).map(x => (x.timeStamp+"-"+x.Id, (x.Id, x.value))).combineByKey(createCombiner, mergeValue, mergeCombiner) *Compile Error:-* found : (String, String) => scala.collection.immutable.Set[String] required: ((String, String)) => ? sc.parallelize(messages).map(x => (x.timeStamp+"-"+x.Id, (x.Id, x.value))).combineByKey(createCombiner, mergeValue, mergeCombiner) *aggregateByKey =>* val result = sc.parallelize(messages).map(x => (x.timeStamp+"-"+x.Id, (x.Id, x.value))).aggregateByKey(Set[String]())( (aggr, value) => aggr ++ Set(value._2), (aggr1, aggr2) => aggr1 ++ aggr2).collect().toMap print(result) Map(0-d1 -> Set(t1, t2, t3, t4), 0-d2 -> Set(t1, t5, t6, t2), 0-d3 -> Set(t1, t2)) Regards, Rajesh On Fri, Apr 5, 2019 at 9:58 PM Jason Nerothin wrote: > I broke some of your code down into the following lines: > > import spark.implicits._ > > val a: RDD[Messages]= sc.parallelize(messages) > val b: Dataset[Messages] = a.toDF.as[Messages] > val c: Dataset[(String, (String, String))] = b.map{x => (x.timeStamp + > "-" + x.Id, (x.Id, x.value))} > > You didn't capitalize .Id and your mergeValue0 and mergeCombiner don't > have the types you think for the reduceByKey. > > I recommend breaking the code down like this to statement-by-statement > when you get into a dance with the Scala type system. > > The type-safety that you're after (that eventually makes life *easier*) is > best supported by Dataset (would have prevented the .id vs .Id error). > Although there are some performance tradeoffs vs RDD and DataFrame... > > > > > > > On Fri, Apr 5, 2019 at 2:11 AM Madabhattula Rajesh Kumar < > mrajaf...@gmail.com> wrote: > >> Hi, >> >> Any issue in the below code. >> >> case class Messages(timeStamp: Int, Id: String, value: String) >> >> val messages = Array( >> Messages(0, "d1", "t1"), >> Messages(0, "d1", "t1"), >> Messages(0, "d1", "t1"), >> Messages(0, "d1", "t1"), >> Messages(0, "d1", "t2"), >> Messages(0, "d1", "t2"), >> Messages(0, "d1", "t3"), >> Messages(0, "d1", "t4"), >> Messages(0, "d2", "t1"), >> Messages(0, "d2", "t1"), >> Messages(0, "d2", "t5"), >> Messages(0, "d2", "t6"), >> Messages(0, "d2", "t2"), >> Messages(0, "d2", "t2"), >> Messages(0, "d3", "t1"), >> Messages(0, "d3", "t1"), >> Messages(0, "d3", "t2") >> ) >> >> //Defining createCombiner, mergeValue and mergeCombiner functions >> def createCombiner = (id: String, value: String) => Set(value) >> >> def mergeValue0 = (accumulator1: Set[String], accumulator2: (String, >> String)) => accumulator1 ++ Set(accumulator2._2) >> >> def mergeCombiner: (Set[String], Set[String]) => Set[String] = >> (accumulator1: Set[String], accumulator2: Set[String]) => accumulator1 ++ >> accumulator2 >> >> sc.parallelize(messages).map(x => (x.timeStamp+"-"+x.id, (x.id, >> x.value))).combineByKey(createCombiner, mergeValue, mergeCombiner) >> >> *Compile Error:-* >> found : (String, String) => scala.collection.immutable.Set[String] >> required: ((String, String)) => ? >> sc.parallelize(messages).map(x => (x.timeStamp+"-"+x.id, (x.id, >> x.value))).combineByKey(createCombiner, mergeValue, mergeCombiner) >> >> Regards, >> Rajesh >> >> > > -- > Thanks, > Jason >
Re: combineByKey
I broke some of your code down into the following lines: import spark.implicits._ val a: RDD[Messages]= sc.parallelize(messages) val b: Dataset[Messages] = a.toDF.as[Messages] val c: Dataset[(String, (String, String))] = b.map{x => (x.timeStamp + "-" + x.Id, (x.Id, x.value))} You didn't capitalize .Id and your mergeValue0 and mergeCombiner don't have the types you think for the reduceByKey. I recommend breaking the code down like this to statement-by-statement when you get into a dance with the Scala type system. The type-safety that you're after (that eventually makes life *easier*) is best supported by Dataset (would have prevented the .id vs .Id error). Although there are some performance tradeoffs vs RDD and DataFrame... On Fri, Apr 5, 2019 at 2:11 AM Madabhattula Rajesh Kumar < mrajaf...@gmail.com> wrote: > Hi, > > Any issue in the below code. > > case class Messages(timeStamp: Int, Id: String, value: String) > > val messages = Array( > Messages(0, "d1", "t1"), > Messages(0, "d1", "t1"), > Messages(0, "d1", "t1"), > Messages(0, "d1", "t1"), > Messages(0, "d1", "t2"), > Messages(0, "d1", "t2"), > Messages(0, "d1", "t3"), > Messages(0, "d1", "t4"), > Messages(0, "d2", "t1"), > Messages(0, "d2", "t1"), > Messages(0, "d2", "t5"), > Messages(0, "d2", "t6"), > Messages(0, "d2", "t2"), > Messages(0, "d2", "t2"), > Messages(0, "d3", "t1"), > Messages(0, "d3", "t1"), > Messages(0, "d3", "t2") > ) > > //Defining createCombiner, mergeValue and mergeCombiner functions > def createCombiner = (id: String, value: String) => Set(value) > > def mergeValue0 = (accumulator1: Set[String], accumulator2: (String, > String)) => accumulator1 ++ Set(accumulator2._2) > > def mergeCombiner: (Set[String], Set[String]) => Set[String] = > (accumulator1: Set[String], accumulator2: Set[String]) => accumulator1 ++ > accumulator2 > > sc.parallelize(messages).map(x => (x.timeStamp+"-"+x.id, (x.id, > x.value))).combineByKey(createCombiner, mergeValue, mergeCombiner) > > *Compile Error:-* > found : (String, String) => scala.collection.immutable.Set[String] > required: ((String, String)) => ? > sc.parallelize(messages).map(x => (x.timeStamp+"-"+x.id, (x.id, > x.value))).combineByKey(createCombiner, mergeValue, mergeCombiner) > > Regards, > Rajesh > > -- Thanks, Jason
Re: combineByKey throws ClassCastException
This problem was caused by the fact that I used a package jar with a Spark version (0.9.1) different from that of the cluster (0.9.0). When I used the correct package jar (spark-assembly_2.10-0.9.0-cdh5.0.1-hadoop2.3.0-cdh5.0.1.jar) instead the application can run as expected. 2014-09-15 14:57 GMT+08:00 x wasedax...@gmail.com: How about this. scala val rdd2 = rdd.combineByKey( | (v: Int) = v.toLong, | (c: Long, v: Int) = c + v, | (c1: Long, c2: Long) = c1 + c2) rdd2: org.apache.spark.rdd.RDD[(String, Long)] = MapPartitionsRDD[9] at combineB yKey at console:14 xj @ Tokyo On Mon, Sep 15, 2014 at 3:06 PM, Tao Xiao xiaotao.cs@gmail.com wrote: I followd an example presented in the tutorial Learning Spark http://www.safaribooksonline.com/library/view/learning-spark/9781449359034/ch04.html to compute the per-key average as follows: val Array(appName) = args val sparkConf = new SparkConf() .setAppName(appName) val sc = new SparkContext(sparkConf) /* * compute the per-key average of values * results should be: *A : 5.8 *B : 14 *C : 60.6 */ val rdd = sc.parallelize(List( (A, 3), (A, 9), (A, 12), (A, 0), (A, 5), (B, 4), (B, 10), (B, 11), (B, 20), (B, 25), (C, 32), (C, 91), (C, 122), (C, 3), (C, 55)), 2) val avg = rdd.combineByKey( (x:Int) = (x, 1), // java.lang.ClassCastException: scala.Tuple2$mcII$sp cannot be cast to java.lang.Integer (acc:(Int, Int), x) = (acc._1 + x, acc._2 + 1), (acc1:(Int, Int), acc2:(Int, Int)) = (acc1._1 + acc2._1, acc1._2 + acc2._2)) .map{case (s, t) = (s, t._1/t._2.toFloat)} avg.collect.foreach(t = println(t._1 + - + t._2)) When I submitted the application, an exception of *java.lang.ClassCastException: scala.Tuple2$mcII$sp cannot be cast to java.lang.Integer* was thrown out. The tutorial said that the first function of *combineByKey*, *(x:Int) = (x, 1)*, should take a single element in the source RDD and return an element of the desired type in the resulting RDD. In my application, we take a single element of type *Int *from the source RDD and return a tuple of type (*Int*, *Int*), which meets the requirements quite well. But why would such an exception be thrown? I'm using CDH 5.0 and Spark 0.9 Thanks.
Re: combineByKey throws ClassCastException
How about this. scala val rdd2 = rdd.combineByKey( | (v: Int) = v.toLong, | (c: Long, v: Int) = c + v, | (c1: Long, c2: Long) = c1 + c2) rdd2: org.apache.spark.rdd.RDD[(String, Long)] = MapPartitionsRDD[9] at combineB yKey at console:14 xj @ Tokyo On Mon, Sep 15, 2014 at 3:06 PM, Tao Xiao xiaotao.cs@gmail.com wrote: I followd an example presented in the tutorial Learning Spark http://www.safaribooksonline.com/library/view/learning-spark/9781449359034/ch04.html to compute the per-key average as follows: val Array(appName) = args val sparkConf = new SparkConf() .setAppName(appName) val sc = new SparkContext(sparkConf) /* * compute the per-key average of values * results should be: *A : 5.8 *B : 14 *C : 60.6 */ val rdd = sc.parallelize(List( (A, 3), (A, 9), (A, 12), (A, 0), (A, 5), (B, 4), (B, 10), (B, 11), (B, 20), (B, 25), (C, 32), (C, 91), (C, 122), (C, 3), (C, 55)), 2) val avg = rdd.combineByKey( (x:Int) = (x, 1), // java.lang.ClassCastException: scala.Tuple2$mcII$sp cannot be cast to java.lang.Integer (acc:(Int, Int), x) = (acc._1 + x, acc._2 + 1), (acc1:(Int, Int), acc2:(Int, Int)) = (acc1._1 + acc2._1, acc1._2 + acc2._2)) .map{case (s, t) = (s, t._1/t._2.toFloat)} avg.collect.foreach(t = println(t._1 + - + t._2)) When I submitted the application, an exception of *java.lang.ClassCastException: scala.Tuple2$mcII$sp cannot be cast to java.lang.Integer* was thrown out. The tutorial said that the first function of *combineByKey*, *(x:Int) = (x, 1)*, should take a single element in the source RDD and return an element of the desired type in the resulting RDD. In my application, we take a single element of type *Int *from the source RDD and return a tuple of type (*Int*, *Int*), which meets the requirements quite well. But why would such an exception be thrown? I'm using CDH 5.0 and Spark 0.9 Thanks.
Re: combineByKey at ShuffledDStream.scala
The streaming program contains the following main stages: 1. receive data from Kafka 2. preprocessing of the data. These are all map and filtering stages. 3. Group by a field 4. Process the groupBy results using map. Inside this processing, I use collect, count. Thanks! Bill On Tue, Jul 22, 2014 at 10:05 PM, Tathagata Das tathagata.das1...@gmail.com wrote: Can you give an idea of the streaming program? Rest of the transformation you are doing on the input streams? On Tue, Jul 22, 2014 at 11:05 AM, Bill Jay bill.jaypeter...@gmail.com wrote: Hi all, I am currently running a Spark Streaming program, which consumes data from Kakfa and does the group by operation on the data. I try to optimize the running time of the program because it looks slow to me. It seems the stage named: * combineByKey at ShuffledDStream.scala:42 * always takes the longest running time. And If I open this stage, I only see two executors on this stage. Does anyone has an idea what this stage does and how to increase the speed for this stage? Thanks! Bill
Re: combineByKey at ShuffledDStream.scala
Can you give an idea of the streaming program? Rest of the transformation you are doing on the input streams? On Tue, Jul 22, 2014 at 11:05 AM, Bill Jay bill.jaypeter...@gmail.com wrote: Hi all, I am currently running a Spark Streaming program, which consumes data from Kakfa and does the group by operation on the data. I try to optimize the running time of the program because it looks slow to me. It seems the stage named: * combineByKey at ShuffledDStream.scala:42 * always takes the longest running time. And If I open this stage, I only see two executors on this stage. Does anyone has an idea what this stage does and how to increase the speed for this stage? Thanks! Bill
Re: combinebykey throw classcastexception
You asked off-list, and provided a more detailed example there: val random = new Random() val testdata = (1 to 1).map(_=(random.nextInt(),random.nextInt())) sc.parallelize(testdata).combineByKey[ArrayBuffer[Int]]( (instant:Int)={new ArrayBuffer[Int]()}, (bucket:ArrayBuffer[Int],instant:Int)={bucket+=instant}, (bucket1:ArrayBuffer[Int],bucket2:ArrayBuffer[Int])={bucket1++=bucket2} ).collect() https://www.quora.com/Why-is-my-combinebykey-throw-classcastexception I can't reproduce this with Spark 0.9.0 / CDH5 or Spark 1.0.0 RC9. Your definition looks fine too. (Except that you are dropping the first value, but that's a different problem.) On Tue, May 20, 2014 at 2:05 AM, xiemeilong xiemeilong...@gmail.com wrote: I am using CDH5 on a three machines cluster. map data from hbase as (string, V) pair , then call combineByKey like this: .combineByKey[C]( (v:V)=new C(v), //this line throw java.lang.ClassCastException: C cannot be cast to V (v:C,v:V)=C, (c1:C,c2:C)=C) I am very confused of this, there isn't C to V casting at all. What's wrong? -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/combinebykey-throw-classcastexception-tp6059.html Sent from the Apache Spark User List mailing list archive at Nabble.com.
Re: combinebykey throw classcastexception
This issue is turned out cased by version mismatch between driver(0.9.1) and server(0.9.0-cdh5.0.1) just now. Other function works fine but combinebykey before. Thank you very much for your reply. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/combinebykey-throw-classcastexception-tp6060p6087.html Sent from the Apache Spark User List mailing list archive at Nabble.com.