Re: combineByKey

2019-04-05 Thread Jason Nerothin
Take a look at this SOF:

https://stackoverflow.com/questions/24804619/how-does-spark-aggregate-function-aggregatebykey-work

On Fri, Apr 5, 2019 at 12:25 PM Madabhattula Rajesh Kumar <
mrajaf...@gmail.com> wrote:

> Hi,
>
> Thank you for the details. It is a typo error while composing the mail.
> Below is the actual flow.
>
> Any idea, why the combineByKey is not working. aggregateByKey is working.
>
> //Defining createCombiner, mergeValue and mergeCombiner functions
>
> def createCombiner = (Id: String, value: String) => (value :: Nil).toSet
>
> def mergeValue = (accumulator1: Set[String], accumulator2: (String,
> String)) => accumulator1 ++ Set(accumulator2._2)
>
> def mergeCombiner: (Set[String], Set[String]) => Set[String] =
> (accumulator1: Set[String], accumulator2: Set[String]) => accumulator1 ++
> accumulator2
>
> sc.parallelize(messages).map(x => (x.timeStamp+"-"+x.Id, (x.Id,
> x.value))).combineByKey(createCombiner, mergeValue, mergeCombiner)
>
> *Compile Error:-*
>  found   : (String, String) => scala.collection.immutable.Set[String]
>  required: ((String, String)) => ?
>  sc.parallelize(messages).map(x => (x.timeStamp+"-"+x.Id, (x.Id,
> x.value))).combineByKey(createCombiner, mergeValue, mergeCombiner)
>
> *aggregateByKey =>*
>
> val result = sc.parallelize(messages).map(x => (x.timeStamp+"-"+x.Id,
> (x.Id, x.value))).aggregateByKey(Set[String]())(
> (aggr, value) => aggr ++ Set(value._2),
> (aggr1, aggr2) => aggr1 ++ aggr2).collect().toMap
>
>  print(result)
>
> Map(0-d1 -> Set(t1, t2, t3, t4), 0-d2 -> Set(t1, t5, t6, t2), 0-d3 ->
> Set(t1, t2))
>
> Regards,
> Rajesh
>
> On Fri, Apr 5, 2019 at 9:58 PM Jason Nerothin 
> wrote:
>
>> I broke some of your code down into the following lines:
>>
>> import spark.implicits._
>>
>> val a: RDD[Messages]= sc.parallelize(messages)
>> val b: Dataset[Messages] = a.toDF.as[Messages]
>> val c: Dataset[(String, (String, String))] = b.map{x => (x.timeStamp
>> + "-" + x.Id, (x.Id, x.value))}
>>
>> You didn't capitalize .Id and your mergeValue0 and mergeCombiner don't
>> have the types you think for the reduceByKey.
>>
>> I recommend breaking the code down like this to statement-by-statement
>> when you get into a dance with the Scala type system.
>>
>> The type-safety that you're after (that eventually makes life *easier*)
>> is best supported by Dataset (would have prevented the .id vs .Id error).
>> Although there are some performance tradeoffs vs RDD and DataFrame...
>>
>>
>>
>>
>>
>>
>> On Fri, Apr 5, 2019 at 2:11 AM Madabhattula Rajesh Kumar <
>> mrajaf...@gmail.com> wrote:
>>
>>> Hi,
>>>
>>> Any issue in the below code.
>>>
>>> case class Messages(timeStamp: Int, Id: String, value: String)
>>>
>>> val messages = Array(
>>>   Messages(0, "d1", "t1"),
>>>   Messages(0, "d1", "t1"),
>>>   Messages(0, "d1", "t1"),
>>>   Messages(0, "d1", "t1"),
>>>   Messages(0, "d1", "t2"),
>>>   Messages(0, "d1", "t2"),
>>>   Messages(0, "d1", "t3"),
>>>   Messages(0, "d1", "t4"),
>>>   Messages(0, "d2", "t1"),
>>>   Messages(0, "d2", "t1"),
>>>   Messages(0, "d2", "t5"),
>>>   Messages(0, "d2", "t6"),
>>>   Messages(0, "d2", "t2"),
>>>   Messages(0, "d2", "t2"),
>>>   Messages(0, "d3", "t1"),
>>>   Messages(0, "d3", "t1"),
>>>   Messages(0, "d3", "t2")
>>> )
>>>
>>> //Defining createCombiner, mergeValue and mergeCombiner functions
>>> def createCombiner = (id: String, value: String) => Set(value)
>>>
>>> def mergeValue0 = (accumulator1: Set[String], accumulator2: (String,
>>> String)) => accumulator1 ++ Set(accumulator2._2)
>>>
>>> def mergeCombiner: (Set[String], Set[String]) => Set[String] =
>>> (accumulator1: Set[String], accumulator2: Set[String]) => accumulator1 ++
>>> accumulator2
>>>
>>> sc.parallelize(messages).map(x => (x.timeStamp+"-"+x.id, (x.id,
>>> x.value))).combineByKey(createCombiner, mergeValue, mergeCombiner)
>>>
>>> *Compile Error:-*
>>>  found   : (String, String) => scala.collection.immutable.Set[String]
>>>  required: ((String, String)) => ?
>>> sc.parallelize(messages).map(x => (x.timeStamp+"-"+x.id, (x.id,
>>> x.value))).combineByKey(createCombiner, mergeValue, mergeCombiner)
>>>
>>> Regards,
>>> Rajesh
>>>
>>>
>>
>> --
>> Thanks,
>> Jason
>>
>

