How to use groupByKey() in spark structured streaming without aggregates
Is there a way through which we can use* groupByKey() Function in spark structured streaming without aggregates ?* I have a scenario like below, where we would like to group the items based on a key without applying any aggregates. *Sample incoming data:* I would like to apply groupByKey on field - "device_id", so that i will be getting an output like below. I have also tried using collect_list() in the aggregate expression of groupByKey, but that is taking more time to process the datasets. Also, since we are aggregating - we could only use either 'Complete' or 'Update' in output modes, but 'Append' mode looks more suitable for our use case. I have also looked at the groupByKey(Num_Partitions) and reduceByKey() functions available in Direct Dstream which gives results like in the form of -> (String, Itreable[String,Int]) without doing any aggregates. Is there something available similar to that in structured streaming ? -- Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/ - To unsubscribe e-mail: user-unsubscr...@spark.apache.org
How to use Dataset forEachPartion and groupByKey together
Hello all, Am using spark-2.3.0 and hadoop-2.7.4. I have spark streaming application which listens to kafka topic, does some transformation and writes to Oracle database using JDBC client. Step 1. Read events from Kafka as shown below; -- Dataset kafkaEvents = getSparkSession().readStream().format("kafka") .option("kafka.bootstrap.servers", strKafkaAddress) .option("assign", strSubscription) .option("maxOffsetsPerTrigger", "10") .option("startingOffsets", "latest") .option("failOnDataLoss", false) .load() .filter(strFilter) .select(functions.from_json(functions.col("value").cast("string"), oSchema).alias("events")) .select("events.*"); I do groupByKey and then for each group, use those set of events obtained per group, create JDBC connection/preparedStatement, insert and then close connection. Am using Oracle JDBC along with flatMapGroupsWithState. Step 2. Groupby and flatMapGroupwithState - Dataset sessionUpdates = kafkaEvents .groupByKey( new MapFunction() { @Override public String call(Row event) { return event.getAs(m_InsightRawEvent.getPrimaryKey()); } }, Encoders.STRING()) .flatMapGroupsWithState(idstateUpdateFunction, OutputMode.Append(), Encoders.bean(InsightEventInfo.class), Encoders.bean(InsightEventUpdate.class), GroupStateTimeout.ProcessingTimeTimeout()); This has a drawback where it creates connection, inserts into DB for each group. I need to do it for each partition so that only one connection and one bacth insert can be done for all the new events which is read from the partition. Can somebody point me on how I can achieve this? Basically am looking below; 1. Read from kafka as said above. 2. kafkaEvents.forEachPartion - Create one connection here. 3. Groupby and flatMapGroupwithState thanks Robin Kuttaiah
Re: Replacing groupBykey() with reduceByKey()
Hi Santhosh, My name is not Bipin, its Biplob as is clear from my Signature. Regarding your question, I have no clue what your map operation is doing on the grouped data, so I can only suggest you to do : dd = hive_context.read.orc(orcfile_dir).rdd.map(lambda x: (x[0],x)).reduceByKey(build_edges, 25) Although based on the return type you would have to modify your build_edges function. Thanks & Regards Biplob Biswas On Mon, Aug 6, 2018 at 6:28 PM Bathi CCDB wrote: > Hey Bipin, > Thanks for the reply, I am actually aggregating after the groupByKey() > operation, > I have posted the wrong code snippet in my first email. Here is what I am > doing > > dd = hive_context.read.orc(orcfile_dir).rdd.map(lambda x: > (x[0],x)).groupByKey(25).map(build_edges) > > Can we replace reduceByKey() in this context ? > > Santhosh > > > On Mon, Aug 6, 2018 at 1:20 AM, Biplob Biswas > wrote: > >> Hi Santhosh, >> >> If you are not performing any aggregation, then I don't think you can >> replace your groupbykey with a reducebykey, and as I see you are only >> grouping and taking 2 values of the result, thus I believe you can't just >> replace your groupbykey with that. >> >> Thanks & Regards >> Biplob Biswas >> >> >> On Sat, Aug 4, 2018 at 12:05 AM Bathi CCDB wrote: >> >>> I am trying to replace groupByKey() with reudceByKey(), I am a pyspark >>> and python newbie and I am having a hard time figuring out the lambda >>> function for the reduceByKey() operation. >>> >>> Here is the code >>> >>> dd = hive_context.read.orc(orcfile_dir).rdd.map(lambda x: >>> (x[0],x)).groupByKey(25).take(2) >>> >>> Here is the return value >>> >>> >>> dd[(u'KEY_1', >> >>> 0x107be0c50>), (u'KEY_2', >> >>> at 0x107be0c10>)] >>> >>> and Here are the iterable contents dd[0][1] >>> >>> Row(key=u'KEY_1', hash_fn=u'deec95d65ca6b3b4f2e1ef259040aa79', >>> value=u'e7dc1f2a')Row(key=u'KEY_1', >>> hash_fn=u'f8891048a9ef8331227b4af080ecd28a', >>> value=u'fb0bc953')...Row(key=u'KEY_1', >>> hash_fn=u'1b9d2bb2db28603ff21052efcd13f242', >>> value=u'd39714d3')Row(key=u'KEY_1', >>> hash_fn=u'c41b0269706ac423732a6bab24bf8a6a', value=u'ab58db92') >>> >>> My question is how do replace with reduceByKey() and get the same >>> output as above? >>> >>> Santhosh >>> >> >
Re: Replacing groupBykey() with reduceByKey()
Hey Bipin, Thanks for the reply, I am actually aggregating after the groupByKey() operation, I have posted the wrong code snippet in my first email. Here is what I am doing dd = hive_context.read.orc(orcfile_dir).rdd.map(lambda x: (x[0],x)).groupByKey(25).map(build_edges) Can we replace reduceByKey() in this context ? Santhosh On Mon, Aug 6, 2018 at 1:20 AM, Biplob Biswas wrote: > Hi Santhosh, > > If you are not performing any aggregation, then I don't think you can > replace your groupbykey with a reducebykey, and as I see you are only > grouping and taking 2 values of the result, thus I believe you can't just > replace your groupbykey with that. > > Thanks & Regards > Biplob Biswas > > > On Sat, Aug 4, 2018 at 12:05 AM Bathi CCDB wrote: > >> I am trying to replace groupByKey() with reudceByKey(), I am a pyspark >> and python newbie and I am having a hard time figuring out the lambda >> function for the reduceByKey() operation. >> >> Here is the code >> >> dd = hive_context.read.orc(orcfile_dir).rdd.map(lambda x: >> (x[0],x)).groupByKey(25).take(2) >> >> Here is the return value >> >> >>> dd[(u'KEY_1', > >>> 0x107be0c50>), (u'KEY_2', > >>> at 0x107be0c10>)] >> >> and Here are the iterable contents dd[0][1] >> >> Row(key=u'KEY_1', hash_fn=u'deec95d65ca6b3b4f2e1ef259040aa79', >> value=u'e7dc1f2a')Row(key=u'KEY_1', >> hash_fn=u'f8891048a9ef8331227b4af080ecd28a', >> value=u'fb0bc953')...Row(key=u'KEY_1', >> hash_fn=u'1b9d2bb2db28603ff21052efcd13f242', >> value=u'd39714d3')Row(key=u'KEY_1', >> hash_fn=u'c41b0269706ac423732a6bab24bf8a6a', value=u'ab58db92') >> >> My question is how do replace with reduceByKey() and get the same output >> as above? >> >> Santhosh >> >
Re: Replacing groupBykey() with reduceByKey()
Hi Santhosh, If you are not performing any aggregation, then I don't think you can replace your groupbykey with a reducebykey, and as I see you are only grouping and taking 2 values of the result, thus I believe you can't just replace your groupbykey with that. Thanks & Regards Biplob Biswas On Sat, Aug 4, 2018 at 12:05 AM Bathi CCDB wrote: > I am trying to replace groupByKey() with reudceByKey(), I am a pyspark > and python newbie and I am having a hard time figuring out the lambda > function for the reduceByKey() operation. > > Here is the code > > dd = hive_context.read.orc(orcfile_dir).rdd.map(lambda x: > (x[0],x)).groupByKey(25).take(2) > > Here is the return value > > >>> dd[(u'KEY_1', >>> 0x107be0c50>), (u'KEY_2', >>> at 0x107be0c10>)] > > and Here are the iterable contents dd[0][1] > > Row(key=u'KEY_1', hash_fn=u'deec95d65ca6b3b4f2e1ef259040aa79', > value=u'e7dc1f2a')Row(key=u'KEY_1', > hash_fn=u'f8891048a9ef8331227b4af080ecd28a', > value=u'fb0bc953')...Row(key=u'KEY_1', > hash_fn=u'1b9d2bb2db28603ff21052efcd13f242', > value=u'd39714d3')Row(key=u'KEY_1', > hash_fn=u'c41b0269706ac423732a6bab24bf8a6a', value=u'ab58db92') > > My question is how do replace with reduceByKey() and get the same output > as above? > > Santhosh >
Replacing groupBykey() with reduceByKey()
I am trying to replace groupByKey() with reudceByKey(), I am a pyspark and python newbie and I am having a hard time figuring out the lambda function for the reduceByKey() operation. Here is the code dd = hive_context.read.orc(orcfile_dir).rdd.map(lambda x: (x[0],x)).groupByKey(25).take(2) Here is the return value >>> dd[(u'KEY_1', >> 0x107be0c50>), (u'KEY_2', >> 0x107be0c10>)] and Here are the iterable contents dd[0][1] Row(key=u'KEY_1', hash_fn=u'deec95d65ca6b3b4f2e1ef259040aa79', value=u'e7dc1f2a')Row(key=u'KEY_1', hash_fn=u'f8891048a9ef8331227b4af080ecd28a', value=u'fb0bc953')...Row(key=u'KEY_1', hash_fn=u'1b9d2bb2db28603ff21052efcd13f242', value=u'd39714d3')Row(key=u'KEY_1', hash_fn=u'c41b0269706ac423732a6bab24bf8a6a', value=u'ab58db92') My question is how do replace with reduceByKey() and get the same output as above? Santhosh
why groupByKey still shuffle if SQL does "Distribute By" on same columns ?
Hi, I am trying something like this.. val sesDS: Dataset[XXX] = hiveContext.sql(select).as[XXX] The select statement is something like this : "select * from sometable DISTRIBUTE by col1, col2, col3" Then comes groupByKey... val gpbyDS = sesDS .groupByKey(x => (x.col1, x.col2, x.col3)) As my select is already Distribute the data based on columns which are same as what I used in groupByKey, Why does groupByKey still doing the shuffle ? Is this an issue or I am missing something ? Regards, Dibyendu
Re: groupByKey vs mapPartitions for efficient grouping within a Partition
groupByKey() is a wide dependency and will cause a full shuffle. It's advised against using this transformation unless you keys are balanced (well-distributed) and you need a full shuffle. Otherwise, what you want is aggregateByKey() or reduceByKey() (depending on the output). These actions are backed by comineByKey(), which can perform map-side aggregation. --- Regards, Andy On Mon, Jan 16, 2017 at 2:21 PM, Patrick <titlibat...@gmail.com> wrote: > Hi, > > Does groupByKey has intelligence associated with it, such that if all the > keys resides in the same partition, it should not do the shuffle? > > Or user should write mapPartitions( scala groupBy code). > > Which would be more efficient and what are the memory considerations? > > > Thanks > > > >
groupByKey vs mapPartitions for efficient grouping within a Partition
Hi, Does groupByKey has intelligence associated with it, such that if all the keys resides in the same partition, it should not do the shuffle? Or user should write mapPartitions( scala groupBy code). Which would be more efficient and what are the memory considerations? Thanks
groupByKey vs reduceByKey
Hi, Read somewhere that groupByKey() in RDD disables map-side aggregation as the aggregation function (appending to a list) does not save any space. However from my understanding, using something like reduceByKey or (CombineByKey + a combiner function,) we could reduce the data shuffled around. Wondering why map-side aggregation is disabled for groupByKey() and why it wouldn’t save space at the executor where data is received after the shuffle. cheers Appu
groupbykey data access size vs Reducer number
Hi all, How shuffle in Spark 1.6.2 work? I am using groupbykey(int: partitionSize). groupbykey, a shuffle operation, has mapper side (M mappers) and reducer side (R reducers). Here R=partitionSize, and each mapper will produce a local file output and store in spark.local.dir. Let's assume total shuffle data size is D, then each reducer will shuffle read in D/R data. My question is, when changing R(for example, decreasing R), each reducer will read in more data (size is P = D/R increases as R decreases) per partition. Since data for each reducer comes from every mapper output, does that mean on average, each reducer reads in P/M = D/(R*M) data. However, what I observe is not consistent with the theory model. I use iostat tool to examine the I/O request size, and found no different in I/O request size when decreasing R. Does anyone know any details on shuffle? Many thanks! R = 6000 <http://apache-spark-user-list.1001560.n3.nabble.com/file/n28140/iostat_m14_reduceBy2_core6_readSize.png> R = 3000 <http://apache-spark-user-list.1001560.n3.nabble.com/file/n28140/iostat_m14_reduceBy4_core6_readSize.png> As seen from two figures comparing two iostat plot results, the average IO request sizes of two reducer number are the same, 250 sectors ( 250 * 512 B/sector = 128 KB). -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/groupbykey-data-access-size-vs-Reducer-number-tp28140.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe e-mail: user-unsubscr...@spark.apache.org
task not serializable in case of groupByKey() + mapGroups + map?
with the following simple code val a = sc.createDataFrame(sc.parallelize(Seq((1,2),(3,4.as[(Int,Int)] val grouped = a.groupByKey({x:(Int,Int)=>x._1}) val mappedGroups = grouped.mapGroups((k,x)=>{(k,1)}) val yyy = sc.broadcast(1) val last = mappedGroups.rdd.map(xx=>{ val simpley = yyy.value 1 }) I'm seeing error: org.apache.spark.SparkException: Task not serializable at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:298) at org.apache.spark.util.ClosureCleaner$.org$apache$spark$util$ClosureCleaner$$clean(ClosureCleaner.scala:288) at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:108) at org.apache.spark.SparkContext.clean(SparkContext.scala:2053) at org.apache.spark.rdd.RDD$$anonfun$map$1.apply(RDD.scala:366) at org.apache.spark.rdd.RDD$$anonfun$map$1.apply(RDD.scala:365) at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151) at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112) at org.apache.spark.rdd.RDD.withScope(RDD.scala:358) at org.apache.spark.rdd.RDD.map(RDD.scala:365) ... 56 elided Caused by: java.io.NotSerializableException: org.apache.spark.sql.execution.QueryExecution Serialization stack: - object not serializable (class: org.apache.spark.sql.execution.QueryExecution, value: == Parsed Logical Plan == 'AppendColumns , unresolveddeserializer(newInstance(class scala.Tuple2)), [input[0, int, true] AS value#210] +- LogicalRDD [_1#201, _2#202] == Analyzed Logical Plan == _1: int, _2: int, value: int AppendColumns , newInstance(class scala.Tuple2), [input[0, int, true] AS value#210] +- LogicalRDD [_1#201, _2#202] == Optimized Logical Plan == AppendColumns , newInstance(class scala.Tuple2), [input[0, int, true] AS value#210] +- LogicalRDD [_1#201, _2#202] == Physical Plan == AppendColumns , newInstance(class scala.Tuple2), [input[0, int, true] AS value#210] +- Scan ExistingRDD[_1#201,_2#202]) - field (class: org.apache.spark.sql.KeyValueGroupedDataset, name: queryExecution, type: class org.apache.spark.sql.execution.QueryExecution) - object (class org.apache.spark.sql.KeyValueGroupedDataset, org.apache.spark.sql.KeyValueGroupedDataset@71148f10) - field (class: $iw, name: grouped, type: class org.apache.spark.sql.KeyValueGroupedDataset) - object (class $iw, $iw@7b1c13e4) - field (class: $iw, name: $iw, type: class $iw) - object (class $iw, $iw@3e9a0c21) - field (class: $iw, name: $iw, type: class $iw) - object (class $iw, $iw@218cc682) - field (class: $iw, name: $iw, type: class $iw) - object (class $iw, $iw@2ecedd08) - field (class: $iw, name: $iw, type: class $iw) - object (class $iw, $iw@79efd402) - field (class: $iw, name: $iw, type: class $iw) - object (class $iw, $iw@d81976c) - field (class: $iw, name: $iw, type: class $iw) - object (class $iw, $iw@2d5d6e2a) - field (class: $iw, name: $iw, type: class $iw) - object (class $iw, $iw@74dc6a7a) - field (class: $iw, name: $iw, type: class $iw) - object (class $iw, $iw@5e220d85) - field (class: $iw, name: $iw, type: class $iw) - object (class $iw, $iw@1c790a4f) - field (class: $iw, name: $iw, type: class $iw) - object (class $iw, $iw@1d954b06) - field (class: $iw, name: $iw, type: class $iw) - object (class $iw, $iw@1343c904) - field (class: $line115.$read, name: $iw, type: class $iw) - object (class $line115.$read, $line115.$read@42497908) - field (class: $iw, name: $line115$read, type: class $line115.$read) - object (class $iw, $iw@af36da5) - field (class: $iw, name: $outer, type: class $iw) - object (class $iw, $iw@2fd5b99a) - field (class: $anonfun$1, name: $outer, type: class $iw) - object (class $anonfun$1, ) at org.apache.spark.serializer.SerializationDebugger$.improveException(SerializationDebugger.scala:40) at org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:46) at org.apache.spark.serializer.JavaSerializerInstance.serialize(JavaSerializer.scala:100) at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:295) ... 65 more
Predict a single vector with the new spark.ml API to avoid groupByKey() after a flatMap()?
Is there a way to predict a single vector with the new spark.ml API, although in my case it's because I want to do this within a map() to avoid calling groupByKey() after a flatMap(): *Current code (pyspark):* % Given 'model', 'rdd', and a function 'split_element' that splits an element of the RDD into a list of elements (and assuming % each element has both a value and a key so that groupByKey will work to merge them later) split_rdd = rdd.flatMap(split_element) split_results = model.transform(split_rdd.toDF()).rdd return split_results.groupByKey() *Desired code:* split_rdd = rdd.map(split_element) split_results = split_rdd.map(lambda elem_list: [model.transformOne(elem) for elem in elem_list]) return split_results -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Predict-a-single-vector-with-the-new-spark-ml-API-to-avoid-groupByKey-after-a-flatMap-tp27932.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe e-mail: user-unsubscr...@spark.apache.org
Re: groupByKey() compile error after upgrading from 1.6.2 to 2.0.0
Thanks, pair_rdd.rdd.groupByKey() did the trick. On Wed, Aug 10, 2016 at 8:24 PM, Holden Karau <hol...@pigscanfly.ca> wrote: > So it looks like (despite the name) pair_rdd is actually a Dataset - my > guess is you might have a map on a dataset up above which used to return an > RDD but now returns another dataset or an unexpected implicit conversion. > Just add rdd() before the groupByKey call to push it into an RDD. That > being said - groupByKey generally is an anti-pattern so please be careful > with it. > > On Wed, Aug 10, 2016 at 8:07 PM, Arun Luthra <arun.lut...@gmail.com> > wrote: > >> Here is the offending line: >> >> val some_rdd = pair_rdd.groupByKey().flatMap { case (mk: MyKey, md_iter: >> Iterable[MyData]) => { >> ... >> >> >> [error] .scala:249: overloaded method value groupByKey with >> alternatives: >> [error] [K](func: >> org.apache.spark.api.java.function.MapFunction[(aaa.MyKey, >> aaa.MyData),K], encoder: org.apache.spark.sql.Encoder[K >> ])org.apache.spark.sql.KeyValueGroupedDataset[K,(aaa.MyKey, aaa.MyData)] >> >> [error] [K](func: ((aaa.MyKey, aaa.MyData)) => K)(implicit evidence$4: >> org.apache.spark.sql.Encoder[K])org.apache.spark.sql.KeyValueGroupedDataset[K,(aaa.MyKey, >> aaa.MyData)] >> [error] cannot be applied to () >> [error] val some_rdd = pair_rdd.groupByKey().flatMap { case (mk: >> MyKey, hd_iter: Iterable[MyData]) => { >> [error] ^ >> [warn] .scala:249: non-variable type argument aaa.MyData in >> type pattern Iterable[aaa.MyData] (the underlying of Iterable[aaa.MyData]) >> is unchecked since it is eliminated by erasure >> [warn] val some_rdd = pair_rdd.groupByKey().flatMap { case (mk: >> MyKey, hd_iter: Iterable[MyData]) => { >> [warn] >> ^ >> [warn] one warning found >> >> >> I can't see any obvious API change... what is the problem? >> >> Thanks, >> Arun >> > > > > -- > Cell : 425-233-8271 > Twitter: https://twitter.com/holdenkarau >
Re: groupByKey() compile error after upgrading from 1.6.2 to 2.0.0
So it looks like (despite the name) pair_rdd is actually a Dataset - my guess is you might have a map on a dataset up above which used to return an RDD but now returns another dataset or an unexpected implicit conversion. Just add rdd() before the groupByKey call to push it into an RDD. That being said - groupByKey generally is an anti-pattern so please be careful with it. On Wed, Aug 10, 2016 at 8:07 PM, Arun Luthra <arun.lut...@gmail.com> wrote: > Here is the offending line: > > val some_rdd = pair_rdd.groupByKey().flatMap { case (mk: MyKey, md_iter: > Iterable[MyData]) => { > ... > > > [error] .scala:249: overloaded method value groupByKey with > alternatives: > [error] [K](func: org.apache.spark.api.java.function.MapFunction[(aaa.MyKey, > aaa.MyData),K], encoder: org.apache.spark.sql.Encoder[ > K])org.apache.spark.sql.KeyValueGroupedDataset[K,(aaa.MyKey, aaa.MyData)] > > [error] [K](func: ((aaa.MyKey, aaa.MyData)) => K)(implicit evidence$4: > org.apache.spark.sql.Encoder[K])org.apache.spark.sql. > KeyValueGroupedDataset[K,(aaa.MyKey, aaa.MyData)] > [error] cannot be applied to () > [error] val some_rdd = pair_rdd.groupByKey().flatMap { case (mk: > MyKey, hd_iter: Iterable[MyData]) => { > [error] ^ > [warn] .scala:249: non-variable type argument aaa.MyData in > type pattern Iterable[aaa.MyData] (the underlying of Iterable[aaa.MyData]) > is unchecked since it is eliminated by erasure > [warn] val some_rdd = pair_rdd.groupByKey().flatMap { case (mk: MyKey, > hd_iter: Iterable[MyData]) => { > [warn] > ^ > [warn] one warning found > > > I can't see any obvious API change... what is the problem? > > Thanks, > Arun > -- Cell : 425-233-8271 Twitter: https://twitter.com/holdenkarau
groupByKey() compile error after upgrading from 1.6.2 to 2.0.0
Here is the offending line: val some_rdd = pair_rdd.groupByKey().flatMap { case (mk: MyKey, md_iter: Iterable[MyData]) => { ... [error] .scala:249: overloaded method value groupByKey with alternatives: [error] [K](func: org.apache.spark.api.java.function.MapFunction[(aaa.MyKey, aaa.MyData),K], encoder: org.apache.spark.sql.Encoder[K])org.apache.spark.sql.KeyValueGroupedDataset[K,(aaa.MyKey, aaa.MyData)] [error] [K](func: ((aaa.MyKey, aaa.MyData)) => K)(implicit evidence$4: org.apache.spark.sql.Encoder[K])org.apache.spark.sql.KeyValueGroupedDataset[K,(aaa.MyKey, aaa.MyData)] [error] cannot be applied to () [error] val some_rdd = pair_rdd.groupByKey().flatMap { case (mk: MyKey, hd_iter: Iterable[MyData]) => { [error] ^ [warn] .scala:249: non-variable type argument aaa.MyData in type pattern Iterable[aaa.MyData] (the underlying of Iterable[aaa.MyData]) is unchecked since it is eliminated by erasure [warn] val some_rdd = pair_rdd.groupByKey().flatMap { case (mk: MyKey, hd_iter: Iterable[MyData]) => { [warn] ^ [warn] one warning found I can't see any obvious API change... what is the problem? Thanks, Arun
Re: groupByKey returns an emptyRDD
Can you give us a bit more information ? how you packaged the code into jar command you used for execution version of Spark related log snippet Thanks On Mon, Jun 6, 2016 at 10:43 AM, Daniel Haviv < daniel.ha...@veracity-group.com> wrote: > Hi, > I'm wrapped the following code into a jar: > > val test = sc.parallelize(Seq(("daniel", "a"), ("daniel", "b"), ("test", > "1)"))) > > val agg = test.groupByKey() > agg.collect.foreach(r=>{println(r._1)}) > > > The result of groupByKey is an empty RDD, when I'm trying the same code using > the spark-shell it's running as expected. > > > Any ideas? > > > Thank you, > > Daniel > >
groupByKey returns an emptyRDD
Hi, I'm wrapped the following code into a jar: val test = sc.parallelize(Seq(("daniel", "a"), ("daniel", "b"), ("test", "1)"))) val agg = test.groupByKey() agg.collect.foreach(r=>{println(r._1)}) The result of groupByKey is an empty RDD, when I'm trying the same code using the spark-shell it's running as expected. Any ideas? Thank you, Daniel
Re: Alternative to groupByKey() + mapValues() for non-commutative, non-associative aggregate?
If you put this into a dataframe then you may be able to use one hot encoding and treat these as categorical features. I believe that the ml pipeline components use project tungsten so the performance will be very fast. After you process the result on the dataframe you would then need to assemble your desired format. On May 3, 2016 4:29 PM, "Bibudh Lahiri" <bibudhlah...@gmail.com> wrote: > Hi, > I have multiple procedure codes that a patient has undergone, in an RDD > with a different row for each combination of patient and procedure. I am > trying to covert this data to the LibSVM format, so that the resultant > looks as follows: > > "0 1:1 2:0 3:1 29:1 30:1 32:1 110:1" > > where 1, 2, 3, 29, 30, 32, 110 are numeric procedure codes a given > patient has undergone. Note that Spark needs these codes to be one-based > and in ascending order, so I am using a combination of groupByKey() and > mapValues() to do this conversion as follows: > > procedures_rdd = procedures_rdd.groupByKey().mapValues(combine_procedures) > > where combine_procedures() is defined as: > > def combine_procedures(l_procs): > ascii_procs = map(lambda x: int(custom_encode(x)), l_procs) > return ' '.join([str(x) + ":1" for x in sorted(ascii_procs)]) > > Note that this reduction is neither commutative nor associative, since > combining "29:1 30:1" and "32:1 110:1" to "32:1 110:1 29:1 30:1" is not > going to work. > Can someone suggest some faster alternative to the combination > of groupByKey() and mapValues() for this? > > Thanks >Bibudh > > > -- > Bibudh Lahiri > Senior Data Scientist, Impetus Technolgoies > 720 University Avenue, Suite 130 > Los Gatos, CA 95129 > http://knowthynumbers.blogspot.com/ > >
Alternative to groupByKey() + mapValues() for non-commutative, non-associative aggregate?
Hi, I have multiple procedure codes that a patient has undergone, in an RDD with a different row for each combination of patient and procedure. I am trying to covert this data to the LibSVM format, so that the resultant looks as follows: "0 1:1 2:0 3:1 29:1 30:1 32:1 110:1" where 1, 2, 3, 29, 30, 32, 110 are numeric procedure codes a given patient has undergone. Note that Spark needs these codes to be one-based and in ascending order, so I am using a combination of groupByKey() and mapValues() to do this conversion as follows: procedures_rdd = procedures_rdd.groupByKey().mapValues(combine_procedures) where combine_procedures() is defined as: def combine_procedures(l_procs): ascii_procs = map(lambda x: int(custom_encode(x)), l_procs) return ' '.join([str(x) + ":1" for x in sorted(ascii_procs)]) Note that this reduction is neither commutative nor associative, since combining "29:1 30:1" and "32:1 110:1" to "32:1 110:1 29:1 30:1" is not going to work. Can someone suggest some faster alternative to the combination of groupByKey() and mapValues() for this? Thanks Bibudh -- Bibudh Lahiri Senior Data Scientist, Impetus Technolgoies 720 University Avenue, Suite 130 Los Gatos, CA 95129 http://knowthynumbers.blogspot.com/
Re: How to call a custom function from GroupByKey which takes Iterable[Row] as input and returns a Map[Int,String] as output in scala
Hi Vishal, Thanks for the solution. I was able to get it working for my scenario. Regarding the Task not serializable error, I still get it when I declare a function outside the main method. However, if I declare it inside the main "val func = {}", it is working fine for me. In case you have any insight to share on the same, then please do share it. Thanks for the help. Regards, Neha On Wed, Jan 20, 2016 at 11:39 AM, Vishal Maru <vzm...@gmail.com> wrote: > It seems Spark is not able to serialize your function code to worker nodes. > > I have tried to put a solution in simple set of commands. Maybe you can > combine last four line into function. > > > val arr = Array((1,"A","<20","0"), (1,"A",">20 & <40","1"), (1,"B",">20 & > <40","0"), (1,"C",">20 & <40","0"), (1,"C",">20 & <40","0"), > (2,"A","<20","0"), (3,"B",">20 & <40","1"), (3,"B",">40","2")) > > val rdd = sc.parallelize(arr) > > val prdd = rdd.map(a => (a._1,a)) > val totals = prdd.groupByKey.map(a => (a._1, a._2.size)) > > var n1 = rdd.map(a => ((a._1, a._2), 1) ) > var n2 = n1.reduceByKey(_+_).map(a => (a._1._1, (a._1._2, a._2))) > var n3 = n2.join(totals).map(a => (a._1, (a._2._1._1, a._2._1._2.toDouble > / a._2._2))) > var n4 = n3.map(a => (a._1, a._2._1 + ":" + > a._2._2.toString)).reduceByKey((a, b) => a + "|" + b) > > n4.collect.foreach(println) > > > > > On Mon, Jan 18, 2016 at 6:47 AM, Neha Mehta <nehamehta...@gmail.com> > wrote: > >> Hi, >> >> I have a scenario wherein my dataset has around 30 columns. It is >> basically user activity information. I need to group the information by >> each user and then for each column/activity parameter I need to find the >> percentage affinity for each value in that column for that user. Below is >> the sample input and output. >> >> UserId C1 C2 C3 >> 1 A <20 0 >> 1 A >20 & <40 1 >> 1 B >20 & <40 0 >> 1 C >20 & <40 0 >> 1 C >20 & <40 0 >> 2 A <20 0 >> 3 B >20 & <40 1 >> 3 B >40 2 >> >> >> >> >> >> >> >> >> Output >> >> >> 1 A:0.4|B:0.2|C:0.4 <20:02|>20 & <40:0.8 0:0.8|1:0.2 >> 2 A:1 <20:1 0:01 >> 3 B:1 >20 & <40:0.5|>40:0.5 1:0.5|2:0.5 >> >> Presently this is how I am calculating these values: >> Group by UserId and C1 and compute values for C1 for all the users, then >> do a group by by Userid and C2 and find the fractions for C2 for each user >> and so on. This approach is quite slow. Also the number of records for >> each user will be at max 30. So I would like to take a second approach >> wherein I do a groupByKey and pass the entire list of records for each key >> to a function which computes all the percentages for each column for each >> user at once. Below are the steps I am trying to follow: >> >> 1. Dataframe1 => group by UserId , find the counts of records for each >> user. Join the results back to the input so that counts are available with >> each record >> 2. Dataframe1.map(s=>s(1),s).groupByKey().map(s=>myUserAggregator(s._2)) >> >> def myUserAggregator(rows: Iterable[Row]): >> scala.collection.mutable.Map[Int,String] = { >> val returnValue = scala.collection.mutable.Map[Int,String]() >> if (rows != null) { >> val activityMap = scala.collection.mutable.Map[Int, >> scala.collection.mutable.Map[String, >> Int]]().withDefaultValue(scala.collection.mutable.Map[String, >> Int]().withDefaultValue(0)) >> >> val rowIt = rows.iterator >> var sentCount = 1 >> for (row <- rowIt) { >> sentCount = row(29).toString().toInt >> for (i <- 0 until row.length) { >> var m = activityMap(i) >> if (activityMap(i) == null) { >> m = collection.mutable.Map[String, >> Int]().withDefaultValue(0) >> } >> m(row(i).toString()) += 1 >> activityMap.update(i, m) >> } >> } >> var activityPPRow: Row = Row() >> for((k,v) <- activityMap) { >> var rowVal:String = "" >> for((a,b) <- v) { >> rowVal += rowVal + a + ":" + b
Re: How to call a custom function from GroupByKey which takes Iterable[Row] as input and returns a Map[Int,String] as output in scala
It seems Spark is not able to serialize your function code to worker nodes. I have tried to put a solution in simple set of commands. Maybe you can combine last four line into function. val arr = Array((1,"A","<20","0"), (1,"A",">20 & <40","1"), (1,"B",">20 & <40","0"), (1,"C",">20 & <40","0"), (1,"C",">20 & <40","0"), (2,"A","<20","0"), (3,"B",">20 & <40","1"), (3,"B",">40","2")) val rdd = sc.parallelize(arr) val prdd = rdd.map(a => (a._1,a)) val totals = prdd.groupByKey.map(a => (a._1, a._2.size)) var n1 = rdd.map(a => ((a._1, a._2), 1) ) var n2 = n1.reduceByKey(_+_).map(a => (a._1._1, (a._1._2, a._2))) var n3 = n2.join(totals).map(a => (a._1, (a._2._1._1, a._2._1._2.toDouble / a._2._2))) var n4 = n3.map(a => (a._1, a._2._1 + ":" + a._2._2.toString)).reduceByKey((a, b) => a + "|" + b) n4.collect.foreach(println) On Mon, Jan 18, 2016 at 6:47 AM, Neha Mehta <nehamehta...@gmail.com> wrote: > Hi, > > I have a scenario wherein my dataset has around 30 columns. It is > basically user activity information. I need to group the information by > each user and then for each column/activity parameter I need to find the > percentage affinity for each value in that column for that user. Below is > the sample input and output. > > UserId C1 C2 C3 > 1 A <20 0 > 1 A >20 & <40 1 > 1 B >20 & <40 0 > 1 C >20 & <40 0 > 1 C >20 & <40 0 > 2 A <20 0 > 3 B >20 & <40 1 > 3 B >40 2 > > > > > > > > > Output > > > 1 A:0.4|B:0.2|C:0.4 <20:02|>20 & <40:0.8 0:0.8|1:0.2 > 2 A:1 <20:1 0:01 > 3 B:1 >20 & <40:0.5|>40:0.5 1:0.5|2:0.5 > > Presently this is how I am calculating these values: > Group by UserId and C1 and compute values for C1 for all the users, then > do a group by by Userid and C2 and find the fractions for C2 for each user > and so on. This approach is quite slow. Also the number of records for > each user will be at max 30. So I would like to take a second approach > wherein I do a groupByKey and pass the entire list of records for each key > to a function which computes all the percentages for each column for each > user at once. Below are the steps I am trying to follow: > > 1. Dataframe1 => group by UserId , find the counts of records for each > user. Join the results back to the input so that counts are available with > each record > 2. Dataframe1.map(s=>s(1),s).groupByKey().map(s=>myUserAggregator(s._2)) > > def myUserAggregator(rows: Iterable[Row]): > scala.collection.mutable.Map[Int,String] = { > val returnValue = scala.collection.mutable.Map[Int,String]() > if (rows != null) { > val activityMap = scala.collection.mutable.Map[Int, > scala.collection.mutable.Map[String, > Int]]().withDefaultValue(scala.collection.mutable.Map[String, > Int]().withDefaultValue(0)) > > val rowIt = rows.iterator > var sentCount = 1 > for (row <- rowIt) { > sentCount = row(29).toString().toInt > for (i <- 0 until row.length) { > var m = activityMap(i) > if (activityMap(i) == null) { > m = collection.mutable.Map[String, > Int]().withDefaultValue(0) > } > m(row(i).toString()) += 1 > activityMap.update(i, m) > } > } > var activityPPRow: Row = Row() > for((k,v) <- activityMap) { > var rowVal:String = "" > for((a,b) <- v) { > rowVal += rowVal + a + ":" + b/sentCount + "|" > } > returnValue.update(k, rowVal) > // activityPPRow.apply(k) = rowVal > } > > } > return returnValue > } > > When I run step 2 I get the following error. I am new to Scala and Spark > and am unable to figure out how to pass the Iterable[Row] to a function and > get back the results. > > org.apache.spark.SparkException: Task not serializable > at > org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:304) > at > org.apache.spark.util.ClosureCleaner$.org$apache$spark$util$ClosureCleaner$$clean(ClosureCleaner.scala:294) > at > org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:122) > at org.apache.spark.SparkContext.clean(SparkContext.scala:2032) > at org.apache.spark.rdd.RDD$$anonfun$map$1.apply(RDD.scala:318) > at org.apache.sp
How to call a custom function from GroupByKey which takes Iterable[Row] as input and returns a Map[Int,String] as output in scala
Hi, I have a scenario wherein my dataset has around 30 columns. It is basically user activity information. I need to group the information by each user and then for each column/activity parameter I need to find the percentage affinity for each value in that column for that user. Below is the sample input and output. UserId C1 C2 C3 1 A <20 0 1 A >20 & <40 1 1 B >20 & <40 0 1 C >20 & <40 0 1 C >20 & <40 0 2 A <20 0 3 B >20 & <40 1 3 B >40 2 Output 1 A:0.4|B:0.2|C:0.4 <20:02|>20 & <40:0.8 0:0.8|1:0.2 2 A:1 <20:1 0:01 3 B:1 >20 & <40:0.5|>40:0.5 1:0.5|2:0.5 Presently this is how I am calculating these values: Group by UserId and C1 and compute values for C1 for all the users, then do a group by by Userid and C2 and find the fractions for C2 for each user and so on. This approach is quite slow. Also the number of records for each user will be at max 30. So I would like to take a second approach wherein I do a groupByKey and pass the entire list of records for each key to a function which computes all the percentages for each column for each user at once. Below are the steps I am trying to follow: 1. Dataframe1 => group by UserId , find the counts of records for each user. Join the results back to the input so that counts are available with each record 2. Dataframe1.map(s=>s(1),s).groupByKey().map(s=>myUserAggregator(s._2)) def myUserAggregator(rows: Iterable[Row]): scala.collection.mutable.Map[Int,String] = { val returnValue = scala.collection.mutable.Map[Int,String]() if (rows != null) { val activityMap = scala.collection.mutable.Map[Int, scala.collection.mutable.Map[String, Int]]().withDefaultValue(scala.collection.mutable.Map[String, Int]().withDefaultValue(0)) val rowIt = rows.iterator var sentCount = 1 for (row <- rowIt) { sentCount = row(29).toString().toInt for (i <- 0 until row.length) { var m = activityMap(i) if (activityMap(i) == null) { m = collection.mutable.Map[String, Int]().withDefaultValue(0) } m(row(i).toString()) += 1 activityMap.update(i, m) } } var activityPPRow: Row = Row() for((k,v) <- activityMap) { var rowVal:String = "" for((a,b) <- v) { rowVal += rowVal + a + ":" + b/sentCount + "|" } returnValue.update(k, rowVal) // activityPPRow.apply(k) = rowVal } } return returnValue } When I run step 2 I get the following error. I am new to Scala and Spark and am unable to figure out how to pass the Iterable[Row] to a function and get back the results. org.apache.spark.SparkException: Task not serializable at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:304) at org.apache.spark.util.ClosureCleaner$.org$apache$spark$util$ClosureCleaner$$clean(ClosureCleaner.scala:294) at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:122) at org.apache.spark.SparkContext.clean(SparkContext.scala:2032) at org.apache.spark.rdd.RDD$$anonfun$map$1.apply(RDD.scala:318) at org.apache.spark.rdd.RDD$$anonfun$map$1.apply(RDD.scala:317) at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:147) at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:108) at org.apache.spark.rdd.RDD.withScope(RDD.scala:310) at org.apache.spark.rdd.RDD.map(RDD.scala:317) at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.(:97) at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.(:102) at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.(:104) at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.(:106) at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.(:108) at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.(:110) at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.(:112) at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.(:114) at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.(:116) .. Thanks for the help. Regards, Neha Mehta
How to call a custom function from GroupByKey which takes Iterable[Row] as input and returns a Map[Int,String] as output in scala
Hi, I have a scenario wherein my dataset has around 30 columns. It is basically user activity information. I need to group the information by each user and then for each column/activity parameter I need to find the percentage affinity for each value in that column for that user. Below is the sample input and output. UserId C1 C2 C3 1 A <20 0 1 A >20 & <40 1 1 B >20 & <40 0 1 C >20 & <40 0 1 C >20 & <40 0 2 A <20 0 3 B >20 & <40 1 3 B >40 2 Output 1 A:0.4|B:0.2|C:0.4 <20:02|>20 & <40:0.8 0:0.8|1:0.2 2 A:1 <20:1 0:01 3 B:1 >20 & <40:0.5|>40:0.5 1:0.5|2:0.5 Presently this is how I am calculating these values: Group by UserId and C1 and compute values for C1 for all the users, then do a group by by Userid and C2 and find the fractions for C2 for each user and so on. This approach is quite slow. Also the number of records for each user will be at max 30. So I would like to take a second approach wherein I do a groupByKey and pass the entire list of records for each key to a function which computes all the percentages for each column for each user at once. Below are the steps I am trying to follow: 1. Dataframe1 => group by UserId , find the counts of records for each user. Join the results back to the input so that counts are available with each record 2. Dataframe1.map(s=>s(1),s).groupByKey().map(s=>myUserAggregator(s._2)) def myUserAggregator(rows: Iterable[Row]): scala.collection.mutable.Map[Int,String] = { val returnValue = scala.collection.mutable.Map[Int,String]() if (rows != null) { val activityMap = scala.collection.mutable.Map[Int, scala.collection.mutable.Map[String, Int]]().withDefaultValue(scala.collection.mutable.Map[String, Int]().withDefaultValue(0)) val rowIt = rows.iterator var sentCount = 1 for (row <- rowIt) { sentCount = row(29).toString().toInt for (i <- 0 until row.length) { var m = activityMap(i) if (activityMap(i) == null) { m = collection.mutable.Map[String, Int]().withDefaultValue(0) } m(row(i).toString()) += 1 activityMap.update(i, m) } } var activityPPRow: Row = Row() for((k,v) <- activityMap) { var rowVal:String = "" for((a,b) <- v) { rowVal += rowVal + a + ":" + b/sentCount + "|" } returnValue.update(k, rowVal) // activityPPRow.apply(k) = rowVal } } return returnValue } When I run step 2 I get the following error. I am new to Scala and Spark and am unable to figure out how to pass the Iterable[Row] to a function and get back the results. org.apache.spark.SparkException: Task not serializable at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:304) at org.apache.spark.util.ClosureCleaner$.org$apache$spark$util$ClosureCleaner$$clean(ClosureCleaner.scala:294) at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:122) at org.apache.spark.SparkContext.clean(SparkContext.scala:2032) at org.apache.spark.rdd.RDD$$anonfun$map$1.apply(RDD.scala:318) at org.apache.spark.rdd.RDD$$anonfun$map$1.apply(RDD.scala:317) at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:147) at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:108) at org.apache.spark.rdd.RDD.withScope(RDD.scala:310) at org.apache.spark.rdd.RDD.map(RDD.scala:317) at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.(:97) at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.(:102) at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.(:104) at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.(:106) at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.(:108) at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.(:110) at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.(:112) at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.(:114) at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.(:116) .. Thanks for the help. Regards, Neha Mehta
Spark Streaming: routing by key without groupByKey
I have requirement to route a paired DStream to a series of map and flatMap such that entries with the same key goes to the same thread within the same batch. Closest I can come up with is groupByKey().flatMap(_._2). But this kills throughput by 50%. When I think about it groupByKey is more than I need. With groupByKey the same thread sees all events with key Alice at a time, and only Alice. For my requirement if there are Bob, Charlie in between it's still OK. This seems to be a common routing requirement and shouldn't cause 50% performance hit. Is there a way to construct the stream in such way that I'm not aware of? I have read https://databricks.gitbooks.io/databricks-spark-knowledge-base/content/best_practices/prefer_reducebykey_over_groupbykey.html but reduceByKey isn't the solution since we are not doing aggregation. Our stream is a chain of map and flatMap[withState]
Re: groupByKey does not work?
I suspect this is another instance of case classes not working as expected between the driver and executor when used with spark-shell. Search JIRA for some back story. On Tue, Jan 5, 2016 at 12:42 AM, Arun Luthra <arun.lut...@gmail.com> wrote: > Spark 1.5.0 > > data: > > p1,lo1,8,0,4,0,5,20150901|5,1,1.0 > p1,lo2,8,0,4,0,5,20150901|5,1,1.0 > p1,lo3,8,0,4,0,5,20150901|5,1,1.0 > p1,lo4,8,0,4,0,5,20150901|5,1,1.0 > p1,lo1,8,0,4,0,5,20150901|5,1,1.0 > p1,lo2,8,0,4,0,5,20150901|5,1,1.0 > p1,lo3,8,0,4,0,5,20150901|5,1,1.0 > p1,lo4,8,0,4,0,5,20150901|5,1,1.0 > p1,lo1,8,0,4,0,5,20150901|5,1,1.0 > p1,lo2,8,0,4,0,5,20150901|5,1,1.0 > p1,lo3,8,0,4,0,5,20150901|5,1,1.0 > p1,lo4,8,0,4,0,5,20150901|5,1,1.0 > p1,lo1,8,0,4,0,5,20150901|5,1,1.0 > p1,lo2,8,0,4,0,5,20150901|5,1,1.0 > p1,lo3,8,0,4,0,5,20150901|5,1,1.0 > p1,lo4,8,0,4,0,5,20150901|5,1,1.0 > > spark-shell: > > spark-shell \ > --num-executors 2 \ > --driver-memory 1g \ > --executor-memory 10g \ > --executor-cores 8 \ > --master yarn-client > > > case class Mykey(uname:String, lo:String, f1:Char, f2:Char, f3:Char, > f4:Char, f5:Char, f6:String) > case class Myvalue(count1:Long, count2:Long, num:Double) > > val myrdd = sc.textFile("/user/al733a/mydata.txt").map { case line => { > val spl = line.split("\\|", -1) > val k = spl(0).split(",") > val v = spl(1).split(",") > (Mykey(k(0), k(1), k(2)(0).toChar, k(3)(0).toChar, k(4)(0).toChar, > k(5)(0).toChar, k(6)(0).toChar, k(7)), > Myvalue(v(0).toLong, v(1).toLong, v(2).toDouble) > ) > }} > > myrdd.groupByKey().map { case (mykey, val_iterable) => (mykey, 1) > }.collect().foreach(println) > > (Mykey(p1,lo1,8,0,4,0,5,20150901),1) > (Mykey(p1,lo1,8,0,4,0,5,20150901),1) > (Mykey(p1,lo3,8,0,4,0,5,20150901),1) > (Mykey(p1,lo3,8,0,4,0,5,20150901),1) > (Mykey(p1,lo4,8,0,4,0,5,20150901),1) > (Mykey(p1,lo4,8,0,4,0,5,20150901),1) > (Mykey(p1,lo2,8,0,4,0,5,20150901),1) > (Mykey(p1,lo2,8,0,4,0,5,20150901),1) > > > > You can see that each key is repeated 2 times but each key should only > appear once. > > Arun > > On Mon, Jan 4, 2016 at 4:07 PM, Ted Yu <yuzhih...@gmail.com> wrote: >> >> Can you give a bit more information ? >> >> Release of Spark you're using >> Minimal dataset that shows the problem >> >> Cheers >> >> On Mon, Jan 4, 2016 at 3:55 PM, Arun Luthra <arun.lut...@gmail.com> wrote: >>> >>> I tried groupByKey and noticed that it did not group all values into the >>> same group. >>> >>> In my test dataset (a Pair rdd) I have 16 records, where there are only 4 >>> distinct keys, so I expected there to be 4 records in the groupByKey object, >>> but instead there were 8. Each of the 4 distinct keys appear 2 times. >>> >>> Is this the expected behavior? I need to be able to get ALL values >>> associated with each key grouped into a SINGLE record. Is it possible? >>> >>> Arun >>> >>> p.s. reducebykey will not be sufficient for me >> >> > - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: groupByKey does not work?
Can you give a bit more information ? Release of Spark you're using Minimal dataset that shows the problem Cheers On Mon, Jan 4, 2016 at 3:55 PM, Arun Luthra <arun.lut...@gmail.com> wrote: > I tried groupByKey and noticed that it did not group all values into the > same group. > > In my test dataset (a Pair rdd) I have 16 records, where there are only 4 > distinct keys, so I expected there to be 4 records in the groupByKey > object, but instead there were 8. Each of the 4 distinct keys appear 2 > times. > > Is this the expected behavior? I need to be able to get ALL values > associated with each key grouped into a SINGLE record. Is it possible? > > Arun > > p.s. reducebykey will not be sufficient for me >
Re: groupByKey does not work?
Could you please post the associated code and output? On Mon, Jan 4, 2016 at 3:55 PM Arun Luthra <arun.lut...@gmail.com> wrote: > I tried groupByKey and noticed that it did not group all values into the > same group. > > In my test dataset (a Pair rdd) I have 16 records, where there are only 4 > distinct keys, so I expected there to be 4 records in the groupByKey > object, but instead there were 8. Each of the 4 distinct keys appear 2 > times. > > Is this the expected behavior? I need to be able to get ALL values > associated with each key grouped into a SINGLE record. Is it possible? > > Arun > > p.s. reducebykey will not be sufficient for me >
groupByKey does not work?
I tried groupByKey and noticed that it did not group all values into the same group. In my test dataset (a Pair rdd) I have 16 records, where there are only 4 distinct keys, so I expected there to be 4 records in the groupByKey object, but instead there were 8. Each of the 4 distinct keys appear 2 times. Is this the expected behavior? I need to be able to get ALL values associated with each key grouped into a SINGLE record. Is it possible? Arun p.s. reducebykey will not be sufficient for me
Re: groupByKey does not work?
Spark 1.5.0 data: p1,lo1,8,0,4,0,5,20150901|5,1,1.0 p1,lo2,8,0,4,0,5,20150901|5,1,1.0 p1,lo3,8,0,4,0,5,20150901|5,1,1.0 p1,lo4,8,0,4,0,5,20150901|5,1,1.0 p1,lo1,8,0,4,0,5,20150901|5,1,1.0 p1,lo2,8,0,4,0,5,20150901|5,1,1.0 p1,lo3,8,0,4,0,5,20150901|5,1,1.0 p1,lo4,8,0,4,0,5,20150901|5,1,1.0 p1,lo1,8,0,4,0,5,20150901|5,1,1.0 p1,lo2,8,0,4,0,5,20150901|5,1,1.0 p1,lo3,8,0,4,0,5,20150901|5,1,1.0 p1,lo4,8,0,4,0,5,20150901|5,1,1.0 p1,lo1,8,0,4,0,5,20150901|5,1,1.0 p1,lo2,8,0,4,0,5,20150901|5,1,1.0 p1,lo3,8,0,4,0,5,20150901|5,1,1.0 p1,lo4,8,0,4,0,5,20150901|5,1,1.0 spark-shell: spark-shell \ --num-executors 2 \ --driver-memory 1g \ --executor-memory 10g \ --executor-cores 8 \ --master yarn-client case class Mykey(uname:String, lo:String, f1:Char, f2:Char, f3:Char, f4:Char, f5:Char, f6:String) case class Myvalue(count1:Long, count2:Long, num:Double) val myrdd = sc.textFile("/user/al733a/mydata.txt").map { case line => { val spl = line.split("\\|", -1) val k = spl(0).split(",") val v = spl(1).split(",") (Mykey(k(0), k(1), k(2)(0).toChar, k(3)(0).toChar, k(4)(0).toChar, k(5)(0).toChar, k(6)(0).toChar, k(7)), Myvalue(v(0).toLong, v(1).toLong, v(2).toDouble) ) }} myrdd.groupByKey().map { case (mykey, val_iterable) => (mykey, 1) }.collect().foreach(println) (Mykey(p1,lo1,8,0,4,0,5,20150901),1) (Mykey(p1,lo1,8,0,4,0,5,20150901),1) (Mykey(p1,lo3,8,0,4,0,5,20150901),1) (Mykey(p1,lo3,8,0,4,0,5,20150901),1) (Mykey(p1,lo4,8,0,4,0,5,20150901),1) (Mykey(p1,lo4,8,0,4,0,5,20150901),1) (Mykey(p1,lo2,8,0,4,0,5,20150901),1) (Mykey(p1,lo2,8,0,4,0,5,20150901),1) You can see that each key is repeated 2 times but each key should only appear once. Arun On Mon, Jan 4, 2016 at 4:07 PM, Ted Yu <yuzhih...@gmail.com> wrote: > Can you give a bit more information ? > > Release of Spark you're using > Minimal dataset that shows the problem > > Cheers > > On Mon, Jan 4, 2016 at 3:55 PM, Arun Luthra <arun.lut...@gmail.com> wrote: > >> I tried groupByKey and noticed that it did not group all values into the >> same group. >> >> In my test dataset (a Pair rdd) I have 16 records, where there are only 4 >> distinct keys, so I expected there to be 4 records in the groupByKey >> object, but instead there were 8. Each of the 4 distinct keys appear 2 >> times. >> >> Is this the expected behavior? I need to be able to get ALL values >> associated with each key grouped into a SINGLE record. Is it possible? >> >> Arun >> >> p.s. reducebykey will not be sufficient for me >> > >
Re: groupByKey does not work?
Could you try simplifying the key and seeing if that makes any difference? Make it just a string or an int so we can count out any issues in object equality. On Mon, Jan 4, 2016 at 4:42 PM Arun Luthra <arun.lut...@gmail.com> wrote: > Spark 1.5.0 > > data: > > p1,lo1,8,0,4,0,5,20150901|5,1,1.0 > p1,lo2,8,0,4,0,5,20150901|5,1,1.0 > p1,lo3,8,0,4,0,5,20150901|5,1,1.0 > p1,lo4,8,0,4,0,5,20150901|5,1,1.0 > p1,lo1,8,0,4,0,5,20150901|5,1,1.0 > p1,lo2,8,0,4,0,5,20150901|5,1,1.0 > p1,lo3,8,0,4,0,5,20150901|5,1,1.0 > p1,lo4,8,0,4,0,5,20150901|5,1,1.0 > p1,lo1,8,0,4,0,5,20150901|5,1,1.0 > p1,lo2,8,0,4,0,5,20150901|5,1,1.0 > p1,lo3,8,0,4,0,5,20150901|5,1,1.0 > p1,lo4,8,0,4,0,5,20150901|5,1,1.0 > p1,lo1,8,0,4,0,5,20150901|5,1,1.0 > p1,lo2,8,0,4,0,5,20150901|5,1,1.0 > p1,lo3,8,0,4,0,5,20150901|5,1,1.0 > p1,lo4,8,0,4,0,5,20150901|5,1,1.0 > > spark-shell: > > spark-shell \ > --num-executors 2 \ > --driver-memory 1g \ > --executor-memory 10g \ > --executor-cores 8 \ > --master yarn-client > > > case class Mykey(uname:String, lo:String, f1:Char, f2:Char, f3:Char, > f4:Char, f5:Char, f6:String) > case class Myvalue(count1:Long, count2:Long, num:Double) > > val myrdd = sc.textFile("/user/al733a/mydata.txt").map { case line => { > val spl = line.split("\\|", -1) > val k = spl(0).split(",") > val v = spl(1).split(",") > (Mykey(k(0), k(1), k(2)(0).toChar, k(3)(0).toChar, k(4)(0).toChar, > k(5)(0).toChar, k(6)(0).toChar, k(7)), > Myvalue(v(0).toLong, v(1).toLong, v(2).toDouble) > ) > }} > > myrdd.groupByKey().map { case (mykey, val_iterable) => (mykey, 1) > }.collect().foreach(println) > > (Mykey(p1,lo1,8,0,4,0,5,20150901),1) > > (Mykey(p1,lo1,8,0,4,0,5,20150901),1) > (Mykey(p1,lo3,8,0,4,0,5,20150901),1) > (Mykey(p1,lo3,8,0,4,0,5,20150901),1) > (Mykey(p1,lo4,8,0,4,0,5,20150901),1) > (Mykey(p1,lo4,8,0,4,0,5,20150901),1) > (Mykey(p1,lo2,8,0,4,0,5,20150901),1) > (Mykey(p1,lo2,8,0,4,0,5,20150901),1) > > > > You can see that each key is repeated 2 times but each key should only > appear once. > > Arun > > On Mon, Jan 4, 2016 at 4:07 PM, Ted Yu <yuzhih...@gmail.com> wrote: > >> Can you give a bit more information ? >> >> Release of Spark you're using >> Minimal dataset that shows the problem >> >> Cheers >> >> On Mon, Jan 4, 2016 at 3:55 PM, Arun Luthra <arun.lut...@gmail.com> >> wrote: >> >>> I tried groupByKey and noticed that it did not group all values into the >>> same group. >>> >>> In my test dataset (a Pair rdd) I have 16 records, where there are only >>> 4 distinct keys, so I expected there to be 4 records in the groupByKey >>> object, but instead there were 8. Each of the 4 distinct keys appear 2 >>> times. >>> >>> Is this the expected behavior? I need to be able to get ALL values >>> associated with each key grouped into a SINGLE record. Is it possible? >>> >>> Arun >>> >>> p.s. reducebykey will not be sufficient for me >>> >> >> >
Re: groupByKey does not work?
If I simplify the key to String column with values lo1, lo2, lo3, lo4, it works correctly. On Mon, Jan 4, 2016 at 4:49 PM, Daniel Imberman <daniel.imber...@gmail.com> wrote: > Could you try simplifying the key and seeing if that makes any difference? > Make it just a string or an int so we can count out any issues in object > equality. > > On Mon, Jan 4, 2016 at 4:42 PM Arun Luthra <arun.lut...@gmail.com> wrote: > >> Spark 1.5.0 >> >> data: >> >> p1,lo1,8,0,4,0,5,20150901|5,1,1.0 >> p1,lo2,8,0,4,0,5,20150901|5,1,1.0 >> p1,lo3,8,0,4,0,5,20150901|5,1,1.0 >> p1,lo4,8,0,4,0,5,20150901|5,1,1.0 >> p1,lo1,8,0,4,0,5,20150901|5,1,1.0 >> p1,lo2,8,0,4,0,5,20150901|5,1,1.0 >> p1,lo3,8,0,4,0,5,20150901|5,1,1.0 >> p1,lo4,8,0,4,0,5,20150901|5,1,1.0 >> p1,lo1,8,0,4,0,5,20150901|5,1,1.0 >> p1,lo2,8,0,4,0,5,20150901|5,1,1.0 >> p1,lo3,8,0,4,0,5,20150901|5,1,1.0 >> p1,lo4,8,0,4,0,5,20150901|5,1,1.0 >> p1,lo1,8,0,4,0,5,20150901|5,1,1.0 >> p1,lo2,8,0,4,0,5,20150901|5,1,1.0 >> p1,lo3,8,0,4,0,5,20150901|5,1,1.0 >> p1,lo4,8,0,4,0,5,20150901|5,1,1.0 >> >> spark-shell: >> >> spark-shell \ >> --num-executors 2 \ >> --driver-memory 1g \ >> --executor-memory 10g \ >> --executor-cores 8 \ >> --master yarn-client >> >> >> case class Mykey(uname:String, lo:String, f1:Char, f2:Char, f3:Char, >> f4:Char, f5:Char, f6:String) >> case class Myvalue(count1:Long, count2:Long, num:Double) >> >> val myrdd = sc.textFile("/user/al733a/mydata.txt").map { case line => { >> val spl = line.split("\\|", -1) >> val k = spl(0).split(",") >> val v = spl(1).split(",") >> (Mykey(k(0), k(1), k(2)(0).toChar, k(3)(0).toChar, k(4)(0).toChar, >> k(5)(0).toChar, k(6)(0).toChar, k(7)), >> Myvalue(v(0).toLong, v(1).toLong, v(2).toDouble) >> ) >> }} >> >> myrdd.groupByKey().map { case (mykey, val_iterable) => (mykey, 1) >> }.collect().foreach(println) >> >> (Mykey(p1,lo1,8,0,4,0,5,20150901),1) >> >> (Mykey(p1,lo1,8,0,4,0,5,20150901),1) >> (Mykey(p1,lo3,8,0,4,0,5,20150901),1) >> (Mykey(p1,lo3,8,0,4,0,5,20150901),1) >> (Mykey(p1,lo4,8,0,4,0,5,20150901),1) >> (Mykey(p1,lo4,8,0,4,0,5,20150901),1) >> (Mykey(p1,lo2,8,0,4,0,5,20150901),1) >> (Mykey(p1,lo2,8,0,4,0,5,20150901),1) >> >> >> >> You can see that each key is repeated 2 times but each key should only >> appear once. >> >> Arun >> >> On Mon, Jan 4, 2016 at 4:07 PM, Ted Yu <yuzhih...@gmail.com> wrote: >> >>> Can you give a bit more information ? >>> >>> Release of Spark you're using >>> Minimal dataset that shows the problem >>> >>> Cheers >>> >>> On Mon, Jan 4, 2016 at 3:55 PM, Arun Luthra <arun.lut...@gmail.com> >>> wrote: >>> >>>> I tried groupByKey and noticed that it did not group all values into >>>> the same group. >>>> >>>> In my test dataset (a Pair rdd) I have 16 records, where there are only >>>> 4 distinct keys, so I expected there to be 4 records in the groupByKey >>>> object, but instead there were 8. Each of the 4 distinct keys appear 2 >>>> times. >>>> >>>> Is this the expected behavior? I need to be able to get ALL values >>>> associated with each key grouped into a SINGLE record. Is it possible? >>>> >>>> Arun >>>> >>>> p.s. reducebykey will not be sufficient for me >>>> >>> >>> >>
Re: groupByKey does not work?
That's interesting. I would try case class Mykey(uname:String) case class Mykey(uname:String, c1:Char) case class Mykey(uname:String, lo:String, f1:Char, f2:Char, f3:Char, f4:Char, f5:Char, f6:String) In that order. It seems like there is some issue with equality between keys. On Mon, Jan 4, 2016 at 5:05 PM Arun Luthra <arun.lut...@gmail.com> wrote: > If I simplify the key to String column with values lo1, lo2, lo3, lo4, it > works correctly. > > On Mon, Jan 4, 2016 at 4:49 PM, Daniel Imberman <daniel.imber...@gmail.com > > wrote: > >> Could you try simplifying the key and seeing if that makes any >> difference? Make it just a string or an int so we can count out any issues >> in object equality. >> >> On Mon, Jan 4, 2016 at 4:42 PM Arun Luthra <arun.lut...@gmail.com> wrote: >> >>> Spark 1.5.0 >>> >>> data: >>> >>> p1,lo1,8,0,4,0,5,20150901|5,1,1.0 >>> p1,lo2,8,0,4,0,5,20150901|5,1,1.0 >>> p1,lo3,8,0,4,0,5,20150901|5,1,1.0 >>> p1,lo4,8,0,4,0,5,20150901|5,1,1.0 >>> p1,lo1,8,0,4,0,5,20150901|5,1,1.0 >>> p1,lo2,8,0,4,0,5,20150901|5,1,1.0 >>> p1,lo3,8,0,4,0,5,20150901|5,1,1.0 >>> p1,lo4,8,0,4,0,5,20150901|5,1,1.0 >>> p1,lo1,8,0,4,0,5,20150901|5,1,1.0 >>> p1,lo2,8,0,4,0,5,20150901|5,1,1.0 >>> p1,lo3,8,0,4,0,5,20150901|5,1,1.0 >>> p1,lo4,8,0,4,0,5,20150901|5,1,1.0 >>> p1,lo1,8,0,4,0,5,20150901|5,1,1.0 >>> p1,lo2,8,0,4,0,5,20150901|5,1,1.0 >>> p1,lo3,8,0,4,0,5,20150901|5,1,1.0 >>> p1,lo4,8,0,4,0,5,20150901|5,1,1.0 >>> >>> spark-shell: >>> >>> spark-shell \ >>> --num-executors 2 \ >>> --driver-memory 1g \ >>> --executor-memory 10g \ >>> --executor-cores 8 \ >>> --master yarn-client >>> >>> >>> case class Mykey(uname:String, lo:String, f1:Char, f2:Char, f3:Char, >>> f4:Char, f5:Char, f6:String) >>> case class Myvalue(count1:Long, count2:Long, num:Double) >>> >>> val myrdd = sc.textFile("/user/al733a/mydata.txt").map { case line => { >>> val spl = line.split("\\|", -1) >>> val k = spl(0).split(",") >>> val v = spl(1).split(",") >>> (Mykey(k(0), k(1), k(2)(0).toChar, k(3)(0).toChar, k(4)(0).toChar, >>> k(5)(0).toChar, k(6)(0).toChar, k(7)), >>> Myvalue(v(0).toLong, v(1).toLong, v(2).toDouble) >>> ) >>> }} >>> >>> myrdd.groupByKey().map { case (mykey, val_iterable) => (mykey, 1) >>> }.collect().foreach(println) >>> >>> (Mykey(p1,lo1,8,0,4,0,5,20150901),1) >>> >>> (Mykey(p1,lo1,8,0,4,0,5,20150901),1) >>> (Mykey(p1,lo3,8,0,4,0,5,20150901),1) >>> (Mykey(p1,lo3,8,0,4,0,5,20150901),1) >>> (Mykey(p1,lo4,8,0,4,0,5,20150901),1) >>> (Mykey(p1,lo4,8,0,4,0,5,20150901),1) >>> (Mykey(p1,lo2,8,0,4,0,5,20150901),1) >>> (Mykey(p1,lo2,8,0,4,0,5,20150901),1) >>> >>> >>> >>> You can see that each key is repeated 2 times but each key should only >>> appear once. >>> >>> Arun >>> >>> On Mon, Jan 4, 2016 at 4:07 PM, Ted Yu <yuzhih...@gmail.com> wrote: >>> >>>> Can you give a bit more information ? >>>> >>>> Release of Spark you're using >>>> Minimal dataset that shows the problem >>>> >>>> Cheers >>>> >>>> On Mon, Jan 4, 2016 at 3:55 PM, Arun Luthra <arun.lut...@gmail.com> >>>> wrote: >>>> >>>>> I tried groupByKey and noticed that it did not group all values into >>>>> the same group. >>>>> >>>>> In my test dataset (a Pair rdd) I have 16 records, where there are >>>>> only 4 distinct keys, so I expected there to be 4 records in the >>>>> groupByKey >>>>> object, but instead there were 8. Each of the 4 distinct keys appear 2 >>>>> times. >>>>> >>>>> Is this the expected behavior? I need to be able to get ALL values >>>>> associated with each key grouped into a SINGLE record. Is it possible? >>>>> >>>>> Arun >>>>> >>>>> p.s. reducebykey will not be sufficient for me >>>>> >>>> >>>> >>> >
groupByKey()
Hi, Sorry for the long inputs but it is my situation. i have two list and i wana grupbykey them but some value of list disapear. i can't understand this. (8867989628612931721,[1, 1, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0]) (8867989628612931721,[1, 1, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0,* 1*, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0]) result of groupbykey (8867989628612931721,[[1, 1, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0
Re: pyspark groupbykey throwing error: unpack requires a string argument of length 4
Stacktrace would be helpful if you can provide that. On Mon, Oct 19, 2015 at 1:42 PM, fahad shah <sfaha...@gmail.com> wrote: > Hi > > I am trying to do pair rdd's, group by the key assign id based on key. > I am using Pyspark with spark 1.3, for some reason, I am getting this > error that I am unable to figure out - any help much appreciated. > > Things I tried (but to no effect), > > 1. make sure I am not doing any conversions on the strings > 2. make sure that the fields used in the key are all there and not > empty string (or else I toss the row out) > > My code is along following lines (split is using stringio to parse > csv, header removes the header row and parse_train is putting the 54 > fields into named tuple after whitespace/quote removal): > > #Error for string argument is thrown on the BB.take(1) where the > groupbykey is evaluated > > A = sc.textFile("train.csv").filter(lambda x:not > isHeader(x)).map(split).map(parse_train).filter(lambda x: not x is > None) > > A.count() > > B = A.map(lambda k: > > ((k.srch_destination_id,k.srch_length_of_stay,k.srch_booking_window,k.srch_adults_count, > k.srch_children_count,k.srch_room_count), > (k[0:54]))) > BB = B.groupByKey() > BB.take(1) > > > best fahad > > - > To unsubscribe, e-mail: user-unsubscr...@spark.apache.org > For additional commands, e-mail: user-h...@spark.apache.org > > -- Best Regards Jeff Zhang
Re: pyspark groupbykey throwing error: unpack requires a string argument of length 4
ortStage(DAGScheduler.scala:1192) at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:693) at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:693) at scala.Option.foreach(Option.scala:236) at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:693) at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1393) at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1354) at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48) On Sun, Oct 18, 2015 at 11:17 PM, Jeff Zhang <zjf...@gmail.com> wrote: > Stacktrace would be helpful if you can provide that. > > > > On Mon, Oct 19, 2015 at 1:42 PM, fahad shah <sfaha...@gmail.com> wrote: >> >> Hi >> >> I am trying to do pair rdd's, group by the key assign id based on key. >> I am using Pyspark with spark 1.3, for some reason, I am getting this >> error that I am unable to figure out - any help much appreciated. >> >> Things I tried (but to no effect), >> >> 1. make sure I am not doing any conversions on the strings >> 2. make sure that the fields used in the key are all there and not >> empty string (or else I toss the row out) >> >> My code is along following lines (split is using stringio to parse >> csv, header removes the header row and parse_train is putting the 54 >> fields into named tuple after whitespace/quote removal): >> >> #Error for string argument is thrown on the BB.take(1) where the >> groupbykey is evaluated >> >> A = sc.textFile("train.csv").filter(lambda x:not >> isHeader(x)).map(split).map(parse_train).filter(lambda x: not x is >> None) >> >> A.count() >> >> B = A.map(lambda k: >> >> ((k.srch_destination_id,k.srch_length_of_stay,k.srch_booking_window,k.srch_adults_count, >> k.srch_children_count,k.srch_room_count), >> (k[0:54]))) >> BB = B.groupByKey() >> BB.take(1) >> >> >> best fahad >> >> - >> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org >> For additional commands, e-mail: user-h...@spark.apache.org >> > > > > -- > Best Regards > > Jeff Zhang - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: pyspark groupbykey throwing error: unpack requires a string argument of length 4
Thanks Davies, sure, I can share the code/data in pm - best fahad On Mon, Oct 19, 2015 at 10:52 AM, Davies Liu <dav...@databricks.com> wrote: > Could you simplify the code a little bit so we can reproduce the failure? > (may also have some sample dataset if it depends on them) > > On Sun, Oct 18, 2015 at 10:42 PM, fahad shah <sfaha...@gmail.com> wrote: >> Hi >> >> I am trying to do pair rdd's, group by the key assign id based on key. >> I am using Pyspark with spark 1.3, for some reason, I am getting this >> error that I am unable to figure out - any help much appreciated. >> >> Things I tried (but to no effect), >> >> 1. make sure I am not doing any conversions on the strings >> 2. make sure that the fields used in the key are all there and not >> empty string (or else I toss the row out) >> >> My code is along following lines (split is using stringio to parse >> csv, header removes the header row and parse_train is putting the 54 >> fields into named tuple after whitespace/quote removal): >> >> #Error for string argument is thrown on the BB.take(1) where the >> groupbykey is evaluated >> >> A = sc.textFile("train.csv").filter(lambda x:not >> isHeader(x)).map(split).map(parse_train).filter(lambda x: not x is >> None) >> >> A.count() >> >> B = A.map(lambda k: >> ((k.srch_destination_id,k.srch_length_of_stay,k.srch_booking_window,k.srch_adults_count, >> k.srch_children_count,k.srch_room_count), >> (k[0:54]))) >> BB = B.groupByKey() >> BB.take(1) >> >> >> best fahad >> >> - >> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org >> For additional commands, e-mail: user-h...@spark.apache.org >> - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: pyspark groupbykey throwing error: unpack requires a string argument of length 4
Could you simplify the code a little bit so we can reproduce the failure? (may also have some sample dataset if it depends on them) On Sun, Oct 18, 2015 at 10:42 PM, fahad shah <sfaha...@gmail.com> wrote: > Hi > > I am trying to do pair rdd's, group by the key assign id based on key. > I am using Pyspark with spark 1.3, for some reason, I am getting this > error that I am unable to figure out - any help much appreciated. > > Things I tried (but to no effect), > > 1. make sure I am not doing any conversions on the strings > 2. make sure that the fields used in the key are all there and not > empty string (or else I toss the row out) > > My code is along following lines (split is using stringio to parse > csv, header removes the header row and parse_train is putting the 54 > fields into named tuple after whitespace/quote removal): > > #Error for string argument is thrown on the BB.take(1) where the > groupbykey is evaluated > > A = sc.textFile("train.csv").filter(lambda x:not > isHeader(x)).map(split).map(parse_train).filter(lambda x: not x is > None) > > A.count() > > B = A.map(lambda k: > ((k.srch_destination_id,k.srch_length_of_stay,k.srch_booking_window,k.srch_adults_count, > k.srch_children_count,k.srch_room_count), (k[0:54]))) > BB = B.groupByKey() > BB.take(1) > > > best fahad > > - > To unsubscribe, e-mail: user-unsubscr...@spark.apache.org > For additional commands, e-mail: user-h...@spark.apache.org > - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
pyspark groupbykey throwing error: unpack requires a string argument of length 4
Hi I am trying to do pair rdd's, group by the key assign id based on key. I am using Pyspark with spark 1.3, for some reason, I am getting this error that I am unable to figure out - any help much appreciated. Things I tried (but to no effect), 1. make sure I am not doing any conversions on the strings 2. make sure that the fields used in the key are all there and not empty string (or else I toss the row out) My code is along following lines (split is using stringio to parse csv, header removes the header row and parse_train is putting the 54 fields into named tuple after whitespace/quote removal): #Error for string argument is thrown on the BB.take(1) where the groupbykey is evaluated A = sc.textFile("train.csv").filter(lambda x:not isHeader(x)).map(split).map(parse_train).filter(lambda x: not x is None) A.count() B = A.map(lambda k: ((k.srch_destination_id,k.srch_length_of_stay,k.srch_booking_window,k.srch_adults_count, k.srch_children_count,k.srch_room_count), (k[0:54]))) BB = B.groupByKey() BB.take(1) best fahad - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Different partition number of GroupByKey leads different result
Hi everyone, I got a trouble these days,and I don't know whether it is a bug of spark.When I use GroupByKey for our sequenceFile Data,I find that different partition number lead different result, so as ReduceByKey. I think the problem happens on the shuffle stage.I read the source code, but still can't find the answer. this is the main code: val rdd = sc.sequenceFile[UserWritable, TagsWritable](input, classOf[UserWritable], classOf[TagsWritable]) val combinedRdd = rdd.map(s => (s._1.getuserid(), s._2)).groupByKey(num).filter(_._1 == uid) num is the number of partition and uid is a filter id for result comparision. TagsWritable implements WritableComparable and Serializable. I used GroupByKey on text file, the result was right. Thanks, Devin Huang -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Different-partition-number-of-GroupByKey-leads-different-result-tp24989.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Different partition number of GroupByKey leads different result
Forgive me for not understanding what you mean.The sequence file key is UserWritable,and Value is TagsWritable.Both of them implement WritableComparable and Serializable and rewrite the clone(). The key of string is collected from UserWritable through a map transformation. Have you ever read the spark source code?Which step can lead to data dislocation? > 在 2015年10月9日,17:37,Sean Owen <so...@cloudera.com> 写道: > > Another guess, since you say the key is String (offline): you are not > cloning the value of TagsWritable. Hadoop reuses the object under the > hood, and so is changing your object value. You can't save references > to the object you get from reading a SequenceFile. > > On Fri, Oct 9, 2015 at 10:22 AM, Sean Owen <so...@cloudera.com> wrote: >> First guess: your key class does not implement hashCode/equals >> >> On Fri, Oct 9, 2015 at 10:05 AM, Devin Huang <hos...@163.com> wrote: >>> Hi everyone, >>> >>> I got a trouble these days,and I don't know whether it is a bug of >>> spark.When I use GroupByKey for our sequenceFile Data,I find that different >>> partition number lead different result, so as ReduceByKey. I think the >>> problem happens on the shuffle stage.I read the source code, but still >>> can't find the answer. >>> >>> >>> this is the main code: >>> >>> val rdd = sc.sequenceFile[UserWritable, TagsWritable](input, >>> classOf[UserWritable], classOf[TagsWritable]) >>> val combinedRdd = rdd.map(s => (s._1.getuserid(), >>> s._2)).groupByKey(num).filter(_._1 == uid) >>> >>> num is the number of partition and uid is a filter id for result >>> comparision. >>> TagsWritable implements WritableComparable and Serializable. >>> >>> I used GroupByKey on text file, the result was right. >>> >>> Thanks, >>> Devin Huang >>> >>> >>> >>> >>> -- >>> View this message in context: >>> http://apache-spark-user-list.1001560.n3.nabble.com/Different-partition-number-of-GroupByKey-leads-different-result-tp24989.html >>> Sent from the Apache Spark User List mailing list archive at Nabble.com. >>> >>> - >>> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org >>> For additional commands, e-mail: user-h...@spark.apache.org >>> - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Different partition number of GroupByKey leads different result
If you are not copying or cloning the value (TagsWritable) object, then that is likely the problem. The value is not immutable and is changed by the InputFormat code reading the file, because it is reused. On Fri, Oct 9, 2015 at 11:04 AM, Devin Huang <hos...@163.com> wrote: > Forgive me for not understanding what you mean.The sequence file key is > UserWritable,and Value is TagsWritable.Both of them implement > WritableComparable and Serializable and rewrite the clone(). > The key of string is collected from UserWritable through a map transformation. > > Have you ever read the spark source code?Which step can lead to data > dislocation? > >> 在 2015年10月9日,17:37,Sean Owen <so...@cloudera.com> 写道: >> >> Another guess, since you say the key is String (offline): you are not >> cloning the value of TagsWritable. Hadoop reuses the object under the >> hood, and so is changing your object value. You can't save references >> to the object you get from reading a SequenceFile. >> >> On Fri, Oct 9, 2015 at 10:22 AM, Sean Owen <so...@cloudera.com> wrote: >>> First guess: your key class does not implement hashCode/equals >>> >>> On Fri, Oct 9, 2015 at 10:05 AM, Devin Huang <hos...@163.com> wrote: >>>> Hi everyone, >>>> >>>> I got a trouble these days,and I don't know whether it is a bug of >>>> spark.When I use GroupByKey for our sequenceFile Data,I find that >>>> different >>>> partition number lead different result, so as ReduceByKey. I think the >>>> problem happens on the shuffle stage.I read the source code, but still >>>> can't find the answer. >>>> >>>> >>>> this is the main code: >>>> >>>> val rdd = sc.sequenceFile[UserWritable, TagsWritable](input, >>>> classOf[UserWritable], classOf[TagsWritable]) >>>> val combinedRdd = rdd.map(s => (s._1.getuserid(), >>>> s._2)).groupByKey(num).filter(_._1 == uid) >>>> >>>> num is the number of partition and uid is a filter id for result >>>> comparision. >>>> TagsWritable implements WritableComparable and Serializable. >>>> >>>> I used GroupByKey on text file, the result was right. >>>> >>>> Thanks, >>>> Devin Huang >>>> >>>> >>>> >>>> >>>> -- >>>> View this message in context: >>>> http://apache-spark-user-list.1001560.n3.nabble.com/Different-partition-number-of-GroupByKey-leads-different-result-tp24989.html >>>> Sent from the Apache Spark User List mailing list archive at Nabble.com. >>>> >>>> - >>>> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org >>>> For additional commands, e-mail: user-h...@spark.apache.org >>>> > > - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Different partition number of GroupByKey leads different result
Let me add. The problem is that GroupByKey cannot divide our sequence data into groups correctly ,and produce wrong key/value .The shuffle stage might not be execute correctly.And I don’t know what leads this. The type of key is String, and the type of value is TagsWritable. I take out one user’s data for example. when the partition number is 300, the value of this user is 270102,1.00;130098967f,1.00;270027,1.00;270001,1.00. when the partition number is 100, the value of this user is 282133,1.00;150098921f,1.00; I guess the wrong value is the other user’s value.The data may be mismatched on the shuffle stage. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Different-partition-number-of-GroupByKey-leads-different-result-tp24989p24990.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Different partition number of GroupByKey leads different result
Another guess, since you say the key is String (offline): you are not cloning the value of TagsWritable. Hadoop reuses the object under the hood, and so is changing your object value. You can't save references to the object you get from reading a SequenceFile. On Fri, Oct 9, 2015 at 10:22 AM, Sean Owen <so...@cloudera.com> wrote: > First guess: your key class does not implement hashCode/equals > > On Fri, Oct 9, 2015 at 10:05 AM, Devin Huang <hos...@163.com> wrote: >> Hi everyone, >> >> I got a trouble these days,and I don't know whether it is a bug of >> spark.When I use GroupByKey for our sequenceFile Data,I find that different >> partition number lead different result, so as ReduceByKey. I think the >> problem happens on the shuffle stage.I read the source code, but still >> can't find the answer. >> >> >> this is the main code: >> >> val rdd = sc.sequenceFile[UserWritable, TagsWritable](input, >> classOf[UserWritable], classOf[TagsWritable]) >> val combinedRdd = rdd.map(s => (s._1.getuserid(), >> s._2)).groupByKey(num).filter(_._1 == uid) >> >> num is the number of partition and uid is a filter id for result >> comparision. >> TagsWritable implements WritableComparable and Serializable. >> >> I used GroupByKey on text file, the result was right. >> >> Thanks, >> Devin Huang >> >> >> >> >> -- >> View this message in context: >> http://apache-spark-user-list.1001560.n3.nabble.com/Different-partition-number-of-GroupByKey-leads-different-result-tp24989.html >> Sent from the Apache Spark User List mailing list archive at Nabble.com. >> >> - >> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org >> For additional commands, e-mail: user-h...@spark.apache.org >> - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Different partition number of GroupByKey leads different result
First guess: your key class does not implement hashCode/equals On Fri, Oct 9, 2015 at 10:05 AM, Devin Huang <hos...@163.com> wrote: > Hi everyone, > > I got a trouble these days,and I don't know whether it is a bug of > spark.When I use GroupByKey for our sequenceFile Data,I find that different > partition number lead different result, so as ReduceByKey. I think the > problem happens on the shuffle stage.I read the source code, but still > can't find the answer. > > > this is the main code: > > val rdd = sc.sequenceFile[UserWritable, TagsWritable](input, > classOf[UserWritable], classOf[TagsWritable]) > val combinedRdd = rdd.map(s => (s._1.getuserid(), > s._2)).groupByKey(num).filter(_._1 == uid) > > num is the number of partition and uid is a filter id for result > comparision. > TagsWritable implements WritableComparable and Serializable. > > I used GroupByKey on text file, the result was right. > > Thanks, > Devin Huang > > > > > -- > View this message in context: > http://apache-spark-user-list.1001560.n3.nabble.com/Different-partition-number-of-GroupByKey-leads-different-result-tp24989.html > Sent from the Apache Spark User List mailing list archive at Nabble.com. > > - > To unsubscribe, e-mail: user-unsubscr...@spark.apache.org > For additional commands, e-mail: user-h...@spark.apache.org > - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: how to handle OOMError from groupByKey
"Note: As currently implemented, groupByKey must be able to hold all the key-value pairs for any key in memory. If a key has too many values, it can result in an [[OutOfMemoryError]]." Obvioulsy one of your key value pair is two large. You can try to increase spark.shuffle.memoryFraction. Are you sure you can't : partition your data by user/time-interval => process with a mapPartition => partition by user => process with a mapPartition Not efficient but if your operation decrease the amount of data per user it may work. 2015-09-29 0:17 GMT+08:00 Fabien Martin <fabien.marti...@gmail.com>: > You can try to reduce the number of containers in order to increase their > memory. > > 2015-09-28 9:35 GMT+02:00 Akhil Das <ak...@sigmoidanalytics.com>: > >> You can try to increase the number of partitions to get ride of the OOM >> errors. Also try to use reduceByKey instead of groupByKey. >> >> Thanks >> Best Regards >> >> On Sat, Sep 26, 2015 at 1:05 AM, Elango Cheran <elango.che...@gmail.com> >> wrote: >> >>> Hi everyone, >>> I have an RDD of the format (user: String, timestamp: Long, state: >>> Boolean). My task invovles converting the states, where on/off is >>> represented as true/false, into intervals of 'on' of the format (beginTs: >>> Long, endTs: Long). So this task requires me, per user, to line up all of >>> the on/off states so that I can compute when it is on, since the >>> calculation is neither associative nor commutative. >>> >>> So there are 2 main operations that I'm trying to accomplish together: >>> 1. group by each user >>> 2. sort by time -- keep all of the states in sorted order by time >>> >>> The main code inside the method that does grouping by user and sorting >>> by time looks sort of looks like this: >>> >>> >>> // RDD starts off in format (user, ts, state) of type RDD[(String, Long, >>> Boolean)] >>> val grouped = keyedStatesRDD.groupByKey >>> // after .groupByKey, format for RDD is (user, seq-of-(ts, state)) of >>> type RDD[(String, Iterable(Long, Boolean))] >>> // take the sequence of (ts, state) per user, sort, get intervals >>> val groupedIntervals = grouped.mapValues( >>> states => { >>> val sortedStates = states.toSeq.sortBy(_._1) >>> val intervals = DFUtil.statesToIntervals(sortedStates) >>> val intervalsList = bucketDurations.map{case(k,v) => >>> (k,v)}(collection.breakOut).sortBy(_._1) >>> intervalsList >>> } >>> ) >>> // after .mapValues, new format for RDD is (user, seq-of-(startTime, >>> endTime)) of type RDD[(String, IndexedSeq(Long, Long))] >>> >>> >>> When I run my Spark job with 1 day's worth of data, the job completes >>> successfully. When I run with 1 month's or 1 year's worth of data, that >>> method is where my Spark job consistently crashes with get >>> OutOfMemoryErrors. I need to run on the full year's worth of data. >>> >>> My suspicion is that the groupByKey is the problem (it's pulling all of >>> the matching data values into a single executor's heap as a plain Scala >>> Iterable). But alternatives of doing sortByKey on the RDD first before >>> grouping, or sortByKey followed by a fold[ByKey] or aggregate[ByKey] don't >>> quite apply in my scenario because my operation is not associative (can't >>> combine per-partition results) and I still need to group by users before >>> doing a foldLeft. >>> >>> I've definitely thought about the issue before and come across users >>> with issues that are similar but not exactly the same: >>> >>> http://apache-spark-user-list.1001560.n3.nabble.com/Folding-an-RDD-in-order-td16577.html >>> >>> http://mail-archives.us.apache.org/mod_mbox/spark-user/201501.mbox/%3ccaae1cqr8rd8ypebcmbjwfhm+lxh6nw4+r+uharx00psk_sh...@mail.gmail.com%3E >>> >>> http://apache-spark-user-list.1001560.n3.nabble.com/Partition-sorting-by-Spark-framework-td18213.html >>> >>> http://apache-spark-user-list.1001560.n3.nabble.com/Alternatives-to-groupByKey-td20293.html >>> >>> And this Jira seems relevant too: >>> https://issues.apache.org/jira/browse/SPARK-3655 >>> >>> The amount of memory that I'm using is 2g per executor, and I can't go >>> higher than that because each executor gets a YARN container from nodes >>> with 16 GB of RAM and 5 YARN containers allowed per node. >>> >>> So I'd like to know if there's an easy solution to executing my logic on >>> my full dataset in Spark. >>> >>> Thanks! >>> >>> -- Elango >>> >> >> > -- Alexis GILLAIN
Re: how to handle OOMError from groupByKey
You can try to reduce the number of containers in order to increase their memory. 2015-09-28 9:35 GMT+02:00 Akhil Das <ak...@sigmoidanalytics.com>: > You can try to increase the number of partitions to get ride of the OOM > errors. Also try to use reduceByKey instead of groupByKey. > > Thanks > Best Regards > > On Sat, Sep 26, 2015 at 1:05 AM, Elango Cheran <elango.che...@gmail.com> > wrote: > >> Hi everyone, >> I have an RDD of the format (user: String, timestamp: Long, state: >> Boolean). My task invovles converting the states, where on/off is >> represented as true/false, into intervals of 'on' of the format (beginTs: >> Long, endTs: Long). So this task requires me, per user, to line up all of >> the on/off states so that I can compute when it is on, since the >> calculation is neither associative nor commutative. >> >> So there are 2 main operations that I'm trying to accomplish together: >> 1. group by each user >> 2. sort by time -- keep all of the states in sorted order by time >> >> The main code inside the method that does grouping by user and sorting by >> time looks sort of looks like this: >> >> >> // RDD starts off in format (user, ts, state) of type RDD[(String, Long, >> Boolean)] >> val grouped = keyedStatesRDD.groupByKey >> // after .groupByKey, format for RDD is (user, seq-of-(ts, state)) of >> type RDD[(String, Iterable(Long, Boolean))] >> // take the sequence of (ts, state) per user, sort, get intervals >> val groupedIntervals = grouped.mapValues( >> states => { >> val sortedStates = states.toSeq.sortBy(_._1) >> val intervals = DFUtil.statesToIntervals(sortedStates) >> val intervalsList = bucketDurations.map{case(k,v) => >> (k,v)}(collection.breakOut).sortBy(_._1) >> intervalsList >> } >> ) >> // after .mapValues, new format for RDD is (user, seq-of-(startTime, >> endTime)) of type RDD[(String, IndexedSeq(Long, Long))] >> >> >> When I run my Spark job with 1 day's worth of data, the job completes >> successfully. When I run with 1 month's or 1 year's worth of data, that >> method is where my Spark job consistently crashes with get >> OutOfMemoryErrors. I need to run on the full year's worth of data. >> >> My suspicion is that the groupByKey is the problem (it's pulling all of >> the matching data values into a single executor's heap as a plain Scala >> Iterable). But alternatives of doing sortByKey on the RDD first before >> grouping, or sortByKey followed by a fold[ByKey] or aggregate[ByKey] don't >> quite apply in my scenario because my operation is not associative (can't >> combine per-partition results) and I still need to group by users before >> doing a foldLeft. >> >> I've definitely thought about the issue before and come across users with >> issues that are similar but not exactly the same: >> >> http://apache-spark-user-list.1001560.n3.nabble.com/Folding-an-RDD-in-order-td16577.html >> >> http://mail-archives.us.apache.org/mod_mbox/spark-user/201501.mbox/%3ccaae1cqr8rd8ypebcmbjwfhm+lxh6nw4+r+uharx00psk_sh...@mail.gmail.com%3E >> >> http://apache-spark-user-list.1001560.n3.nabble.com/Partition-sorting-by-Spark-framework-td18213.html >> >> http://apache-spark-user-list.1001560.n3.nabble.com/Alternatives-to-groupByKey-td20293.html >> >> And this Jira seems relevant too: >> https://issues.apache.org/jira/browse/SPARK-3655 >> >> The amount of memory that I'm using is 2g per executor, and I can't go >> higher than that because each executor gets a YARN container from nodes >> with 16 GB of RAM and 5 YARN containers allowed per node. >> >> So I'd like to know if there's an easy solution to executing my logic on >> my full dataset in Spark. >> >> Thanks! >> >> -- Elango >> > >
Re: how to handle OOMError from groupByKey
You can try to increase the number of partitions to get ride of the OOM errors. Also try to use reduceByKey instead of groupByKey. Thanks Best Regards On Sat, Sep 26, 2015 at 1:05 AM, Elango Cheran <elango.che...@gmail.com> wrote: > Hi everyone, > I have an RDD of the format (user: String, timestamp: Long, state: > Boolean). My task invovles converting the states, where on/off is > represented as true/false, into intervals of 'on' of the format (beginTs: > Long, endTs: Long). So this task requires me, per user, to line up all of > the on/off states so that I can compute when it is on, since the > calculation is neither associative nor commutative. > > So there are 2 main operations that I'm trying to accomplish together: > 1. group by each user > 2. sort by time -- keep all of the states in sorted order by time > > The main code inside the method that does grouping by user and sorting by > time looks sort of looks like this: > > > // RDD starts off in format (user, ts, state) of type RDD[(String, Long, > Boolean)] > val grouped = keyedStatesRDD.groupByKey > // after .groupByKey, format for RDD is (user, seq-of-(ts, state)) of type > RDD[(String, Iterable(Long, Boolean))] > // take the sequence of (ts, state) per user, sort, get intervals > val groupedIntervals = grouped.mapValues( > states => { > val sortedStates = states.toSeq.sortBy(_._1) > val intervals = DFUtil.statesToIntervals(sortedStates) > val intervalsList = bucketDurations.map{case(k,v) => > (k,v)}(collection.breakOut).sortBy(_._1) > intervalsList > } > ) > // after .mapValues, new format for RDD is (user, seq-of-(startTime, > endTime)) of type RDD[(String, IndexedSeq(Long, Long))] > > > When I run my Spark job with 1 day's worth of data, the job completes > successfully. When I run with 1 month's or 1 year's worth of data, that > method is where my Spark job consistently crashes with get > OutOfMemoryErrors. I need to run on the full year's worth of data. > > My suspicion is that the groupByKey is the problem (it's pulling all of > the matching data values into a single executor's heap as a plain Scala > Iterable). But alternatives of doing sortByKey on the RDD first before > grouping, or sortByKey followed by a fold[ByKey] or aggregate[ByKey] don't > quite apply in my scenario because my operation is not associative (can't > combine per-partition results) and I still need to group by users before > doing a foldLeft. > > I've definitely thought about the issue before and come across users with > issues that are similar but not exactly the same: > > http://apache-spark-user-list.1001560.n3.nabble.com/Folding-an-RDD-in-order-td16577.html > > http://mail-archives.us.apache.org/mod_mbox/spark-user/201501.mbox/%3ccaae1cqr8rd8ypebcmbjwfhm+lxh6nw4+r+uharx00psk_sh...@mail.gmail.com%3E > > http://apache-spark-user-list.1001560.n3.nabble.com/Partition-sorting-by-Spark-framework-td18213.html > > http://apache-spark-user-list.1001560.n3.nabble.com/Alternatives-to-groupByKey-td20293.html > > And this Jira seems relevant too: > https://issues.apache.org/jira/browse/SPARK-3655 > > The amount of memory that I'm using is 2g per executor, and I can't go > higher than that because each executor gets a YARN container from nodes > with 16 GB of RAM and 5 YARN containers allowed per node. > > So I'd like to know if there's an easy solution to executing my logic on > my full dataset in Spark. > > Thanks! > > -- Elango >
how to handle OOMError from groupByKey
Hi everyone, I have an RDD of the format (user: String, timestamp: Long, state: Boolean). My task invovles converting the states, where on/off is represented as true/false, into intervals of 'on' of the format (beginTs: Long, endTs: Long). So this task requires me, per user, to line up all of the on/off states so that I can compute when it is on, since the calculation is neither associative nor commutative. So there are 2 main operations that I'm trying to accomplish together: 1. group by each user 2. sort by time -- keep all of the states in sorted order by time The main code inside the method that does grouping by user and sorting by time looks sort of looks like this: // RDD starts off in format (user, ts, state) of type RDD[(String, Long, Boolean)] val grouped = keyedStatesRDD.groupByKey // after .groupByKey, format for RDD is (user, seq-of-(ts, state)) of type RDD[(String, Iterable(Long, Boolean))] // take the sequence of (ts, state) per user, sort, get intervals val groupedIntervals = grouped.mapValues( states => { val sortedStates = states.toSeq.sortBy(_._1) val intervals = DFUtil.statesToIntervals(sortedStates) val intervalsList = bucketDurations.map{case(k,v) => (k,v)}(collection.breakOut).sortBy(_._1) intervalsList } ) // after .mapValues, new format for RDD is (user, seq-of-(startTime, endTime)) of type RDD[(String, IndexedSeq(Long, Long))] When I run my Spark job with 1 day's worth of data, the job completes successfully. When I run with 1 month's or 1 year's worth of data, that method is where my Spark job consistently crashes with get OutOfMemoryErrors. I need to run on the full year's worth of data. My suspicion is that the groupByKey is the problem (it's pulling all of the matching data values into a single executor's heap as a plain Scala Iterable). But alternatives of doing sortByKey on the RDD first before grouping, or sortByKey followed by a fold[ByKey] or aggregate[ByKey] don't quite apply in my scenario because my operation is not associative (can't combine per-partition results) and I still need to group by users before doing a foldLeft. I've definitely thought about the issue before and come across users with issues that are similar but not exactly the same: http://apache-spark-user-list.1001560.n3.nabble.com/Folding-an-RDD-in-order-td16577.html http://mail-archives.us.apache.org/mod_mbox/spark-user/201501.mbox/%3ccaae1cqr8rd8ypebcmbjwfhm+lxh6nw4+r+uharx00psk_sh...@mail.gmail.com%3E http://apache-spark-user-list.1001560.n3.nabble.com/Partition-sorting-by-Spark-framework-td18213.html http://apache-spark-user-list.1001560.n3.nabble.com/Alternatives-to-groupByKey-td20293.html And this Jira seems relevant too: https://issues.apache.org/jira/browse/SPARK-3655 The amount of memory that I'm using is 2g per executor, and I can't go higher than that because each executor gets a YARN container from nodes with 16 GB of RAM and 5 YARN containers allowed per node. So I'd like to know if there's an easy solution to executing my logic on my full dataset in Spark. Thanks! -- Elango
Re: Transformation not happening for reduceByKey or GroupByKey
HI All, Please find fix info for users who are following the mail chain of this issue and the respective solution below: *reduceByKey: Non working snippet* import org.apache.spark.Context import org.apache.spark.Context._ import org.apache.spark.SparkConf val conf = new SparkConf() val sc = new SparkContext(conf) val DataRDD = SC.makeRDD(Seq((0,1),(0,2),(1,2),(1,3),(2,4))) DataRDD.reduceByKey(_+_).collect Result: Array() is empty *reduceByKey: Working snippet* import org.apache.spark.Context import org.apache.spark.Context._ import org.apache.spark.SparkConf val conf = new SparkConf() val sc = new SparkContext(conf).set(spark.driver.allowMultipleContexts,true) val DataRDD = SC.makeRDD(Seq((0,1),(0,2),(1,2),(1,3),(2,4))) DataRDD.reduceByKey(_+_).collect Result: Array((0,3),(1,5),(2,4)) Regards, Satish Chandra On Sat, Aug 22, 2015 at 11:27 AM, satish chandra j jsatishchan...@gmail.com wrote: HI All, Currently using DSE 4.7 and Spark 1.2.2 version Regards, Satish On Fri, Aug 21, 2015 at 7:30 PM, java8964 java8...@hotmail.com wrote: What version of Spark you are using, or comes with DSE 4.7? We just cannot reproduce it in Spark. yzhang@localhost$ more test.spark val pairs = sc.makeRDD(Seq((0,1),(0,2),(1,20),(1,30),(2,40))) pairs.reduceByKey((x,y) = x + y).collect yzhang@localhost$ ~/spark/bin/spark-shell --master local -i test.spark Welcome to __ / __/__ ___ _/ /__ _\ \/ _ \/ _ `/ __/ '_/ /___/ .__/\_,_/_/ /_/\_\ version 1.3.1 /_/ Using Scala version 2.10.4 Spark context available as sc. SQL context available as sqlContext. Loading test.spark... pairs: org.apache.spark.rdd.RDD[(Int, Int)] = ParallelCollectionRDD[0] at makeRDD at console:21 15/08/21 09:58:51 WARN SizeEstimator: Failed to check whether UseCompressedOops is set; assuming yes res0: Array[(Int, Int)] = Array((0,3), (1,50), (2,40)) Yong -- Date: Fri, 21 Aug 2015 19:24:09 +0530 Subject: Re: Transformation not happening for reduceByKey or GroupByKey From: jsatishchan...@gmail.com To: abhis...@tetrationanalytics.com CC: user@spark.apache.org HI Abhishek, I have even tried that but rdd2 is empty Regards, Satish On Fri, Aug 21, 2015 at 6:47 PM, Abhishek R. Singh abhis...@tetrationanalytics.com wrote: You had: RDD.reduceByKey((x,y) = x+y) RDD.take(3) Maybe try: rdd2 = RDD.reduceByKey((x,y) = x+y) rdd2.take(3) -Abhishek- On Aug 20, 2015, at 3:05 AM, satish chandra j jsatishchan...@gmail.com wrote: HI All, I have data in RDD as mentioned below: RDD : Array[(Int),(Int)] = Array((0,1), (0,2),(1,20),(1,30),(2,40)) I am expecting output as Array((0,3),(1,50),(2,40)) just a sum function on Values for each key Code: RDD.reduceByKey((x,y) = x+y) RDD.take(3) Result in console: RDD: org.apache.spark.rdd.RDD[(Int,Int)]= ShuffledRDD[1] at reduceByKey at console:73 res:Array[(Int,Int)] = Array() Command as mentioned dse spark --master local --jars postgresql-9.4-1201.jar -i ScriptFile Please let me know what is missing in my code, as my resultant Array is empty Regards, Satish
Re: Transformation not happening for reduceByKey or GroupByKey
HI All, Currently using DSE 4.7 and Spark 1.2.2 version Regards, Satish On Fri, Aug 21, 2015 at 7:30 PM, java8964 java8...@hotmail.com wrote: What version of Spark you are using, or comes with DSE 4.7? We just cannot reproduce it in Spark. yzhang@localhost$ more test.spark val pairs = sc.makeRDD(Seq((0,1),(0,2),(1,20),(1,30),(2,40))) pairs.reduceByKey((x,y) = x + y).collect yzhang@localhost$ ~/spark/bin/spark-shell --master local -i test.spark Welcome to __ / __/__ ___ _/ /__ _\ \/ _ \/ _ `/ __/ '_/ /___/ .__/\_,_/_/ /_/\_\ version 1.3.1 /_/ Using Scala version 2.10.4 Spark context available as sc. SQL context available as sqlContext. Loading test.spark... pairs: org.apache.spark.rdd.RDD[(Int, Int)] = ParallelCollectionRDD[0] at makeRDD at console:21 15/08/21 09:58:51 WARN SizeEstimator: Failed to check whether UseCompressedOops is set; assuming yes res0: Array[(Int, Int)] = Array((0,3), (1,50), (2,40)) Yong -- Date: Fri, 21 Aug 2015 19:24:09 +0530 Subject: Re: Transformation not happening for reduceByKey or GroupByKey From: jsatishchan...@gmail.com To: abhis...@tetrationanalytics.com CC: user@spark.apache.org HI Abhishek, I have even tried that but rdd2 is empty Regards, Satish On Fri, Aug 21, 2015 at 6:47 PM, Abhishek R. Singh abhis...@tetrationanalytics.com wrote: You had: RDD.reduceByKey((x,y) = x+y) RDD.take(3) Maybe try: rdd2 = RDD.reduceByKey((x,y) = x+y) rdd2.take(3) -Abhishek- On Aug 20, 2015, at 3:05 AM, satish chandra j jsatishchan...@gmail.com wrote: HI All, I have data in RDD as mentioned below: RDD : Array[(Int),(Int)] = Array((0,1), (0,2),(1,20),(1,30),(2,40)) I am expecting output as Array((0,3),(1,50),(2,40)) just a sum function on Values for each key Code: RDD.reduceByKey((x,y) = x+y) RDD.take(3) Result in console: RDD: org.apache.spark.rdd.RDD[(Int,Int)]= ShuffledRDD[1] at reduceByKey at console:73 res:Array[(Int,Int)] = Array() Command as mentioned dse spark --master local --jars postgresql-9.4-1201.jar -i ScriptFile Please let me know what is missing in my code, as my resultant Array is empty Regards, Satish
RE: Transformation not happening for reduceByKey or GroupByKey
I believe spark-shell -i scriptFile is there. We also use it, at least in Spark 1.3.1. dse spark will just wrap spark-shell command, underline it is just invoking spark-shell. I don't know too much about the original problem though. Yong Date: Fri, 21 Aug 2015 18:19:49 +0800 Subject: Re: Transformation not happening for reduceByKey or GroupByKey From: zjf...@gmail.com To: jsatishchan...@gmail.com CC: robin.e...@xense.co.uk; user@spark.apache.org Hi Satish, I don't see where spark support -i, so suspect it is provided by DSE. In that case, it might be bug of DSE. On Fri, Aug 21, 2015 at 6:02 PM, satish chandra j jsatishchan...@gmail.com wrote: HI Robin,Yes, it is DSE but issue is related to Spark only Regards,Satish Chandra On Fri, Aug 21, 2015 at 3:06 PM, Robin East robin.e...@xense.co.uk wrote: Not sure, never used dse - it’s part of DataStax Enterprise right? On 21 Aug 2015, at 10:07, satish chandra j jsatishchan...@gmail.com wrote: HI Robin,Yes, below mentioned piece or code works fine in Spark Shell but the same when place in Script File and executed with -i file name it creating an empty RDD scala val pairs = sc.makeRDD(Seq((0,1),(0,2),(1,20),(1,30),(2,40)))pairs: org.apache.spark.rdd.RDD[(Int, Int)] = ParallelCollectionRDD[77] at makeRDD at console:28 scala pairs.reduceByKey((x,y) = x + y).collectres43: Array[(Int, Int)] = Array((0,3), (1,50), (2,40)) Command: dse spark --master local --jars postgresql-9.4-1201.jar -i ScriptFile I understand, I am missing something here due to which my final RDD does not have as required output Regards,Satish Chandra On Thu, Aug 20, 2015 at 8:23 PM, Robin East robin.e...@xense.co.uk wrote: This works for me: scala val pairs = sc.makeRDD(Seq((0,1),(0,2),(1,20),(1,30),(2,40)))pairs: org.apache.spark.rdd.RDD[(Int, Int)] = ParallelCollectionRDD[77] at makeRDD at console:28 scala pairs.reduceByKey((x,y) = x + y).collectres43: Array[(Int, Int)] = Array((0,3), (1,50), (2,40)) On 20 Aug 2015, at 11:05, satish chandra j jsatishchan...@gmail.com wrote: HI All,I have data in RDD as mentioned below: RDD : Array[(Int),(Int)] = Array((0,1), (0,2),(1,20),(1,30),(2,40)) I am expecting output as Array((0,3),(1,50),(2,40)) just a sum function on Values for each key Code:RDD.reduceByKey((x,y) = x+y)RDD.take(3) Result in console: RDD: org.apache.spark.rdd.RDD[(Int,Int)]= ShuffledRDD[1] at reduceByKey at console:73res:Array[(Int,Int)] = Array() Command as mentioned dse spark --master local --jars postgresql-9.4-1201.jar -i ScriptFile Please let me know what is missing in my code, as my resultant Array is empty Regards,Satish -- Best Regards Jeff Zhang
Re: Transformation not happening for reduceByKey or GroupByKey
HI All, Any inputs for the actual problem statement Regards, Satish On Fri, Aug 21, 2015 at 5:57 PM, Jeff Zhang zjf...@gmail.com wrote: Yong, Thanks for your reply. I tried spark-shell -i script-file, it works fine for me. Not sure the different with dse spark --master local --jars postgresql-9.4-1201.jar -i ScriptFile On Fri, Aug 21, 2015 at 7:01 PM, java8964 java8...@hotmail.com wrote: I believe spark-shell -i scriptFile is there. We also use it, at least in Spark 1.3.1. dse spark will just wrap spark-shell command, underline it is just invoking spark-shell. I don't know too much about the original problem though. Yong -- Date: Fri, 21 Aug 2015 18:19:49 +0800 Subject: Re: Transformation not happening for reduceByKey or GroupByKey From: zjf...@gmail.com To: jsatishchan...@gmail.com CC: robin.e...@xense.co.uk; user@spark.apache.org Hi Satish, I don't see where spark support -i, so suspect it is provided by DSE. In that case, it might be bug of DSE. On Fri, Aug 21, 2015 at 6:02 PM, satish chandra j jsatishchan...@gmail.com wrote: HI Robin, Yes, it is DSE but issue is related to Spark only Regards, Satish Chandra On Fri, Aug 21, 2015 at 3:06 PM, Robin East robin.e...@xense.co.uk wrote: Not sure, never used dse - it’s part of DataStax Enterprise right? On 21 Aug 2015, at 10:07, satish chandra j jsatishchan...@gmail.com wrote: HI Robin, Yes, below mentioned piece or code works fine in Spark Shell but the same when place in Script File and executed with -i file name it creating an empty RDD scala val pairs = sc.makeRDD(Seq((0,1),(0,2),(1,20),(1,30),(2,40))) pairs: org.apache.spark.rdd.RDD[(Int, Int)] = ParallelCollectionRDD[77] at makeRDD at console:28 scala pairs.reduceByKey((x,y) = x + y).collect res43: Array[(Int, Int)] = Array((0,3), (1,50), (2,40)) Command: dse spark --master local --jars postgresql-9.4-1201.jar -i ScriptFile I understand, I am missing something here due to which my final RDD does not have as required output Regards, Satish Chandra On Thu, Aug 20, 2015 at 8:23 PM, Robin East robin.e...@xense.co.uk wrote: This works for me: scala val pairs = sc.makeRDD(Seq((0,1),(0,2),(1,20),(1,30),(2,40))) pairs: org.apache.spark.rdd.RDD[(Int, Int)] = ParallelCollectionRDD[77] at makeRDD at console:28 scala pairs.reduceByKey((x,y) = x + y).collect res43: Array[(Int, Int)] = Array((0,3), (1,50), (2,40)) On 20 Aug 2015, at 11:05, satish chandra j jsatishchan...@gmail.com wrote: HI All, I have data in RDD as mentioned below: RDD : Array[(Int),(Int)] = Array((0,1), (0,2),(1,20),(1,30),(2,40)) I am expecting output as Array((0,3),(1,50),(2,40)) just a sum function on Values for each key Code: RDD.reduceByKey((x,y) = x+y) RDD.take(3) Result in console: RDD: org.apache.spark.rdd.RDD[(Int,Int)]= ShuffledRDD[1] at reduceByKey at console:73 res:Array[(Int,Int)] = Array() Command as mentioned dse spark --master local --jars postgresql-9.4-1201.jar -i ScriptFile Please let me know what is missing in my code, as my resultant Array is empty Regards, Satish -- Best Regards Jeff Zhang -- Best Regards Jeff Zhang
Re: Transformation not happening for reduceByKey or GroupByKey
You had: RDD.reduceByKey((x,y) = x+y) RDD.take(3) Maybe try: rdd2 = RDD.reduceByKey((x,y) = x+y) rdd2.take(3) -Abhishek- On Aug 20, 2015, at 3:05 AM, satish chandra j jsatishchan...@gmail.com wrote: HI All, I have data in RDD as mentioned below: RDD : Array[(Int),(Int)] = Array((0,1), (0,2),(1,20),(1,30),(2,40)) I am expecting output as Array((0,3),(1,50),(2,40)) just a sum function on Values for each key Code: RDD.reduceByKey((x,y) = x+y) RDD.take(3) Result in console: RDD: org.apache.spark.rdd.RDD[(Int,Int)]= ShuffledRDD[1] at reduceByKey at console:73 res:Array[(Int,Int)] = Array() Command as mentioned dse spark --master local --jars postgresql-9.4-1201.jar -i ScriptFile Please let me know what is missing in my code, as my resultant Array is empty Regards, Satish - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Transformation not happening for reduceByKey or GroupByKey
HI Abhishek, I have even tried that but rdd2 is empty Regards, Satish On Fri, Aug 21, 2015 at 6:47 PM, Abhishek R. Singh abhis...@tetrationanalytics.com wrote: You had: RDD.reduceByKey((x,y) = x+y) RDD.take(3) Maybe try: rdd2 = RDD.reduceByKey((x,y) = x+y) rdd2.take(3) -Abhishek- On Aug 20, 2015, at 3:05 AM, satish chandra j jsatishchan...@gmail.com wrote: HI All, I have data in RDD as mentioned below: RDD : Array[(Int),(Int)] = Array((0,1), (0,2),(1,20),(1,30),(2,40)) I am expecting output as Array((0,3),(1,50),(2,40)) just a sum function on Values for each key Code: RDD.reduceByKey((x,y) = x+y) RDD.take(3) Result in console: RDD: org.apache.spark.rdd.RDD[(Int,Int)]= ShuffledRDD[1] at reduceByKey at console:73 res:Array[(Int,Int)] = Array() Command as mentioned dse spark --master local --jars postgresql-9.4-1201.jar -i ScriptFile Please let me know what is missing in my code, as my resultant Array is empty Regards, Satish
RE: Transformation not happening for reduceByKey or GroupByKey
What version of Spark you are using, or comes with DSE 4.7? We just cannot reproduce it in Spark. yzhang@localhost$ more test.sparkval pairs = sc.makeRDD(Seq((0,1),(0,2),(1,20),(1,30),(2,40)))pairs.reduceByKey((x,y) = x + y).collectyzhang@localhost$ ~/spark/bin/spark-shell --master local -i test.sparkWelcome to __ / __/__ ___ _/ /__ _\ \/ _ \/ _ `/ __/ '_/ /___/ .__/\_,_/_/ /_/\_\ version 1.3.1 /_/ Using Scala version 2.10.4Spark context available as sc.SQL context available as sqlContext.Loading test.spark...pairs: org.apache.spark.rdd.RDD[(Int, Int)] = ParallelCollectionRDD[0] at makeRDD at console:2115/08/21 09:58:51 WARN SizeEstimator: Failed to check whether UseCompressedOops is set; assuming yesres0: Array[(Int, Int)] = Array((0,3), (1,50), (2,40)) Yong Date: Fri, 21 Aug 2015 19:24:09 +0530 Subject: Re: Transformation not happening for reduceByKey or GroupByKey From: jsatishchan...@gmail.com To: abhis...@tetrationanalytics.com CC: user@spark.apache.org HI Abhishek, I have even tried that but rdd2 is empty Regards,Satish On Fri, Aug 21, 2015 at 6:47 PM, Abhishek R. Singh abhis...@tetrationanalytics.com wrote: You had: RDD.reduceByKey((x,y) = x+y) RDD.take(3) Maybe try: rdd2 = RDD.reduceByKey((x,y) = x+y) rdd2.take(3) -Abhishek- On Aug 20, 2015, at 3:05 AM, satish chandra j jsatishchan...@gmail.com wrote: HI All, I have data in RDD as mentioned below: RDD : Array[(Int),(Int)] = Array((0,1), (0,2),(1,20),(1,30),(2,40)) I am expecting output as Array((0,3),(1,50),(2,40)) just a sum function on Values for each key Code: RDD.reduceByKey((x,y) = x+y) RDD.take(3) Result in console: RDD: org.apache.spark.rdd.RDD[(Int,Int)]= ShuffledRDD[1] at reduceByKey at console:73 res:Array[(Int,Int)] = Array() Command as mentioned dse spark --master local --jars postgresql-9.4-1201.jar -i ScriptFile Please let me know what is missing in my code, as my resultant Array is empty Regards, Satish
Re: Transformation not happening for reduceByKey or GroupByKey
Yes, DSE 4.7 Regards, Satish Chandra On Fri, Aug 21, 2015 at 3:06 PM, Robin East robin.e...@xense.co.uk wrote: Not sure, never used dse - it’s part of DataStax Enterprise right? On 21 Aug 2015, at 10:07, satish chandra j jsatishchan...@gmail.com wrote: HI Robin, Yes, below mentioned piece or code works fine in Spark Shell but the same when place in Script File and executed with -i file name it creating an empty RDD scala val pairs = sc.makeRDD(Seq((0,1),(0,2),(1,20),(1,30),(2,40))) pairs: org.apache.spark.rdd.RDD[(Int, Int)] = ParallelCollectionRDD[77] at makeRDD at console:28 scala pairs.reduceByKey((x,y) = x + y).collect res43: Array[(Int, Int)] = Array((0,3), (1,50), (2,40)) Command: dse spark --master local --jars postgresql-9.4-1201.jar -i ScriptFile I understand, I am missing something here due to which my final RDD does not have as required output Regards, Satish Chandra On Thu, Aug 20, 2015 at 8:23 PM, Robin East robin.e...@xense.co.uk wrote: This works for me: scala val pairs = sc.makeRDD(Seq((0,1),(0,2),(1,20),(1,30),(2,40))) pairs: org.apache.spark.rdd.RDD[(Int, Int)] = ParallelCollectionRDD[77] at makeRDD at console:28 scala pairs.reduceByKey((x,y) = x + y).collect res43: Array[(Int, Int)] = Array((0,3), (1,50), (2,40)) On 20 Aug 2015, at 11:05, satish chandra j jsatishchan...@gmail.com wrote: HI All, I have data in RDD as mentioned below: RDD : Array[(Int),(Int)] = Array((0,1), (0,2),(1,20),(1,30),(2,40)) I am expecting output as Array((0,3),(1,50),(2,40)) just a sum function on Values for each key Code: RDD.reduceByKey((x,y) = x+y) RDD.take(3) Result in console: RDD: org.apache.spark.rdd.RDD[(Int,Int)]= ShuffledRDD[1] at reduceByKey at console:73 res:Array[(Int,Int)] = Array() Command as mentioned dse spark --master local --jars postgresql-9.4-1201.jar -i ScriptFile Please let me know what is missing in my code, as my resultant Array is empty Regards, Satish
Re: Transformation not happening for reduceByKey or GroupByKey
HI Robin, Yes, it is DSE but issue is related to Spark only Regards, Satish Chandra On Fri, Aug 21, 2015 at 3:06 PM, Robin East robin.e...@xense.co.uk wrote: Not sure, never used dse - it’s part of DataStax Enterprise right? On 21 Aug 2015, at 10:07, satish chandra j jsatishchan...@gmail.com wrote: HI Robin, Yes, below mentioned piece or code works fine in Spark Shell but the same when place in Script File and executed with -i file name it creating an empty RDD scala val pairs = sc.makeRDD(Seq((0,1),(0,2),(1,20),(1,30),(2,40))) pairs: org.apache.spark.rdd.RDD[(Int, Int)] = ParallelCollectionRDD[77] at makeRDD at console:28 scala pairs.reduceByKey((x,y) = x + y).collect res43: Array[(Int, Int)] = Array((0,3), (1,50), (2,40)) Command: dse spark --master local --jars postgresql-9.4-1201.jar -i ScriptFile I understand, I am missing something here due to which my final RDD does not have as required output Regards, Satish Chandra On Thu, Aug 20, 2015 at 8:23 PM, Robin East robin.e...@xense.co.uk wrote: This works for me: scala val pairs = sc.makeRDD(Seq((0,1),(0,2),(1,20),(1,30),(2,40))) pairs: org.apache.spark.rdd.RDD[(Int, Int)] = ParallelCollectionRDD[77] at makeRDD at console:28 scala pairs.reduceByKey((x,y) = x + y).collect res43: Array[(Int, Int)] = Array((0,3), (1,50), (2,40)) On 20 Aug 2015, at 11:05, satish chandra j jsatishchan...@gmail.com wrote: HI All, I have data in RDD as mentioned below: RDD : Array[(Int),(Int)] = Array((0,1), (0,2),(1,20),(1,30),(2,40)) I am expecting output as Array((0,3),(1,50),(2,40)) just a sum function on Values for each key Code: RDD.reduceByKey((x,y) = x+y) RDD.take(3) Result in console: RDD: org.apache.spark.rdd.RDD[(Int,Int)]= ShuffledRDD[1] at reduceByKey at console:73 res:Array[(Int,Int)] = Array() Command as mentioned dse spark --master local --jars postgresql-9.4-1201.jar -i ScriptFile Please let me know what is missing in my code, as my resultant Array is empty Regards, Satish
Re: Transformation not happening for reduceByKey or GroupByKey
Hi Satish, I don't see where spark support -i, so suspect it is provided by DSE. In that case, it might be bug of DSE. On Fri, Aug 21, 2015 at 6:02 PM, satish chandra j jsatishchan...@gmail.com wrote: HI Robin, Yes, it is DSE but issue is related to Spark only Regards, Satish Chandra On Fri, Aug 21, 2015 at 3:06 PM, Robin East robin.e...@xense.co.uk wrote: Not sure, never used dse - it’s part of DataStax Enterprise right? On 21 Aug 2015, at 10:07, satish chandra j jsatishchan...@gmail.com wrote: HI Robin, Yes, below mentioned piece or code works fine in Spark Shell but the same when place in Script File and executed with -i file name it creating an empty RDD scala val pairs = sc.makeRDD(Seq((0,1),(0,2),(1,20),(1,30),(2,40))) pairs: org.apache.spark.rdd.RDD[(Int, Int)] = ParallelCollectionRDD[77] at makeRDD at console:28 scala pairs.reduceByKey((x,y) = x + y).collect res43: Array[(Int, Int)] = Array((0,3), (1,50), (2,40)) Command: dse spark --master local --jars postgresql-9.4-1201.jar -i ScriptFile I understand, I am missing something here due to which my final RDD does not have as required output Regards, Satish Chandra On Thu, Aug 20, 2015 at 8:23 PM, Robin East robin.e...@xense.co.uk wrote: This works for me: scala val pairs = sc.makeRDD(Seq((0,1),(0,2),(1,20),(1,30),(2,40))) pairs: org.apache.spark.rdd.RDD[(Int, Int)] = ParallelCollectionRDD[77] at makeRDD at console:28 scala pairs.reduceByKey((x,y) = x + y).collect res43: Array[(Int, Int)] = Array((0,3), (1,50), (2,40)) On 20 Aug 2015, at 11:05, satish chandra j jsatishchan...@gmail.com wrote: HI All, I have data in RDD as mentioned below: RDD : Array[(Int),(Int)] = Array((0,1), (0,2),(1,20),(1,30),(2,40)) I am expecting output as Array((0,3),(1,50),(2,40)) just a sum function on Values for each key Code: RDD.reduceByKey((x,y) = x+y) RDD.take(3) Result in console: RDD: org.apache.spark.rdd.RDD[(Int,Int)]= ShuffledRDD[1] at reduceByKey at console:73 res:Array[(Int,Int)] = Array() Command as mentioned dse spark --master local --jars postgresql-9.4-1201.jar -i ScriptFile Please let me know what is missing in my code, as my resultant Array is empty Regards, Satish -- Best Regards Jeff Zhang
Re: Transformation not happening for reduceByKey or GroupByKey
HI Robin, Yes, below mentioned piece or code works fine in Spark Shell but the same when place in Script File and executed with -i file name it creating an empty RDD scala val pairs = sc.makeRDD(Seq((0,1),(0,2),(1,20),(1,30),(2,40))) pairs: org.apache.spark.rdd.RDD[(Int, Int)] = ParallelCollectionRDD[77] at makeRDD at console:28 scala pairs.reduceByKey((x,y) = x + y).collect res43: Array[(Int, Int)] = Array((0,3), (1,50), (2,40)) Command: dse spark --master local --jars postgresql-9.4-1201.jar -i ScriptFile I understand, I am missing something here due to which my final RDD does not have as required output Regards, Satish Chandra On Thu, Aug 20, 2015 at 8:23 PM, Robin East robin.e...@xense.co.uk wrote: This works for me: scala val pairs = sc.makeRDD(Seq((0,1),(0,2),(1,20),(1,30),(2,40))) pairs: org.apache.spark.rdd.RDD[(Int, Int)] = ParallelCollectionRDD[77] at makeRDD at console:28 scala pairs.reduceByKey((x,y) = x + y).collect res43: Array[(Int, Int)] = Array((0,3), (1,50), (2,40)) On 20 Aug 2015, at 11:05, satish chandra j jsatishchan...@gmail.com wrote: HI All, I have data in RDD as mentioned below: RDD : Array[(Int),(Int)] = Array((0,1), (0,2),(1,20),(1,30),(2,40)) I am expecting output as Array((0,3),(1,50),(2,40)) just a sum function on Values for each key Code: RDD.reduceByKey((x,y) = x+y) RDD.take(3) Result in console: RDD: org.apache.spark.rdd.RDD[(Int,Int)]= ShuffledRDD[1] at reduceByKey at console:73 res:Array[(Int,Int)] = Array() Command as mentioned dse spark --master local --jars postgresql-9.4-1201.jar -i ScriptFile Please let me know what is missing in my code, as my resultant Array is empty Regards, Satish
Transformation not happening for reduceByKey or GroupByKey
HI All, I have data in RDD as mentioned below: RDD : Array[(Int),(Int)] = Array((0,1), (0,2),(1,20),(1,30),(2,40)) I am expecting output as Array((0,3),(1,50),(2,40)) just a sum function on Values for each key Code: RDD.reduceByKey((x,y) = x+y) RDD.take(3) Result in console: RDD: org.apache.spark.rdd.RDD[(Int,Int)]= ShuffledRDD[1] at reduceByKey at console:73 res:Array[(Int,Int)] = Array() Command as mentioned dse spark --master local --jars postgresql-9.4-1201.jar -i ScriptFile Please let me know what is missing in my code, as my resultant Array is empty Regards, Satish
Re: Transformation not happening for reduceByKey or GroupByKey
HI All, Could anybody let me know what is that i missing here, it should work as its a basic transformation Please let me know if any additional information required Regards, Satish On Thu, Aug 20, 2015 at 3:35 PM, satish chandra j jsatishchan...@gmail.com wrote: HI All, I have data in RDD as mentioned below: RDD : Array[(Int),(Int)] = Array((0,1), (0,2),(1,20),(1,30),(2,40)) I am expecting output as Array((0,3),(1,50),(2,40)) just a sum function on Values for each key Code: RDD.reduceByKey((x,y) = x+y) RDD.take(3) Result in console: RDD: org.apache.spark.rdd.RDD[(Int,Int)]= ShuffledRDD[1] at reduceByKey at console:73 res:Array[(Int,Int)] = Array() Command as mentioned dse spark --master local --jars postgresql-9.4-1201.jar -i ScriptFile Please let me know what is missing in my code, as my resultant Array is empty Regards, Satish
Want to avoid groupByKey as its running for ever
I have a RDD of type (String, Iterable[(com.ebay.ep.poc.spark.reporting.process.detail.model.DetailInputRecord, com.ebay.ep.poc.spark.reporting.process.model.DataRecord)])] Here String is Key and a list of tuples for that key. I got above RDD after doing a groupByKey. I later want to compute total number of values for a given key and total number of unique values for the same given key and hence i do this val totalViCount = details.size.toLong val uniqueViCount = details.map(_._1.get(itemId).asInstanceOf[Long]).distinct.size.toLong How do i do this using reduceByKey. *Total Code:* val groupedDetail: RDD[(String, Iterable[(DetailInputRecord, DataRecord)])] = detailInputsToGroup.map { case (detailInput, dataRecord) = val key: StringBuilder = new StringBuilder dimensions.foreach { dimension = key ++= { Option(dataRecord.get(dimension)).getOrElse(Option(detailInput.get(dimension)).getOrElse()).toString } } (key.toString, (detailInput, dataRecord)) }.groupByKey groupedDetail.map { case (key, values) = { val valueList = values.toList //Compute dimensions // You can skup this val (detailInput, dataRecord) = valueList.head val schema = SchemaUtil.outputSchema(_detail) val detailOutput = new DetailOutputRecord(detail, new SessionRecord(schema)) DataUtil.populateDimensions(schema, dimensions.toArray, detailInput, dataRecord, detailOutput) val metricsData = metricProviders.flatMap { case (className, instance) = val data = instance.getMetrics(valueList) ReflectionUtil.getData(data, _metricProviderMemberNames(className)) } metricsData.map { case (k, v) = detailOutput.put(k, v) } val wrap = new AvroKey[DetailOutputRecord](detailOutput) (wrap, NullWritable.get) } } //getMetrics: def getMetrics(details: List[(DetailInputRecord, DataRecord)]) = { val totalViCount = details.size.toLong val uniqueViCount = details.map(_._1.get(itemId).asInstanceOf[Long]).distinct.size.toLong new ViewItemCountMetric(totalViCount, uniqueViCount) } I understand that totalViCount can be implemented using reduceByKey. How can i implement total unique count as i need to have the full list to know the unique values. -- Deepak
Re: Want to avoid groupByKey as its running for ever
If the number of items is very large, have you considered using probabilistic counting? The HyperLogLogPlus https://github.com/addthis/stream-lib/blob/master/src/main/java/com/clearspring/analytics/stream/cardinality/HyperLogLogPlus.java class from stream-lib https://github.com/addthis/stream-lib might be suitable. On Tue, Jun 30, 2015 at 2:29 PM, ÐΞ€ρ@Ҝ (๏̯͡๏) deepuj...@gmail.com wrote: I have a RDD of type (String, Iterable[(com.ebay.ep.poc.spark.reporting.process.detail.model.DetailInputRecord, com.ebay.ep.poc.spark.reporting.process.model.DataRecord)])] Here String is Key and a list of tuples for that key. I got above RDD after doing a groupByKey. I later want to compute total number of values for a given key and total number of unique values for the same given key and hence i do this val totalViCount = details.size.toLong val uniqueViCount = details.map(_._1.get(itemId).asInstanceOf[Long]).distinct.size.toLong How do i do this using reduceByKey. *Total Code:* val groupedDetail: RDD[(String, Iterable[(DetailInputRecord, DataRecord)])] = detailInputsToGroup.map { case (detailInput, dataRecord) = val key: StringBuilder = new StringBuilder dimensions.foreach { dimension = key ++= { Option(dataRecord.get(dimension)).getOrElse(Option(detailInput.get(dimension)).getOrElse()).toString } } (key.toString, (detailInput, dataRecord)) }.groupByKey groupedDetail.map { case (key, values) = { val valueList = values.toList //Compute dimensions // You can skup this val (detailInput, dataRecord) = valueList.head val schema = SchemaUtil.outputSchema(_detail) val detailOutput = new DetailOutputRecord(detail, new SessionRecord(schema)) DataUtil.populateDimensions(schema, dimensions.toArray, detailInput, dataRecord, detailOutput) val metricsData = metricProviders.flatMap { case (className, instance) = val data = instance.getMetrics(valueList) ReflectionUtil.getData(data, _metricProviderMemberNames(className)) } metricsData.map { case (k, v) = detailOutput.put(k, v) } val wrap = new AvroKey[DetailOutputRecord](detailOutput) (wrap, NullWritable.get) } } //getMetrics: def getMetrics(details: List[(DetailInputRecord, DataRecord)]) = { val totalViCount = details.size.toLong val uniqueViCount = details.map(_._1.get(itemId).asInstanceOf[Long]).distinct.size.toLong new ViewItemCountMetric(totalViCount, uniqueViCount) } I understand that totalViCount can be implemented using reduceByKey. How can i implement total unique count as i need to have the full list to know the unique values. -- Deepak
Re: Want to avoid groupByKey as its running for ever
I modified to detailInputsToGroup.map { case (detailInput, dataRecord) = val key: StringBuilder = new StringBuilder dimensions.foreach { dimension = key ++= { Option(dataRecord.get(dimension)).getOrElse(Option(detailInput.get(dimension)).getOrElse()).toString } } (key.toString, (detailInput, dataRecord)) }.reduceByKey { case (v1, v2) = { val v1Detail = v1._1 val v2Detail = v2._1 val v1Data = v1._2 val v2Data = v2._2 * val totalViCount = Option(v1Data.get(totalViCount).asInstanceOf[Int]).getOrElse(0)* * v1Data.getRecord.put(totalViCount, totalViCount + 1)* (v1) } }.map { case (k, v) = { val schema = SchemaUtil.outputSchema(_detail) val detailOutputRecord = new DetailOutputRecord(detail, new SessionRecord(schema)) //Compute dimensions DataUtil.populateDimensions(schema, dimensions.toArray, v._1, v._2, detailOutputRecord) //Construct Output val wrap = new AvroKey[DetailOutputRecord](detailOutputRecord) (wrap, NullWritable.get) } } How do i compute unique count ? On Tue, Jun 30, 2015 at 12:04 PM, Daniel Siegmann daniel.siegm...@teamaol.com wrote: If the number of items is very large, have you considered using probabilistic counting? The HyperLogLogPlus https://github.com/addthis/stream-lib/blob/master/src/main/java/com/clearspring/analytics/stream/cardinality/HyperLogLogPlus.java class from stream-lib https://github.com/addthis/stream-lib might be suitable. On Tue, Jun 30, 2015 at 2:29 PM, ÐΞ€ρ@Ҝ (๏̯͡๏) deepuj...@gmail.com wrote: I have a RDD of type (String, Iterable[(com.ebay.ep.poc.spark.reporting.process.detail.model.DetailInputRecord, com.ebay.ep.poc.spark.reporting.process.model.DataRecord)])] Here String is Key and a list of tuples for that key. I got above RDD after doing a groupByKey. I later want to compute total number of values for a given key and total number of unique values for the same given key and hence i do this val totalViCount = details.size.toLong val uniqueViCount = details.map(_._1.get(itemId).asInstanceOf[Long]).distinct.size.toLong How do i do this using reduceByKey. *Total Code:* val groupedDetail: RDD[(String, Iterable[(DetailInputRecord, DataRecord)])] = detailInputsToGroup.map { case (detailInput, dataRecord) = val key: StringBuilder = new StringBuilder dimensions.foreach { dimension = key ++= { Option(dataRecord.get(dimension)).getOrElse(Option(detailInput.get(dimension)).getOrElse()).toString } } (key.toString, (detailInput, dataRecord)) }.groupByKey groupedDetail.map { case (key, values) = { val valueList = values.toList //Compute dimensions // You can skup this val (detailInput, dataRecord) = valueList.head val schema = SchemaUtil.outputSchema(_detail) val detailOutput = new DetailOutputRecord(detail, new SessionRecord(schema)) DataUtil.populateDimensions(schema, dimensions.toArray, detailInput, dataRecord, detailOutput) val metricsData = metricProviders.flatMap { case (className, instance) = val data = instance.getMetrics(valueList) ReflectionUtil.getData(data, _metricProviderMemberNames(className)) } metricsData.map { case (k, v) = detailOutput.put(k, v) } val wrap = new AvroKey[DetailOutputRecord](detailOutput) (wrap, NullWritable.get) } } //getMetrics: def getMetrics(details: List[(DetailInputRecord, DataRecord)]) = { val totalViCount = details.size.toLong val uniqueViCount = details.map(_._1.get(itemId).asInstanceOf[Long]).distinct.size.toLong new ViewItemCountMetric(totalViCount, uniqueViCount) } I understand that totalViCount can be implemented using reduceByKey. How can i implement total unique count as i need to have the full list to know the unique values. -- Deepak -- Deepak
Re: workaround for groupByKey
It all depends on what it is you need to do with the pages. If you’re just going to be collecting them then it’s really not much different than a groupByKey. If instead you’re looking to derive some other value from the series of pages then you could potentially partition by user id and run a mapPartitions or one of the other combineByKey APIs? From: Jianguo Li Date: Tuesday, June 23, 2015 at 9:46 AM To: Silvio Fiorito Cc: user@spark.apache.orgmailto:user@spark.apache.org Subject: Re: workaround for groupByKey Thanks. Yes, unfortunately, they all need to be grouped. I guess I can partition the record by user id. However, I have millions of users, do you think partition by user id will help? Jianguo On Mon, Jun 22, 2015 at 6:28 PM, Silvio Fiorito silvio.fior...@granturing.commailto:silvio.fior...@granturing.com wrote: You’re right of course, I’m sorry. I was typing before thinking about what you actually asked! On a second thought, what is the ultimate outcome for what you want the sequence of pages for? Do they need to actually all be grouped? Could you instead partition by user id then use a mapPartitions perhaps? From: Jianguo Li Date: Monday, June 22, 2015 at 6:21 PM To: Silvio Fiorito Cc: user@spark.apache.orgmailto:user@spark.apache.org Subject: Re: workaround for groupByKey Thanks for your suggestion. I guess aggregateByKey is similar to combineByKey. I read in the Learning Sparking We can disable map-side aggregation in combineByKey() if we know that our data won’t benefit from it. For example, groupByKey() disables map-side aggregation as the aggregation function (appending to a list) does not save any space. If we want to disable map-side combines, we need to specify the partitioner; for now you can just use the partitioner on the source RDD by passingrdd.partitioner It seems that when the map-side aggregation function is to append something to a list (as opposed to summing over all the numbers), then this map-side aggregation does not offer any benefit since appending to a list does not save any space. Is my understanding correct? Thanks, Jianguo On Mon, Jun 22, 2015 at 4:43 PM, Silvio Fiorito silvio.fior...@granturing.commailto:silvio.fior...@granturing.com wrote: You can use aggregateByKey as one option: val input: RDD[Int, String] = ... val test = input.aggregateByKey(ListBuffer.empty[String])((a, b) = a += b, (a, b) = a ++ b) From: Jianguo Li Date: Monday, June 22, 2015 at 5:12 PM To: user@spark.apache.orgmailto:user@spark.apache.org Subject: workaround for groupByKey Hi, I am processing an RDD of key-value pairs. The key is an user_id, and the value is an website url the user has ever visited. Since I need to know all the urls each user has visited, I am tempted to call the groupByKey on this RDD. However, since there could be millions of users and urls, the shuffling caused by groupByKey proves to be a major bottleneck to get the job done. Is there any workaround? I want to end up with an RDD of key-value pairs, where the key is an user_id, the value is a list of all the urls visited by the user. Thanks, Jianguo
Re: workaround for groupByKey
Thanks. Yes, unfortunately, they all need to be grouped. I guess I can partition the record by user id. However, I have millions of users, do you think partition by user id will help? Jianguo On Mon, Jun 22, 2015 at 6:28 PM, Silvio Fiorito silvio.fior...@granturing.com wrote: You’re right of course, I’m sorry. I was typing before thinking about what you actually asked! On a second thought, what is the ultimate outcome for what you want the sequence of pages for? Do they need to actually all be grouped? Could you instead partition by user id then use a mapPartitions perhaps? From: Jianguo Li Date: Monday, June 22, 2015 at 6:21 PM To: Silvio Fiorito Cc: user@spark.apache.org Subject: Re: workaround for groupByKey Thanks for your suggestion. I guess aggregateByKey is similar to combineByKey. I read in the Learning Sparking *We can disable map-side aggregation in combineByKey() if we know that our data won’t benefit from it. For example, groupByKey() disables map-side aggregation as the aggregation function (appending to a list) does not save any space. If we want to disable map-side combines, we need to specify the partitioner; for now you can just use the partitioner on the source RDD by passingrdd.partitioner* It seems that when the map-side aggregation function is to append something to a list (as opposed to summing over all the numbers), then this map-side aggregation does not offer any benefit since appending to a list does not save any space. Is my understanding correct? Thanks, Jianguo On Mon, Jun 22, 2015 at 4:43 PM, Silvio Fiorito silvio.fior...@granturing.com wrote: You can use aggregateByKey as one option: val input: RDD[Int, String] = ... val test = input.aggregateByKey(ListBuffer.empty[String])((a, b) = a += b, (a, b) = a ++ b) From: Jianguo Li Date: Monday, June 22, 2015 at 5:12 PM To: user@spark.apache.org Subject: workaround for groupByKey Hi, I am processing an RDD of key-value pairs. The key is an user_id, and the value is an website url the user has ever visited. Since I need to know all the urls each user has visited, I am tempted to call the groupByKey on this RDD. However, since there could be millions of users and urls, the shuffling caused by groupByKey proves to be a major bottleneck to get the job done. Is there any workaround? I want to end up with an RDD of key-value pairs, where the key is an user_id, the value is a list of all the urls visited by the user. Thanks, Jianguo
workaround for groupByKey
Hi, I am processing an RDD of key-value pairs. The key is an user_id, and the value is an website url the user has ever visited. Since I need to know all the urls each user has visited, I am tempted to call the groupByKey on this RDD. However, since there could be millions of users and urls, the shuffling caused by groupByKey proves to be a major bottleneck to get the job done. Is there any workaround? I want to end up with an RDD of key-value pairs, where the key is an user_id, the value is a list of all the urls visited by the user. Thanks, Jianguo
Re: workaround for groupByKey
There is reduceByKey that works on K,V. You need to accumulate partial results and proceed. does your computation allow that ? On Mon, Jun 22, 2015 at 2:12 PM, Jianguo Li flyingfromch...@gmail.com wrote: Hi, I am processing an RDD of key-value pairs. The key is an user_id, and the value is an website url the user has ever visited. Since I need to know all the urls each user has visited, I am tempted to call the groupByKey on this RDD. However, since there could be millions of users and urls, the shuffling caused by groupByKey proves to be a major bottleneck to get the job done. Is there any workaround? I want to end up with an RDD of key-value pairs, where the key is an user_id, the value is a list of all the urls visited by the user. Thanks, Jianguo -- Deepak
Re: workaround for groupByKey
Silvio, Suppose my RDD is (K-1, v1,v2,v3,v4). If i want to do simple addition i can use reduceByKey or aggregateByKey. What if my processing needs to check all the items in the value list each time, Above two operations do not get all the values, they just get two pairs (v1, v2) , you do some processing and store it back into v1. How do i use the combiner facility present with reduceByKey aggregateByKey. -deepak On Mon, Jun 22, 2015 at 2:43 PM, Silvio Fiorito silvio.fior...@granturing.com wrote: You can use aggregateByKey as one option: val input: RDD[Int, String] = ... val test = input.aggregateByKey(ListBuffer.empty[String])((a, b) = a += b, (a, b) = a ++ b) From: Jianguo Li Date: Monday, June 22, 2015 at 5:12 PM To: user@spark.apache.org Subject: workaround for groupByKey Hi, I am processing an RDD of key-value pairs. The key is an user_id, and the value is an website url the user has ever visited. Since I need to know all the urls each user has visited, I am tempted to call the groupByKey on this RDD. However, since there could be millions of users and urls, the shuffling caused by groupByKey proves to be a major bottleneck to get the job done. Is there any workaround? I want to end up with an RDD of key-value pairs, where the key is an user_id, the value is a list of all the urls visited by the user. Thanks, Jianguo -- Deepak
Re: workaround for groupByKey
You’re right of course, I’m sorry. I was typing before thinking about what you actually asked! On a second thought, what is the ultimate outcome for what you want the sequence of pages for? Do they need to actually all be grouped? Could you instead partition by user id then use a mapPartitions perhaps? From: Jianguo Li Date: Monday, June 22, 2015 at 6:21 PM To: Silvio Fiorito Cc: user@spark.apache.orgmailto:user@spark.apache.org Subject: Re: workaround for groupByKey Thanks for your suggestion. I guess aggregateByKey is similar to combineByKey. I read in the Learning Sparking We can disable map-side aggregation in combineByKey() if we know that our data won’t benefit from it. For example, groupByKey() disables map-side aggregation as the aggregation function (appending to a list) does not save any space. If we want to disable map-side combines, we need to specify the partitioner; for now you can just use the partitioner on the source RDD by passingrdd.partitioner It seems that when the map-side aggregation function is to append something to a list (as opposed to summing over all the numbers), then this map-side aggregation does not offer any benefit since appending to a list does not save any space. Is my understanding correct? Thanks, Jianguo On Mon, Jun 22, 2015 at 4:43 PM, Silvio Fiorito silvio.fior...@granturing.commailto:silvio.fior...@granturing.com wrote: You can use aggregateByKey as one option: val input: RDD[Int, String] = ... val test = input.aggregateByKey(ListBuffer.empty[String])((a, b) = a += b, (a, b) = a ++ b) From: Jianguo Li Date: Monday, June 22, 2015 at 5:12 PM To: user@spark.apache.orgmailto:user@spark.apache.org Subject: workaround for groupByKey Hi, I am processing an RDD of key-value pairs. The key is an user_id, and the value is an website url the user has ever visited. Since I need to know all the urls each user has visited, I am tempted to call the groupByKey on this RDD. However, since there could be millions of users and urls, the shuffling caused by groupByKey proves to be a major bottleneck to get the job done. Is there any workaround? I want to end up with an RDD of key-value pairs, where the key is an user_id, the value is a list of all the urls visited by the user. Thanks, Jianguo
Re: Creating RDD from Iterable from groupByKey results
I updated code sample so people can understand better what are my inputs and outputs. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Creating-RDD-from-Iterable-from-groupByKey-results-tp23328p23341.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Creating RDD from Iterable from groupByKey results
I am trying to create new RDD based on given PairRDD. I have a PairRDD with few keys but each keys have large (about 100k) values. I want to somehow repartition, make each `Iterablev` into RDD[v] so that I can further apply map, reduce, sortBy etc effectively on those values. I am sensing flatMapValues is my friend but want to check with other sparkens. This is for real-time spark app. I have already tried collect() and computing all measures in-memory of app server but trying to improve upon it. This is what I try (psuedo) class ComputeMetrices{ transient JavaSparkContext sparkContext; public MapString, V computeMetrices(JavaPairRdd javaPairRdd) { javaPairRdd.groupByKey(10).mapValues(itr = { sparContext.parallelize(list(itr)) //null pointer ; probably at sparkContext }) } } I want to create RDD out of that Iterable from groupByKey result so that I can user further spark transformations. Thanks Nir -- [image: What's New with Xactly] http://www.xactlycorp.com/email-click/ [image: Facebook] http://www.facebook.com/XactlyCorp [image: LinkedIn] http://www.linkedin.com/company/xactly-corporation [image: Twitter] https://twitter.com/xactly [image: YouTube] http://www.youtube.com/xactlycorporation
Spark groupByKey, does it always create at least 1 partition per key?
I am currently using spark streaming. During my batch processing I must groupByKey. Afterwards I call foreachRDD foreachPartition write to an external datastore. My only concern with this is if it's future proof? I know groupByKey by default uses the hashPartitioner. I have printed out the internals of partitions and loaded large text files into memory and ran groupByKey just to make sure. I have two questions. #1 First will my implementation ever break in the future? Will partitions groupByKey work differently? #2 Is it possible for a (key,values) to exist on more than 1 partition after using groupByKey. Notes: I'm aware groupByKey, is not very efficient. However I am not working with large amounts of data can process batches very quickly. Below I could have used aggregateByKey because I printed the sum, however my real implementation is much different and I do need each value for each key I can not reduce the data. 1 Million line test log file Partition HashCode: 965943941 Key:lol Size:2346 Partition HashCode: 1605678983 Key:ee Size:4692 Partition HashCode: 1605678983 Key:aa Size:32844 Partition HashCode: 1605678983 Key:gg Size:4692 Partition HashCode: 1605678983 Key:dd Size:11730 Partition HashCode: 1605678983 Key:hh Size:4692 Partition HashCode: 1605678983 Key:kk Size:2346 Partition HashCode: 1605678983 Key:tt Size:4692 Partition HashCode: 1605678983 Key:ff Size:2346 Partition HashCode: 1605678983 Key:bb Size:18768 Partition HashCode: 1605678983 Key:cc Size:14076 -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Spark-groupByKey-does-it-always-create-at-least-1-partition-per-key-tp22938.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Using groupByKey with Spark SQL
Perhaps you are looking for GROUP BY and collect_set, which would allow you to stay in SQL. I'll add that in Spark 1.4 you can get access to items of a row by name. On Fri, May 15, 2015 at 10:48 AM, Edward Sargisson ejsa...@gmail.com wrote: Hi all, This might be a question to be answered or feedback for a possibly new feature depending: We have source data which is events about the state changes of an entity (identified by an ID) represented as nested JSON. We wanted to sessionize this data so that we had a collection of all the events for a given ID as we have to do more processing based on what we find. We tried doing this using Spark SQL and then converting to a JavaPairRDD using DataFrame.javaRdd.groupByKey. The schema inference worked great but what was frustrating was that the result of groupByKey is String, IterableRow. Rows only have get(int) methods and don't take notice of the schema stuff so they ended up being something we didn't want to work with. We are currently solving this problem by ignoring Spark SQL and deserializing the event JSON into a POJO for further processing. Are there better approaches to this? Perhaps Spark should have a DataFrame.groupByKey that returns Rows that can be used with the schema stuff? Thanks! Edward
Re: why does groupByKey return RDD[(K, Iterable[V])] not RDD[(K, CompactBuffer[V])] ?
Should I repost this to dev list ? -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/why-does-groupByKey-return-RDD-K-Iterable-V-not-RDD-K-CompactBuffer-V-tp22616p22640.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: why does groupByKey return RDD[(K, Iterable[V])] not RDD[(K, CompactBuffer[V])] ?
If you return an iterable, you are not tying the API to a compactbuffer. Someday, the data could be fetched lazily and he API would not have to change. On Apr 23, 2015 6:59 PM, Dean Wampler deanwamp...@gmail.com wrote: I wasn't involved in this decision (I just make the fries), but CompactBuffer is designed for relatively small data sets that at least fit in memory. It's more or less an Array. In principle, returning an iterator could hide the actual data structure that might be needed to hold a much bigger data set, if necessary. HOWEVER, it actually returns a CompactBuffer. https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala#L444 Dean Wampler, Ph.D. Author: Programming Scala, 2nd Edition http://shop.oreilly.com/product/0636920033073.do (O'Reilly) Typesafe http://typesafe.com @deanwampler http://twitter.com/deanwampler http://polyglotprogramming.com On Thu, Apr 23, 2015 at 5:46 PM, Hao Ren inv...@gmail.com wrote: Should I repost this to dev list ? -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/why-does-groupByKey-return-RDD-K-Iterable-V-not-RDD-K-CompactBuffer-V-tp22616p22640.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: why does groupByKey return RDD[(K, Iterable[V])] not RDD[(K, CompactBuffer[V])] ?
because CompactBuffer is considered an implementation detail. It is also not public for the same reason. On Thu, Apr 23, 2015 at 6:46 PM, Hao Ren inv...@gmail.com wrote: Should I repost this to dev list ? -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/why-does-groupByKey-return-RDD-K-Iterable-V-not-RDD-K-CompactBuffer-V-tp22616p22640.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: why does groupByKey return RDD[(K, Iterable[V])] not RDD[(K, CompactBuffer[V])] ?
I wasn't involved in this decision (I just make the fries), but CompactBuffer is designed for relatively small data sets that at least fit in memory. It's more or less an Array. In principle, returning an iterator could hide the actual data structure that might be needed to hold a much bigger data set, if necessary. HOWEVER, it actually returns a CompactBuffer. https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala#L444 Dean Wampler, Ph.D. Author: Programming Scala, 2nd Edition http://shop.oreilly.com/product/0636920033073.do (O'Reilly) Typesafe http://typesafe.com @deanwampler http://twitter.com/deanwampler http://polyglotprogramming.com On Thu, Apr 23, 2015 at 5:46 PM, Hao Ren inv...@gmail.com wrote: Should I repost this to dev list ? -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/why-does-groupByKey-return-RDD-K-Iterable-V-not-RDD-K-CompactBuffer-V-tp22616p22640.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
why does groupByKey return RDD[(K, Iterable[V])] not RDD[(K, CompactBuffer[V])] ?
Hi, Just a quick question, Regarding the source code of groupByKey: https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala#L453 In the end, it cast CompactBuffer to Iterable. But why ? Any advantage? Thank you. Hao. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/why-does-groupByKey-return-RDD-K-Iterable-V-not-RDD-K-CompactBuffer-V-tp22616.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: GroupByKey causing problem
Hi Tushar, The most scalable option is probably for you to consider doing some approximation. Eg., sample the first to come up with the bucket boundaries. Then you can assign data points to buckets without needing to do a full groupByKey. You could even have more passes which corrects any errors in your approximation (eg., see how sortByKey() works, and how it samples the underlying RDD when constructing the RangePartitioner). Though its more passes through the data, it will probably be much faster since you avoid the expensive groupByKey() Imran On Thu, Feb 26, 2015 at 3:38 AM, Tushar Sharma tushars...@gmail.com wrote: Hi, I am trying to apply binning to a large CSV dataset. Here are the steps I am taking: 1. Emit each value of CSV as (ColIndex,(RowIndex,value)) 2. Then I groupByKey (here ColumnIndex) and get all values of a particular index to one node, as I have to work on the collection of all values 3. I apply my binning algorithm which is as follows: a. Sort the values b. Iterate through values and see if it is different than the previous one if no then add it to the same bin if yes then check the size of that bin, if it is greater than a particular size (say 5% of wholedataset) then change the bin number, else keep the same bin c. repeat for each column Due to this algorithm I can't calculate it partition wise and merge for final result. But even for groupByKey I expect it should work , maybe slowly, but it should finish. I increased the partition to reduce the output of each groupByKey so that it helps in successful completion of the process. But even with that it is stuck at the same stage. The log for executor says: ExternalMapAppendOnly(splilling to disk) (Trying ...) The code works for small CSV files but can't complete for big files. val inputfile = hdfs://hm41:9000/user/file1 val table = sc.textFile(inputfile,1000) val withoutHeader: RDD[String] = dropHeader(table) val kvPairs = withoutHeader.flatMap(retAtrTuple) //val filter_na = kvPairs.map{case (x,y) = (x,if(y == NA) else y)} val isNum = kvPairs.map{case (x,y) = (x,isNumeric(y))}.reduceByKey(__) val numeric_indexes = isNum.filter{case (x,y) = y}.sortByKey().map{case (x,y) = x}.collect() //val isNum_Arr = isNum.sortByKey().collect() val kvidx = withoutHeader.zipWithIndex //val t = kvidx.map{case (a,b) = retAtrTuple(a).map(x =(x,b)) } val t = kvidx.flatMap{case (a,b) = retAtrTuple(a).map(x =(x,b)) } val t2 = t.filter{case (a,b) = numeric_indexes contains a._1 } //val t2 = t.filter{case (a,b) = a._1 ==0 } val t3 = t2.map{case ((a,b),c) = (a,(c,b.toDouble))} //val t4 = t3.sortBy(_._2._1) val t4 = t3.groupByKey.map{case (a,b) = (a,classing_summary(b.toArray.sortBy(_._2)))} def dropHeader(data: RDD[String]): RDD[String] = { data.mapPartitionsWithIndex((idx, lines) = { if (idx == 0) { lines.drop(1) } lines }) } def retAtrTuple(x: String) = { val newX = x.split(',') for (h - 0 until newX.length) yield (h, newX(h)) } def isNumeric(s: String): Boolean = { (allCatch opt s.toDouble).isDefined } def classing_summary(arr: Array[(Long, Double)]) = { var idx = 0L var value = 0.0 var prevValue = Double.MinValue var counter = 1 var classSize = 0.0 var size = arr.length val output = for(i - 0 until arr.length) yield { idx = arr(i)._1; value = arr(i)._2; if(value==prevValue){ classSize+=1.0/size; //println(both values same) //println(idx,value,classSize,counter,classSize); prevValue = value; (idx,value,counter,classSize); } else if(classSize(0.05)){ classSize+=1.0/size; //println(both values not same, adding to present bucket) //println(idx,value,classSize,counter,classSize); prevValue = value; (idx,value,counter,classSize); } else { classSize = 1.0/size; counter +=1; //println(both values not same, adding to different bucket) //println(idx,value,classSize,counter,classSize); prevValue = value; (idx,value,counter,classSize); } } output.toArray } Thanks in advance, Tushar Sharma
GroupByKey causing problem
Hi, I am trying to apply binning to a large CSV dataset. Here are the steps I am taking: 1. Emit each value of CSV as (ColIndex,(RowIndex,value)) 2. Then I groupByKey (here ColumnIndex) and get all values of a particular index to one node, as I have to work on the collection of all values 3. I apply my binning algorithm which is as follows: a. Sort the values b. Iterate through values and see if it is different than the previous one if no then add it to the same bin if yes then check the size of that bin, if it is greater than a particular size (say 5% of wholedataset) then change the bin number, else keep the same bin c. repeat for each column Due to this algorithm I can't calculate it partition wise and merge for final result. But even for groupByKey I expect it should work , maybe slowly, but it should finish. I increased the partition to reduce the output of each groupByKey so that it helps in successful completion of the process. But even with that it is stuck at the same stage. The log for executor says: ExternalMapAppendOnly(splilling to disk) (Trying ...) The code works for small CSV files but can't complete for big files. val inputfile = hdfs://hm41:9000/user/file1 val table = sc.textFile(inputfile,1000) val withoutHeader: RDD[String] = dropHeader(table) val kvPairs = withoutHeader.flatMap(retAtrTuple) //val filter_na = kvPairs.map{case (x,y) = (x,if(y == NA) else y)} val isNum = kvPairs.map{case (x,y) = (x,isNumeric(y))}.reduceByKey(__) val numeric_indexes = isNum.filter{case (x,y) = y}.sortByKey().map{case (x,y) = x}.collect() //val isNum_Arr = isNum.sortByKey().collect() val kvidx = withoutHeader.zipWithIndex //val t = kvidx.map{case (a,b) = retAtrTuple(a).map(x =(x,b)) } val t = kvidx.flatMap{case (a,b) = retAtrTuple(a).map(x =(x,b)) } val t2 = t.filter{case (a,b) = numeric_indexes contains a._1 } //val t2 = t.filter{case (a,b) = a._1 ==0 } val t3 = t2.map{case ((a,b),c) = (a,(c,b.toDouble))} //val t4 = t3.sortBy(_._2._1) val t4 = t3.groupByKey.map{case (a,b) = (a,classing_summary(b.toArray.sortBy(_._2)))} def dropHeader(data: RDD[String]): RDD[String] = { data.mapPartitionsWithIndex((idx, lines) = { if (idx == 0) { lines.drop(1) } lines }) } def retAtrTuple(x: String) = { val newX = x.split(',') for (h - 0 until newX.length) yield (h, newX(h)) } def isNumeric(s: String): Boolean = { (allCatch opt s.toDouble).isDefined } def classing_summary(arr: Array[(Long, Double)]) = { var idx = 0L var value = 0.0 var prevValue = Double.MinValue var counter = 1 var classSize = 0.0 var size = arr.length val output = for(i - 0 until arr.length) yield { idx = arr(i)._1; value = arr(i)._2; if(value==prevValue){ classSize+=1.0/size; //println(both values same) //println(idx,value,classSize,counter,classSize); prevValue = value; (idx,value,counter,classSize); } else if(classSize(0.05)){ classSize+=1.0/size; //println(both values not same, adding to present bucket) //println(idx,value,classSize,counter,classSize); prevValue = value; (idx,value,counter,classSize); } else { classSize = 1.0/size; counter +=1; //println(both values not same, adding to different bucket) //println(idx,value,classSize,counter,classSize); prevValue = value; (idx,value,counter,classSize); } } output.toArray } Thanks in advance, Tushar Sharma
A spark join and groupbykey that is making my containers on EC2 go over their memory limits
Hello, I have many questions about joins, but arguably just one. specifically about memory and containers that are overstepping their limits, as per errors dotted around all over the place, but something like: http://mail-archives.apache.org/mod_mbox/spark-issues/201405.mbox/%3CJIRA.12716648.1401112206043.18368.1401118322196@arcas%3E http://mail-archives.apache.org/mod_mbox/spark-issues/201405.mbox/%3CJIRA.12716648.1401112206043.18368.1401118322196@arcas%3E I have a job (mostly like this link: http://hastebin.com/quwamoreko.scala http://hastebin.com/quwamoreko.scala, but with a write-to-files-based-on-keys thing at the end) that is doing a join between a medium sized (like, 150,000 entries, classRDD in the link) RDD to a larger (108 million entry, objRDD in the link) RDD… the keys and values for each entry are quite small. In the linked join most objects will have 10 or so classes and most classes 100k associated objects. Though a few (10 or so?) classes will have millions of objects and some objects hundreds of classes. The issue i'm having is that (on an m2.xlarge ec2 instance) my container is overstepping the memory limits and being shut down This confuses me and makes me question my fundamental understanding of joins. I thought joins were a reduce operation that happened on disk. Further, my joins don’t seem to hold very much in memory, indeed at any given point a pair of strings and another string is all i seem to hold. The container limit is 7Gb according to the error in my container logs and has been apparently reasonable for jobs i’ve run in the past. But again, I don’t see where in my program i am actually keeping anything in memory at all. And yet sure enough, after about 30 minutes of running, over a time period of like 2 or so minutes, one of the containers grows from about 800mb to 7Gb and is promptly killed. So, my questions, what could be going on here and how can i fix it? Is this just some fundamental feature of my data or is there anything else i can do? Further rider questions: Is there some logger settings I can use for the logs to tell me exactly where in my job has been reached? i.e. which RDD is being constructed or which join is being performed? The RDD numbers and stages aren’t all that helpful and though i know the spark UI exists some logs i can refer back to when my cluster has long died would be great. Cheers - Sina
A spark join and groupbykey that is making my containers on EC2 go over their memory limits
Hello, I have many questions about joins, but arguably just one. specifically about memory and containers that are overstepping their limits, as per errors dotted around all over the place, but something like: http://mail-archives.apache.org/mod_mbox/spark-issues/201405.mbox/%3CJIRA.12716648.1401112206043.18368.1401118322196@arcas%3E http://mail-archives.apache.org/mod_mbox/spark-issues/201405.mbox/%3CJIRA.12716648.1401112206043.18368.1401118322196@arcas%3E I have a job (mostly like this link: http://hastebin.com/quwamoreko.scala http://hastebin.com/quwamoreko.scala, but with a write-to-files-based-on-keys thing at the end) that is doing a join between a medium sized (like, 150,000 entries, classRDD in the link) RDD to a larger (108 million entry, objRDD in the link) RDD… the keys and values for each entry are quite small. In the linked join most objects will have 10 or so classes and most classes 100k associated objects. Though a few (10 or so?) classes will have millions of objects and some objects hundreds of classes. The issue i'm having is that (on an m2.xlarge ec2 instance) my container is overstepping the memory limits and being shut down This confuses me and makes me question my fundamental understanding of joins. I thought joins were a reduce operation that happened on disk. Further, my joins don’t seem to hold very much in memory, indeed at any given point a pair of strings and another string is all i seem to hold. The container limit is 7Gb according to the error in my container logs and has been apparently reasonable for jobs i’ve run in the past. But again, I don’t see where in my program i am actually keeping anything in memory at all. And yet sure enough, after about 30 minutes of running, over a time period of like 2 or so minutes, one of the containers grows from about 800mb to 7Gb and is promptly killed. So, my questions, what could be going on here and how can i fix it? Is this just some fundamental feature of my data or is there anything else i can do? Further rider questions: Is there some logger settings I can use for the logs to tell me exactly where in my job has been reached? i.e. which RDD is being constructed or which join is being performed? The RDD numbers and stages aren’t all that helpful and though i know the spark UI exists some logs i can refer back to when my cluster has long died would be great. Cheers - Sina
A spark join and groupbykey that is making my containers on EC2 go over their memory limits
Hello, I have many questions about joins, but arguably just one. specifically about memory and containers that are overstepping their limits, as per errors dotted around all over the place, but something like: http://mail-archives.apache.org/mod_mbox/spark-issues/201405.mbox/%3CJIRA.12716648.1401112206043.18368.1401118322196@arcas%3E http://mail-archives.apache.org/mod_mbox/spark-issues/201405.mbox/%3CJIRA.12716648.1401112206043.18368.1401118322196@arcas%3E I have a job (mostly like this link: http://hastebin.com/quwamoreko.scala http://hastebin.com/quwamoreko.scala, but with a write-to-files-based-on-keys thing at the end) that is doing a join between a medium sized (like, 150,000 entries, classRDD in the link) RDD to a larger (108 million entry, objRDD in the link) RDD… the keys and values for each entry are quite small. In the linked join most objects will have 10 or so classes and most classes 100k associated objects. Though a few (10 or so?) classes will have millions of objects and some objects hundreds of classes. The issue i'm having is that (on an m2.xlarge ec2 instance) my container is overstepping the memory limits and being shut down This confuses me and makes me question my fundamental understanding of joins. I thought joins were a reduce operation that happened on disk. Further, my joins don’t seem to hold very much in memory, indeed at any given point a pair of strings and another string is all i seem to hold. The container limit is 7Gb according to the error in my container logs and has been apparently reasonable for jobs i’ve run in the past. But again, I don’t see where in my program i am actually keeping anything in memory at all. And yet sure enough, after about 30 minutes of running, over a time period of like 2 or so minutes, one of the containers grows from about 800mb to 7Gb and is promptly killed. So, my questions, what could be going on here and how can i fix it? Is this just some fundamental feature of my data or is there anything else i can do? Further rider questions: Is there some logger settings I can use for the logs to tell me exactly where in my job has been reached? i.e. which RDD is being constructed or which join is being performed? The RDD numbers and stages aren’t all that helpful and though i know the spark UI exists some logs i can refer back to when my cluster has long died would be great. Cheers - Sina
Re: groupByKey is not working
Amit - IJ will not find it until you add the import as Sean mentioned. It includes implicits that intellij will not know about otherwise. 2015-01-30 12:44 GMT-08:00 Amit Behera amit.bd...@gmail.com: I am sorry Sean. I am developing code in intelliJ Idea. so with the above dependencies I am not able to find *groupByKey* when I am searching by ctrl+space On Sat, Jan 31, 2015 at 2:04 AM, Sean Owen so...@cloudera.com wrote: When you post a question anywhere, and say it's not working, you *really* need to say what that means. On Fri, Jan 30, 2015 at 8:20 PM, Amit Behera amit.bd...@gmail.com wrote: hi all, my sbt file is like this: name := Spark version := 1.0 scalaVersion := 2.10.4 libraryDependencies += org.apache.spark %% spark-core % 1.1.0 libraryDependencies += net.sf.opencsv % opencsv % 2.3 code: object SparkJob { def pLines(lines:Iterator[String])={ val parser=new CSVParser() lines.map(l={val vs=parser.parseLine(l) (vs(0),vs(1).toInt)}) } def main(args: Array[String]) { val conf = new SparkConf().setAppName(Spark Job).setMaster(local) val sc = new SparkContext(conf) val data = sc.textFile(/home/amit/testData.csv).cache() val result = data.mapPartitions(pLines).groupByKey //val list = result.filter(x= {(x._1).contains(24050881)}) } } Here groupByKey is not working . But same thing is working from spark-shell. Please help me Thanks Amit
Re: groupByKey is not working
Hi Charles, I forgot to mention. But I imported the following import au.com.bytecode.opencsv.CSVParser import org.apache.spark._ On Sat, Jan 31, 2015 at 2:09 AM, Charles Feduke charles.fed...@gmail.com wrote: Define not working. Not compiling? If so you need: import org.apache.spark.SparkContext._ On Fri Jan 30 2015 at 3:21:45 PM Amit Behera amit.bd...@gmail.com wrote: hi all, my sbt file is like this: name := Spark version := 1.0 scalaVersion := 2.10.4 libraryDependencies += org.apache.spark %% spark-core % 1.1.0 libraryDependencies += net.sf.opencsv % opencsv % 2.3 *code:* object SparkJob { def pLines(lines:Iterator[String])={ val parser=new CSVParser() lines.map(l={val vs=parser.parseLine(l) (vs(0),vs(1).toInt)}) } def main(args: Array[String]) { val conf = new SparkConf().setAppName(Spark Job).setMaster(local) val sc = new SparkContext(conf) val data = sc.textFile(/home/amit/testData.csv).cache() val result = data.mapPartitions(pLines).groupByKey //val list = result.filter(x= {(x._1).contains(24050881)}) } } Here groupByKey is not working . But same thing is working from *spark-shell.* Please help me Thanks Amit
Re: groupByKey is not working
Thank you very much Charles, I got it :) On Sat, Jan 31, 2015 at 2:20 AM, Charles Feduke charles.fed...@gmail.com wrote: You'll still need to: import org.apache.spark.SparkContext._ Importing org.apache.spark._ does _not_ recurse into sub-objects or sub-packages, it only brings in whatever is at the level of the package or object imported. SparkContext._ has some implicits, one of them for adding groupByKey to an RDD[_] IIRC. On Fri Jan 30 2015 at 3:48:22 PM Stephen Boesch java...@gmail.com wrote: Amit - IJ will not find it until you add the import as Sean mentioned. It includes implicits that intellij will not know about otherwise. 2015-01-30 12:44 GMT-08:00 Amit Behera amit.bd...@gmail.com: I am sorry Sean. I am developing code in intelliJ Idea. so with the above dependencies I am not able to find *groupByKey* when I am searching by ctrl+space On Sat, Jan 31, 2015 at 2:04 AM, Sean Owen so...@cloudera.com wrote: When you post a question anywhere, and say it's not working, you *really* need to say what that means. On Fri, Jan 30, 2015 at 8:20 PM, Amit Behera amit.bd...@gmail.com wrote: hi all, my sbt file is like this: name := Spark version := 1.0 scalaVersion := 2.10.4 libraryDependencies += org.apache.spark %% spark-core % 1.1.0 libraryDependencies += net.sf.opencsv % opencsv % 2.3 code: object SparkJob { def pLines(lines:Iterator[String])={ val parser=new CSVParser() lines.map(l={val vs=parser.parseLine(l) (vs(0),vs(1).toInt)}) } def main(args: Array[String]) { val conf = new SparkConf().setAppName(Spark Job).setMaster(local) val sc = new SparkContext(conf) val data = sc.textFile(/home/amit/testData.csv).cache() val result = data.mapPartitions(pLines).groupByKey //val list = result.filter(x= {(x._1).contains(24050881)}) } } Here groupByKey is not working . But same thing is working from spark-shell. Please help me Thanks Amit
Re: groupByKey is not working
I am sorry Sean. I am developing code in intelliJ Idea. so with the above dependencies I am not able to find *groupByKey* when I am searching by ctrl+space On Sat, Jan 31, 2015 at 2:04 AM, Sean Owen so...@cloudera.com wrote: When you post a question anywhere, and say it's not working, you *really* need to say what that means. On Fri, Jan 30, 2015 at 8:20 PM, Amit Behera amit.bd...@gmail.com wrote: hi all, my sbt file is like this: name := Spark version := 1.0 scalaVersion := 2.10.4 libraryDependencies += org.apache.spark %% spark-core % 1.1.0 libraryDependencies += net.sf.opencsv % opencsv % 2.3 code: object SparkJob { def pLines(lines:Iterator[String])={ val parser=new CSVParser() lines.map(l={val vs=parser.parseLine(l) (vs(0),vs(1).toInt)}) } def main(args: Array[String]) { val conf = new SparkConf().setAppName(Spark Job).setMaster(local) val sc = new SparkContext(conf) val data = sc.textFile(/home/amit/testData.csv).cache() val result = data.mapPartitions(pLines).groupByKey //val list = result.filter(x= {(x._1).contains(24050881)}) } } Here groupByKey is not working . But same thing is working from spark-shell. Please help me Thanks Amit
Re: groupByKey is not working
You'll still need to: import org.apache.spark.SparkContext._ Importing org.apache.spark._ does _not_ recurse into sub-objects or sub-packages, it only brings in whatever is at the level of the package or object imported. SparkContext._ has some implicits, one of them for adding groupByKey to an RDD[_] IIRC. On Fri Jan 30 2015 at 3:48:22 PM Stephen Boesch java...@gmail.com wrote: Amit - IJ will not find it until you add the import as Sean mentioned. It includes implicits that intellij will not know about otherwise. 2015-01-30 12:44 GMT-08:00 Amit Behera amit.bd...@gmail.com: I am sorry Sean. I am developing code in intelliJ Idea. so with the above dependencies I am not able to find *groupByKey* when I am searching by ctrl+space On Sat, Jan 31, 2015 at 2:04 AM, Sean Owen so...@cloudera.com wrote: When you post a question anywhere, and say it's not working, you *really* need to say what that means. On Fri, Jan 30, 2015 at 8:20 PM, Amit Behera amit.bd...@gmail.com wrote: hi all, my sbt file is like this: name := Spark version := 1.0 scalaVersion := 2.10.4 libraryDependencies += org.apache.spark %% spark-core % 1.1.0 libraryDependencies += net.sf.opencsv % opencsv % 2.3 code: object SparkJob { def pLines(lines:Iterator[String])={ val parser=new CSVParser() lines.map(l={val vs=parser.parseLine(l) (vs(0),vs(1).toInt)}) } def main(args: Array[String]) { val conf = new SparkConf().setAppName(Spark Job).setMaster(local) val sc = new SparkContext(conf) val data = sc.textFile(/home/amit/testData.csv).cache() val result = data.mapPartitions(pLines).groupByKey //val list = result.filter(x= {(x._1).contains(24050881)}) } } Here groupByKey is not working . But same thing is working from spark-shell. Please help me Thanks Amit
Re: groupByKey is not working
Define not working. Not compiling? If so you need: import org.apache.spark.SparkContext._ On Fri Jan 30 2015 at 3:21:45 PM Amit Behera amit.bd...@gmail.com wrote: hi all, my sbt file is like this: name := Spark version := 1.0 scalaVersion := 2.10.4 libraryDependencies += org.apache.spark %% spark-core % 1.1.0 libraryDependencies += net.sf.opencsv % opencsv % 2.3 *code:* object SparkJob { def pLines(lines:Iterator[String])={ val parser=new CSVParser() lines.map(l={val vs=parser.parseLine(l) (vs(0),vs(1).toInt)}) } def main(args: Array[String]) { val conf = new SparkConf().setAppName(Spark Job).setMaster(local) val sc = new SparkContext(conf) val data = sc.textFile(/home/amit/testData.csv).cache() val result = data.mapPartitions(pLines).groupByKey //val list = result.filter(x= {(x._1).contains(24050881)}) } } Here groupByKey is not working . But same thing is working from *spark-shell.* Please help me Thanks Amit
groupByKey is not working
hi all, my sbt file is like this: name := Spark version := 1.0 scalaVersion := 2.10.4 libraryDependencies += org.apache.spark %% spark-core % 1.1.0 libraryDependencies += net.sf.opencsv % opencsv % 2.3 *code:* object SparkJob { def pLines(lines:Iterator[String])={ val parser=new CSVParser() lines.map(l={val vs=parser.parseLine(l) (vs(0),vs(1).toInt)}) } def main(args: Array[String]) { val conf = new SparkConf().setAppName(Spark Job).setMaster(local) val sc = new SparkContext(conf) val data = sc.textFile(/home/amit/testData.csv).cache() val result = data.mapPartitions(pLines).groupByKey //val list = result.filter(x= {(x._1).contains(24050881)}) } } Here groupByKey is not working . But same thing is working from *spark-shell.* Please help me Thanks Amit
Re: groupByKey is not working
Hi Amit, What error does it through? Thanks Arush On Sat, Jan 31, 2015 at 1:50 AM, Amit Behera amit.bd...@gmail.com wrote: hi all, my sbt file is like this: name := Spark version := 1.0 scalaVersion := 2.10.4 libraryDependencies += org.apache.spark %% spark-core % 1.1.0 libraryDependencies += net.sf.opencsv % opencsv % 2.3 *code:* object SparkJob { def pLines(lines:Iterator[String])={ val parser=new CSVParser() lines.map(l={val vs=parser.parseLine(l) (vs(0),vs(1).toInt)}) } def main(args: Array[String]) { val conf = new SparkConf().setAppName(Spark Job).setMaster(local) val sc = new SparkContext(conf) val data = sc.textFile(/home/amit/testData.csv).cache() val result = data.mapPartitions(pLines).groupByKey //val list = result.filter(x= {(x._1).contains(24050881)}) } } Here groupByKey is not working . But same thing is working from *spark-shell.* Please help me Thanks Amit -- [image: Sigmoid Analytics] http://htmlsig.com/www.sigmoidanalytics.com *Arush Kharbanda* || Technical Teamlead ar...@sigmoidanalytics.com || www.sigmoidanalytics.com
PySpark Loading Json Following by groupByKey seems broken in spark 1.1.1
When I run a groupByKey it seems to create a single tasks after the groupByKey that never stops executing. I'm loading a smallish json dataset that is 4 million. This is the code I'm running. rdd = sql_context.jsonFile(uri) rdd = rdd.cache() grouped = rdd.map(lambda row: (row.id, row)).groupByKey(160) grouped.take(1) The groupByKey stage takes a few minutes with 160 tasks which is expected. However it then creates a single task runjob at PythonRDD.scala:300 that never ends. I gave up after 30minutes. http://apache-spark-user-list.1001560.n3.nabble.com/file/n20559/Screen_Shot_2014-12-05_at_6.png -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/PySpark-Loading-Json-Following-by-groupByKey-seems-broken-in-spark-1-1-1-tp20559.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Alternatives to groupByKey
I think it would depend on the type and amount of information you're collecting. If you're just trying to collect small numbers for each window, and don't have an overwhelming number of windows, you might consider using accumulators. Just make one per value per time window, and for each data point, add it to the accumulators for the time windows in which it belongs. We've found this approach a lot faster than anything involving a shuffle. This should work fine for stuff like max(), min(), and mean() If you're collecting enough data that accumulators are impractical, I think I would try multiple passes. Cache your data, and for each pass, filter to that window, and perform all your operations on the filtered RDD. Because of the caching, it won't be significantly slower than processing it all at once - in fact, it will probably be a lot faster, because the shuffles are shuffling less information. This is similar to what you're suggesting about partitioning your rdd, but probably simpler and easier. That being said, your restriction 3 seems to be in contradiction to the rest of your request - if your aggregation needs to be able to look at all the data at once, then that seems contradictory to viewing the data through an RDD. Could you explain a bit more what you mean by that? -Nathan On Wed, Dec 3, 2014 at 4:26 PM, ameyc ambr...@gmail.com wrote: Hi, So my Spark app needs to run a sliding window through a time series dataset (I'm not using Spark streaming). And then run different types on aggregations on per window basis. Right now I'm using a groupByKey() which gives me Iterables for each window. There are a few concerns I have with this approach: 1. groupByKey() could potentially fail for a key not fitting in the memory. 2. I'd like to run aggregations like max(), mean() on each of the groups, it'd be nice to have the RDD functionality at this point instead of the iterables. 3. I can't use reduceByKey() or aggregateByKey() are some of my aggregations need to have a view of the entire window. Only other way I could think of is partitioning my RDDs into multiple RDDs with each RDD representing a window. Is this a sensible approach? Or is there any other way of going about this? -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Alternatives-to-groupByKey-tp20293.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org -- Nathan Kronenfeld Senior Visualization Developer Oculus Info Inc 2 Berkeley Street, Suite 600, Toronto, Ontario M5A 4J5 Phone: +1-416-203-3003 x 238 Email: nkronenf...@oculusinfo.com
Re: Alternatives to groupByKey
do these requirements boils down to a need for foldLeftByKey with sorting of the values? https://issues.apache.org/jira/browse/SPARK-3655 On Wed, Dec 3, 2014 at 6:34 PM, Xuefeng Wu ben...@gmail.com wrote: I have similar requirememt,take top N by key. right now I use groupByKey,but one key would group more than half data in some dataset. Yours, Xuefeng Wu 吴雪峰 敬上 On 2014年12月4日, at 上午7:26, Nathan Kronenfeld nkronenf...@oculusinfo.com wrote: I think it would depend on the type and amount of information you're collecting. If you're just trying to collect small numbers for each window, and don't have an overwhelming number of windows, you might consider using accumulators. Just make one per value per time window, and for each data point, add it to the accumulators for the time windows in which it belongs. We've found this approach a lot faster than anything involving a shuffle. This should work fine for stuff like max(), min(), and mean() If you're collecting enough data that accumulators are impractical, I think I would try multiple passes. Cache your data, and for each pass, filter to that window, and perform all your operations on the filtered RDD. Because of the caching, it won't be significantly slower than processing it all at once - in fact, it will probably be a lot faster, because the shuffles are shuffling less information. This is similar to what you're suggesting about partitioning your rdd, but probably simpler and easier. That being said, your restriction 3 seems to be in contradiction to the rest of your request - if your aggregation needs to be able to look at all the data at once, then that seems contradictory to viewing the data through an RDD. Could you explain a bit more what you mean by that? -Nathan On Wed, Dec 3, 2014 at 4:26 PM, ameyc ambr...@gmail.com wrote: Hi, So my Spark app needs to run a sliding window through a time series dataset (I'm not using Spark streaming). And then run different types on aggregations on per window basis. Right now I'm using a groupByKey() which gives me Iterables for each window. There are a few concerns I have with this approach: 1. groupByKey() could potentially fail for a key not fitting in the memory. 2. I'd like to run aggregations like max(), mean() on each of the groups, it'd be nice to have the RDD functionality at this point instead of the iterables. 3. I can't use reduceByKey() or aggregateByKey() are some of my aggregations need to have a view of the entire window. Only other way I could think of is partitioning my RDDs into multiple RDDs with each RDD representing a window. Is this a sensible approach? Or is there any other way of going about this? -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Alternatives-to-groupByKey-tp20293.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org -- Nathan Kronenfeld Senior Visualization Developer Oculus Info Inc 2 Berkeley Street, Suite 600, Toronto, Ontario M5A 4J5 Phone: +1-416-203-3003 x 238 Email: nkronenf...@oculusinfo.com
Re: Alternatives to groupByKey
looks good. I concern about the foldLeftByKey which looks break the consistence from foldLeft in RDD and aggregateByKey in PairRDD Yours, Xuefeng Wu 吴雪峰 敬上 On 2014年12月4日, at 上午7:47, Koert Kuipers ko...@tresata.com wrote: foldLeftByKey - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: How to make operation like cogrop() , groupbykey() on pair RDD = [ [ ], [ ] , [ ] ]
Hi, You just need add list() in the sorted function. For example, map((lambda (x,y): (x, (list(y[0]), list(y[1], sorted(list(rdd1.cogroup(rdd2).collect( I think you just forget the list... PS: your post has NOT been accepted by the mailing list yet. Best Gen pm wrote Hi , Thanks for reply , now after doing cogroup mentioned in below, merge_rdd = map((lambda (x,y): (x, (list(y[0]), list(y[1], sorted((rdd1.cogroup(rdd2).collect( map((lambda (x,y): (x, (list(y[0]), list(y[1], sorted((merge_rdd.cogroup(rdd3).collect( i m getting output like [((u'abc', u'0010'), ([( pyspark.resultiterable.ResultIterable at 0x4b1b4d0 , pyspark.resultiterable.ResultIterable at 0x4b1b550 )], [[(u'address, u'2017 CAN'), (u'address_city', u'VESTAVIA '), ]])), ((u'abc', u'0020'), ([( pyspark.resultiterable.ResultIterable at 0x4b1bd50 , pyspark.resultiterable.ResultIterable at 0x4b1bf10 )], [[(u'address', u'2017 CAN'), (u'address_city', u'VESTAV'), ]]))] How to show value for object pyspark.resultiterable.ResultIterable at 0x4b1b4d0. I want to show data for pyspark.resultiterable.ResultIterable at 0x4b1bd50. Could please tell me the way to show data for those object . I m using python Thanks, -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/How-to-make-operation-like-cogrop-groupbykey-on-pair-RDD-tp16487p16598.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: How to make operation like cogrop() , groupbykey() on pair RDD = [ [ ], [ ] , [ ] ]
What results do you want? If your pair is like (a, b), where a is the key and b is the value, you can try rdd1 = rdd1.flatMap(lambda l: l) and then use cogroup. Best Gen -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/How-to-make-operation-like-cogrop-groupbykey-on-pair-RDD-tp16487p16489.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Spark groupByKey partition out of memory
When a MappedRDD is handled by groupByKey transformation, tuples distributed in different worker nodes with the same key will be collected into one worker nodes, say, (K, V1), (K, V2), ..., (K, Vn) - (K, Seq(V1, V2, ..., Vn)). I want to know whether the value /Seq(V1, V2, ..., Vn)/ of a tuple in the grouped RDD can reside in different nodes or have to be in one node, if I set the number of partitions when using groupByKey. If the value /Seq(V1, V2, ..., Vn)/ can only reside in the memory of just one machine, out of memory risk exists in case the size of the /Seq(V1, V2, ..., Vn)/ is larger than the JVM memory limit of this machine. if this case happens, how should we deal with? -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Spark-groupByKey-partition-out-of-memory-tp13669.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org