How to monitor the throughput and latency of the combineByKey transformation in Spark 3?

2020-07-20 Thread Felipe Gutierrez
Hi community, I built a simple count and sum spark application which uses the combineByKey transformation [1] and I would like to monitor the throughput in/out of this transformation and the latency that the combineByKey spends to pre-aggregate tuples. Ideally, the latency I would like to take

Re: combineByKey

2019-04-05 Thread Jason Nerothin
posing the mail. > Below is the actual flow. > > Any idea, why the combineByKey is not working. aggregateByKey is working. > > //Defining createCombiner, mergeValue and mergeCombiner functions > > def createCombiner = (Id: String, value: String) => (value :: Nil).toSet > &g

Re: combineByKey

2019-04-05 Thread Madabhattula Rajesh Kumar
Hi, Thank you for the details. It is a typo error while composing the mail. Below is the actual flow. Any idea, why the combineByKey is not working. aggregateByKey is working. //Defining createCombiner, mergeValue and mergeCombiner functions def createCombiner = (Id: String, value: String

Re: combineByKey

2019-04-05 Thread Jason Nerothin
uot;t2"), > Messages(0, "d3", "t1"), > Messages(0, "d3", "t1"), > Messages(0, "d3", "t2") > ) > > //Defining createCombiner, mergeValue and mergeCombiner functions > def createCombiner = (

combineByKey

2019-04-05 Thread Madabhattula Rajesh Kumar
ges(0, "d3", "t1"), Messages(0, "d3", "t2") ) //Defining createCombiner, mergeValue and mergeCombiner functions def createCombiner = (id: String, value: String) => Set(value) def mergeValue0 = (accumulator1: Set[String], accumulator2: (String

Re: Inconsistent results with combineByKey API

2017-09-05 Thread Swapnil Shinde
Ping.. Can someone please correct me whether this is an issue or not. - Swapnil On Thu, Aug 31, 2017 at 12:27 PM, Swapnil Shinde <swapnilushi...@gmail.com> wrote: > Hello All > > I am observing some strange results with aggregateByKey API which is > implemented with comb

Inconsistent results with combineByKey API

2017-08-31 Thread Swapnil Shinde
Hello All I am observing some strange results with aggregateByKey API which is implemented with combineByKey. Not sure if this is by design or bug - I created this toy example but same problem can be observed on large datasets as well - *case class ABC(key: Int, c1: Int, c2: Int)* *case class

Partitioning Data to optimize combineByKey

2016-06-02 Thread Nathan Case
of that processing is to mapToPair to use each rows id as the key in a tuple. Then the data goes through a combineByKey operation to group all values with the same key. This operation always exceeds the maximum cluster memory and the job eventually fails. While it is shuffling there is a lot of "spi

Re: Datasets combineByKey

2016-04-10 Thread Koert Kuipers
; I'm mapping RDD API to Datasets API and I was wondering if I was missing >> something or is this functionality is missing. >> >> >> On Sun, Apr 10, 2016 at 3:00 PM Ted Yu <yuzhih...@gmail.com> wrote: >> >>> Haven't found any JIRA w.r.t. combineByKey for

Re: Datasets combineByKey

2016-04-10 Thread Amit Sela
y is missing. > > > On Sun, Apr 10, 2016 at 3:00 PM Ted Yu <yuzhih...@gmail.com> wrote: > >> Haven't found any JIRA w.r.t. combineByKey for Dataset. >> >> What's your use case ? >> >> Thanks >> >> On Sat, Apr 9, 2016 at 7:38 PM, Amit Sela <amit

Re: Datasets combineByKey

2016-04-10 Thread Amit Sela
I'm mapping RDD API to Datasets API and I was wondering if I was missing something or is this functionality is missing. On Sun, Apr 10, 2016 at 3:00 PM Ted Yu <yuzhih...@gmail.com> wrote: > Haven't found any JIRA w.r.t. combineByKey for Dataset. > > What's your use case ? > &

Re: Datasets combineByKey