-- 
Thanks,
Jason


Re: combineByKey

2019-04-05 Thread Madabhattula Rajesh Kumar
Hi,

Thank you for the details. It is a typo error while composing the mail.
Below is the actual flow.

Any idea, why the combineByKey is not working. aggregateByKey is working.

//Defining createCombiner, mergeValue and mergeCombiner functions

def createCombiner = (Id: String, value: String) => (value :: Nil).toSet

def mergeValue = (accumulator1: Set[String], accumulator2: (String,
String)) => accumulator1 ++ Set(accumulator2._2)

def mergeCombiner: (Set[String], Set[String]) => Set[String] =
(accumulator1: Set[String], accumulator2: Set[String]) => accumulator1 ++
accumulator2

sc.parallelize(messages).map(x => (x.timeStamp+"-"+x.Id, (x.Id,
x.value))).combineByKey(createCombiner, mergeValue, mergeCombiner)

*Compile Error:-*
 found   : (String, String) => scala.collection.immutable.Set[String]
 required: ((String, String)) => ?
 sc.parallelize(messages).map(x => (x.timeStamp+"-"+x.Id, (x.Id,
x.value))).combineByKey(createCombiner, mergeValue, mergeCombiner)

*aggregateByKey =>*

val result = sc.parallelize(messages).map(x => (x.timeStamp+"-"+x.Id,
(x.Id, x.value))).aggregateByKey(Set[String]())(
(aggr, value) => aggr ++ Set(value._2),
(aggr1, aggr2) => aggr1 ++ aggr2).collect().toMap

 print(result)

Map(0-d1 -> Set(t1, t2, t3, t4), 0-d2 -> Set(t1, t5, t6, t2), 0-d3 ->
Set(t1, t2))

Regards,
Rajesh

On Fri, Apr 5, 2019 at 9:58 PM Jason Nerothin 
wrote:

> I broke some of your code down into the following lines:
>
> import spark.implicits._
>
> val a: RDD[Messages]= sc.parallelize(messages)
> val b: Dataset[Messages] = a.toDF.as[Messages]
> val c: Dataset[(String, (String, String))] = b.map{x => (x.timeStamp +
> "-" + x.Id, (x.Id, x.value))}
>
> You didn't capitalize .Id and your mergeValue0 and mergeCombiner don't
> have the types you think for the reduceByKey.
>
> I recommend breaking the code down like this to statement-by-statement
> when you get into a dance with the Scala type system.
>
> The type-safety that you're after (that eventually makes life *easier*) is
> best supported by Dataset (would have prevented the .id vs .Id error).
> Although there are some performance tradeoffs vs RDD and DataFrame...
>
>
>
>
>
>
> On Fri, Apr 5, 2019 at 2:11 AM Madabhattula Rajesh Kumar <
> mrajaf...@gmail.com> wrote:
>
>> Hi,
>>
>> Any issue in the below code.
>>
>> case class Messages(timeStamp: Int, Id: String, value: String)
>>
>> val messages = Array(
>>   Messages(0, "d1", "t1"),
>>   Messages(0, "d1", "t1"),
>>   Messages(0, "d1", "t1"),
>>   Messages(0, "d1", "t1"),
>>   Messages(0, "d1", "t2"),
>>   Messages(0, "d1", "t2"),
>>   Messages(0, "d1", "t3"),
>>   Messages(0, "d1", "t4"),
>>   Messages(0, "d2", "t1"),
>>   Messages(0, "d2", "t1"),
>>   Messages(0, "d2", "t5"),
>>   Messages(0, "d2", "t6"),
>>   Messages(0, "d2", "t2"),
>>   Messages(0, "d2", "t2"),
>>   Messages(0, "d3", "t1"),
>>   Messages(0, "d3", "t1"),
>>   Messages(0, "d3", "t2")
>> )
>>
>> //Defining createCombiner, mergeValue and mergeCombiner functions
>> def createCombiner = (id: String, value: String) => Set(value)
>>
>> def mergeValue0 = (accumulator1: Set[String], accumulator2: (String,
>> String)) => accumulator1 ++ Set(accumulator2._2)
>>
>> def mergeCombiner: (Set[String], Set[String]) => Set[String] =
>> (accumulator1: Set[String], accumulator2: Set[String]) => accumulator1 ++
>> accumulator2
>>
>> sc.parallelize(messages).map(x => (x.timeStamp+"-"+x.id, (x.id,
>> x.value))).combineByKey(createCombiner, mergeValue, mergeCombiner)
>>
>> *Compile Error:-*
>>  found   : (String, String) => scala.collection.immutable.Set[String]
>>  required: ((String, String)) => ?
>> sc.parallelize(messages).map(x => (x.timeStamp+"-"+x.id, (x.id,
>> x.value))).combineByKey(createCombiner, mergeValue, mergeCombiner)
>>
>> Regards,
>> Rajesh
>>
>>
>
> --
> Thanks,
> Jason
>


Re: combineByKey

2019-04-05 Thread Jason Nerothin
I broke some of your code down into the following lines:

import spark.implicits._

val a: RDD[Messages]= sc.parallelize(messages)
val b: Dataset[Messages] = a.toDF.as[Messages]
val c: Dataset[(String, (String, String))] = b.map{x => (x.timeStamp +
"-" + x.Id, (x.Id, x.value))}

You didn't capitalize .Id and your mergeValue0 and mergeCombiner don't have
the types you think for the reduceByKey.

I recommend breaking the code down like this to statement-by-statement when
you get into a dance with the Scala type system.

The type-safety that you're after (that eventually makes life *easier*) is
best supported by Dataset (would have prevented the .id vs .Id error).
Although there are some performance tradeoffs vs RDD and DataFrame...






On Fri, Apr 5, 2019 at 2:11 AM Madabhattula Rajesh Kumar <
mrajaf...@gmail.com> wrote:

> Hi,
>
> Any issue in the below code.
>
> case class Messages(timeStamp: Int, Id: String, value: String)
>
> val messages = Array(
>   Messages(0, "d1", "t1"),
>   Messages(0, "d1", "t1"),
>   Messages(0, "d1", "t1"),
>   Messages(0, "d1", "t1"),
>   Messages(0, "d1", "t2"),
>   Messages(0, "d1", "t2"),
>   Messages(0, "d1", "t3"),
>   Messages(0, "d1", "t4"),
>   Messages(0, "d2", "t1"),
>   Messages(0, "d2", "t1"),
>   Messages(0, "d2", "t5"),
>   Messages(0, "d2", "t6"),
>   Messages(0, "d2", "t2"),
>   Messages(0, "d2", "t2"),
>   Messages(0, "d3", "t1"),
>   Messages(0, "d3", "t1"),
>   Messages(0, "d3", "t2")
> )
>
> //Defining createCombiner, mergeValue and mergeCombiner functions
> def createCombiner = (id: String, value: String) => Set(value)
>
> def mergeValue0 = (accumulator1: Set[String], accumulator2: (String,
> String)) => accumulator1 ++ Set(accumulator2._2)
>
> def mergeCombiner: (Set[String], Set[String]) => Set[String] =
> (accumulator1: Set[String], accumulator2: Set[String]) => accumulator1 ++
> accumulator2
>
> sc.parallelize(messages).map(x => (x.timeStamp+"-"+x.id, (x.id,
> x.value))).combineByKey(createCombiner, mergeValue, mergeCombiner)
>
> *Compile Error:-*
>  found   : (String, String) => scala.collection.immutable.Set[String]
>  required: ((String, String)) => ?
> sc.parallelize(messages).map(x => (x.timeStamp+"-"+x.id, (x.id,
> x.value))).combineByKey(createCombiner, mergeValue, mergeCombiner)
>
> Regards,
> Rajesh
>
>

-- 
Thanks,
Jason


Re: combineByKey throws ClassCastException