2016-04-10 Thread Ted Yu
Haven't found any JIRA w.r.t. combineByKey for Dataset. What's your use case ? Thanks On Sat, Apr 9, 2016 at 7:38 PM, Amit Sela <amitsel...@gmail.com> wrote: > Is there (planned ?) a combineByKey support for Dataset ? > Is / Will there be a support for combiner lifting ? > > Thanks, > Amit >

Datasets combineByKey

2016-04-09 Thread Amit Sela
Is there (planned ?) a combineByKey support for Dataset ? Is / Will there be a support for combiner lifting ? Thanks, Amit

RE: aggregateByKey vs combineByKey

2016-01-05 Thread LINChen
Subject: aggregateByKey vs combineByKey From: mmistr...@gmail.com To: user@spark.apache.org Hi all i have the following dataSet kv = [(2,Hi), (1,i), (2,am), (1,a), (4,test), (6,s tring)] It's a simple list of tuples containing (word_length, word) What i wanted to do was to group the result

Re: aggregateByKey vs combineByKey

2016-01-05 Thread Ted Yu
Looking at PairRDDFunctions.scala : def aggregateByKey[U: ClassTag](zeroValue: U, partitioner: Partitioner)(seqOp: (U, V) => U, combOp: (U, U) => U): RDD[(K, U)] = self.withScope { ... combineByKeyWithClassTag[U]((v: V) => cleanedSeqOp(createZero(), v), cleanedSeqOp, combOp,

aggregateByKey vs combineByKey

2016-01-05 Thread Marco Mistroni
Hi all i have the following dataSet kv = [(2,Hi), (1,i), (2,am), (1,a), (4,test), (6,s tring)] It's a simple list of tuples containing (word_length, word) What i wanted to do was to group the result by key in order to have a result in the form [(word_length_1, [word1, word2, word3],

Re: OOM in SizeEstimator while using combineByKey

2015-04-15 Thread Xianjin YE
, Aniket Bhatnagar wrote: I am aggregating a dataset using combineByKey method and for a certain input size, the job fails with the following error. I have enabled head dumps to better analyze the issue and will report back if I have any findings. Meanwhile, if you guys have any idea of what could

OOM in SizeEstimator while using combineByKey

2015-04-15 Thread Aniket Bhatnagar
I am aggregating a dataset using combineByKey method and for a certain input size, the job fails with the following error. I have enabled head dumps to better analyze the issue and will report back if I have any findings. Meanwhile, if you guys have any idea of what could possibly result

Re: OOM in SizeEstimator while using combineByKey

2015-04-15 Thread Aniket Bhatnagar
the hashCode and equals method correctly. On Wednesday, April 15, 2015 at 3:10 PM, Aniket Bhatnagar wrote: I am aggregating a dataset using combineByKey method and for a certain input size, the job fails with the following error. I have enabled head dumps to better analyze the issue

CombineByKey - Please explain its working

2015-03-24 Thread ashish.usoni
I am reading about combinebyKey and going through below example from one of the blog post but i cant understand how it works step by step , Can some one please explain Case class Fruit ( kind : String , weight : Int ) { def makeJuice : Juice = Juice ( weight * 100 ) } Case

retry in combineByKey at BinaryClassificationMetrics.scala

2014-12-23 Thread Thomas Kwan
Hi there, We are using mllib 1.1.1, and doing Logistics Regression with a dataset of about 150M rows. The training part usually goes pretty smoothly without any retries. But during the prediction stage and BinaryClassificationMetrics stage, I am seeing retries with error of fetch failure. The

Re: retry in combineByKey at BinaryClassificationMetrics.scala

2014-12-23 Thread Xiangrui Meng
Sean's PR may be relevant to this issue (https://github.com/apache/spark/pull/3702). As a workaround, you can try to truncate the raw scores to 4 digits (e.g., 0.5643215 - 0.5643) before sending it to BinaryClassificationMetrics. This may not work well if he score distribution is very skewed. See

Re: retry in combineByKey at BinaryClassificationMetrics.scala

2014-12-23 Thread Sean Owen
Yes, my change is slightly downstream of this point in the processing though. The code is still creating a counter for each distinct score value, and then binning. I don't think that would cause a failure - just might be slow. At the extremes, you might see 'fetch failure' as a symptom of things

Re: Help with using combineByKey

2014-10-10 Thread Davies Liu
Maybe this version is easier to use: plist.mapValues((v) = (if (v 0) 1 else 0, 1)).reduceByKey((x, y) = (x._1 + y._1, x._2 + y._2)) It has similar behavior with combineByKey(), will by faster than groupByKey() version. On Thu, Oct 9, 2014 at 9:28 PM, HARIPRIYA AYYALASOMAYAJULA aharipriy

Re: Help with using combineByKey

2014-10-10 Thread HARIPRIYA AYYALASOMAYAJULA
similar behavior with combineByKey(), will by faster than groupByKey() version. On Thu, Oct 9, 2014 at 9:28 PM, HARIPRIYA AYYALASOMAYAJULA aharipriy...@gmail.com wrote: Sean, Thank you. It works. But I am still confused about the function. Can you kindly throw some light on it? I was going

Re: Help with using combineByKey

2014-10-09 Thread Sean Owen
,1). Try v = (1,1) On Thu, Oct 9, 2014 at 4:47 PM, HARIPRIYA AYYALASOMAYAJULA aharipriy...@gmail.com wrote: I am a beginner to Spark and finding it difficult to implement a very simple reduce operation. I read that is ideal to use combineByKey for complex reduce operations. My input: val

Re: Help with using combineByKey

2014-10-09 Thread Yana Kadiyska
If you just want the ratio of positive to all values per key (if I'm reading right) this works val reduced= input.groupByKey().map(grp= grp._2.filter(v=v0).size.toFloat/grp._2.size) reduced.foreach(println) I don't think you need reduceByKey or combineByKey as you're not doing anything where

Re: Help with using combineByKey

2014-10-09 Thread HARIPRIYA AYYALASOMAYAJULA
read that is ideal to use combineByKey for complex reduce operations. My input: val input = sc.parallelize(List((LAX,6), (LAX,8), (LAX,7), (SFO,0), (SFO,1), (SFO,9),(PHX,65),(PHX,88),(KX,7),(KX,6),(KX,1), (KX,9), (HOU,56),(HOU,5),(HOU,59),(HOU,0),(MA,563),(MA,545),(MA,5),(MA,0

Re: Help with using combineByKey

2014-10-09 Thread HARIPRIYA AYYALASOMAYAJULA
more about these functions? It would be helpful if I can get a chance to look at more examples. Also, I assume using combineByKey helps us solve it parallel than using simple functions provided by scala as mentioned by Yana. Am I correct? On Thu, Oct 9, 2014 at 12:30 PM, Sean Owen so...@cloudera.com

Re: Help with using combineByKey

2014-10-09 Thread Sean Owen
, whereas your combineByKey does not. On Fri, Oct 10, 2014 at 5:28 AM, HARIPRIYA AYYALASOMAYAJULA aharipriy...@gmail.com wrote: Sean, Thank you. It works. But I am still confused about the function. Can you kindly throw some light on it? I was going through the example mentioned in https

aggregateByKey vs combineByKey

2014-09-29 Thread David Rowe
Hi All, After some hair pulling, I've reached the realisation that an operation I am currently doing via: myRDD.groupByKey.mapValues(func) should be done more efficiently using aggregateByKey or combineByKey. Both of these methods would do, and they seem very similar to me in terms

Re: aggregateByKey vs combineByKey

2014-09-29 Thread Liquan Pei
Hi Dave, You can replace groupByKey with reduceByKey to improve performance in some cases. reduceByKey performs map side combine which can reduce Network IO and shuffle size where as groupByKey will not perform map side combine. combineByKey is more general then aggregateByKey. Actually

Re: aggregateByKey vs combineByKey

2014-09-29 Thread David Rowe
as groupByKey will not perform map side combine. combineByKey is more general then aggregateByKey. Actually, the implementation of aggregateByKey, reduceByKey and groupByKey is achieved by combineByKey. aggregateByKey is similar to reduceByKey but you can provide initial values when performing

Re: combineByKey throws ClassCastException

2014-09-16 Thread Tao Xiao
* was thrown out. The tutorial said that the first function of *combineByKey*, *(x:Int) = (x, 1)*, should take a single element in the source RDD and return an element of the desired type in the resulting RDD. In my application, we take a single element of type *Int *from the source RDD

Re: combineByKey throws ClassCastException

2014-09-15 Thread x
that the first function of *combineByKey*, *(x:Int) = (x, 1)*, should take a single element in the source RDD and return an element of the desired type in the resulting RDD. In my application, we take a single element of type *Int *from the source RDD and return a tuple of type (*Int*, *Int*), which

Re: combineByKey at ShuffledDStream.scala

2014-07-23 Thread Bill Jay
a Spark Streaming program, which consumes data from Kakfa and does the group by operation on the data. I try to optimize the running time of the program because it looks slow to me. It seems the stage named: * combineByKey at ShuffledDStream.scala:42 * always takes the longest running time

combineByKey at ShuffledDStream.scala

2014-07-22 Thread Bill Jay
Hi all, I am currently running a Spark Streaming program, which consumes data from Kakfa and does the group by operation on the data. I try to optimize the running time of the program because it looks slow to me. It seems the stage named: * combineByKey at ShuffledDStream.scala:42 * always

Re: combineByKey at ShuffledDStream.scala

2014-07-22 Thread Tathagata Das
the group by operation on the data. I try to optimize the running time of the program because it looks slow to me. It seems the stage named: * combineByKey at ShuffledDStream.scala:42 * always takes the longest running time. And If I open this stage, I only see two executors on this stage. Does

Re: When to use CombineByKey vs reduceByKey?

2014-06-12 Thread Diana Hu
Matei, Thanks for the answer this clarifies this very much. Based on my usage I would use combineByKey, since the output is another custom data structures. I found out my issues with combineByKey were relieved after doing more tuning with the level of parallelism. I've found that it really

Re: When to use CombineByKey vs reduceByKey?

2014-06-11 Thread Matei Zaharia
combineByKey is designed for when your return type from the aggregation is different from the values being aggregated (e.g. you group together objects), and it should allow you to modify the leftmost argument of each function (mergeCombiners, mergeValue, etc) and return that instead

Re: combinebykey throw classcastexception

2014-05-20 Thread Sean Owen
You asked off-list, and provided a more detailed example there: val random = new Random() val testdata = (1 to 1).map(_=(random.nextInt(),random.nextInt())) sc.parallelize(testdata).combineByKey[ArrayBuffer[Int]]( (instant:Int)={new ArrayBuffer[Int

Re: combinebykey throw classcastexception

2014-05-20 Thread xiemeilong
This issue is turned out cased by version mismatch between driver(0.9.1) and server(0.9.0-cdh5.0.1) just now. Other function works fine but combinebykey before. Thank you very much for your reply. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com

combinebykey throw classcastexception

2014-05-19 Thread xiemeilong
I am using CDH5 on a three machines cluster. map data from hbase as (string, V) pair , then call combineByKey like this: .combineByKey[C]( (v:V)=new C(v), //this line throw java.lang.ClassCastException: C cannot be cast to V (v:C,v:V)=C, (c1:C,c2:C)=C) I am very

Re: Use combineByKey and StatCount

2014-04-14 Thread dachuan
1, 2014 at 10:55 AM, Jaonary Rabarisoa jaon...@gmail.comwrote: Hi all; Can someone give me some tips to compute mean of RDD by key , maybe with combineByKey and StatCount. Cheers, Jaonary -- Dachuan Huang Cellphone: 614-390-7234 2015 Neil Avenue Ohio State University Columbus, Ohio

Use combineByKey and StatCount

2014-04-01 Thread Jaonary Rabarisoa
Hi all; Can someone give me some tips to compute mean of RDD by key , maybe with combineByKey and StatCount. Cheers, Jaonary