2014-09-16 Thread Tao Xiao
This problem was caused by the fact that I used a package jar with a Spark
version (0.9.1) different from that of the cluster (0.9.0). When I used the
correct package jar
(spark-assembly_2.10-0.9.0-cdh5.0.1-hadoop2.3.0-cdh5.0.1.jar) instead the
application can run as expected.



2014-09-15 14:57 GMT+08:00 x wasedax...@gmail.com:

 How about this.

 scala val rdd2 = rdd.combineByKey(
  | (v: Int) = v.toLong,
  | (c: Long, v: Int) = c + v,
  | (c1: Long, c2: Long) = c1 + c2)
 rdd2: org.apache.spark.rdd.RDD[(String, Long)] = MapPartitionsRDD[9] at
 combineB
 yKey at console:14

 xj @ Tokyo

 On Mon, Sep 15, 2014 at 3:06 PM, Tao Xiao xiaotao.cs@gmail.com
 wrote:

 I followd an example presented in the tutorial Learning Spark
 http://www.safaribooksonline.com/library/view/learning-spark/9781449359034/ch04.html
 to compute the per-key average as follows:


 val Array(appName) = args
 val sparkConf = new SparkConf()
 .setAppName(appName)
 val sc = new SparkContext(sparkConf)
 /*
  * compute the per-key average of values
  * results should be:
  *A : 5.8
  *B : 14
  *C : 60.6
  */
 val rdd = sc.parallelize(List(
 (A, 3), (A, 9), (A, 12), (A, 0), (A, 5),
 (B, 4), (B, 10), (B, 11), (B, 20), (B, 25),
 (C, 32), (C, 91), (C, 122), (C, 3), (C, 55)), 2)
 val avg = rdd.combineByKey(
 (x:Int) = (x, 1),  // java.lang.ClassCastException: scala.Tuple2$mcII$sp
 cannot be cast to java.lang.Integer
 (acc:(Int, Int), x) = (acc._1 + x, acc._2 + 1),
 (acc1:(Int, Int), acc2:(Int, Int)) = (acc1._1 + acc2._1, acc1._2 +
 acc2._2))
 .map{case (s, t) = (s, t._1/t._2.toFloat)}
  avg.collect.foreach(t = println(t._1 +  - + t._2))



 When I submitted the application, an exception of 
 *java.lang.ClassCastException:
 scala.Tuple2$mcII$sp cannot be cast to java.lang.Integer* was thrown
 out. The tutorial said that the first function of *combineByKey*, *(x:Int)
 = (x, 1)*, should take a single element in the source RDD and return an
 element of the desired type in the resulting RDD. In my application, we
 take a single element of type *Int *from the source RDD and return a
 tuple of type (*Int*, *Int*), which meets the requirements quite well.
 But why would such an exception be thrown?

 I'm using CDH 5.0 and Spark 0.9

 Thanks.






Re: combineByKey throws ClassCastException

2014-09-15 Thread x
How about this.

scala val rdd2 = rdd.combineByKey(
 | (v: Int) = v.toLong,
 | (c: Long, v: Int) = c + v,
 | (c1: Long, c2: Long) = c1 + c2)
rdd2: org.apache.spark.rdd.RDD[(String, Long)] = MapPartitionsRDD[9] at
combineB
yKey at console:14

xj @ Tokyo

On Mon, Sep 15, 2014 at 3:06 PM, Tao Xiao xiaotao.cs@gmail.com wrote:

 I followd an example presented in the tutorial Learning Spark
 http://www.safaribooksonline.com/library/view/learning-spark/9781449359034/ch04.html
 to compute the per-key average as follows:


 val Array(appName) = args
 val sparkConf = new SparkConf()
 .setAppName(appName)
 val sc = new SparkContext(sparkConf)
 /*
  * compute the per-key average of values
  * results should be:
  *A : 5.8
  *B : 14
  *C : 60.6
  */
 val rdd = sc.parallelize(List(
 (A, 3), (A, 9), (A, 12), (A, 0), (A, 5),
 (B, 4), (B, 10), (B, 11), (B, 20), (B, 25),
 (C, 32), (C, 91), (C, 122), (C, 3), (C, 55)), 2)
 val avg = rdd.combineByKey(
 (x:Int) = (x, 1),  // java.lang.ClassCastException: scala.Tuple2$mcII$sp
 cannot be cast to java.lang.Integer
 (acc:(Int, Int), x) = (acc._1 + x, acc._2 + 1),
 (acc1:(Int, Int), acc2:(Int, Int)) = (acc1._1 + acc2._1, acc1._2 +
 acc2._2))
 .map{case (s, t) = (s, t._1/t._2.toFloat)}
  avg.collect.foreach(t = println(t._1 +  - + t._2))



 When I submitted the application, an exception of 
 *java.lang.ClassCastException:
 scala.Tuple2$mcII$sp cannot be cast to java.lang.Integer* was thrown
 out. The tutorial said that the first function of *combineByKey*, *(x:Int)
 = (x, 1)*, should take a single element in the source RDD and return an
 element of the desired type in the resulting RDD. In my application, we
 take a single element of type *Int *from the source RDD and return a
 tuple of type (*Int*, *Int*), which meets the requirements quite well.
 But why would such an exception be thrown?

 I'm using CDH 5.0 and Spark 0.9

 Thanks.





Re: combineByKey at ShuffledDStream.scala

2014-07-23 Thread Bill Jay
The streaming program contains the following main stages:

1. receive data from Kafka
2. preprocessing of the data. These are all map and filtering stages.
3. Group by a field
4. Process the groupBy results using map. Inside this processing, I use
collect, count.

Thanks!

Bill


On Tue, Jul 22, 2014 at 10:05 PM, Tathagata Das tathagata.das1...@gmail.com
 wrote:

 Can you give an idea of the streaming program? Rest of the transformation
 you are doing on the input streams?


 On Tue, Jul 22, 2014 at 11:05 AM, Bill Jay bill.jaypeter...@gmail.com
 wrote:

 Hi all,

 I am currently running a Spark Streaming program, which consumes data
 from Kakfa and does the group by operation on the data. I try to optimize
 the running time of the program because it looks slow to me. It seems the
 stage named:

 * combineByKey at ShuffledDStream.scala:42 *

 always takes the longest running time. And If I open this stage, I only
 see two executors on this stage. Does anyone has an idea what this stage
 does and how to increase the speed for this stage? Thanks!

 Bill





Re: combineByKey at ShuffledDStream.scala

2014-07-22 Thread Tathagata Das
Can you give an idea of the streaming program? Rest of the transformation
you are doing on the input streams?


On Tue, Jul 22, 2014 at 11:05 AM, Bill Jay bill.jaypeter...@gmail.com
wrote:

 Hi all,

 I am currently running a Spark Streaming program, which consumes data from
 Kakfa and does the group by operation on the data. I try to optimize the
 running time of the program because it looks slow to me. It seems the stage
 named:

 * combineByKey at ShuffledDStream.scala:42 *

 always takes the longest running time. And If I open this stage, I only
 see two executors on this stage. Does anyone has an idea what this stage
 does and how to increase the speed for this stage? Thanks!

 Bill



Re: combinebykey throw classcastexception

2014-05-20 Thread Sean Owen
You asked off-list, and provided a more detailed example there:

val random = new Random()
val testdata = (1 to 1).map(_=(random.nextInt(),random.nextInt()))
sc.parallelize(testdata).combineByKey[ArrayBuffer[Int]](
  (instant:Int)={new ArrayBuffer[Int]()},
  (bucket:ArrayBuffer[Int],instant:Int)={bucket+=instant},
  (bucket1:ArrayBuffer[Int],bucket2:ArrayBuffer[Int])={bucket1++=bucket2}
).collect()

https://www.quora.com/Why-is-my-combinebykey-throw-classcastexception

I can't reproduce this with Spark 0.9.0  / CDH5 or Spark 1.0.0 RC9.
Your definition looks fine too. (Except that you are dropping the
first value, but that's a different problem.)

On Tue, May 20, 2014 at 2:05 AM, xiemeilong xiemeilong...@gmail.com wrote:
 I am using CDH5 on a three machines cluster. map data from hbase as (string,
 V) pair , then call combineByKey like this:

 .combineByKey[C](
   (v:V)=new C(v),   //this line throw java.lang.ClassCastException: C
 cannot be cast to V
   (v:C,v:V)=C,
   (c1:C,c2:C)=C)


 I am very confused of this, there isn't C to V casting at all.  What's
 wrong?



 --
 View this message in context: 
 http://apache-spark-user-list.1001560.n3.nabble.com/combinebykey-throw-classcastexception-tp6059.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.


Re: combinebykey throw classcastexception

2014-05-20 Thread xiemeilong
This issue is turned out cased by version mismatch between driver(0.9.1) and
server(0.9.0-cdh5.0.1) just now.  Other function works fine but combinebykey
before.

Thank you very much for your reply.



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/combinebykey-throw-classcastexception-tp6060p6087.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.