Re: Replacing groupBykey() with reduceByKey()
Hi Santhosh, My name is not Bipin, its Biplob as is clear from my Signature. Regarding your question, I have no clue what your map operation is doing on the grouped data, so I can only suggest you to do : dd = hive_context.read.orc(orcfile_dir).rdd.map(lambda x: (x[0],x)).reduceByKey(build_edges, 25) Although based on the return type you would have to modify your build_edges function. Thanks & Regards Biplob Biswas On Mon, Aug 6, 2018 at 6:28 PM Bathi CCDB wrote: > Hey Bipin, > Thanks for the reply, I am actually aggregating after the groupByKey() > operation, > I have posted the wrong code snippet in my first email. Here is what I am > doing > > dd = hive_context.read.orc(orcfile_dir).rdd.map(lambda x: > (x[0],x)).groupByKey(25).map(build_edges) > > Can we replace reduceByKey() in this context ? > > Santhosh > > > On Mon, Aug 6, 2018 at 1:20 AM, Biplob Biswas > wrote: > >> Hi Santhosh, >> >> If you are not performing any aggregation, then I don't think you can >> replace your groupbykey with a reducebykey, and as I see you are only >> grouping and taking 2 values of the result, thus I believe you can't just >> replace your groupbykey with that. >> >> Thanks & Regards >> Biplob Biswas >> >> >> On Sat, Aug 4, 2018 at 12:05 AM Bathi CCDB wrote: >> >>> I am trying to replace groupByKey() with reudceByKey(), I am a pyspark >>> and python newbie and I am having a hard time figuring out the lambda >>> function for the reduceByKey() operation. >>> >>> Here is the code >>> >>> dd = hive_context.read.orc(orcfile_dir).rdd.map(lambda x: >>> (x[0],x)).groupByKey(25).take(2) >>> >>> Here is the return value >>> >>> >>> dd[(u'KEY_1', >> >>> 0x107be0c50>), (u'KEY_2', >> >>> at 0x107be0c10>)] >>> >>> and Here are the iterable contents dd[0][1] >>> >>> Row(key=u'KEY_1', hash_fn=u'deec95d65ca6b3b4f2e1ef259040aa79', >>> value=u'e7dc1f2a')Row(key=u'KEY_1', >>> hash_fn=u'f8891048a9ef8331227b4af080ecd28a', >>> value=u'fb0bc953')...Row(key=u'KEY_1', >>> hash_fn=u'1b9d2bb2db28603ff21052efcd13f242', >>> value=u'd39714d3')Row(key=u'KEY_1', >>> hash_fn=u'c41b0269706ac423732a6bab24bf8a6a', value=u'ab58db92') >>> >>> My question is how do replace with reduceByKey() and get the same >>> output as above? >>> >>> Santhosh >>> >> >
Re: Replacing groupBykey() with reduceByKey()
Hey Bipin, Thanks for the reply, I am actually aggregating after the groupByKey() operation, I have posted the wrong code snippet in my first email. Here is what I am doing dd = hive_context.read.orc(orcfile_dir).rdd.map(lambda x: (x[0],x)).groupByKey(25).map(build_edges) Can we replace reduceByKey() in this context ? Santhosh On Mon, Aug 6, 2018 at 1:20 AM, Biplob Biswas wrote: > Hi Santhosh, > > If you are not performing any aggregation, then I don't think you can > replace your groupbykey with a reducebykey, and as I see you are only > grouping and taking 2 values of the result, thus I believe you can't just > replace your groupbykey with that. > > Thanks & Regards > Biplob Biswas > > > On Sat, Aug 4, 2018 at 12:05 AM Bathi CCDB wrote: > >> I am trying to replace groupByKey() with reudceByKey(), I am a pyspark >> and python newbie and I am having a hard time figuring out the lambda >> function for the reduceByKey() operation. >> >> Here is the code >> >> dd = hive_context.read.orc(orcfile_dir).rdd.map(lambda x: >> (x[0],x)).groupByKey(25).take(2) >> >> Here is the return value >> >> >>> dd[(u'KEY_1', > >>> 0x107be0c50>), (u'KEY_2', > >>> at 0x107be0c10>)] >> >> and Here are the iterable contents dd[0][1] >> >> Row(key=u'KEY_1', hash_fn=u'deec95d65ca6b3b4f2e1ef259040aa79', >> value=u'e7dc1f2a')Row(key=u'KEY_1', >> hash_fn=u'f8891048a9ef8331227b4af080ecd28a', >> value=u'fb0bc953')...Row(key=u'KEY_1', >> hash_fn=u'1b9d2bb2db28603ff21052efcd13f242', >> value=u'd39714d3')Row(key=u'KEY_1', >> hash_fn=u'c41b0269706ac423732a6bab24bf8a6a', value=u'ab58db92') >> >> My question is how do replace with reduceByKey() and get the same output >> as above? >> >> Santhosh >> >
Re: Replacing groupBykey() with reduceByKey()
Hi Santhosh, If you are not performing any aggregation, then I don't think you can replace your groupbykey with a reducebykey, and as I see you are only grouping and taking 2 values of the result, thus I believe you can't just replace your groupbykey with that. Thanks & Regards Biplob Biswas On Sat, Aug 4, 2018 at 12:05 AM Bathi CCDB wrote: > I am trying to replace groupByKey() with reudceByKey(), I am a pyspark > and python newbie and I am having a hard time figuring out the lambda > function for the reduceByKey() operation. > > Here is the code > > dd = hive_context.read.orc(orcfile_dir).rdd.map(lambda x: > (x[0],x)).groupByKey(25).take(2) > > Here is the return value > > >>> dd[(u'KEY_1', >>> 0x107be0c50>), (u'KEY_2', >>> at 0x107be0c10>)] > > and Here are the iterable contents dd[0][1] > > Row(key=u'KEY_1', hash_fn=u'deec95d65ca6b3b4f2e1ef259040aa79', > value=u'e7dc1f2a')Row(key=u'KEY_1', > hash_fn=u'f8891048a9ef8331227b4af080ecd28a', > value=u'fb0bc953')...Row(key=u'KEY_1', > hash_fn=u'1b9d2bb2db28603ff21052efcd13f242', > value=u'd39714d3')Row(key=u'KEY_1', > hash_fn=u'c41b0269706ac423732a6bab24bf8a6a', value=u'ab58db92') > > My question is how do replace with reduceByKey() and get the same output > as above? > > Santhosh >
Replacing groupBykey() with reduceByKey()
I am trying to replace groupByKey() with reudceByKey(), I am a pyspark and python newbie and I am having a hard time figuring out the lambda function for the reduceByKey() operation. Here is the code dd = hive_context.read.orc(orcfile_dir).rdd.map(lambda x: (x[0],x)).groupByKey(25).take(2) Here is the return value >>> dd[(u'KEY_1', >> 0x107be0c50>), (u'KEY_2', >> 0x107be0c10>)] and Here are the iterable contents dd[0][1] Row(key=u'KEY_1', hash_fn=u'deec95d65ca6b3b4f2e1ef259040aa79', value=u'e7dc1f2a')Row(key=u'KEY_1', hash_fn=u'f8891048a9ef8331227b4af080ecd28a', value=u'fb0bc953')...Row(key=u'KEY_1', hash_fn=u'1b9d2bb2db28603ff21052efcd13f242', value=u'd39714d3')Row(key=u'KEY_1', hash_fn=u'c41b0269706ac423732a6bab24bf8a6a', value=u'ab58db92') My question is how do replace with reduceByKey() and get the same output as above? Santhosh
Re: Iterative rdd union + reduceByKey operations on small dataset leads to "No space left on device" error on account of lot of shuffle spill.
Yeah, you are right. I ran the experiments locally not on YARN. On Fri, Jul 27, 2018 at 11:54 PM, Vadim Semenov wrote: > `spark.worker.cleanup.enabled=true` doesn't work for YARN. > On Fri, Jul 27, 2018 at 8:52 AM dineshdharme > wrote: > > > > I am trying to do few (union + reduceByKey) operations on a hiearchical > > dataset in a iterative fashion in rdd. The first few loops run fine but > on > > the subsequent loops, the operations ends up using the whole scratch > space > > provided to it. > > > > I have set the spark scratch directory, i.e. SPARK_LOCAL_DIRS , to be one > > having 100 GB space. > > The heirarchical dataset, whose size is (< 400kB), remains constant > > throughout the iterations. > > I have tried the worker cleanup flag but it has no effect i.e. > > "spark.worker.cleanup.enabled=true" > > > > > > > > Error : > > Caused by: java.io.IOException: No space left on device > > at java.io.FileOutputStream.writeBytes(Native Method) > > at java.io.FileOutputStream.write(FileOutputStream.java:326) > > at java.io.BufferedOutputStream.flushBuffer( > BufferedOutputStream.java:82) > > at java.io.BufferedOutputStream.write(BufferedOutputStream.java:126) > > at java.io.DataOutputStream.writeLong(DataOutputStream.java:224) > > at > > org.apache.spark.shuffle.IndexShuffleBlockResolver$$anonfun$ > writeIndexFileAndCommit$1$$anonfun$apply$mcV$sp$1.apply$mcVJ$sp( > IndexShuffleBlockResolver.scala:151) > > at > > org.apache.spark.shuffle.IndexShuffleBlockResolver$$anonfun$ > writeIndexFileAndCommit$1$$anonfun$apply$mcV$sp$1.apply( > IndexShuffleBlockResolver.scala:149) > > at > > org.apache.spark.shuffle.IndexShuffleBlockResolver$$anonfun$ > writeIndexFileAndCommit$1$$anonfun$apply$mcV$sp$1.apply( > IndexShuffleBlockResolver.scala:149) > > at > > scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized. > scala:33) > > at scala.collection.mutable.ArrayOps$ofLong.foreach(ArrayOps.scala:246) > > at > > org.apache.spark.shuffle.IndexShuffleBlockResolver$$anonfun$ > writeIndexFileAndCommit$1.apply$mcV$sp(IndexShuffleBlockResolver. > scala:149) > > at > > org.apache.spark.shuffle.IndexShuffleBlockResolver$$anonfun$ > writeIndexFileAndCommit$1.apply(IndexShuffleBlockResolver.scala:145) > > at > > org.apache.spark.shuffle.IndexShuffleBlockResolver$$anonfun$ > writeIndexFileAndCommit$1.apply(IndexShuffleBlockResolver.scala:145) > > at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1377) > > at > > org.apache.spark.shuffle.IndexShuffleBlockResolver. > writeIndexFileAndCommit(IndexShuffleBlockResolver.scala:153) > > at > > org.apache.spark.shuffle.sort.SortShuffleWriter.write( > SortShuffleWriter.scala:73) > > at > > org.apache.spark.scheduler.ShuffleMapTask.runTask( > ShuffleMapTask.scala:96) > > at > > org.apache.spark.scheduler.ShuffleMapTask.runTask( > ShuffleMapTask.scala:53) > > at org.apache.spark.scheduler.Task.run(Task.scala:109) > > at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:345) > > at > > java.util.concurrent.ThreadPoolExecutor.runWorker( > ThreadPoolExecutor.java:1149) > > at > > java.util.concurrent.ThreadPoolExecutor$Worker.run( > ThreadPoolExecutor.java:624) > > at java.lang.Thread.run(Thread.java:748) > > > > > > What I am trying to do (High Level): > > > > I have a dataset of 5 different csv ( Parent, Child1, Child2, Child21, > > Child22 ) which are related in a hierarchical fashion as shown below. > > > > Parent-> Child1 -> Child2 -> Child21 > > > > Parent-> Child1 -> Child2 -> Child22 > > > > Each element in the tree has 14 columns (elementid, parentelement_id, > cat1, > > cat2, num1, num2,., num10) > > > > I am trying to aggregate the values of one column of Child21 into Child1 > > (i.e. 2 levels up). I am doing the same for another column value of > Child22 > > into Child1. Then I am merging these aggregated values at the same Child1 > > level. > > > > This is present in the code at location : > > > > spark.rddexample.dummyrdd.tree.child1.events.Function1 > > > > > > Code which replicates the issue: > > > > 1] https://github.com/dineshdharme/SparkRddShuffleIssue > > > > > > > > Steps to reproduce the issue : > > > > 1] Clone the above repository. > > > > 2] Put the csvs in the "issue-data" folder in the above repository at a > > hadoop location "hdfs:///tr
Re: Iterative rdd union + reduceByKey operations on small dataset leads to "No space left on device" error on account of lot of shuffle spill.
`spark.worker.cleanup.enabled=true` doesn't work for YARN. On Fri, Jul 27, 2018 at 8:52 AM dineshdharme wrote: > > I am trying to do few (union + reduceByKey) operations on a hiearchical > dataset in a iterative fashion in rdd. The first few loops run fine but on > the subsequent loops, the operations ends up using the whole scratch space > provided to it. > > I have set the spark scratch directory, i.e. SPARK_LOCAL_DIRS , to be one > having 100 GB space. > The heirarchical dataset, whose size is (< 400kB), remains constant > throughout the iterations. > I have tried the worker cleanup flag but it has no effect i.e. > "spark.worker.cleanup.enabled=true" > > > > Error : > Caused by: java.io.IOException: No space left on device > at java.io.FileOutputStream.writeBytes(Native Method) > at java.io.FileOutputStream.write(FileOutputStream.java:326) > at java.io.BufferedOutputStream.flushBuffer(BufferedOutputStream.java:82) > at java.io.BufferedOutputStream.write(BufferedOutputStream.java:126) > at java.io.DataOutputStream.writeLong(DataOutputStream.java:224) > at > org.apache.spark.shuffle.IndexShuffleBlockResolver$$anonfun$writeIndexFileAndCommit$1$$anonfun$apply$mcV$sp$1.apply$mcVJ$sp(IndexShuffleBlockResolver.scala:151) > at > org.apache.spark.shuffle.IndexShuffleBlockResolver$$anonfun$writeIndexFileAndCommit$1$$anonfun$apply$mcV$sp$1.apply(IndexShuffleBlockResolver.scala:149) > at > org.apache.spark.shuffle.IndexShuffleBlockResolver$$anonfun$writeIndexFileAndCommit$1$$anonfun$apply$mcV$sp$1.apply(IndexShuffleBlockResolver.scala:149) > at > scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33) > at scala.collection.mutable.ArrayOps$ofLong.foreach(ArrayOps.scala:246) > at > org.apache.spark.shuffle.IndexShuffleBlockResolver$$anonfun$writeIndexFileAndCommit$1.apply$mcV$sp(IndexShuffleBlockResolver.scala:149) > at > org.apache.spark.shuffle.IndexShuffleBlockResolver$$anonfun$writeIndexFileAndCommit$1.apply(IndexShuffleBlockResolver.scala:145) > at > org.apache.spark.shuffle.IndexShuffleBlockResolver$$anonfun$writeIndexFileAndCommit$1.apply(IndexShuffleBlockResolver.scala:145) > at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1377) > at > org.apache.spark.shuffle.IndexShuffleBlockResolver.writeIndexFileAndCommit(IndexShuffleBlockResolver.scala:153) > at > org.apache.spark.shuffle.sort.SortShuffleWriter.write(SortShuffleWriter.scala:73) > at > org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:96) > at > org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:53) > at org.apache.spark.scheduler.Task.run(Task.scala:109) > at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:345) > at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) > at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) > at java.lang.Thread.run(Thread.java:748) > > > What I am trying to do (High Level): > > I have a dataset of 5 different csv ( Parent, Child1, Child2, Child21, > Child22 ) which are related in a hierarchical fashion as shown below. > > Parent-> Child1 -> Child2 -> Child21 > > Parent-> Child1 -> Child2 -> Child22 > > Each element in the tree has 14 columns (elementid, parentelement_id, cat1, > cat2, num1, num2,., num10) > > I am trying to aggregate the values of one column of Child21 into Child1 > (i.e. 2 levels up). I am doing the same for another column value of Child22 > into Child1. Then I am merging these aggregated values at the same Child1 > level. > > This is present in the code at location : > > spark.rddexample.dummyrdd.tree.child1.events.Function1 > > > Code which replicates the issue: > > 1] https://github.com/dineshdharme/SparkRddShuffleIssue > > > > Steps to reproduce the issue : > > 1] Clone the above repository. > > 2] Put the csvs in the "issue-data" folder in the above repository at a > hadoop location "hdfs:///tree/dummy/data/" > > 3] Set the spark scratch directory (SPARK_LOCAL_DIRS) to a folder which has > large space. (> 100 GB) > > 4] Run "sbt assembly" > > 5] Run the following command at the project location > > /path/to/spark-2.3.0-bin-hadoop2.7/bin/spark-submit \ > --class spark.rddexample.dummyrdd.FunctionExecutor \ > --master local[2] \ > --deploy-mode client \ > --executor-memory 2G \ > --driver-memory 2G \ > target/scala-2.11/rdd-shuffle-assembly-0.1.0.jar \ > 20 \ > hdfs:///tree/dummy/data/ \ > hdfs:///tree/dummy/results/ > > > > -- > Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/ > > - > To unsubscribe e-mail: user-unsubscr...@spark.apache.org > -- Sent from my iPhone - To unsubscribe e-mail: user-unsubscr...@spark.apache.org
Iterative rdd union + reduceByKey operations on small dataset leads to "No space left on device" error on account of lot of shuffle spill.
I am trying to do few (union + reduceByKey) operations on a hiearchical dataset in a iterative fashion in rdd. The first few loops run fine but on the subsequent loops, the operations ends up using the whole scratch space provided to it. I have set the spark scratch directory, i.e. SPARK_LOCAL_DIRS , to be one having 100 GB space. The heirarchical dataset, whose size is (< 400kB), remains constant throughout the iterations. I have tried the worker cleanup flag but it has no effect i.e. "spark.worker.cleanup.enabled=true" Error : Caused by: java.io.IOException: No space left on device at java.io.FileOutputStream.writeBytes(Native Method) at java.io.FileOutputStream.write(FileOutputStream.java:326) at java.io.BufferedOutputStream.flushBuffer(BufferedOutputStream.java:82) at java.io.BufferedOutputStream.write(BufferedOutputStream.java:126) at java.io.DataOutputStream.writeLong(DataOutputStream.java:224) at org.apache.spark.shuffle.IndexShuffleBlockResolver$$anonfun$writeIndexFileAndCommit$1$$anonfun$apply$mcV$sp$1.apply$mcVJ$sp(IndexShuffleBlockResolver.scala:151) at org.apache.spark.shuffle.IndexShuffleBlockResolver$$anonfun$writeIndexFileAndCommit$1$$anonfun$apply$mcV$sp$1.apply(IndexShuffleBlockResolver.scala:149) at org.apache.spark.shuffle.IndexShuffleBlockResolver$$anonfun$writeIndexFileAndCommit$1$$anonfun$apply$mcV$sp$1.apply(IndexShuffleBlockResolver.scala:149) at scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33) at scala.collection.mutable.ArrayOps$ofLong.foreach(ArrayOps.scala:246) at org.apache.spark.shuffle.IndexShuffleBlockResolver$$anonfun$writeIndexFileAndCommit$1.apply$mcV$sp(IndexShuffleBlockResolver.scala:149) at org.apache.spark.shuffle.IndexShuffleBlockResolver$$anonfun$writeIndexFileAndCommit$1.apply(IndexShuffleBlockResolver.scala:145) at org.apache.spark.shuffle.IndexShuffleBlockResolver$$anonfun$writeIndexFileAndCommit$1.apply(IndexShuffleBlockResolver.scala:145) at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1377) at org.apache.spark.shuffle.IndexShuffleBlockResolver.writeIndexFileAndCommit(IndexShuffleBlockResolver.scala:153) at org.apache.spark.shuffle.sort.SortShuffleWriter.write(SortShuffleWriter.scala:73) at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:96) at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:53) at org.apache.spark.scheduler.Task.run(Task.scala:109) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:345) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) at java.lang.Thread.run(Thread.java:748) What I am trying to do (High Level): I have a dataset of 5 different csv ( Parent, Child1, Child2, Child21, Child22 ) which are related in a hierarchical fashion as shown below. Parent-> Child1 -> Child2 -> Child21 Parent-> Child1 -> Child2 -> Child22 Each element in the tree has 14 columns (elementid, parentelement_id, cat1, cat2, num1, num2,., num10) I am trying to aggregate the values of one column of Child21 into Child1 (i.e. 2 levels up). I am doing the same for another column value of Child22 into Child1. Then I am merging these aggregated values at the same Child1 level. This is present in the code at location : spark.rddexample.dummyrdd.tree.child1.events.Function1 Code which replicates the issue: 1] https://github.com/dineshdharme/SparkRddShuffleIssue Steps to reproduce the issue : 1] Clone the above repository. 2] Put the csvs in the "issue-data" folder in the above repository at a hadoop location "hdfs:///tree/dummy/data/" 3] Set the spark scratch directory (SPARK_LOCAL_DIRS) to a folder which has large space. (> 100 GB) 4] Run "sbt assembly" 5] Run the following command at the project location /path/to/spark-2.3.0-bin-hadoop2.7/bin/spark-submit \ --class spark.rddexample.dummyrdd.FunctionExecutor \ --master local[2] \ --deploy-mode client \ --executor-memory 2G \ --driver-memory 2G \ target/scala-2.11/rdd-shuffle-assembly-0.1.0.jar \ 20 \ hdfs:///tree/dummy/data/ \ hdfs:///tree/dummy/results/ -- Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/ - To unsubscribe e-mail: user-unsubscr...@spark.apache.org
Re: Structured Stream equivalent of reduceByKey
Hmmm, I see. You could output the delta using flatMapGroupsWithState <https://spark.apache.org/docs/2.2.0/api/java/org/apache/spark/sql/KeyValueGroupedDataset.html#flatMapGroupsWithState-org.apache.spark.api.java.function.FlatMapGroupsWithStateFunction-org.apache.spark.sql.streaming.OutputMode-org.apache.spark.sql.Encoder-org.apache.spark.sql.Encoder-org.apache.spark.sql.streaming.GroupStateTimeout-> probably. On Thu, Oct 26, 2017 at 10:11 PM, Piyush Mukati <piyush.muk...@gmail.com> wrote: > Thanks, Michael > I have explored Aggregator > <https://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.sql.expressions.Aggregator> > with > update mode. The problem is it will give the overall aggregated value for > the changed. while I only want the delta change in the group as the > aggregation we are doing at sink level too. > > Below is the plan generated with count Aggregator. > > *HashAggregate > StateStoreSave > *HashAggregate, > StateStoreRestore > *HashAggregate, > Exchange > *HashAggregate, > *Project > StreamingRelation > > we are looking for some aggregation which will avoid state > store interactions. > > Also anyone aware of any design doc or some example about how we can add > new operation on dataSet and corresponding physical plan. > > > > On Thu, Oct 26, 2017 at 5:54 PM, Michael Armbrust <mich...@databricks.com> > wrote: > >> - dev >> >> I think you should be able to write an Aggregator >> <https://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.sql.expressions.Aggregator>. >> You probably want to run in update mode if you are looking for it to output >> any group that has changed in the batch. >> >> On Wed, Oct 25, 2017 at 5:52 PM, Piyush Mukati <piyush.muk...@gmail.com> >> wrote: >> >>> Hi, >>> we are migrating some jobs from Dstream to Structured Stream. >>> >>> Currently to handle aggregations we call map and reducebyKey on each RDD >>> like >>> rdd.map(event => (event._1, event)).reduceByKey((a, b) => merge(a, b)) >>> >>> The final output of each RDD is merged to the sink with support for >>> aggregation at the sink( Like co-processor at HBase ). >>> >>> In the new DataSet API, I am not finding any suitable API to aggregate >>> over the micro-batch. >>> Most of the aggregation API uses state-store and provide global >>> aggregations. ( with append mode it does not give the change in existing >>> buckets ) >>> Problems we are suspecting are : >>> 1) state-store is tightly linked to the job definitions. while in our >>> case we want may edit the job while keeping the older calculated aggregate >>> as it is. >>> >>> The desired result can be achieved with below dataset APIs. >>> dataset.groupByKey(a=>a._1).mapGroups( (key, valueItr) => >>> merge(valueItr)) >>> while on observing the physical plan it does not call any merge before >>> sort. >>> >>> Anyone aware of API or other workarounds to get the desired result? >>> >> >> >
Re: Structured Stream equivalent of reduceByKey
Thanks, Michael I have explored Aggregator <https://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.sql.expressions.Aggregator> with update mode. The problem is it will give the overall aggregated value for the changed. while I only want the delta change in the group as the aggregation we are doing at sink level too. Below is the plan generated with count Aggregator. *HashAggregate StateStoreSave *HashAggregate, StateStoreRestore *HashAggregate, Exchange *HashAggregate, *Project StreamingRelation we are looking for some aggregation which will avoid state store interactions. Also anyone aware of any design doc or some example about how we can add new operation on dataSet and corresponding physical plan. On Thu, Oct 26, 2017 at 5:54 PM, Michael Armbrust <mich...@databricks.com> wrote: > - dev > > I think you should be able to write an Aggregator > <https://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.sql.expressions.Aggregator>. > You probably want to run in update mode if you are looking for it to output > any group that has changed in the batch. > > On Wed, Oct 25, 2017 at 5:52 PM, Piyush Mukati <piyush.muk...@gmail.com> > wrote: > >> Hi, >> we are migrating some jobs from Dstream to Structured Stream. >> >> Currently to handle aggregations we call map and reducebyKey on each RDD >> like >> rdd.map(event => (event._1, event)).reduceByKey((a, b) => merge(a, b)) >> >> The final output of each RDD is merged to the sink with support for >> aggregation at the sink( Like co-processor at HBase ). >> >> In the new DataSet API, I am not finding any suitable API to aggregate >> over the micro-batch. >> Most of the aggregation API uses state-store and provide global >> aggregations. ( with append mode it does not give the change in existing >> buckets ) >> Problems we are suspecting are : >> 1) state-store is tightly linked to the job definitions. while in our >> case we want may edit the job while keeping the older calculated aggregate >> as it is. >> >> The desired result can be achieved with below dataset APIs. >> dataset.groupByKey(a=>a._1).mapGroups( (key, valueItr) => >> merge(valueItr)) >> while on observing the physical plan it does not call any merge before >> sort. >> >> Anyone aware of API or other workarounds to get the desired result? >> > >
Re: Structured Stream equivalent of reduceByKey
- dev I think you should be able to write an Aggregator <https://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.sql.expressions.Aggregator>. You probably want to run in update mode if you are looking for it to output any group that has changed in the batch. On Wed, Oct 25, 2017 at 5:52 PM, Piyush Mukati <piyush.muk...@gmail.com> wrote: > Hi, > we are migrating some jobs from Dstream to Structured Stream. > > Currently to handle aggregations we call map and reducebyKey on each RDD > like > rdd.map(event => (event._1, event)).reduceByKey((a, b) => merge(a, b)) > > The final output of each RDD is merged to the sink with support for > aggregation at the sink( Like co-processor at HBase ). > > In the new DataSet API, I am not finding any suitable API to aggregate > over the micro-batch. > Most of the aggregation API uses state-store and provide global > aggregations. ( with append mode it does not give the change in existing > buckets ) > Problems we are suspecting are : > 1) state-store is tightly linked to the job definitions. while in our > case we want may edit the job while keeping the older calculated aggregate > as it is. > > The desired result can be achieved with below dataset APIs. > dataset.groupByKey(a=>a._1).mapGroups( (key, valueItr) => merge(valueItr)) > while on observing the physical plan it does not call any merge before > sort. > > Anyone aware of API or other workarounds to get the desired result? >
Structured Stream equivalent of reduceByKey
Hi, we are migrating some jobs from Dstream to Structured Stream. Currently to handle aggregations we call map and reducebyKey on each RDD like rdd.map(event => (event._1, event)).reduceByKey((a, b) => merge(a, b)) The final output of each RDD is merged to the sink with support for aggregation at the sink( Like co-processor at HBase ). In the new DataSet API, I am not finding any suitable API to aggregate over the micro-batch. Most of the aggregation API uses state-store and provide global aggregations. ( with append mode it does not give the change in existing buckets ) Problems we are suspecting are : 1) state-store is tightly linked to the job definitions. while in our case we want may edit the job while keeping the older calculated aggregate as it is. The desired result can be achieved with below dataset APIs. dataset.groupByKey(a=>a._1).mapGroups( (key, valueItr) => merge(valueItr)) while on observing the physical plan it does not call any merge before sort. Anyone aware of API or other workarounds to get the desired result?
Re: reducebykey
Hi Stephen, If you use aggregate functions or reduceGroup on KeyValueGroupedDataset it behaves as reduceByKey on RDD. Only if you use flatMapGroups and mapGroups it behaves as groupByKey on RDD and if you read the API documentation it warns of using the API. Hope this helps. Thanks Ankur On Fri, Apr 7, 2017 at 7:26 AM, Stephen Fletcher <stephen.fletc...@gmail.com > wrote: > Are there plans to add reduceByKey to dataframes, Since switching over to > spark 2 I find myself increasing dissatisfied with the idea of converting > dataframes to RDD to do procedural programming on grouped data(both from a > ease of programming stance and performance stance). So I've been using > Dataframe's experimental groupByKey and flatMapGroups which perform > extremely well, I'm guessing because of the encoders, but the amount of > data being transfers is a little excessive. Is there any plans to port > reduceByKey ( and additionally a reduceByKeyleft and right)? >
reducebykey
Are there plans to add reduceByKey to dataframes, Since switching over to spark 2 I find myself increasing dissatisfied with the idea of converting dataframes to RDD to do procedural programming on grouped data(both from a ease of programming stance and performance stance). So I've been using Dataframe's experimental groupByKey and flatMapGroups which perform extremely well, I'm guessing because of the encoders, but the amount of data being transfers is a little excessive. Is there any plans to port reduceByKey ( and additionally a reduceByKeyleft and right)?
[Spark Core]: flatMap/reduceByKey seems to be quite slow with Long keys on some distributions
Hi all, I'm using Spark to process some corpora and I need to count the occurrence of each 2-gram. I started with counting tuples (wordID1, wordID2) and it worked fine except the large memory usage and gc overhead due to the substantial number of small tuple objects. Then I tried to pack a pair of Int into a Long, and the gc overhead did reduce greatly, but the run time also increased several times. I ran some small experiments with random data on different distributions. It seems that the performance issue only occurs on exponential distributed data. The example code is attached. The job is split into two stages, flatMap() and count(). When counting Tuples, flatMap() takes about 6s and count() takes about 2s, while when counting Longs, flatMap() takes 18s and count() takes 10s. I haven't look into Spark's implementation of flatMap/reduceByKey, but I guess Spark has some specializations for Long keys which happen to perform not very well on some specific distributions. Does anyone have ideas about this? Best wishes, Richard // lines of word IDs val data = (1 to 5000).par.map({ _ => (1 to 1000) map { _ => (-1000 * Math.log(Random.nextDouble)).toInt } }).seq // count Tuples, fast sc parallelize(data) flatMap { line => val first = line.iterator val second = line.iterator.drop(1) for (pair <- first zip(second)) yield (pair, 1L) } reduceByKey { _ + _ } count() // count Long, slow sc parallelize(data) flatMap { line => val first = line.iterator val second = line.iterator.drop(1) for ((a, b) <- first zip(second)) yield ((a.toLong << 32) | b, 1L) } reduceByKey { _ + _ } count()
groupByKey vs reduceByKey
Hi, Read somewhere that groupByKey() in RDD disables map-side aggregation as the aggregation function (appending to a list) does not save any space. However from my understanding, using something like reduceByKey or (CombineByKey + a combiner function,) we could reduce the data shuffled around. Wondering why map-side aggregation is disabled for groupByKey() and why it wouldn’t save space at the executor where data is received after the shuffle. cheers Appu
Access broadcast variable from within function passed to reduceByKey
For example: rows.reduceByKey(reduceKeyMapFunction) reduceKeyMapFunction(log1: Map[String, Long], log2: Map[String, Long]): Map[String,Long] = { val bcast = broadcastv.value val countFields = dbh.getCountFields val aggs: Map[String, Long] = Map() countFields.foreach { f => val valueSum = aggLog1(f) + aggLog2(f) aggs ++ Map(f -> valueSum) } aggs } I can't pass broadcast to the reduceKeyMapFunction. I create the broadcast variable (broadcastv) in the driver but I fear it will not be initialized on the workers where the reduceKeyMapFunction runs. I've tried this but when accessing broadcastv a NPE is thrown. I can't pass it to the reduceKeyMapFunction because it can only accept two params (log1, log2). Any ideas? -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Access-broadcast-variable-from-within-function-passed-to-reduceByKey-tp28082.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe e-mail: user-unsubscr...@spark.apache.org
Re: Spark Streaming- ReduceByKey not removing Duplicates for the same key in a Batch
I haven't tried rdd.distinct. I thought since redcuceByKey itself is not helping me even with a sliding window here ,so i thought rdd.distinct might not help . I will write a minimal code for reproducing the issue and share it with you guys. One another point I want to bring in is that I am unable to reproduce the issue when I am running on my local box , but when I deploy the code in yarn cluster with 34 executors the problem is easily reproduced . Similarly when I am using Spark. CreateStream with one partition the issue is not reproduced and when I am using spark DirectStream to consume kafka with 100 partitions the issue can be easily reproduced. The duplicates are not happening on the same executor as per log print, its happening on different executors . I don't know whether last point helps. On Sun, Nov 13, 2016 at 5:22 AM, ayan guha <guha.a...@gmail.com> wrote: > Have you tried rdd.distinc? > > On Sun, Nov 13, 2016 at 8:28 AM, Cody Koeninger <c...@koeninger.org> > wrote: > >> Can you come up with a minimal reproducible example? >> >> Probably unrelated, but why are you doing a union of 3 streams? >> >> On Sat, Nov 12, 2016 at 10:29 AM, dev loper <spark...@gmail.com> wrote: >> > There are no failures or errors. Irrespective of that I am seeing >> > duplicates. The steps and stages are all successful and even the >> speculation >> > is turned off . >> > >> > On Sat, Nov 12, 2016 at 9:55 PM, Cody Koeninger <c...@koeninger.org> >> wrote: >> >> >> >> Are you certain you aren't getting any failed tasks or other errors? >> >> Output actions like foreach aren't exactly once and will be retried on >> >> failures. >> >> >> >> >> >> On Nov 12, 2016 06:36, "dev loper" <spark...@gmail.com> wrote: >> >>> >> >>> Dear fellow Spark Users, >> >>> >> >>> My Spark Streaming application (Spark 2.0 , on AWS EMR yarn cluster) >> >>> listens to Campaigns based on live stock feeds and the batch duration >> is 5 >> >>> seconds. The applications uses Kafka DirectStream and based on the >> feed >> >>> source there are three streams. As given in the code snippet I am >> doing a >> >>> union of three streams and I am trying to remove the duplicate >> campaigns >> >>> received using reduceByKey based on the customer and campaignId. I >> could see >> >>> lot of duplicate email being send out for the same key in the same >> batch.I >> >>> was expecting reduceByKey to remove the duplicate campaigns in a >> batch based >> >>> on customer and campaignId. In logs I am even printing the the >> key,batch >> >>> time before sending the email and I could clearly see duplicates. I >> could >> >>> see some duplicates getting removed after adding log in reduceByKey >> >>> Function, but its not eliminating completely . >> >>> >> >>> JavaDStream matchedCampaigns = >> >>> stream1.transform(CmpManager::getMatchedCampaigns) >> >>> .union(stream2).union(stream3).cache(); >> >>> >> >>> JavaPairDStream<String, Campaign> uniqueCampaigns = >> >>> matchedCampaigns.mapToPair(campaign->{ >> >>> String key=campaign.getCustomer()+"_"+campaign.getId(); >> >>> return new Tuple2<String, Campaigns>(key, campaign); >> >>> }) >> >>> .reduceByKey((campaign1, campaign2)->{return campaign1;}); >> >>> >> >>> uniqueCampaigns.foreachRDD(CmpManager::sendEmail); >> >>> >> >>> I am not able to figure out where I am going wrong here . Please help >> me >> >>> here to get rid of this weird problem. Previously we were using >> createStream >> >>> for listening to Kafka Queue (number of partitions 1) , there we >> didn't face >> >>> this issue. But when we moved to directStream (number of partitions >> 100) we >> >>> could easily reproduce this issue on high load . >> >>> >> >>> Note: I even tried reduceByKeyAndWindow with duration of 5 seconds >> >>> instead of reduceByKey Operation, But even that didn't >> >>> help.uniqueCampaigns.reduceByKeyAndWindow((c1,c2)=>c1, >> Durations.Seconds(5), >> >>> Durations.Seconds(5)) >> >>> >> >>> I have even requested for help on Stackoverflow , But I haven't >> received >> >>> any solutions to this issue. >> >>> >> >>> Stack Overflow Link >> >>> >> >>> >> >>> https://stackoverflow.com/questions/40559858/spark-streaming >> -reducebykey-not-removing-duplicates-for-the-same-key-in-a-batch >> >>> >> >>> >> >>> Thanks and Regards >> >>> Dev >> > >> > >> >> - >> To unsubscribe e-mail: user-unsubscr...@spark.apache.org >> >> > > > -- > Best Regards, > Ayan Guha >
Re: Spark Streaming- ReduceByKey not removing Duplicates for the same key in a Batch
Have you tried rdd.distinc? On Sun, Nov 13, 2016 at 8:28 AM, Cody Koeninger <c...@koeninger.org> wrote: > Can you come up with a minimal reproducible example? > > Probably unrelated, but why are you doing a union of 3 streams? > > On Sat, Nov 12, 2016 at 10:29 AM, dev loper <spark...@gmail.com> wrote: > > There are no failures or errors. Irrespective of that I am seeing > > duplicates. The steps and stages are all successful and even the > speculation > > is turned off . > > > > On Sat, Nov 12, 2016 at 9:55 PM, Cody Koeninger <c...@koeninger.org> > wrote: > >> > >> Are you certain you aren't getting any failed tasks or other errors? > >> Output actions like foreach aren't exactly once and will be retried on > >> failures. > >> > >> > >> On Nov 12, 2016 06:36, "dev loper" <spark...@gmail.com> wrote: > >>> > >>> Dear fellow Spark Users, > >>> > >>> My Spark Streaming application (Spark 2.0 , on AWS EMR yarn cluster) > >>> listens to Campaigns based on live stock feeds and the batch duration > is 5 > >>> seconds. The applications uses Kafka DirectStream and based on the feed > >>> source there are three streams. As given in the code snippet I am > doing a > >>> union of three streams and I am trying to remove the duplicate > campaigns > >>> received using reduceByKey based on the customer and campaignId. I > could see > >>> lot of duplicate email being send out for the same key in the same > batch.I > >>> was expecting reduceByKey to remove the duplicate campaigns in a batch > based > >>> on customer and campaignId. In logs I am even printing the the > key,batch > >>> time before sending the email and I could clearly see duplicates. I > could > >>> see some duplicates getting removed after adding log in reduceByKey > >>> Function, but its not eliminating completely . > >>> > >>> JavaDStream matchedCampaigns = > >>> stream1.transform(CmpManager::getMatchedCampaigns) > >>> .union(stream2).union(stream3).cache(); > >>> > >>> JavaPairDStream<String, Campaign> uniqueCampaigns = > >>> matchedCampaigns.mapToPair(campaign->{ > >>> String key=campaign.getCustomer()+"_"+campaign.getId(); > >>> return new Tuple2<String, Campaigns>(key, campaign); > >>> }) > >>> .reduceByKey((campaign1, campaign2)->{return campaign1;}); > >>> > >>> uniqueCampaigns.foreachRDD(CmpManager::sendEmail); > >>> > >>> I am not able to figure out where I am going wrong here . Please help > me > >>> here to get rid of this weird problem. Previously we were using > createStream > >>> for listening to Kafka Queue (number of partitions 1) , there we > didn't face > >>> this issue. But when we moved to directStream (number of partitions > 100) we > >>> could easily reproduce this issue on high load . > >>> > >>> Note: I even tried reduceByKeyAndWindow with duration of 5 seconds > >>> instead of reduceByKey Operation, But even that didn't > >>> help.uniqueCampaigns.reduceByKeyAndWindow((c1,c2)=>c1, > Durations.Seconds(5), > >>> Durations.Seconds(5)) > >>> > >>> I have even requested for help on Stackoverflow , But I haven't > received > >>> any solutions to this issue. > >>> > >>> Stack Overflow Link > >>> > >>> > >>> https://stackoverflow.com/questions/40559858/spark- > streaming-reducebykey-not-removing-duplicates-for-the-same-key-in-a-batch > >>> > >>> > >>> Thanks and Regards > >>> Dev > > > > > > - > To unsubscribe e-mail: user-unsubscr...@spark.apache.org > > -- Best Regards, Ayan Guha
Re: Spark Streaming- ReduceByKey not removing Duplicates for the same key in a Batch
Can you come up with a minimal reproducible example? Probably unrelated, but why are you doing a union of 3 streams? On Sat, Nov 12, 2016 at 10:29 AM, dev loper <spark...@gmail.com> wrote: > There are no failures or errors. Irrespective of that I am seeing > duplicates. The steps and stages are all successful and even the speculation > is turned off . > > On Sat, Nov 12, 2016 at 9:55 PM, Cody Koeninger <c...@koeninger.org> wrote: >> >> Are you certain you aren't getting any failed tasks or other errors? >> Output actions like foreach aren't exactly once and will be retried on >> failures. >> >> >> On Nov 12, 2016 06:36, "dev loper" <spark...@gmail.com> wrote: >>> >>> Dear fellow Spark Users, >>> >>> My Spark Streaming application (Spark 2.0 , on AWS EMR yarn cluster) >>> listens to Campaigns based on live stock feeds and the batch duration is 5 >>> seconds. The applications uses Kafka DirectStream and based on the feed >>> source there are three streams. As given in the code snippet I am doing a >>> union of three streams and I am trying to remove the duplicate campaigns >>> received using reduceByKey based on the customer and campaignId. I could see >>> lot of duplicate email being send out for the same key in the same batch.I >>> was expecting reduceByKey to remove the duplicate campaigns in a batch based >>> on customer and campaignId. In logs I am even printing the the key,batch >>> time before sending the email and I could clearly see duplicates. I could >>> see some duplicates getting removed after adding log in reduceByKey >>> Function, but its not eliminating completely . >>> >>> JavaDStream matchedCampaigns = >>> stream1.transform(CmpManager::getMatchedCampaigns) >>> .union(stream2).union(stream3).cache(); >>> >>> JavaPairDStream<String, Campaign> uniqueCampaigns = >>> matchedCampaigns.mapToPair(campaign->{ >>> String key=campaign.getCustomer()+"_"+campaign.getId(); >>> return new Tuple2<String, Campaigns>(key, campaign); >>> }) >>> .reduceByKey((campaign1, campaign2)->{return campaign1;}); >>> >>> uniqueCampaigns.foreachRDD(CmpManager::sendEmail); >>> >>> I am not able to figure out where I am going wrong here . Please help me >>> here to get rid of this weird problem. Previously we were using createStream >>> for listening to Kafka Queue (number of partitions 1) , there we didn't face >>> this issue. But when we moved to directStream (number of partitions 100) we >>> could easily reproduce this issue on high load . >>> >>> Note: I even tried reduceByKeyAndWindow with duration of 5 seconds >>> instead of reduceByKey Operation, But even that didn't >>> help.uniqueCampaigns.reduceByKeyAndWindow((c1,c2)=>c1, Durations.Seconds(5), >>> Durations.Seconds(5)) >>> >>> I have even requested for help on Stackoverflow , But I haven't received >>> any solutions to this issue. >>> >>> Stack Overflow Link >>> >>> >>> https://stackoverflow.com/questions/40559858/spark-streaming-reducebykey-not-removing-duplicates-for-the-same-key-in-a-batch >>> >>> >>> Thanks and Regards >>> Dev > > - To unsubscribe e-mail: user-unsubscr...@spark.apache.org
Re: Spark Streaming- ReduceByKey not removing Duplicates for the same key in a Batch
There are no failures or errors. Irrespective of that I am seeing duplicates. The steps and stages are all successful and even the speculation is turned off . On Sat, Nov 12, 2016 at 9:55 PM, Cody Koeninger <c...@koeninger.org> wrote: > Are you certain you aren't getting any failed tasks or other errors? > Output actions like foreach aren't exactly once and will be retried on > failures. > > On Nov 12, 2016 06:36, "dev loper" <spark...@gmail.com> wrote: > >> Dear fellow Spark Users, >> >> My Spark Streaming application (Spark 2.0 , on AWS EMR yarn cluster) >> listens to Campaigns based on live stock feeds and the batch duration is 5 >> seconds. The applications uses Kafka DirectStream and based on the feed >> source there are three streams. As given in the code snippet I am doing a >> union of three streams and I am trying to remove the duplicate campaigns >> received using reduceByKey based on the customer and campaignId. I could >> see lot of duplicate email being send out for the same key in the same >> batch.I was expecting reduceByKey to remove the duplicate campaigns in a >> batch based on customer and campaignId. In logs I am even printing the the >> key,batch time before sending the email and I could clearly see duplicates. >> I could see some duplicates getting removed after adding log in reduceByKey >> Function, but its not eliminating completely . >> >> JavaDStream matchedCampaigns = >> stream1.transform(CmpManager::getMatchedCampaigns) >> .union(stream2).union(stream3).cache(); >> JavaPairDStream<String, Campaign> uniqueCampaigns = >> matchedCampaigns.mapToPair(campaign->{ >> String key=campaign.getCustomer()+"_"+campaign.getId(); >> return new Tuple2<String, Campaigns>(key, campaign); >> }).reduceByKey((campaign1, campaign2)->{return campaign1;}); >> >> uniqueCampaigns.foreachRDD(CmpManager::sendEmail); >> >> I am not able to figure out where I am going wrong here . Please help me >> here to get rid of this weird problem. Previously we were using >> createStream for listening to Kafka Queue (number of partitions 1) , there >> we didn't face this issue. But when we moved to directStream (number of >> partitions 100) we could easily reproduce this issue on high load . >> >> *Note:* I even tried reduceByKeyAndWindow with duration of 5 seconds >> instead of reduceByKey Operation, But even that didn't help. >> uniqueCampaigns.reduceByKeyAndWindow((c1,c2)=>c1, Durations.Seconds(5), >> Durations.Seconds(5)) >> I have even requested for help on Stackoverflow , But I haven't received >> any solutions to this issue. >> >> >> *Stack Overflow Link* >> https://stackoverflow.com/questions/40559858/spark-streaming >> -reducebykey-not-removing-duplicates-for-the-same-key-in-a-batch >> >> >> Thanks and Regards >> Dev >> >
Re: Spark Streaming- ReduceByKey not removing Duplicates for the same key in a Batch
Are you certain you aren't getting any failed tasks or other errors? Output actions like foreach aren't exactly once and will be retried on failures. On Nov 12, 2016 06:36, "dev loper" <spark...@gmail.com> wrote: > Dear fellow Spark Users, > > My Spark Streaming application (Spark 2.0 , on AWS EMR yarn cluster) > listens to Campaigns based on live stock feeds and the batch duration is 5 > seconds. The applications uses Kafka DirectStream and based on the feed > source there are three streams. As given in the code snippet I am doing a > union of three streams and I am trying to remove the duplicate campaigns > received using reduceByKey based on the customer and campaignId. I could > see lot of duplicate email being send out for the same key in the same > batch.I was expecting reduceByKey to remove the duplicate campaigns in a > batch based on customer and campaignId. In logs I am even printing the the > key,batch time before sending the email and I could clearly see duplicates. > I could see some duplicates getting removed after adding log in reduceByKey > Function, but its not eliminating completely . > > JavaDStream matchedCampaigns = > stream1.transform(CmpManager::getMatchedCampaigns) > .union(stream2).union(stream3).cache(); > JavaPairDStream<String, Campaign> uniqueCampaigns = > matchedCampaigns.mapToPair(campaign->{ > String key=campaign.getCustomer()+"_"+campaign.getId(); > return new Tuple2<String, Campaigns>(key, campaign); > }).reduceByKey((campaign1, campaign2)->{return campaign1;}); > > uniqueCampaigns.foreachRDD(CmpManager::sendEmail); > > I am not able to figure out where I am going wrong here . Please help me > here to get rid of this weird problem. Previously we were using > createStream for listening to Kafka Queue (number of partitions 1) , there > we didn't face this issue. But when we moved to directStream (number of > partitions 100) we could easily reproduce this issue on high load . > > *Note:* I even tried reduceByKeyAndWindow with duration of 5 seconds > instead of reduceByKey Operation, But even that didn't help. > uniqueCampaigns.reduceByKeyAndWindow((c1,c2)=>c1, Durations.Seconds(5), > Durations.Seconds(5)) > I have even requested for help on Stackoverflow , But I haven't received > any solutions to this issue. > > > *Stack Overflow Link* > https://stackoverflow.com/questions/40559858/spark- > streaming-reducebykey-not-removing-duplicates-for-the-same-key-in-a-batch > > > Thanks and Regards > Dev >
Spark Streaming- ReduceByKey not removing Duplicates for the same key in a Batch
Dear fellow Spark Users, My Spark Streaming application (Spark 2.0 , on AWS EMR yarn cluster) listens to Campaigns based on live stock feeds and the batch duration is 5 seconds. The applications uses Kafka DirectStream and based on the feed source there are three streams. As given in the code snippet I am doing a union of three streams and I am trying to remove the duplicate campaigns received using reduceByKey based on the customer and campaignId. I could see lot of duplicate email being send out for the same key in the same batch.I was expecting reduceByKey to remove the duplicate campaigns in a batch based on customer and campaignId. In logs I am even printing the the key,batch time before sending the email and I could clearly see duplicates. I could see some duplicates getting removed after adding log in reduceByKey Function, but its not eliminating completely . JavaDStream matchedCampaigns = stream1.transform(CmpManager::getMatchedCampaigns) .union(stream2).union(stream3).cache(); JavaPairDStream<String, Campaign> uniqueCampaigns = matchedCampaigns.mapToPair(campaign->{ String key=campaign.getCustomer()+"_"+campaign.getId(); return new Tuple2<String, Campaigns>(key, campaign); }).reduceByKey((campaign1, campaign2)->{return campaign1;}); uniqueCampaigns.foreachRDD(CmpManager::sendEmail); I am not able to figure out where I am going wrong here . Please help me here to get rid of this weird problem. Previously we were using createStream for listening to Kafka Queue (number of partitions 1) , there we didn't face this issue. But when we moved to directStream (number of partitions 100) we could easily reproduce this issue on high load . *Note:* I even tried reduceByKeyAndWindow with duration of 5 seconds instead of reduceByKey Operation, But even that didn't help.uniqueCampaigns.reduceByKeyAndWindow((c1,c2)=>c1, Durations.Seconds(5), Durations.Seconds(5)) I have even requested for help on Stackoverflow , But I haven't received any solutions to this issue. *Stack Overflow Link* https://stackoverflow.com/questions/40559858/spark-streaming-reducebykey-not-removing-duplicates-for-the-same-key-in-a-batch Thanks and Regards Dev
Re: Is there a reduceByKey functionality in DataFrame API?
Not really, a grouped DataFrame only provides SQL-like functions like sum and avg (at least in 1.5). > On 29.08.2016, at 14:56, ayan guha <guha.a...@gmail.com> wrote: > > If you are confused because of the name of two APIs. I think DF API name > groupBy came from SQL, but it works similarly as reducebykey. > > On 29 Aug 2016 20:57, "Marius Soutier" <mps@gmail.com > <mailto:mps@gmail.com>> wrote: > In DataFrames (and thus in 1.5 in general) this is not possible, correct? > >> On 11.08.2016, at 05:42, Holden Karau <hol...@pigscanfly.ca >> <mailto:hol...@pigscanfly.ca>> wrote: >> >> Hi Luis, >> >> You might want to consider upgrading to Spark 2.0 - but in Spark 1.6.2 you >> can do groupBy followed by a reduce on the GroupedDataset ( >> http://spark.apache.org/docs/1.6.2/api/scala/index.html#org.apache.spark.sql.GroupedDataset >> >> <http://spark.apache.org/docs/1.6.2/api/scala/index.html#org.apache.spark.sql.GroupedDataset> >> ) - this works on a per-key basis despite the different name. In Spark 2.0 >> you would use groupByKey on the Dataset followed by reduceGroups ( >> http://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.sql.KeyValueGroupedDataset >> >> <http://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.sql.KeyValueGroupedDataset> >> ). >> >> Cheers, >> >> Holden :) >> >> On Wed, Aug 10, 2016 at 5:15 PM, luismattor <luismat...@gmail.com >> <mailto:luismat...@gmail.com>> wrote: >> Hi everyone, >> >> Consider the following code: >> >> val result = df.groupBy("col1").agg(min("col2")) >> >> I know that rdd.reduceByKey(func) produces the same RDD as >> rdd.groupByKey().mapValues(value => value.reduce(func)) However reducerByKey >> is more efficient as it avoids shipping each value to the reducer doing the >> aggregation (it ships partial aggregations instead). >> >> I wonder whether the DataFrame API optimizes the code doing something >> similar to what RDD.reduceByKey does. >> >> I am using Spark 1.6.2. >> >> Regards, >> Luis >> >> >> >> -- >> View this message in context: >> http://apache-spark-user-list.1001560.n3.nabble.com/Is-there-a-reduceByKey-functionality-in-DataFrame-API-tp27508.html >> >> <http://apache-spark-user-list.1001560.n3.nabble.com/Is-there-a-reduceByKey-functionality-in-DataFrame-API-tp27508.html> >> Sent from the Apache Spark User List mailing list archive at Nabble.com >> <http://nabble.com/>. >> >> - >> To unsubscribe e-mail: user-unsubscr...@spark.apache.org >> <mailto:user-unsubscr...@spark.apache.org> >> >> >> >> >> -- >> Cell : 425-233-8271 >> Twitter: https://twitter.com/holdenkarau <https://twitter.com/holdenkarau>
Re: Is there a reduceByKey functionality in DataFrame API?
If you are confused because of the name of two APIs. I think DF API name groupBy came from SQL, but it works similarly as reducebykey. On 29 Aug 2016 20:57, "Marius Soutier" <mps@gmail.com> wrote: > In DataFrames (and thus in 1.5 in general) this is not possible, correct? > > On 11.08.2016, at 05:42, Holden Karau <hol...@pigscanfly.ca> wrote: > > Hi Luis, > > You might want to consider upgrading to Spark 2.0 - but in Spark 1.6.2 you > can do groupBy followed by a reduce on the GroupedDataset ( > http://spark.apache.org/docs/1.6.2/api/scala/index. > html#org.apache.spark.sql.GroupedDataset ) - this works on a per-key > basis despite the different name. In Spark 2.0 you would use groupByKey on > the Dataset followed by reduceGroups ( http://spark.apache.org/ > docs/latest/api/scala/index.html#org.apache.spark.sql. > KeyValueGroupedDataset ). > > Cheers, > > Holden :) > > On Wed, Aug 10, 2016 at 5:15 PM, luismattor <luismat...@gmail.com> wrote: > >> Hi everyone, >> >> Consider the following code: >> >> val result = df.groupBy("col1").agg(min("col2")) >> >> I know that rdd.reduceByKey(func) produces the same RDD as >> rdd.groupByKey().mapValues(value => value.reduce(func)) However >> reducerByKey >> is more efficient as it avoids shipping each value to the reducer doing >> the >> aggregation (it ships partial aggregations instead). >> >> I wonder whether the DataFrame API optimizes the code doing something >> similar to what RDD.reduceByKey does. >> >> I am using Spark 1.6.2. >> >> Regards, >> Luis >> >> >> >> -- >> View this message in context: http://apache-spark-user-list. >> 1001560.n3.nabble.com/Is-there-a-reduceByKey-functionality- >> in-DataFrame-API-tp27508.html >> Sent from the Apache Spark User List mailing list archive at Nabble.com >> <http://nabble.com>. >> >> - >> To unsubscribe e-mail: user-unsubscr...@spark.apache.org >> >> > > > -- > Cell : 425-233-8271 > Twitter: https://twitter.com/holdenkarau > > >
Re: Is there a reduceByKey functionality in DataFrame API?
In DataFrames (and thus in 1.5 in general) this is not possible, correct? > On 11.08.2016, at 05:42, Holden Karau <hol...@pigscanfly.ca> wrote: > > Hi Luis, > > You might want to consider upgrading to Spark 2.0 - but in Spark 1.6.2 you > can do groupBy followed by a reduce on the GroupedDataset ( > http://spark.apache.org/docs/1.6.2/api/scala/index.html#org.apache.spark.sql.GroupedDataset > > <http://spark.apache.org/docs/1.6.2/api/scala/index.html#org.apache.spark.sql.GroupedDataset> > ) - this works on a per-key basis despite the different name. In Spark 2.0 > you would use groupByKey on the Dataset followed by reduceGroups ( > http://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.sql.KeyValueGroupedDataset > > <http://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.sql.KeyValueGroupedDataset> > ). > > Cheers, > > Holden :) > > On Wed, Aug 10, 2016 at 5:15 PM, luismattor <luismat...@gmail.com > <mailto:luismat...@gmail.com>> wrote: > Hi everyone, > > Consider the following code: > > val result = df.groupBy("col1").agg(min("col2")) > > I know that rdd.reduceByKey(func) produces the same RDD as > rdd.groupByKey().mapValues(value => value.reduce(func)) However reducerByKey > is more efficient as it avoids shipping each value to the reducer doing the > aggregation (it ships partial aggregations instead). > > I wonder whether the DataFrame API optimizes the code doing something > similar to what RDD.reduceByKey does. > > I am using Spark 1.6.2. > > Regards, > Luis > > > > -- > View this message in context: > http://apache-spark-user-list.1001560.n3.nabble.com/Is-there-a-reduceByKey-functionality-in-DataFrame-API-tp27508.html > > <http://apache-spark-user-list.1001560.n3.nabble.com/Is-there-a-reduceByKey-functionality-in-DataFrame-API-tp27508.html> > Sent from the Apache Spark User List mailing list archive at Nabble.com. > > - > To unsubscribe e-mail: user-unsubscr...@spark.apache.org > <mailto:user-unsubscr...@spark.apache.org> > > > > > -- > Cell : 425-233-8271 > Twitter: https://twitter.com/holdenkarau <https://twitter.com/holdenkarau>
Re: Is there a reduceByKey functionality in DataFrame API?
Hi Luis, You might want to consider upgrading to Spark 2.0 - but in Spark 1.6.2 you can do groupBy followed by a reduce on the GroupedDataset ( http://spark.apache.org/docs/1.6.2/api/scala/index.html#org.apache.spark.sql.GroupedDataset ) - this works on a per-key basis despite the different name. In Spark 2.0 you would use groupByKey on the Dataset followed by reduceGroups ( http://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.sql.KeyValueGroupedDataset ). Cheers, Holden :) On Wed, Aug 10, 2016 at 5:15 PM, luismattor <luismat...@gmail.com> wrote: > Hi everyone, > > Consider the following code: > > val result = df.groupBy("col1").agg(min("col2")) > > I know that rdd.reduceByKey(func) produces the same RDD as > rdd.groupByKey().mapValues(value => value.reduce(func)) However > reducerByKey > is more efficient as it avoids shipping each value to the reducer doing the > aggregation (it ships partial aggregations instead). > > I wonder whether the DataFrame API optimizes the code doing something > similar to what RDD.reduceByKey does. > > I am using Spark 1.6.2. > > Regards, > Luis > > > > -- > View this message in context: http://apache-spark-user-list. > 1001560.n3.nabble.com/Is-there-a-reduceByKey-functionality-in-DataFrame- > API-tp27508.html > Sent from the Apache Spark User List mailing list archive at Nabble.com. > > - > To unsubscribe e-mail: user-unsubscr...@spark.apache.org > > -- Cell : 425-233-8271 Twitter: https://twitter.com/holdenkarau
Is there a reduceByKey functionality in DataFrame API?
Hi everyone, Consider the following code: val result = df.groupBy("col1").agg(min("col2")) I know that rdd.reduceByKey(func) produces the same RDD as rdd.groupByKey().mapValues(value => value.reduce(func)) However reducerByKey is more efficient as it avoids shipping each value to the reducer doing the aggregation (it ships partial aggregations instead). I wonder whether the DataFrame API optimizes the code doing something similar to what RDD.reduceByKey does. I am using Spark 1.6.2. Regards, Luis -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Is-there-a-reduceByKey-functionality-in-DataFrame-API-tp27508.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe e-mail: user-unsubscr...@spark.apache.org
Re: Spark 1.5.2 - Different results from reduceByKey over multiple iterations
SO it was indeed my merge function. I created new result object for every merge and its working now. Thanks On Wed, Jun 22, 2016 at 3:46 PM, Nirav Patel <npa...@xactlycorp.com> wrote: > PS. In my reduceByKey operation I have two mutable object. What I do is > merge mutable2 into mutable1 and return mutable1. I read that it works for > aggregateByKey so thought it will work for reduceByKey as well. I might be > wrong here. Can someone verify if this will work or be un predictable? > > On Wed, Jun 22, 2016 at 11:52 AM, Nirav Patel <npa...@xactlycorp.com> > wrote: > >> Hi, >> >> I do not see any indication of errors or executor getting killed in spark >> UI - jobs, stages, event timelines. No task failures. I also don't see any >> errors in executor logs. >> >> Thanks >> >> On Wed, Jun 22, 2016 at 2:32 AM, Ted Yu <yuzhih...@gmail.com> wrote: >> >>> For the run which returned incorrect result, did you observe any error >>> (on workers) ? >>> >>> Cheers >>> >>> On Tue, Jun 21, 2016 at 10:42 PM, Nirav Patel <npa...@xactlycorp.com> >>> wrote: >>> >>>> I have an RDD[String, MyObj] which is a result of Join + Map operation. >>>> It has no partitioner info. I run reduceByKey without passing any >>>> Partitioner or partition counts. I observed that output aggregation result >>>> for given key is incorrect sometime. like 1 out of 5 times. It looks like >>>> reduce operation is joining values from two different keys. There is no >>>> configuration change between multiple runs. I am scratching my head over >>>> this. I verified results by printing out RDD before and after reduce >>>> operation; collecting subset at driver. >>>> >>>> Besides shuffle and storage memory fraction I use following options: >>>> >>>> sparkConf.set("spark.driver.userClassPathFirst","true") >>>> sparkConf.set("spark.unsafe.offHeap","true") >>>> sparkConf.set("spark.reducer.maxSizeInFlight","128m") >>>> sparkConf.set("spark.serializer", >>>> "org.apache.spark.serializer.KryoSerializer") >>>> >>>> >>>> >>>> [image: What's New with Xactly] >>>> <http://www.xactlycorp.com/email-click/> >>>> >>>> <https://www.nyse.com/quote/XNYS:XTLY> [image: LinkedIn] >>>> <https://www.linkedin.com/company/xactly-corporation> [image: Twitter] >>>> <https://twitter.com/Xactly> [image: Facebook] >>>> <https://www.facebook.com/XactlyCorp> [image: YouTube] >>>> <http://www.youtube.com/xactlycorporation> >>> >>> >>> >> > -- [image: What's New with Xactly] <http://www.xactlycorp.com/email-click/> <https://www.nyse.com/quote/XNYS:XTLY> [image: LinkedIn] <https://www.linkedin.com/company/xactly-corporation> [image: Twitter] <https://twitter.com/Xactly> [image: Facebook] <https://www.facebook.com/XactlyCorp> [image: YouTube] <http://www.youtube.com/xactlycorporation>
Re: Spark 1.5.2 - Different results from reduceByKey over multiple iterations
PS. In my reduceByKey operation I have two mutable object. What I do is merge mutable2 into mutable1 and return mutable1. I read that it works for aggregateByKey so thought it will work for reduceByKey as well. I might be wrong here. Can someone verify if this will work or be un predictable? On Wed, Jun 22, 2016 at 11:52 AM, Nirav Patel <npa...@xactlycorp.com> wrote: > Hi, > > I do not see any indication of errors or executor getting killed in spark > UI - jobs, stages, event timelines. No task failures. I also don't see any > errors in executor logs. > > Thanks > > On Wed, Jun 22, 2016 at 2:32 AM, Ted Yu <yuzhih...@gmail.com> wrote: > >> For the run which returned incorrect result, did you observe any error >> (on workers) ? >> >> Cheers >> >> On Tue, Jun 21, 2016 at 10:42 PM, Nirav Patel <npa...@xactlycorp.com> >> wrote: >> >>> I have an RDD[String, MyObj] which is a result of Join + Map operation. >>> It has no partitioner info. I run reduceByKey without passing any >>> Partitioner or partition counts. I observed that output aggregation result >>> for given key is incorrect sometime. like 1 out of 5 times. It looks like >>> reduce operation is joining values from two different keys. There is no >>> configuration change between multiple runs. I am scratching my head over >>> this. I verified results by printing out RDD before and after reduce >>> operation; collecting subset at driver. >>> >>> Besides shuffle and storage memory fraction I use following options: >>> >>> sparkConf.set("spark.driver.userClassPathFirst","true") >>> sparkConf.set("spark.unsafe.offHeap","true") >>> sparkConf.set("spark.reducer.maxSizeInFlight","128m") >>> sparkConf.set("spark.serializer", >>> "org.apache.spark.serializer.KryoSerializer") >>> >>> >>> >>> [image: What's New with Xactly] <http://www.xactlycorp.com/email-click/> >>> >>> <https://www.nyse.com/quote/XNYS:XTLY> [image: LinkedIn] >>> <https://www.linkedin.com/company/xactly-corporation> [image: Twitter] >>> <https://twitter.com/Xactly> [image: Facebook] >>> <https://www.facebook.com/XactlyCorp> [image: YouTube] >>> <http://www.youtube.com/xactlycorporation> >> >> >> > -- [image: What's New with Xactly] <http://www.xactlycorp.com/email-click/> <https://www.nyse.com/quote/XNYS:XTLY> [image: LinkedIn] <https://www.linkedin.com/company/xactly-corporation> [image: Twitter] <https://twitter.com/Xactly> [image: Facebook] <https://www.facebook.com/XactlyCorp> [image: YouTube] <http://www.youtube.com/xactlycorporation>
Re: Spark 1.5.2 - Different results from reduceByKey over multiple iterations
Hi, I do not see any indication of errors or executor getting killed in spark UI - jobs, stages, event timelines. No task failures. I also don't see any errors in executor logs. Thanks On Wed, Jun 22, 2016 at 2:32 AM, Ted Yu <yuzhih...@gmail.com> wrote: > For the run which returned incorrect result, did you observe any error (on > workers) ? > > Cheers > > On Tue, Jun 21, 2016 at 10:42 PM, Nirav Patel <npa...@xactlycorp.com> > wrote: > >> I have an RDD[String, MyObj] which is a result of Join + Map operation. >> It has no partitioner info. I run reduceByKey without passing any >> Partitioner or partition counts. I observed that output aggregation result >> for given key is incorrect sometime. like 1 out of 5 times. It looks like >> reduce operation is joining values from two different keys. There is no >> configuration change between multiple runs. I am scratching my head over >> this. I verified results by printing out RDD before and after reduce >> operation; collecting subset at driver. >> >> Besides shuffle and storage memory fraction I use following options: >> >> sparkConf.set("spark.driver.userClassPathFirst","true") >> sparkConf.set("spark.unsafe.offHeap","true") >> sparkConf.set("spark.reducer.maxSizeInFlight","128m") >> sparkConf.set("spark.serializer", >> "org.apache.spark.serializer.KryoSerializer") >> >> >> >> [image: What's New with Xactly] <http://www.xactlycorp.com/email-click/> >> >> <https://www.nyse.com/quote/XNYS:XTLY> [image: LinkedIn] >> <https://www.linkedin.com/company/xactly-corporation> [image: Twitter] >> <https://twitter.com/Xactly> [image: Facebook] >> <https://www.facebook.com/XactlyCorp> [image: YouTube] >> <http://www.youtube.com/xactlycorporation> > > > -- [image: What's New with Xactly] <http://www.xactlycorp.com/email-click/> <https://www.nyse.com/quote/XNYS:XTLY> [image: LinkedIn] <https://www.linkedin.com/company/xactly-corporation> [image: Twitter] <https://twitter.com/Xactly> [image: Facebook] <https://www.facebook.com/XactlyCorp> [image: YouTube] <http://www.youtube.com/xactlycorporation>
Re: Spark 1.5.2 - Different results from reduceByKey over multiple iterations
For the run which returned incorrect result, did you observe any error (on workers) ? Cheers On Tue, Jun 21, 2016 at 10:42 PM, Nirav Patel <npa...@xactlycorp.com> wrote: > I have an RDD[String, MyObj] which is a result of Join + Map operation. It > has no partitioner info. I run reduceByKey without passing any Partitioner > or partition counts. I observed that output aggregation result for given > key is incorrect sometime. like 1 out of 5 times. It looks like reduce > operation is joining values from two different keys. There is no > configuration change between multiple runs. I am scratching my head over > this. I verified results by printing out RDD before and after reduce > operation; collecting subset at driver. > > Besides shuffle and storage memory fraction I use following options: > > sparkConf.set("spark.driver.userClassPathFirst","true") > sparkConf.set("spark.unsafe.offHeap","true") > sparkConf.set("spark.reducer.maxSizeInFlight","128m") > sparkConf.set("spark.serializer", > "org.apache.spark.serializer.KryoSerializer") > > > > [image: What's New with Xactly] <http://www.xactlycorp.com/email-click/> > > <https://www.nyse.com/quote/XNYS:XTLY> [image: LinkedIn] > <https://www.linkedin.com/company/xactly-corporation> [image: Twitter] > <https://twitter.com/Xactly> [image: Facebook] > <https://www.facebook.com/XactlyCorp> [image: YouTube] > <http://www.youtube.com/xactlycorporation>
Re: Spark 1.5.2 - Different results from reduceByKey over multiple iterations
Hi, Could you check the issue also occurs in v1.6.1 and v2.0? // maropu On Wed, Jun 22, 2016 at 2:42 PM, Nirav Patel <npa...@xactlycorp.com> wrote: > I have an RDD[String, MyObj] which is a result of Join + Map operation. It > has no partitioner info. I run reduceByKey without passing any Partitioner > or partition counts. I observed that output aggregation result for given > key is incorrect sometime. like 1 out of 5 times. It looks like reduce > operation is joining values from two different keys. There is no > configuration change between multiple runs. I am scratching my head over > this. I verified results by printing out RDD before and after reduce > operation; collecting subset at driver. > > Besides shuffle and storage memory fraction I use following options: > > sparkConf.set("spark.driver.userClassPathFirst","true") > sparkConf.set("spark.unsafe.offHeap","true") > sparkConf.set("spark.reducer.maxSizeInFlight","128m") > sparkConf.set("spark.serializer", > "org.apache.spark.serializer.KryoSerializer") > > > > [image: What's New with Xactly] <http://www.xactlycorp.com/email-click/> > > <https://www.nyse.com/quote/XNYS:XTLY> [image: LinkedIn] > <https://www.linkedin.com/company/xactly-corporation> [image: Twitter] > <https://twitter.com/Xactly> [image: Facebook] > <https://www.facebook.com/XactlyCorp> [image: YouTube] > <http://www.youtube.com/xactlycorporation> -- --- Takeshi Yamamuro
Spark 1.5.2 - Different results from reduceByKey over multiple iterations
I have an RDD[String, MyObj] which is a result of Join + Map operation. It has no partitioner info. I run reduceByKey without passing any Partitioner or partition counts. I observed that output aggregation result for given key is incorrect sometime. like 1 out of 5 times. It looks like reduce operation is joining values from two different keys. There is no configuration change between multiple runs. I am scratching my head over this. I verified results by printing out RDD before and after reduce operation; collecting subset at driver. Besides shuffle and storage memory fraction I use following options: sparkConf.set("spark.driver.userClassPathFirst","true") sparkConf.set("spark.unsafe.offHeap","true") sparkConf.set("spark.reducer.maxSizeInFlight","128m") sparkConf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer") -- [image: What's New with Xactly] <http://www.xactlycorp.com/email-click/> <https://www.nyse.com/quote/XNYS:XTLY> [image: LinkedIn] <https://www.linkedin.com/company/xactly-corporation> [image: Twitter] <https://twitter.com/Xactly> [image: Facebook] <https://www.facebook.com/XactlyCorp> [image: YouTube] <http://www.youtube.com/xactlycorporation>
Re: Dataset - reduceByKey
Hi Bryan, What about groupBy [1] and agg [2]? What about UserDefinedAggregateFunction [3]? [1] https://home.apache.org/~pwendell/spark-nightly/spark-master-docs/latest/api/scala/index.html#org.apache.spark.sql.Dataset@groupBy(col1:String,cols:String*):org.apache.spark.sql.RelationalGroupedDataset [2] https://home.apache.org/~pwendell/spark-nightly/spark-master-docs/latest/api/scala/index.html#org.apache.spark.sql.RelationalGroupedDataset [3] https://home.apache.org/~pwendell/spark-nightly/spark-master-docs/latest/api/scala/index.html#org.apache.spark.sql.expressions.UserDefinedAggregateFunction Pozdrawiam, Jacek Laskowski https://medium.com/@jaceklaskowski/ Mastering Apache Spark http://bit.ly/mastering-apache-spark Follow me at https://twitter.com/jaceklaskowski On Tue, Jun 7, 2016 at 8:32 PM, Bryan Jeffrey <bryan.jeff...@gmail.com> wrote: > Hello. > > I am looking at the option of moving RDD based operations to Dataset based > operations. We are calling 'reduceByKey' on some pair RDDs we have. What > would the equivalent be in the Dataset interface - I do not see a simple > reduceByKey replacement. > > Regards, > > Bryan Jeffrey > - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Dataset - reduceByKey
All, Thank you for the replies. It seems as though the Dataset API is still far behind the RDD API. This is unfortunate as the Dataset API potentially provides a number of performance benefits. I will move to using it in a more limited set of cases for the moment. Thank you! Bryan Jeffrey On Tue, Jun 7, 2016 at 2:50 PM, Richard Marscher <rmarsc...@localytics.com> wrote: > There certainly are some gaps between the richness of the RDD API and the > Dataset API. I'm also migrating from RDD to Dataset and ran into > reduceByKey and join scenarios. > > In the spark-dev list, one person was discussing reduceByKey being > sub-optimal at the moment and it spawned this JIRA > https://issues.apache.org/jira/browse/SPARK-15598. But you might be able > to get by with groupBy().reduce() for now, check performance though. > > As for join, the approach would be using the joinWith function on Dataset. > Although the API isn't as sugary as it was for RDD IMO, something which > I've been discussing in a separate thread as well. I can't find a weblink > for it but the thread subject is "Dataset Outer Join vs RDD Outer Join". > > On Tue, Jun 7, 2016 at 2:40 PM, Bryan Jeffrey <bryan.jeff...@gmail.com> > wrote: > >> It would also be nice if there was a better example of joining two >> Datasets. I am looking at the documentation here: >> http://spark.apache.org/docs/latest/sql-programming-guide.html. It seems >> a little bit sparse - is there a better documentation source? >> >> Regards, >> >> Bryan Jeffrey >> >> On Tue, Jun 7, 2016 at 2:32 PM, Bryan Jeffrey <bryan.jeff...@gmail.com> >> wrote: >> >>> Hello. >>> >>> I am looking at the option of moving RDD based operations to Dataset >>> based operations. We are calling 'reduceByKey' on some pair RDDs we have. >>> What would the equivalent be in the Dataset interface - I do not see a >>> simple reduceByKey replacement. >>> >>> Regards, >>> >>> Bryan Jeffrey >>> >>> >> > > > -- > *Richard Marscher* > Senior Software Engineer > Localytics > Localytics.com <http://localytics.com/> | Our Blog > <http://localytics.com/blog> | Twitter <http://twitter.com/localytics> | > Facebook <http://facebook.com/localytics> | LinkedIn > <http://www.linkedin.com/company/1148792?trk=tyah> >
Re: Dataset - reduceByKey
There certainly are some gaps between the richness of the RDD API and the Dataset API. I'm also migrating from RDD to Dataset and ran into reduceByKey and join scenarios. In the spark-dev list, one person was discussing reduceByKey being sub-optimal at the moment and it spawned this JIRA https://issues.apache.org/jira/browse/SPARK-15598. But you might be able to get by with groupBy().reduce() for now, check performance though. As for join, the approach would be using the joinWith function on Dataset. Although the API isn't as sugary as it was for RDD IMO, something which I've been discussing in a separate thread as well. I can't find a weblink for it but the thread subject is "Dataset Outer Join vs RDD Outer Join". On Tue, Jun 7, 2016 at 2:40 PM, Bryan Jeffrey <bryan.jeff...@gmail.com> wrote: > It would also be nice if there was a better example of joining two > Datasets. I am looking at the documentation here: > http://spark.apache.org/docs/latest/sql-programming-guide.html. It seems > a little bit sparse - is there a better documentation source? > > Regards, > > Bryan Jeffrey > > On Tue, Jun 7, 2016 at 2:32 PM, Bryan Jeffrey <bryan.jeff...@gmail.com> > wrote: > >> Hello. >> >> I am looking at the option of moving RDD based operations to Dataset >> based operations. We are calling 'reduceByKey' on some pair RDDs we have. >> What would the equivalent be in the Dataset interface - I do not see a >> simple reduceByKey replacement. >> >> Regards, >> >> Bryan Jeffrey >> >> > -- *Richard Marscher* Senior Software Engineer Localytics Localytics.com <http://localytics.com/> | Our Blog <http://localytics.com/blog> | Twitter <http://twitter.com/localytics> | Facebook <http://facebook.com/localytics> | LinkedIn <http://www.linkedin.com/company/1148792?trk=tyah>
Re: Dataset - reduceByKey
Seems you can see docs for 2.0 for now; https://home.apache.org/~pwendell/spark-nightly/spark-branch-2.0-docs/spark-2.0.0-SNAPSHOT-2016_06_07_07_01-1e2c931-docs/ // maropu On Tue, Jun 7, 2016 at 11:40 AM, Bryan Jeffrey <bryan.jeff...@gmail.com> wrote: > It would also be nice if there was a better example of joining two > Datasets. I am looking at the documentation here: > http://spark.apache.org/docs/latest/sql-programming-guide.html. It seems > a little bit sparse - is there a better documentation source? > > Regards, > > Bryan Jeffrey > > On Tue, Jun 7, 2016 at 2:32 PM, Bryan Jeffrey <bryan.jeff...@gmail.com> > wrote: > >> Hello. >> >> I am looking at the option of moving RDD based operations to Dataset >> based operations. We are calling 'reduceByKey' on some pair RDDs we have. >> What would the equivalent be in the Dataset interface - I do not see a >> simple reduceByKey replacement. >> >> Regards, >> >> Bryan Jeffrey >> >> > -- --- Takeshi Yamamuro
Re: Dataset - reduceByKey
It would also be nice if there was a better example of joining two Datasets. I am looking at the documentation here: http://spark.apache.org/docs/latest/sql-programming-guide.html. It seems a little bit sparse - is there a better documentation source? Regards, Bryan Jeffrey On Tue, Jun 7, 2016 at 2:32 PM, Bryan Jeffrey <bryan.jeff...@gmail.com> wrote: > Hello. > > I am looking at the option of moving RDD based operations to Dataset based > operations. We are calling 'reduceByKey' on some pair RDDs we have. What > would the equivalent be in the Dataset interface - I do not see a simple > reduceByKey replacement. > > Regards, > > Bryan Jeffrey > >
Dataset - reduceByKey
Hello. I am looking at the option of moving RDD based operations to Dataset based operations. We are calling 'reduceByKey' on some pair RDDs we have. What would the equivalent be in the Dataset interface - I do not see a simple reduceByKey replacement. Regards, Bryan Jeffrey
How to change Spark DataFrame groupby("col1",..,"coln") into reduceByKey()?
Hi I have Spark job which does group by and I cant avoid it because of my use case. I have large dataset around 1 TB which I need to process/update in DataFrame. Now my jobs shuffles huge data and slows things because of shuffling and groupby. One reason I see is my data is skew some of my group by keys are empty. How do I avoid empty group by keys in DataFrame? Does DataFrame avoid empty group by key? I have around 8 keys on which I do group by. sourceFrame.select("blabla").groupby("col1","col2","col3",..."col8").agg("bla bla"); How do I change above code into using reduceByKey() can we apply aggregation on reduceByKey()? Please guide. Thanks in advance. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/How-to-change-Spark-DataFrame-groupby-col1-coln-into-reduceByKey-tp26998.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Executor memory requirement for reduceByKey
Even though it does not sound intuitive, reduce by key expects all values for a particular key for a partition to be loaded into memory. So once you increase the partitions you can run the jobs.
Re: Executor memory requirement for reduceByKey
Ok, so that worked flawlessly after I upped the number of partitions to 400 from 40. Thanks! On Fri, May 13, 2016 at 7:28 PM, Sung Hwan Chung <coded...@cs.stanford.edu> wrote: > I'll try that, as of now I have a small number of partitions in the order > of 20~40. > > It would be great if there's some documentation on the memory requirement > wrt the number of keys and the number of partitions per executor (i.e., the > Spark's internal memory requirement outside of the user space). > > Otherwise, it's like shooting in the dark. > > On Fri, May 13, 2016 at 7:20 PM, Ted Yu <yuzhih...@gmail.com> wrote: > >> Have you taken a look at SPARK-11293 ? >> >> Consider using repartition to increase the number of partitions. >> >> FYI >> >> On Fri, May 13, 2016 at 12:14 PM, Sung Hwan Chung < >> coded...@cs.stanford.edu> wrote: >> >>> Hello, >>> >>> I'm using Spark version 1.6.0 and have trouble with memory when trying >>> to do reducebykey on a dataset with as many as 75 million keys. I.e. I get >>> the following exception when I run the task. >>> >>> There are 20 workers in the cluster. It is running under the standalone >>> mode with 12 GB assigned per executor and 4 cores per worker. The >>> spark.memory.fraction is set to 0.5 and I'm not using any caching. >>> >>> What might be the problem here? Since I'm using the version 1.6.0, this >>> doesn't seem to be related to SPARK-12155. This problem always happens >>> during the shuffle read phase. >>> >>> Is there a minimum amount of memory required for executor >>> (spark.memory.fraction) for shuffle read? >>> >>> java.lang.OutOfMemoryError: Unable to acquire 262144 bytes of memory, got 0 >>> at >>> org.apache.spark.memory.MemoryConsumer.allocateArray(MemoryConsumer.java:91) >>> at >>> org.apache.spark.unsafe.map.BytesToBytesMap.allocate(BytesToBytesMap.java:735) >>> at >>> org.apache.spark.unsafe.map.BytesToBytesMap.(BytesToBytesMap.java:197) >>> at >>> org.apache.spark.unsafe.map.BytesToBytesMap.(BytesToBytesMap.java:212) >>> at >>> org.apache.spark.sql.execution.UnsafeFixedWidthAggregationMap.(UnsafeFixedWidthAggregationMap.java:103) >>> at >>> org.apache.spark.sql.execution.aggregate.TungstenAggregationIterator.(TungstenAggregationIterator.scala:483) >>> at >>> org.apache.spark.sql.execution.aggregate.TungstenAggregate$$anonfun$doExecute$1$$anonfun$2.apply(TungstenAggregate.scala:95) >>> at >>> org.apache.spark.sql.execution.aggregate.TungstenAggregate$$anonfun$doExecute$1$$anonfun$2.apply(TungstenAggregate.scala:86) >>> at >>> org.apache.spark.rdd.RDD$$anonfun$mapPartitions$1$$anonfun$apply$20.apply(RDD.scala:710) >>> at >>> org.apache.spark.rdd.RDD$$anonfun$mapPartitions$1$$anonfun$apply$20.apply(RDD.scala:710) >>> at >>> org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38) >>> at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306) >>> at org.apache.spark.rdd.RDD.iterator(RDD.scala:270) >>> at >>> org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38) >>> at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306) >>> at org.apache.spark.rdd.RDD.iterator(RDD.scala:270) >>> at >>> org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:73) >>> at >>> org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41) >>> at org.apache.spark.scheduler.Task.run(Task.scala:89) >>> at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:213) >>> at >>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) >>> at >>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) >>> at java.lang.Thread.run(Thread.java:745) >>> >>> >> >
Re: Executor memory requirement for reduceByKey
I'll try that, as of now I have a small number of partitions in the order of 20~40. It would be great if there's some documentation on the memory requirement wrt the number of keys and the number of partitions per executor (i.e., the Spark's internal memory requirement outside of the user space). Otherwise, it's like shooting in the dark. On Fri, May 13, 2016 at 7:20 PM, Ted Yu <yuzhih...@gmail.com> wrote: > Have you taken a look at SPARK-11293 ? > > Consider using repartition to increase the number of partitions. > > FYI > > On Fri, May 13, 2016 at 12:14 PM, Sung Hwan Chung < > coded...@cs.stanford.edu> wrote: > >> Hello, >> >> I'm using Spark version 1.6.0 and have trouble with memory when trying to >> do reducebykey on a dataset with as many as 75 million keys. I.e. I get the >> following exception when I run the task. >> >> There are 20 workers in the cluster. It is running under the standalone >> mode with 12 GB assigned per executor and 4 cores per worker. The >> spark.memory.fraction is set to 0.5 and I'm not using any caching. >> >> What might be the problem here? Since I'm using the version 1.6.0, this >> doesn't seem to be related to SPARK-12155. This problem always happens >> during the shuffle read phase. >> >> Is there a minimum amount of memory required for executor >> (spark.memory.fraction) for shuffle read? >> >> java.lang.OutOfMemoryError: Unable to acquire 262144 bytes of memory, got 0 >> at >> org.apache.spark.memory.MemoryConsumer.allocateArray(MemoryConsumer.java:91) >> at >> org.apache.spark.unsafe.map.BytesToBytesMap.allocate(BytesToBytesMap.java:735) >> at >> org.apache.spark.unsafe.map.BytesToBytesMap.(BytesToBytesMap.java:197) >> at >> org.apache.spark.unsafe.map.BytesToBytesMap.(BytesToBytesMap.java:212) >> at >> org.apache.spark.sql.execution.UnsafeFixedWidthAggregationMap.(UnsafeFixedWidthAggregationMap.java:103) >> at >> org.apache.spark.sql.execution.aggregate.TungstenAggregationIterator.(TungstenAggregationIterator.scala:483) >> at >> org.apache.spark.sql.execution.aggregate.TungstenAggregate$$anonfun$doExecute$1$$anonfun$2.apply(TungstenAggregate.scala:95) >> at >> org.apache.spark.sql.execution.aggregate.TungstenAggregate$$anonfun$doExecute$1$$anonfun$2.apply(TungstenAggregate.scala:86) >> at >> org.apache.spark.rdd.RDD$$anonfun$mapPartitions$1$$anonfun$apply$20.apply(RDD.scala:710) >> at >> org.apache.spark.rdd.RDD$$anonfun$mapPartitions$1$$anonfun$apply$20.apply(RDD.scala:710) >> at >> org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38) >> at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306) >> at org.apache.spark.rdd.RDD.iterator(RDD.scala:270) >> at >> org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38) >> at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306) >> at org.apache.spark.rdd.RDD.iterator(RDD.scala:270) >> at >> org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:73) >> at >> org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41) >> at org.apache.spark.scheduler.Task.run(Task.scala:89) >> at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:213) >> at >> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) >> at >> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) >> at java.lang.Thread.run(Thread.java:745) >> >> >
Re: Executor memory requirement for reduceByKey
Have you taken a look at SPARK-11293 ? Consider using repartition to increase the number of partitions. FYI On Fri, May 13, 2016 at 12:14 PM, Sung Hwan Chung <coded...@cs.stanford.edu> wrote: > Hello, > > I'm using Spark version 1.6.0 and have trouble with memory when trying to > do reducebykey on a dataset with as many as 75 million keys. I.e. I get the > following exception when I run the task. > > There are 20 workers in the cluster. It is running under the standalone > mode with 12 GB assigned per executor and 4 cores per worker. The > spark.memory.fraction is set to 0.5 and I'm not using any caching. > > What might be the problem here? Since I'm using the version 1.6.0, this > doesn't seem to be related to SPARK-12155. This problem always happens > during the shuffle read phase. > > Is there a minimum amount of memory required for executor > (spark.memory.fraction) for shuffle read? > > java.lang.OutOfMemoryError: Unable to acquire 262144 bytes of memory, got 0 > at > org.apache.spark.memory.MemoryConsumer.allocateArray(MemoryConsumer.java:91) > at > org.apache.spark.unsafe.map.BytesToBytesMap.allocate(BytesToBytesMap.java:735) > at > org.apache.spark.unsafe.map.BytesToBytesMap.(BytesToBytesMap.java:197) > at > org.apache.spark.unsafe.map.BytesToBytesMap.(BytesToBytesMap.java:212) > at > org.apache.spark.sql.execution.UnsafeFixedWidthAggregationMap.(UnsafeFixedWidthAggregationMap.java:103) > at > org.apache.spark.sql.execution.aggregate.TungstenAggregationIterator.(TungstenAggregationIterator.scala:483) > at > org.apache.spark.sql.execution.aggregate.TungstenAggregate$$anonfun$doExecute$1$$anonfun$2.apply(TungstenAggregate.scala:95) > at > org.apache.spark.sql.execution.aggregate.TungstenAggregate$$anonfun$doExecute$1$$anonfun$2.apply(TungstenAggregate.scala:86) > at > org.apache.spark.rdd.RDD$$anonfun$mapPartitions$1$$anonfun$apply$20.apply(RDD.scala:710) > at > org.apache.spark.rdd.RDD$$anonfun$mapPartitions$1$$anonfun$apply$20.apply(RDD.scala:710) > at > org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38) > at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306) > at org.apache.spark.rdd.RDD.iterator(RDD.scala:270) > at > org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38) > at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306) > at org.apache.spark.rdd.RDD.iterator(RDD.scala:270) > at > org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:73) > at > org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41) > at org.apache.spark.scheduler.Task.run(Task.scala:89) > at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:213) > at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) > at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) > at java.lang.Thread.run(Thread.java:745) > >
Executor memory requirement for reduceByKey
Hello, I'm using Spark version 1.6.0 and have trouble with memory when trying to do reducebykey on a dataset with as many as 75 million keys. I.e. I get the following exception when I run the task. There are 20 workers in the cluster. It is running under the standalone mode with 12 GB assigned per executor and 4 cores per worker. The spark.memory.fraction is set to 0.5 and I'm not using any caching. What might be the problem here? Since I'm using the version 1.6.0, this doesn't seem to be related to SPARK-12155. This problem always happens during the shuffle read phase. Is there a minimum amount of memory required for executor (spark.memory.fraction) for shuffle read? java.lang.OutOfMemoryError: Unable to acquire 262144 bytes of memory, got 0 at org.apache.spark.memory.MemoryConsumer.allocateArray(MemoryConsumer.java:91) at org.apache.spark.unsafe.map.BytesToBytesMap.allocate(BytesToBytesMap.java:735) at org.apache.spark.unsafe.map.BytesToBytesMap.(BytesToBytesMap.java:197) at org.apache.spark.unsafe.map.BytesToBytesMap.(BytesToBytesMap.java:212) at org.apache.spark.sql.execution.UnsafeFixedWidthAggregationMap.(UnsafeFixedWidthAggregationMap.java:103) at org.apache.spark.sql.execution.aggregate.TungstenAggregationIterator.(TungstenAggregationIterator.scala:483) at org.apache.spark.sql.execution.aggregate.TungstenAggregate$$anonfun$doExecute$1$$anonfun$2.apply(TungstenAggregate.scala:95) at org.apache.spark.sql.execution.aggregate.TungstenAggregate$$anonfun$doExecute$1$$anonfun$2.apply(TungstenAggregate.scala:86) at org.apache.spark.rdd.RDD$$anonfun$mapPartitions$1$$anonfun$apply$20.apply(RDD.scala:710) at org.apache.spark.rdd.RDD$$anonfun$mapPartitions$1$$anonfun$apply$20.apply(RDD.scala:710) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306) at org.apache.spark.rdd.RDD.iterator(RDD.scala:270) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306) at org.apache.spark.rdd.RDD.iterator(RDD.scala:270) at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:73) at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41) at org.apache.spark.scheduler.Task.run(Task.scala:89) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:213) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) at java.lang.Thread.run(Thread.java:745)
Re: reduceByKey as Action or Transformation
On Monday 25 April 2016 11:28 PM, Weiping Qu wrote: Dear Ted, You are right. ReduceByKey is transformation. My fault. I would rephrase my question using following code snippet. object ScalaApp { def main(args: Array[String]): Unit ={ val conf = new SparkConf().setAppName("ScalaApp").setMaster("local") val sc = new SparkContext(conf) //val textFile: RDD[String] = val file = sc.textFile("/home/usr/test.dat") val output = file.flatMap(line => line.split(" ")) .map(word => (word, 1)) .reduceByKey(_ + _) output.persist() output.count() output.collect() } It's a simple code snippet. I realize that the first action count() would trigger the execution based on HadoopRDD, MapParititonRDD and the reduceByKey will take the ShuffleRDD as input to perform the count. The count() will trigger both the execution as well as the persistence of output RDD (as each partition is iterated). The second action collect just perform the collect over the same ShuffleRDD. It will use the persisted ShuffleRDD blocks. I think the re-calculation will also be carried out over ShuffleRDD instead of re-executing preceding HadoopRDD and MapParitionRDD in case one partition of persisted output is missing. Am I right? Since it is a partition of persisted ShuffleRDD that is missing, the partition will have to be recreated from the base HadoopRDD. To avoid it, one can checkpoint the ShuffleRDD if required. Thanks and Regards, Weiping regards -- Sumedh Wale SnappyData (http://www.snappydata.io) - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: reduceByKey as Action or Transformation
Dear Ted, You are right. ReduceByKey is transformation. My fault. I would rephrase my question using following code snippet. object ScalaApp { def main(args: Array[String]): Unit ={ val conf = new SparkConf().setAppName("ScalaApp").setMaster("local") val sc = new SparkContext(conf) //val textFile: RDD[String] = val file = sc.textFile("/home/usr/test.dat") val output = file.flatMap(line => line.split(" ")) .map(word => (word, 1)) .reduceByKey(_ + _) output.persist() output.count() output.collect() } It's a simple code snippet. I realize that the first action count() would trigger the execution based on HadoopRDD, MapParititonRDD and the reduceByKey will take the ShuffleRDD as input to perform the count. The second action collect just perform the collect over the same ShuffleRDD. I think the re-calculation will also be carried out over ShuffleRDD instead of re-executing preceding HadoopRDD and MapParitionRDD in case one partition of persisted output is missing. Am I right? Thanks and Regards, Weiping On 25.04.2016 17:46, Ted Yu wrote: Can you show snippet of your code which demonstrates what you observed ? Thansk On Mon, Apr 25, 2016 at 8:38 AM, Weiping Qu <q...@informatik.uni-kl.de <mailto:q...@informatik.uni-kl.de>> wrote: Thanks. I read that from the specification. I thought the way people distinguish actions and transformations depends on whether they are lazily executed or not. As far as I saw from my codes, the reduceByKey will be executed without any operations in the Action category. Please correct me if I am wrong. Thanks, Regards, Weiping On 25.04.2016 17 <tel:25.04.2016%2017>:20, Chadha Pooja wrote: Reduce By Key is a Transformation http://spark.apache.org/docs/latest/programming-guide.html#transformations Thanks _ Pooja Chadha Senior Architect THE BOSTON CONSULTING GROUP Mobile +1 617 794 3862 <tel:%2B1%20617%20794%203862> _ -Original Message- From: Weiping Qu [mailto:q...@informatik.uni-kl.de <mailto:q...@informatik.uni-kl.de>] Sent: Monday, April 25, 2016 11:05 AM To: u...@spark.incubator.apache.org <mailto:u...@spark.incubator.apache.org> Subject: reduceByKey as Action or Transformation Hi, I'd like just to verify that whether reduceByKey is transformation or actions. As written in RDD papers, spark flow will not be triggered only if actions are reached. I tried and saw that the my flow will be executed once there is a reduceByKey while it is categorized into transformations in Spark 1.6.1 specification. Thanks and Regards, Weiping - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org <mailto:user-unsubscr...@spark.apache.org> For additional commands, e-mail: user-h...@spark.apache.org <mailto:user-h...@spark.apache.org> __ The Boston Consulting Group, Inc. This e-mail message may contain confidential and/or privileged information. If you are not an addressee or otherwise authorized to receive this message, you should not use, copy, disclose or take any action based on this e-mail or any information contained in the message. If you have received this material in error, please advise the sender immediately by reply e-mail and delete this message. Thank you. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org <mailto:user-unsubscr...@spark.apache.org> For additional commands, e-mail: user-h...@spark.apache.org <mailto:user-h...@spark.apache.org>
Re: reduceByKey as Action or Transformation
Can you show snippet of your code which demonstrates what you observed ? Thansk On Mon, Apr 25, 2016 at 8:38 AM, Weiping Qu <q...@informatik.uni-kl.de> wrote: > Thanks. > I read that from the specification. > I thought the way people distinguish actions and transformations depends > on whether they are lazily executed or not. > As far as I saw from my codes, the reduceByKey will be executed without > any operations in the Action category. > Please correct me if I am wrong. > > Thanks, > Regards, > Weiping > > On 25.04.2016 17:20, Chadha Pooja wrote: > >> Reduce By Key is a Transformation >> >> http://spark.apache.org/docs/latest/programming-guide.html#transformations >> >> Thanks >> >> _ >> >> Pooja Chadha >> Senior Architect >> THE BOSTON CONSULTING GROUP >> Mobile +1 617 794 3862 >> >> >> _ >> >> >> >> -Original Message----- >> From: Weiping Qu [mailto:q...@informatik.uni-kl.de] >> Sent: Monday, April 25, 2016 11:05 AM >> To: u...@spark.incubator.apache.org >> Subject: reduceByKey as Action or Transformation >> >> Hi, >> >> I'd like just to verify that whether reduceByKey is transformation or >> actions. >> As written in RDD papers, spark flow will not be triggered only if >> actions are reached. >> I tried and saw that the my flow will be executed once there is a >> reduceByKey while it is categorized into transformations in Spark 1.6.1 >> specification. >> >> Thanks and Regards, >> Weiping >> >> - >> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org >> For additional commands, e-mail: user-h...@spark.apache.org >> >> >> __ >> The Boston Consulting Group, Inc. >> This e-mail message may contain confidential and/or privileged >> information. >> If you are not an addressee or otherwise authorized to receive this >> message, >> you should not use, copy, disclose or take any action based on this >> e-mail or >> any information contained in the message. If you have received this >> material >> in error, please advise the sender immediately by reply e-mail and delete >> this >> message. Thank you. >> > > > - > To unsubscribe, e-mail: user-unsubscr...@spark.apache.org > For additional commands, e-mail: user-h...@spark.apache.org > >
Re: reduceByKey as Action or Transformation
Thanks. I read that from the specification. I thought the way people distinguish actions and transformations depends on whether they are lazily executed or not. As far as I saw from my codes, the reduceByKey will be executed without any operations in the Action category. Please correct me if I am wrong. Thanks, Regards, Weiping On 25.04.2016 17:20, Chadha Pooja wrote: Reduce By Key is a Transformation http://spark.apache.org/docs/latest/programming-guide.html#transformations Thanks _ Pooja Chadha Senior Architect THE BOSTON CONSULTING GROUP Mobile +1 617 794 3862 _ -Original Message- From: Weiping Qu [mailto:q...@informatik.uni-kl.de] Sent: Monday, April 25, 2016 11:05 AM To: u...@spark.incubator.apache.org Subject: reduceByKey as Action or Transformation Hi, I'd like just to verify that whether reduceByKey is transformation or actions. As written in RDD papers, spark flow will not be triggered only if actions are reached. I tried and saw that the my flow will be executed once there is a reduceByKey while it is categorized into transformations in Spark 1.6.1 specification. Thanks and Regards, Weiping - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org __ The Boston Consulting Group, Inc. This e-mail message may contain confidential and/or privileged information. If you are not an addressee or otherwise authorized to receive this message, you should not use, copy, disclose or take any action based on this e-mail or any information contained in the message. If you have received this material in error, please advise the sender immediately by reply e-mail and delete this message. Thank you. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
reduceByKey as Action or Transformation
Hi, I'd like just to verify that whether reduceByKey is transformation or actions. As written in RDD papers, spark flow will not be triggered only if actions are reached. I tried and saw that the my flow will be executed once there is a reduceByKey while it is categorized into transformations in Spark 1.6.1 specification. Thanks and Regards, Weiping - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
What is the relationship between reduceByKey and spark.driver.maxResultSize?
I have a job that is running into intermittent errors with [SparkDriver] java.lang.OutOfMemoryError: Java heap space. Before I was getting this error I was getting errors saying the result size exceed the spark.driver.maxResultSize. This does not make any sense to me, as there are no actions in my job that send data to the driver - just a pull of data from S3, a map and reduceByKey and then conversion to dataframe and saveAsTable action that puts the results back on S3. I've found a few references to reduceByKey and spark.driver.maxResultSize having some importance, but cannot fathom how this setting could be related. Would greatly appreciated any advice. Thanks in advance, Tom
Re: What is the relationship between reduceByKey and spark.driver.maxResultSize?
I think you are fetching too many results to the driver. Typically, it is not recommended to collect much data to driver. But if you have to, you can increase the driver memory, when submitting jobs. Thanks. Zhan Zhang On Dec 11, 2015, at 6:14 AM, Tom Seddon <mr.tom.sed...@gmail.com<mailto:mr.tom.sed...@gmail.com>> wrote: I have a job that is running into intermittent errors with [SparkDriver] java.lang.OutOfMemoryError: Java heap space. Before I was getting this error I was getting errors saying the result size exceed the spark.driver.maxResultSize. This does not make any sense to me, as there are no actions in my job that send data to the driver - just a pull of data from S3, a map and reduceByKey and then conversion to dataframe and saveAsTable action that puts the results back on S3. I've found a few references to reduceByKey and spark.driver.maxResultSize having some importance, but cannot fathom how this setting could be related. Would greatly appreciated any advice. Thanks in advance, Tom
Re: What is the relationship between reduceByKey and spark.driver.maxResultSize?
Do you have a large number of tasks? This can happen if you have a large number of tasks and a small driver or if you use accumulators of lists like datastructures. 2015-12-11 11:17 GMT-08:00 Zhan Zhang <zzh...@hortonworks.com>: > I think you are fetching too many results to the driver. Typically, it is > not recommended to collect much data to driver. But if you have to, you can > increase the driver memory, when submitting jobs. > > Thanks. > > Zhan Zhang > > On Dec 11, 2015, at 6:14 AM, Tom Seddon <mr.tom.sed...@gmail.com> wrote: > > I have a job that is running into intermittent errors with [SparkDriver] > java.lang.OutOfMemoryError: Java heap space. Before I was getting this > error I was getting errors saying the result size exceed the > spark.driver.maxResultSize. > This does not make any sense to me, as there are no actions in my job that > send data to the driver - just a pull of data from S3, a map and > reduceByKey and then conversion to dataframe and saveAsTable action that > puts the results back on S3. > > I've found a few references to reduceByKey and spark.driver.maxResultSize > having some importance, but cannot fathom how this setting could be related. > > Would greatly appreciated any advice. > > Thanks in advance, > > Tom > > >
Re: Data in one partition after reduceByKey
public long getTime() Returns the number of milliseconds since January 1, 1970, 00:00:00 GMT represented by this Date object. http://docs.oracle.com/javase/7/docs/api/java/util/Date.html#getTime%28%29 Based on what you did i might be easier to get date partitioner from that. Also, to get even more even distriubution you could use a hash function from that not just a remainder. -- Ruslan Dautkhanov On Mon, Nov 23, 2015 at 6:35 AM, Patrick McGloin <mcgloin.patr...@gmail.com> wrote: > I will answer my own question, since I figured it out. Here is my answer > in case anyone else has the same issue. > > My DateTimes were all without seconds and milliseconds since I wanted to > group data belonging to the same minute. The hashCode() for Joda DateTimes > which are one minute apart is a constant: > > scala> val now = DateTime.now > now: org.joda.time.DateTime = 2015-11-23T11:14:17.088Z > > scala> now.withSecondOfMinute(0).withMillisOfSecond(0).hashCode - > now.minusMinutes(1).withSecondOfMinute(0).withMillisOfSecond(0).hashCode > res42: Int = 6 > > As can be seen by this example, if the hashCode values are similarly > spaced, they can end up in the same partition: > > scala> val nums = for(i <- 0 to 100) yield ((i*20 % 1000), i) > nums: scala.collection.immutable.IndexedSeq[(Int, Int)] = Vector((0,0), > (20,1), (40,2), (60,3), (80,4), (100,5), (120,6), (140,7), (160,8), (180,9), > (200,10), (220,11), (240,12), (260,13), (280,14), (300,15), (320,16), > (340,17), (360,18), (380,19), (400,20), (420,21), (440,22), (460,23), > (480,24), (500,25), (520,26), (540,27), (560,28), (580,29), (600,30), > (620,31), (640,32), (660,33), (680,34), (700,35), (720,36), (740,37), > (760,38), (780,39), (800,40), (820,41), (840,42), (860,43), (880,44), > (900,45), (920,46), (940,47), (960,48), (980,49), (0,50), (20,51), (40,52), > (60,53), (80,54), (100,55), (120,56), (140,57), (160,58), (180,59), (200,60), > (220,61), (240,62), (260,63), (280,64), (300,65), (320,66), (340,67), > (360,68), (380,69), (400,70), (420,71), (440,72), (460,73), (480,74), (500... > > scala> val rddNum = sc.parallelize(nums) > rddNum: org.apache.spark.rdd.RDD[(Int, Int)] = ParallelCollectionRDD[0] at > parallelize at :23 > > scala> val reducedNum = rddNum.reduceByKey(_+_) > reducedNum: org.apache.spark.rdd.RDD[(Int, Int)] = ShuffledRDD[1] at > reduceByKey at :25 > > scala> reducedNum.mapPartitions(iter => Array(iter.size).iterator, > true).collect.toList > > res2: List[Int] = List(50, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, > 0, 0) > > To distribute my data more evenly across the partitions I created my own > custom Partitoiner: > > class JodaPartitioner(rddNumPartitions: Int) extends Partitioner { > def numPartitions: Int = rddNumPartitions > def getPartition(key: Any): Int = { > key match { > case dateTime: DateTime => > val sum = dateTime.getYear + dateTime.getMonthOfYear + > dateTime.getDayOfMonth + dateTime.getMinuteOfDay + dateTime.getSecondOfDay > sum % numPartitions > case _ => 0 > } > } > } > > > On 20 November 2015 at 17:17, Patrick McGloin <mcgloin.patr...@gmail.com> > wrote: > >> Hi, >> >> I have Spark application which contains the following segment: >> >> val reparitioned = rdd.repartition(16) >> val filtered: RDD[(MyKey, myData)] = MyUtils.filter(reparitioned, startDate, >> endDate) >> val mapped: RDD[(DateTime, myData)] = filtered.map(kv=(kv._1.processingTime, >> kv._2)) >> val reduced: RDD[(DateTime, myData)] = mapped.reduceByKey(_+_) >> >> When I run this with some logging this is what I see: >> >> reparitioned ==> [List(2536, 2529, 2526, 2520, 2519, 2514, 2512, 2508, >> 2504, 2501, 2496, 2490, 2551, 2547, 2543, 2537)] >> filtered ==> [List(2081, 2063, 2043, 2040, 2063, 2050, 2081, 2076, 2042, >> 2066, 2032, 2001, 2031, 2101, 2050, 2068)] >> mapped ==> [List(2081, 2063, 2043, 2040, 2063, 2050, 2081, 2076, 2042, >> 2066, 2032, 2001, 2031, 2101, 2050, 2068)] >> reduced ==> [List(0, 0, 0, 0, 0, 0, 922, 0, 0, 0, 0, 0, 0, 0, 0, 0)] >> >> My logging is done using these two lines: >> >> val sizes: RDD[Int] = rdd.mapPartitions(iter => Array(iter.size).iterator, >> true)log.info(s"rdd ==> [${sizes.collect.toList}]") >> >> My question is why does my data end up in one partition after the >> reduceByKey? After the filter it can be seen that the data is evenly >> distributed, but the reduceByKey results in data in only one partition. >> >> Thanks, >> >> Patrick >> > >
Re: Data in one partition after reduceByKey
I will answer my own question, since I figured it out. Here is my answer in case anyone else has the same issue. My DateTimes were all without seconds and milliseconds since I wanted to group data belonging to the same minute. The hashCode() for Joda DateTimes which are one minute apart is a constant: scala> val now = DateTime.now now: org.joda.time.DateTime = 2015-11-23T11:14:17.088Z scala> now.withSecondOfMinute(0).withMillisOfSecond(0).hashCode - now.minusMinutes(1).withSecondOfMinute(0).withMillisOfSecond(0).hashCode res42: Int = 6 As can be seen by this example, if the hashCode values are similarly spaced, they can end up in the same partition: scala> val nums = for(i <- 0 to 100) yield ((i*20 % 1000), i) nums: scala.collection.immutable.IndexedSeq[(Int, Int)] = Vector((0,0), (20,1), (40,2), (60,3), (80,4), (100,5), (120,6), (140,7), (160,8), (180,9), (200,10), (220,11), (240,12), (260,13), (280,14), (300,15), (320,16), (340,17), (360,18), (380,19), (400,20), (420,21), (440,22), (460,23), (480,24), (500,25), (520,26), (540,27), (560,28), (580,29), (600,30), (620,31), (640,32), (660,33), (680,34), (700,35), (720,36), (740,37), (760,38), (780,39), (800,40), (820,41), (840,42), (860,43), (880,44), (900,45), (920,46), (940,47), (960,48), (980,49), (0,50), (20,51), (40,52), (60,53), (80,54), (100,55), (120,56), (140,57), (160,58), (180,59), (200,60), (220,61), (240,62), (260,63), (280,64), (300,65), (320,66), (340,67), (360,68), (380,69), (400,70), (420,71), (440,72), (460,73), (480,74), (500... scala> val rddNum = sc.parallelize(nums) rddNum: org.apache.spark.rdd.RDD[(Int, Int)] = ParallelCollectionRDD[0] at parallelize at :23 scala> val reducedNum = rddNum.reduceByKey(_+_) reducedNum: org.apache.spark.rdd.RDD[(Int, Int)] = ShuffledRDD[1] at reduceByKey at :25 scala> reducedNum.mapPartitions(iter => Array(iter.size).iterator, true).collect.toList res2: List[Int] = List(50, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0) To distribute my data more evenly across the partitions I created my own custom Partitoiner: class JodaPartitioner(rddNumPartitions: Int) extends Partitioner { def numPartitions: Int = rddNumPartitions def getPartition(key: Any): Int = { key match { case dateTime: DateTime => val sum = dateTime.getYear + dateTime.getMonthOfYear + dateTime.getDayOfMonth + dateTime.getMinuteOfDay + dateTime.getSecondOfDay sum % numPartitions case _ => 0 } } } On 20 November 2015 at 17:17, Patrick McGloin <mcgloin.patr...@gmail.com> wrote: > Hi, > > I have Spark application which contains the following segment: > > val reparitioned = rdd.repartition(16) > val filtered: RDD[(MyKey, myData)] = MyUtils.filter(reparitioned, startDate, > endDate) > val mapped: RDD[(DateTime, myData)] = filtered.map(kv=(kv._1.processingTime, > kv._2)) > val reduced: RDD[(DateTime, myData)] = mapped.reduceByKey(_+_) > > When I run this with some logging this is what I see: > > reparitioned ==> [List(2536, 2529, 2526, 2520, 2519, 2514, 2512, 2508, > 2504, 2501, 2496, 2490, 2551, 2547, 2543, 2537)] > filtered ==> [List(2081, 2063, 2043, 2040, 2063, 2050, 2081, 2076, 2042, > 2066, 2032, 2001, 2031, 2101, 2050, 2068)] > mapped ==> [List(2081, 2063, 2043, 2040, 2063, 2050, 2081, 2076, 2042, > 2066, 2032, 2001, 2031, 2101, 2050, 2068)] > reduced ==> [List(0, 0, 0, 0, 0, 0, 922, 0, 0, 0, 0, 0, 0, 0, 0, 0)] > > My logging is done using these two lines: > > val sizes: RDD[Int] = rdd.mapPartitions(iter => Array(iter.size).iterator, > true)log.info(s"rdd ==> [${sizes.collect.toList}]") > > My question is why does my data end up in one partition after the > reduceByKey? After the filter it can be seen that the data is evenly > distributed, but the reduceByKey results in data in only one partition. > > Thanks, > > Patrick >
Data in one partition after reduceByKey
Hi, I have Spark application which contains the following segment: val reparitioned = rdd.repartition(16) val filtered: RDD[(MyKey, myData)] = MyUtils.filter(reparitioned, startDate, endDate) val mapped: RDD[(DateTime, myData)] = filtered.map(kv=(kv._1.processingTime, kv._2)) val reduced: RDD[(DateTime, myData)] = mapped.reduceByKey(_+_) When I run this with some logging this is what I see: reparitioned ==> [List(2536, 2529, 2526, 2520, 2519, 2514, 2512, 2508, 2504, 2501, 2496, 2490, 2551, 2547, 2543, 2537)] filtered ==> [List(2081, 2063, 2043, 2040, 2063, 2050, 2081, 2076, 2042, 2066, 2032, 2001, 2031, 2101, 2050, 2068)] mapped ==> [List(2081, 2063, 2043, 2040, 2063, 2050, 2081, 2076, 2042, 2066, 2032, 2001, 2031, 2101, 2050, 2068)] reduced ==> [List(0, 0, 0, 0, 0, 0, 922, 0, 0, 0, 0, 0, 0, 0, 0, 0)] My logging is done using these two lines: val sizes: RDD[Int] = rdd.mapPartitions(iter => Array(iter.size).iterator, true)log.info(s"rdd ==> [${sizes.collect.toList}]") My question is why does my data end up in one partition after the reduceByKey? After the filter it can be seen that the data is evenly distributed, but the reduceByKey results in data in only one partition. Thanks, Patrick
Re: Incorrect results with reduceByKey
Deep copying the data solved the issue: data.map(r => {val t = SpecificData.get().deepCopy(r.getSchema, r); (t.id, List(t)) }).reduceByKey(_ ++ _) (noted here: https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/SparkContext.scala#L1003) Thanks Igor Berman, for pointing that out. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Incorrect-results-with-reduceByKey-tp25410p25420.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Incorrect results with reduceByKey
Howdy, We've noticed a strange behavior with Avro serialized data and reduceByKey RDD functionality. Please see below: // We're reading a bunch of Avro serialized data val data: RDD[T] = sparkContext.hadoopFile(path, classOf[AvroInputFormat[T]], classOf[AvroWrapper[T]], classOf[NullWritable]) // Incorrect data returned val bad: RDD[(String,List[T])] = data.map(r => (r.id, List(r))).reduceByKey(_ ++ _) // After adding the partitioner we get everything as expected val good: RDD[(String,List[T])] = data.map(r => (r.id, List(r))).partitionBy(Partitioner.defaultPartitioner(data)).reduceByKey(_ ++ _) Any ideas? Thanks in advance -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Incorrect-results-with-reduceByKey-tp25410.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: job hangs when using pipe() with reduceByKey()
Hi. What is slow exactly? In code-base 1: When you run the persist() + count() you stored the result in RAM. Then the map + reducebykey is done on in-memory data. In the latter case (all-in-oneline) you are doing both steps at the same time. So you are saying that if you sum-up the time to do both steps in the first code-base it is still much faster than the latter code-base ? -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/job-hangs-when-using-pipe-with-reduceByKey-tp25242p25248.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: job hangs when using pipe() with reduceByKey()
yes, the first code takes only 30mins. but the second method, I wait for 5 hours, only finish 10% -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/job-hangs-when-using-pipe-with-reduceByKey-tp25242p25249.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
job hangs when using pipe() with reduceByKey()
I meet a situation: When I use val a = rdd.pipe("./my_cpp_program").persist() a.count() // just use it to persist a val b = a.map(s => (s, 1)).reduceByKey().count() it 's so fast but when I use val b = rdd.pipe("./my_cpp_program").map(s => (s, 1)).reduceByKey().count() it is so slow and there are many such log in my executors: 15/10/31 19:53:58 INFO collection.ExternalSorter: Thread 78 spilling in-memory map of 633.1 MB to disk (8 times so far) 15/10/31 19:54:14 INFO collection.ExternalSorter: Thread 74 spilling in-memory map of 633.1 MB to disk (8 times so far) 15/10/31 19:54:17 INFO collection.ExternalSorter: Thread 79 spilling in-memory map of 633.1 MB to disk (8 times so far) 15/10/31 19:54:29 INFO collection.ExternalSorter: Thread 77 spilling in-memory map of 633.1 MB to disk (8 times so far) 15/10/31 19:54:50 INFO collection.ExternalSorter: Thread 76 spilling in-memory map of 633.1 MB to disk (9 times so far) -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/job-hangs-when-using-pipe-with-reduceByKey-tp25242.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re:Re: job hangs when using pipe() with reduceByKey()
spark 1.4.1 hadoop 2.6.0 centos 6.6 At 2015-10-31 23:14:46, "Ted Yu" <yuzhih...@gmail.com> wrote: Which Spark release are you using ? Which OS ? Thanks On Sat, Oct 31, 2015 at 5:18 AM, hotdog <lisend...@163.com> wrote: I meet a situation: When I use val a = rdd.pipe("./my_cpp_program").persist() a.count() // just use it to persist a val b = a.map(s => (s, 1)).reduceByKey().count() it 's so fast but when I use val b = rdd.pipe("./my_cpp_program").map(s => (s, 1)).reduceByKey().count() it is so slow and there are many such log in my executors: 15/10/31 19:53:58 INFO collection.ExternalSorter: Thread 78 spilling in-memory map of 633.1 MB to disk (8 times so far) 15/10/31 19:54:14 INFO collection.ExternalSorter: Thread 74 spilling in-memory map of 633.1 MB to disk (8 times so far) 15/10/31 19:54:17 INFO collection.ExternalSorter: Thread 79 spilling in-memory map of 633.1 MB to disk (8 times so far) 15/10/31 19:54:29 INFO collection.ExternalSorter: Thread 77 spilling in-memory map of 633.1 MB to disk (8 times so far) 15/10/31 19:54:50 INFO collection.ExternalSorter: Thread 76 spilling in-memory map of 633.1 MB to disk (9 times so far) -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/job-hangs-when-using-pipe-with-reduceByKey-tp25242.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: job hangs when using pipe() with reduceByKey()
Which Spark release are you using ? Which OS ? Thanks On Sat, Oct 31, 2015 at 5:18 AM, hotdog <lisend...@163.com> wrote: > I meet a situation: > When I use > val a = rdd.pipe("./my_cpp_program").persist() > a.count() // just use it to persist a > val b = a.map(s => (s, 1)).reduceByKey().count() > it 's so fast > > but when I use > val b = rdd.pipe("./my_cpp_program").map(s => (s, 1)).reduceByKey().count() > it is so slow > and there are many such log in my executors: > 15/10/31 19:53:58 INFO collection.ExternalSorter: Thread 78 spilling > in-memory map of 633.1 MB to disk (8 times so far) > 15/10/31 19:54:14 INFO collection.ExternalSorter: Thread 74 spilling > in-memory map of 633.1 MB to disk (8 times so far) > 15/10/31 19:54:17 INFO collection.ExternalSorter: Thread 79 spilling > in-memory map of 633.1 MB to disk (8 times so far) > 15/10/31 19:54:29 INFO collection.ExternalSorter: Thread 77 spilling > in-memory map of 633.1 MB to disk (8 times so far) > 15/10/31 19:54:50 INFO collection.ExternalSorter: Thread 76 spilling > in-memory map of 633.1 MB to disk (9 times so far) > > > > -- > View this message in context: > http://apache-spark-user-list.1001560.n3.nabble.com/job-hangs-when-using-pipe-with-reduceByKey-tp25242.html > Sent from the Apache Spark User List mailing list archive at Nabble.com. > > - > To unsubscribe, e-mail: user-unsubscr...@spark.apache.org > For additional commands, e-mail: user-h...@spark.apache.org > >
Does using Custom Partitioner before calling reduceByKey improve performance?
Hi, We currently use reduceByKey to reduce by a particular metric name in our Streaming/Batch job. It seems to be doing a lot of shuffles and it has impact on performance. Does using a custompartitioner before calling reduceByKey improve performance? Thanks, Swetha -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Does-using-Custom-Partitioner-before-calling-reduceByKey-improve-performance-tp25214.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Does using Custom Partitioner before calling reduceByKey improve performance?
If you just want to control the number of reducers, then setting the numPartitions is sufficient. If you want to control how exact partitioning scheme (that is some other scheme other than hash-based) then you need to implement a custom partitioner. It can be used to improve data skews, etc. which ultimately improves performance. On Tue, Oct 27, 2015 at 1:20 PM, swetha <swethakasire...@gmail.com> wrote: > Hi, > > We currently use reduceByKey to reduce by a particular metric name in our > Streaming/Batch job. It seems to be doing a lot of shuffles and it has > impact on performance. Does using a custompartitioner before calling > reduceByKey improve performance? > > > Thanks, > Swetha > > > > -- > View this message in context: > http://apache-spark-user-list.1001560.n3.nabble.com/Does-using-Custom-Partitioner-before-calling-reduceByKey-improve-performance-tp25214.html > Sent from the Apache Spark User List mailing list archive at Nabble.com. > > - > To unsubscribe, e-mail: user-unsubscr...@spark.apache.org > For additional commands, e-mail: user-h...@spark.apache.org > >
Re: Does using Custom Partitioner before calling reduceByKey improve performance?
So, Wouldn't using a customPartitioner on the rdd upon which the groupByKey or reduceByKey is performed avoid shuffles and improve performance? My code does groupByAndSort and reduceByKey on different datasets as shown below. Would using a custom partitioner on those datasets before using a groupByKey or reduceByKey improve performance? My idea is to avoid shuffles and improve performance. Also, right now I see a lot of spills when there is a very large dataset for groupByKey and reduceByKey. I think the memory is not sufficient. We need to group by sessionId and then sort the Jsons based on the timeStamp as shown in the below code. What is the alternative to using groupByKey for better performance? And in case of reduceByKey, would using a customPartitioner on the RDD upon which the reduceByKey is performed would reduce the shuffles and improve the performance? rdd.partitionBy(customPartitioner) def getGrpdAndSrtdRecs(rdd: RDD[(String, (Long, String))]): RDD[(String, List[(Long, String)])] = { val grpdRecs = rdd.groupByKey(); val srtdRecs = grpdRecs.mapValues[(List[(Long, String)])](iter => iter.toList.sortBy(_._1)) srtdRecs } rdd.reduceByKey((a, b) => { (Math.max(a._1, b._1), (a._2 ++ b._2)) }) On Tue, Oct 27, 2015 at 5:07 PM, Tathagata Das <t...@databricks.com> wrote: > If you just want to control the number of reducers, then setting the > numPartitions is sufficient. If you want to control how exact partitioning > scheme (that is some other scheme other than hash-based) then you need to > implement a custom partitioner. It can be used to improve data skews, etc. > which ultimately improves performance. > > On Tue, Oct 27, 2015 at 1:20 PM, swetha <swethakasire...@gmail.com> wrote: > >> Hi, >> >> We currently use reduceByKey to reduce by a particular metric name in our >> Streaming/Batch job. It seems to be doing a lot of shuffles and it has >> impact on performance. Does using a custompartitioner before calling >> reduceByKey improve performance? >> >> >> Thanks, >> Swetha >> >> >> >> -- >> View this message in context: >> http://apache-spark-user-list.1001560.n3.nabble.com/Does-using-Custom-Partitioner-before-calling-reduceByKey-improve-performance-tp25214.html >> Sent from the Apache Spark User List mailing list archive at Nabble.com. >> >> - >> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org >> For additional commands, e-mail: user-h...@spark.apache.org >> >> >
Re: Does using Custom Partitioner before calling reduceByKey improve performance?
if you specify the same partitioner (custom or otherwise) for both partitionBy and groupBy, then may be it will help. The fundamental problem is groupByKey, that takes a lot of working memory. 1. Try to avoid groupByKey. What is it that you want to after sorting the list of grouped events? can you do that operation with a reduceByKey? 2. If not, use more partitions. That would cause lesser data in each partition, so less spilling. 3. You can control the amount memory allocated for shuffles by changing the configuration spark.shuffle.memoryFraction . More fraction would cause less spilling. On Tue, Oct 27, 2015 at 6:35 PM, swetha kasireddy <swethakasire...@gmail.com > wrote: > So, Wouldn't using a customPartitioner on the rdd upon which the > groupByKey or reduceByKey is performed avoid shuffles and improve > performance? My code does groupByAndSort and reduceByKey on different > datasets as shown below. Would using a custom partitioner on those datasets > before using a groupByKey or reduceByKey improve performance? My idea is > to avoid shuffles and improve performance. Also, right now I see a lot of > spills when there is a very large dataset for groupByKey and reduceByKey. I > think the memory is not sufficient. We need to group by sessionId and then > sort the Jsons based on the timeStamp as shown in the below code. > > > What is the alternative to using groupByKey for better performance? And in > case of reduceByKey, would using a customPartitioner on the RDD upon which > the reduceByKey is performed would reduce the shuffles and improve the > performance? > > > rdd.partitionBy(customPartitioner) > > def getGrpdAndSrtdRecs(rdd: RDD[(String, (Long, String))]): RDD[(String, > List[(Long, String)])] = > { val grpdRecs = rdd.groupByKey(); val srtdRecs = > grpdRecs.mapValues[(List[(Long, String)])](iter => > iter.toList.sortBy(_._1)) srtdRecs } > > rdd.reduceByKey((a, b) => { > (Math.max(a._1, b._1), (a._2 ++ b._2)) > }) > > > > On Tue, Oct 27, 2015 at 5:07 PM, Tathagata Das <t...@databricks.com> > wrote: > >> If you just want to control the number of reducers, then setting the >> numPartitions is sufficient. If you want to control how exact partitioning >> scheme (that is some other scheme other than hash-based) then you need to >> implement a custom partitioner. It can be used to improve data skews, etc. >> which ultimately improves performance. >> >> On Tue, Oct 27, 2015 at 1:20 PM, swetha <swethakasire...@gmail.com> >> wrote: >> >>> Hi, >>> >>> We currently use reduceByKey to reduce by a particular metric name in our >>> Streaming/Batch job. It seems to be doing a lot of shuffles and it has >>> impact on performance. Does using a custompartitioner before calling >>> reduceByKey improve performance? >>> >>> >>> Thanks, >>> Swetha >>> >>> >>> >>> -- >>> View this message in context: >>> http://apache-spark-user-list.1001560.n3.nabble.com/Does-using-Custom-Partitioner-before-calling-reduceByKey-improve-performance-tp25214.html >>> Sent from the Apache Spark User List mailing list archive at Nabble.com. >>> >>> - >>> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org >>> For additional commands, e-mail: user-h...@spark.apache.org >>> >>> >> >
Re: Does using Custom Partitioner before calling reduceByKey improve performance?
If it is streaming, you can look at updateStateByKey for maintaining active sessions. But wont work for batch. and I answered that before. it can improve performance if you change the partitioning scheme from hash-based to something else. Its hard to say anything beyond that without understand the data skew and other details of your application. Before jumping into that, you should simple change the number of partitions and see if the performance improves. On Tue, Oct 27, 2015 at 7:10 PM, swetha kasireddy <swethakasire...@gmail.com > wrote: > After sorting the list of grouped events I would need to have an RDD that > has a key which is nothing but the sessionId and a list of values that are > sorted by timeStamp for each input Json. So basically the return type would > be RDD[(String, List[(Long, String)] where the key is the sessionId and > a list of tuples that has a timeStamp and Json as the values. I will need > to use groupByKey to do a groupBy sessionId and secondary sort by timeStamp > and then get the list of JsonValues in a sorted order. Is there any > alternative for that? Please find the code below that I used for the same. > > > Also, does using a customPartitioner for a reduceByKey improve performance? > > > def getGrpdAndSrtdRecs(rdd: RDD[(String, (Long, String))]): RDD[(String, > List[(Long, String)])] = > { val grpdRecs = rdd.groupByKey(); val srtdRecs = > grpdRecs.mapValues[(List[(Long, String)])](iter => > iter.toList.sortBy(_._1)) srtdRecs } > > > On Tue, Oct 27, 2015 at 6:47 PM, Tathagata Das <t...@databricks.com> > wrote: > >> if you specify the same partitioner (custom or otherwise) for both >> partitionBy and groupBy, then may be it will help. The fundamental problem >> is groupByKey, that takes a lot of working memory. >> 1. Try to avoid groupByKey. What is it that you want to after sorting the >> list of grouped events? can you do that operation with a reduceByKey? >> 2. If not, use more partitions. That would cause lesser data in each >> partition, so less spilling. >> 3. You can control the amount memory allocated for shuffles by changing >> the configuration spark.shuffle.memoryFraction . More fraction would cause >> less spilling. >> >> >> On Tue, Oct 27, 2015 at 6:35 PM, swetha kasireddy < >> swethakasire...@gmail.com> wrote: >> >>> So, Wouldn't using a customPartitioner on the rdd upon which the >>> groupByKey or reduceByKey is performed avoid shuffles and improve >>> performance? My code does groupByAndSort and reduceByKey on different >>> datasets as shown below. Would using a custom partitioner on those datasets >>> before using a groupByKey or reduceByKey improve performance? My idea is >>> to avoid shuffles and improve performance. Also, right now I see a lot of >>> spills when there is a very large dataset for groupByKey and reduceByKey. I >>> think the memory is not sufficient. We need to group by sessionId and then >>> sort the Jsons based on the timeStamp as shown in the below code. >>> >>> >>> What is the alternative to using groupByKey for better performance? And >>> in case of reduceByKey, would using a customPartitioner on the RDD upon >>> which the reduceByKey is performed would reduce the shuffles and improve >>> the performance? >>> >>> >>> rdd.partitionBy(customPartitioner) >>> >>> def getGrpdAndSrtdRecs(rdd: RDD[(String, (Long, String))]): >>> RDD[(String, List[(Long, String)])] = >>> { val grpdRecs = rdd.groupByKey(); val srtdRecs = >>> grpdRecs.mapValues[(List[(Long, String)])](iter => >>> iter.toList.sortBy(_._1)) srtdRecs } >>> >>> rdd.reduceByKey((a, b) => { >>> (Math.max(a._1, b._1), (a._2 ++ b._2)) >>> }) >>> >>> >>> >>> On Tue, Oct 27, 2015 at 5:07 PM, Tathagata Das <t...@databricks.com> >>> wrote: >>> >>>> If you just want to control the number of reducers, then setting the >>>> numPartitions is sufficient. If you want to control how exact partitioning >>>> scheme (that is some other scheme other than hash-based) then you need to >>>> implement a custom partitioner. It can be used to improve data skews, etc. >>>> which ultimately improves performance. >>>> >>>> On Tue, Oct 27, 2015 at 1:20 PM, swetha <swethakasire...@gmail.com> >>>> wrote: >>>> >>>>> Hi, >>>>> >>>>> We currently use reduceByKey to reduce by a particular metric name in >>>>> our >>>>> Streamin
Re: Does using Custom Partitioner before calling reduceByKey improve performance?
After sorting the list of grouped events I would need to have an RDD that has a key which is nothing but the sessionId and a list of values that are sorted by timeStamp for each input Json. So basically the return type would be RDD[(String, List[(Long, String)] where the key is the sessionId and a list of tuples that has a timeStamp and Json as the values. I will need to use groupByKey to do a groupBy sessionId and secondary sort by timeStamp and then get the list of JsonValues in a sorted order. Is there any alternative for that? Please find the code below that I used for the same. Also, does using a customPartitioner for a reduceByKey improve performance? def getGrpdAndSrtdRecs(rdd: RDD[(String, (Long, String))]): RDD[(String, List[(Long, String)])] = { val grpdRecs = rdd.groupByKey(); val srtdRecs = grpdRecs.mapValues[(List[(Long, String)])](iter => iter.toList.sortBy(_._1)) srtdRecs } On Tue, Oct 27, 2015 at 6:47 PM, Tathagata Das <t...@databricks.com> wrote: > if you specify the same partitioner (custom or otherwise) for both > partitionBy and groupBy, then may be it will help. The fundamental problem > is groupByKey, that takes a lot of working memory. > 1. Try to avoid groupByKey. What is it that you want to after sorting the > list of grouped events? can you do that operation with a reduceByKey? > 2. If not, use more partitions. That would cause lesser data in each > partition, so less spilling. > 3. You can control the amount memory allocated for shuffles by changing > the configuration spark.shuffle.memoryFraction . More fraction would cause > less spilling. > > > On Tue, Oct 27, 2015 at 6:35 PM, swetha kasireddy < > swethakasire...@gmail.com> wrote: > >> So, Wouldn't using a customPartitioner on the rdd upon which the >> groupByKey or reduceByKey is performed avoid shuffles and improve >> performance? My code does groupByAndSort and reduceByKey on different >> datasets as shown below. Would using a custom partitioner on those datasets >> before using a groupByKey or reduceByKey improve performance? My idea is >> to avoid shuffles and improve performance. Also, right now I see a lot of >> spills when there is a very large dataset for groupByKey and reduceByKey. I >> think the memory is not sufficient. We need to group by sessionId and then >> sort the Jsons based on the timeStamp as shown in the below code. >> >> >> What is the alternative to using groupByKey for better performance? And >> in case of reduceByKey, would using a customPartitioner on the RDD upon >> which the reduceByKey is performed would reduce the shuffles and improve >> the performance? >> >> >> rdd.partitionBy(customPartitioner) >> >> def getGrpdAndSrtdRecs(rdd: RDD[(String, (Long, String))]): RDD[(String, >> List[(Long, String)])] = >> { val grpdRecs = rdd.groupByKey(); val srtdRecs = >> grpdRecs.mapValues[(List[(Long, String)])](iter => >> iter.toList.sortBy(_._1)) srtdRecs } >> >> rdd.reduceByKey((a, b) => { >> (Math.max(a._1, b._1), (a._2 ++ b._2)) >> }) >> >> >> >> On Tue, Oct 27, 2015 at 5:07 PM, Tathagata Das <t...@databricks.com> >> wrote: >> >>> If you just want to control the number of reducers, then setting the >>> numPartitions is sufficient. If you want to control how exact partitioning >>> scheme (that is some other scheme other than hash-based) then you need to >>> implement a custom partitioner. It can be used to improve data skews, etc. >>> which ultimately improves performance. >>> >>> On Tue, Oct 27, 2015 at 1:20 PM, swetha <swethakasire...@gmail.com> >>> wrote: >>> >>>> Hi, >>>> >>>> We currently use reduceByKey to reduce by a particular metric name in >>>> our >>>> Streaming/Batch job. It seems to be doing a lot of shuffles and it has >>>> impact on performance. Does using a custompartitioner before calling >>>> reduceByKey improve performance? >>>> >>>> >>>> Thanks, >>>> Swetha >>>> >>>> >>>> >>>> -- >>>> View this message in context: >>>> http://apache-spark-user-list.1001560.n3.nabble.com/Does-using-Custom-Partitioner-before-calling-reduceByKey-improve-performance-tp25214.html >>>> Sent from the Apache Spark User List mailing list archive at Nabble.com. >>>> >>>> - >>>> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org >>>> For additional commands, e-mail: user-h...@spark.apache.org >>>> >>>> >>> >> >
Re: Configuring Spark for reduceByKey on on massive data sets
hi Daniel, Do you solve your problem? I met the same problem when running massive data using reduceByKey on yarn. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Configuring-Spark-for-reduceByKey-on-on-massive-data-sets-tp5966p25023.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: "Too many open files" exception on reduceByKey
It turns out the mesos can overwrite the OS ulimit -n setting. So we have increased the mesos slave ulimit -n setting. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Too-many-open-files-exception-on-reduceByKey-tp2462p25019.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: "Too many open files" exception on reduceByKey
You are right, I did find that mesos overwrite this to a smaller number.So we will modify that and try to run again. Thanks! Tian On Thursday, October 8, 2015 4:18 PM, DB Tsai <dbt...@dbtsai.com> wrote: Try to run to see actual ulimit. We found that mesos overrides the ulimit which causes the issue. import sys.process._ val p = 1 to 100 val rdd = sc.parallelize(p, 100) val a = rdd.map(x=> Seq("sh", "-c", "ulimit -n").!!.toDouble.toLong).collect Sincerely, DB Tsai --Blog: https://www.dbtsai.com PGP Key ID: 0xAF08DF8D On Thu, Oct 8, 2015 at 3:22 PM, Tian Zhang <tzhang...@yahoo.com> wrote: I hit this issue with spark 1.3.0 stateful application (with updateStateByKey) function on mesos. It will fail after running fine for about 24 hours. The error stack trace as below, I checked ulimit -n and we have very large numbers set on the machines. What else can be wrong? 15/09/27 18:45:11 WARN scheduler.TaskSetManager: Lost task 2.0 in stage 113727.0 (TID 833758, ip-10-112-10-221.ec2.internal): java.io.FileNotFoundException: /media/ephemeral0/oncue/mesos-slave/slaves/20150512-215537-2165010442-5050-1730-S5/frameworks/20150825-175705-2165010442-5050-13705-0338/executors/0/runs/19342849-d076-483c-88da-747896e19b93/./spark-6efa2dcd-aea7-478e-9fa9-6e0973578eb4/blockmgr-33b1e093-6dd6-4462-938c-2597516272a9/27/shuffle_535_2_0.index (Too many open files) at java.io.FileOutputStream.open(Native Method) at java.io.FileOutputStream.(FileOutputStream.java:221) at java.io.FileOutputStream.(FileOutputStream.java:171) at org.apache.spark.shuffle.IndexShuffleBlockManager.writeIndexFile(IndexShuffleBlockManager.scala:85) at org.apache.spark.shuffle.sort.SortShuffleWriter.write(SortShuffleWriter.scala:69) at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:68) at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41) at org.apache.spark.scheduler.Task.run(Task.scala:64) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:203) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) at java.lang.Thread.run(Thread.java:745) -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Too-many-open-files-exception-on-reduceByKey-tp2462p24985.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: "Too many open files" exception on reduceByKey
I hit this issue with spark 1.3.0 stateful application (with updateStateByKey) function on mesos. It will fail after running fine for about 24 hours. The error stack trace as below, I checked ulimit -n and we have very large numbers set on the machines. What else can be wrong? 15/09/27 18:45:11 WARN scheduler.TaskSetManager: Lost task 2.0 in stage 113727.0 (TID 833758, ip-10-112-10-221.ec2.internal): java.io.FileNotFoundException: /media/ephemeral0/oncue/mesos-slave/slaves/20150512-215537-2165010442-5050-1730-S5/frameworks/20150825-175705-2165010442-5050-13705-0338/executors/0/runs/19342849-d076-483c-88da-747896e19b93/./spark-6efa2dcd-aea7-478e-9fa9-6e0973578eb4/blockmgr-33b1e093-6dd6-4462-938c-2597516272a9/27/shuffle_535_2_0.index (Too many open files) at java.io.FileOutputStream.open(Native Method) at java.io.FileOutputStream.(FileOutputStream.java:221) at java.io.FileOutputStream.(FileOutputStream.java:171) at org.apache.spark.shuffle.IndexShuffleBlockManager.writeIndexFile(IndexShuffleBlockManager.scala:85) at org.apache.spark.shuffle.sort.SortShuffleWriter.write(SortShuffleWriter.scala:69) at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:68) at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41) at org.apache.spark.scheduler.Task.run(Task.scala:64) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:203) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) at java.lang.Thread.run(Thread.java:745) -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Too-many-open-files-exception-on-reduceByKey-tp2462p24985.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: "Too many open files" exception on reduceByKey
Try to run to see actual ulimit. We found that mesos overrides the ulimit which causes the issue. import sys.process._ val p = 1 to 100 val rdd = sc.parallelize(p, 100) val a = rdd.map(x=> Seq("sh", "-c", "ulimit -n").!!.toDouble.toLong).collect Sincerely, DB Tsai -- Blog: https://www.dbtsai.com PGP Key ID: 0xAF08DF8D <https://pgp.mit.edu/pks/lookup?search=0x59DF55B8AF08DF8D> On Thu, Oct 8, 2015 at 3:22 PM, Tian Zhang <tzhang...@yahoo.com> wrote: > I hit this issue with spark 1.3.0 stateful application (with > updateStateByKey) function on mesos. It will > fail after running fine for about 24 hours. > The error stack trace as below, I checked ulimit -n and we have very large > numbers set on the machines. > What else can be wrong? > 15/09/27 18:45:11 WARN scheduler.TaskSetManager: Lost task 2.0 in stage > 113727.0 (TID 833758, ip-10-112-10-221.ec2.internal): > java.io.FileNotFoundException: > > /media/ephemeral0/oncue/mesos-slave/slaves/20150512-215537-2165010442-5050-1730-S5/frameworks/20150825-175705-2165010442-5050-13705-0338/executors/0/runs/19342849-d076-483c-88da-747896e19b93/./spark-6efa2dcd-aea7-478e-9fa9-6e0973578eb4/blockmgr-33b1e093-6dd6-4462-938c-2597516272a9/27/shuffle_535_2_0.index > (Too many open files) > at java.io.FileOutputStream.open(Native Method) > at java.io.FileOutputStream.(FileOutputStream.java:221) > at java.io.FileOutputStream.(FileOutputStream.java:171) > at > > org.apache.spark.shuffle.IndexShuffleBlockManager.writeIndexFile(IndexShuffleBlockManager.scala:85) > at > > org.apache.spark.shuffle.sort.SortShuffleWriter.write(SortShuffleWriter.scala:69) > at > org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:68) > at > org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41) > at org.apache.spark.scheduler.Task.run(Task.scala:64) > at > org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:203) > at > > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) > at > > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) > at java.lang.Thread.run(Thread.java:745) > > > > > > -- > View this message in context: > http://apache-spark-user-list.1001560.n3.nabble.com/Too-many-open-files-exception-on-reduceByKey-tp2462p24985.html > Sent from the Apache Spark User List mailing list archive at Nabble.com. > > - > To unsubscribe, e-mail: user-unsubscr...@spark.apache.org > For additional commands, e-mail: user-h...@spark.apache.org > >
Re: How to make Group By/reduceByKey more efficient?
All the *ByKey aggregations perform an efficient shuffle and preserve partitioning on the output. If all you need is to call reduceByKey, then don’t bother with groupBy. You should use groupBy if you really need all the datapoints from a key for a very custom operation. From the docs: Note: If you are grouping in order to perform an aggregation (such as a sum or average) over each key, using reduceByKey or aggregateByKey will yield much better performance. What you should worry about in more complex pipelines is that you’re actually preserving the partitioner between stages. For example, if you use a custom partitioner between a partitionBy and an updateStateBy key. Or if you use .map or .flatMap instead of .mapValues and .flatMapValues. By the way, learn to use the Spark UI to understand the DAG / Execution plan and try to navigate the source code - I found the comments and the various preservePartitioner options very educational. -adrian On 9/23/15, 8:43 AM, "swetha" <swethakasire...@gmail.com> wrote: >Hi, > >How to make Group By more efficient? Is it recommended to use a custom >partitioner and then do a Group By? And can we use a custom partitioner and >then use a reduceByKey for optimization? > > >Thanks, >Swetha > > > >-- >View this message in context: >http://apache-spark-user-list.1001560.n3.nabble.com/How-to-make-Group-By-reduceByKey-more-efficient-tp24780.html >Sent from the Apache Spark User List mailing list archive at Nabble.com. > >- >To unsubscribe, e-mail: user-unsubscr...@spark.apache.org >For additional commands, e-mail: user-h...@spark.apache.org >
reduceByKey inside updateStateByKey in Spark Streaming???
Hi, How to use reduceByKey inside updateStateByKey? Suppose I have a bunch of keys for which I need to do sum and average inside the updateStateByKey by joining with old state. How do I accomplish that? Thanks, Swetha -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/reduceByKey-inside-updateStateByKey-in-Spark-Streaming-tp24808.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: reduceByKey inside updateStateByKey in Spark Streaming???
The 2 operations can't be used inside one another. If you need something like an all time average then you need to keep a tuple (sum, count) to which you add all the new values that come in every batch. The average is then just a map on the state DStream. Makes sense? have I guessed your use case? Sent from my iPhone > On 24 Sep 2015, at 19:47, swetha <swethakasire...@gmail.com> wrote: > > Hi, > > How to use reduceByKey inside updateStateByKey? Suppose I have a bunch of > keys for which I need to do sum and average inside the updateStateByKey by > joining with old state. How do I accomplish that? > > > Thanks, > Swetha > > > > -- > View this message in context: > http://apache-spark-user-list.1001560.n3.nabble.com/reduceByKey-inside-updateStateByKey-in-Spark-Streaming-tp24808.html > Sent from the Apache Spark User List mailing list archive at Nabble.com. > > - > To unsubscribe, e-mail: user-unsubscr...@spark.apache.org > For additional commands, e-mail: user-h...@spark.apache.org > - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
How to make Group By/reduceByKey more efficient?
Hi, How to make Group By more efficient? Is it recommended to use a custom partitioner and then do a Group By? And can we use a custom partitioner and then use a reduceByKey for optimization? Thanks, Swetha -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/How-to-make-Group-By-reduceByKey-more-efficient-tp24780.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
reduceByKey not working on JavaPairDStream
Hi, I have applied mapToPair and then a reduceByKey on a DStream to obtain a JavaPairDStreamString, MapString, Object. I have to apply a flatMapToPair and reduceByKey on the DSTream Obtained above. But i do not see any logs from reduceByKey operation. Can anyone explain why is this happening..? find My Code Below - * /*** * GroupLevel1 Groups - articleId, host and tags */* JavaPairDStreamString, MapString, Object groupLevel1 = inputDataMap .mapToPair( new PairFunctionMapString, Object, String, MapString, Object() { private static final long serialVersionUID = 5196132687044875422L; @Override public Tuple2String, MapString, Object call( MapString, Object map) throws Exception { String host = (String) map.get(host); String articleId = (String) map.get(articleId); List tags = (List) map.get(tags); if (host == null || articleId == null) { logger.error(*** Error Doc \n + map); } String key = articleId_ + articleId + _host_ + host + _tags_ + tags.toString(); //logger.info(key); System.out.println(Printing Key - + key); map.put(articlecount, 1L); return new Tuple2String, MapString, Object(key, map); } }) .reduceByKey( new Function2MapString, Object, MapString, Object, MapString, Object() { private static final long serialVersionUID = 1L; @Override public MapString, Object call( MapString, Object map1, MapString, Object map2) throws Exception { Long count1 = (Long) map1.get(articlecount); Long count2 = (Long) map2.get(articlecount); map1.put(articlecount, count1 + count2); return map1; } }); */*** * Grouping level 1 groups on articleId+host+tags * Tags can be multiple for an article. * Grouping level 2 does - * 1. For each tag in a row, find occurrence of that tag in other rows. * 2. If one tag found in another row, then add the articleCount of current and new row and put as articleCount for that tag. * Note - * Idea behind this grouping is to get all article counts that contain a particular tag and preserve this value. */* JavaPairDStreamString, MapString, Object groupLevel2 = groupLevel1.flatMapToPair(new PairFlatMapFunctionTuple2String, MapString, Object, String, MapString, Object() { @Override public IterableTuple2String, MapString, Object call(Tuple2String, MapString, Object stringMapTuple2) throws Exception { System.out.println(group level 2 tuple 1 - + stringMapTuple2._1()); System.out.println(group level 2 tuple 2 - + stringMapTuple2._2()); ArrayListString tagList = (ArrayListString) stringMapTuple2._2().get(tags); ArrayList tagKeyList = new ArrayList(); String host = (String) stringMapTuple2._2().get(host); StringBuilder key; for (String tag : tagList) { key = new StringBuilder(host_).append(host).append(_tag_).append(tag); System.out.println(generated Key - +key); tagKeyList.add(new Tuple2String, MapString, Object(key.toString(), stringMapTuple2._2())); } return tagKeyList; } }); groupLevel2 = groupLevel2.reduceByKey(new Function2MapString, Object, MapString, Object, MapString, Object() { @Override public MapString, Object call(MapString, Object dataMap1, MapString, Object dataMap2) throws Exception { System.out.println(Type of article map in 1 + dataMap1.get(articleId).getClass()); System.out.println(Type of article map in 2 + dataMap2.get(articleId).getClass()); MapString, String articleMap1 = (MapString, String) dataMap1.get(articleId); MapString, String articleMap2 = (MapString, String) dataMap2.get(articleId); if (articleMap1 == null || articleMap1.isEmpty()) { System.out.println(returning because map 1 null
Re: reduceByKey not working on JavaPairDStream
I don't see that you invoke any action in this code. It won't do anything unless you tell it to perform an action that requires the transformations. On Wed, Aug 26, 2015 at 7:05 AM, Deepesh Maheshwari deepesh.maheshwar...@gmail.com wrote: Hi, I have applied mapToPair and then a reduceByKey on a DStream to obtain a JavaPairDStreamString, MapString, Object. I have to apply a flatMapToPair and reduceByKey on the DSTream Obtained above. But i do not see any logs from reduceByKey operation. Can anyone explain why is this happening..? find My Code Below - /*** * GroupLevel1 Groups - articleId, host and tags */ JavaPairDStreamString, MapString, Object groupLevel1 = inputDataMap .mapToPair( new PairFunctionMapString, Object, String, MapString, Object() { private static final long serialVersionUID = 5196132687044875422L; @Override public Tuple2String, MapString, Object call( MapString, Object map) throws Exception { String host = (String) map.get(host); String articleId = (String) map.get(articleId); List tags = (List) map.get(tags); if (host == null || articleId == null) { logger.error(*** Error Doc \n + map); } String key = articleId_ + articleId + _host_ + host + _tags_ + tags.toString(); //logger.info(key); System.out.println(Printing Key - + key); map.put(articlecount, 1L); return new Tuple2String, MapString, Object(key, map); } }) .reduceByKey( new Function2MapString, Object, MapString, Object, MapString, Object() { private static final long serialVersionUID = 1L; @Override public MapString, Object call( MapString, Object map1, MapString, Object map2) throws Exception { Long count1 = (Long) map1.get(articlecount); Long count2 = (Long) map2.get(articlecount); map1.put(articlecount, count1 + count2); return map1; } }); /*** * Grouping level 1 groups on articleId+host+tags * Tags can be multiple for an article. * Grouping level 2 does - * 1. For each tag in a row, find occurrence of that tag in other rows. * 2. If one tag found in another row, then add the articleCount of current and new row and put as articleCount for that tag. * Note - * Idea behind this grouping is to get all article counts that contain a particular tag and preserve this value. */ JavaPairDStreamString, MapString, Object groupLevel2 = groupLevel1.flatMapToPair(new PairFlatMapFunctionTuple2String, MapString, Object, String, MapString, Object() { @Override public IterableTuple2String, MapString, Object call(Tuple2String, MapString, Object stringMapTuple2) throws Exception { System.out.println(group level 2 tuple 1 - + stringMapTuple2._1()); System.out.println(group level 2 tuple 2 - + stringMapTuple2._2()); ArrayListString tagList = (ArrayListString) stringMapTuple2._2().get(tags); ArrayList tagKeyList = new ArrayList(); String host = (String) stringMapTuple2._2().get(host); StringBuilder key; for (String tag : tagList) { key = new StringBuilder(host_).append(host).append(_tag_).append(tag); System.out.println(generated Key - +key); tagKeyList.add(new Tuple2String, MapString, Object(key.toString(), stringMapTuple2._2())); } return tagKeyList; } }); groupLevel2 = groupLevel2.reduceByKey(new Function2MapString, Object, MapString, Object, MapString, Object() { @Override public MapString, Object call(MapString, Object dataMap1, MapString, Object dataMap2) throws Exception { System.out.println(Type of article map in 1 + dataMap1.get(articleId).getClass()); System.out.println(Type of article map in 2 + dataMap2.get(articleId).getClass
Re: Transformation not happening for reduceByKey or GroupByKey
HI All, Please find fix info for users who are following the mail chain of this issue and the respective solution below: *reduceByKey: Non working snippet* import org.apache.spark.Context import org.apache.spark.Context._ import org.apache.spark.SparkConf val conf = new SparkConf() val sc = new SparkContext(conf) val DataRDD = SC.makeRDD(Seq((0,1),(0,2),(1,2),(1,3),(2,4))) DataRDD.reduceByKey(_+_).collect Result: Array() is empty *reduceByKey: Working snippet* import org.apache.spark.Context import org.apache.spark.Context._ import org.apache.spark.SparkConf val conf = new SparkConf() val sc = new SparkContext(conf).set(spark.driver.allowMultipleContexts,true) val DataRDD = SC.makeRDD(Seq((0,1),(0,2),(1,2),(1,3),(2,4))) DataRDD.reduceByKey(_+_).collect Result: Array((0,3),(1,5),(2,4)) Regards, Satish Chandra On Sat, Aug 22, 2015 at 11:27 AM, satish chandra j jsatishchan...@gmail.com wrote: HI All, Currently using DSE 4.7 and Spark 1.2.2 version Regards, Satish On Fri, Aug 21, 2015 at 7:30 PM, java8964 java8...@hotmail.com wrote: What version of Spark you are using, or comes with DSE 4.7? We just cannot reproduce it in Spark. yzhang@localhost$ more test.spark val pairs = sc.makeRDD(Seq((0,1),(0,2),(1,20),(1,30),(2,40))) pairs.reduceByKey((x,y) = x + y).collect yzhang@localhost$ ~/spark/bin/spark-shell --master local -i test.spark Welcome to __ / __/__ ___ _/ /__ _\ \/ _ \/ _ `/ __/ '_/ /___/ .__/\_,_/_/ /_/\_\ version 1.3.1 /_/ Using Scala version 2.10.4 Spark context available as sc. SQL context available as sqlContext. Loading test.spark... pairs: org.apache.spark.rdd.RDD[(Int, Int)] = ParallelCollectionRDD[0] at makeRDD at console:21 15/08/21 09:58:51 WARN SizeEstimator: Failed to check whether UseCompressedOops is set; assuming yes res0: Array[(Int, Int)] = Array((0,3), (1,50), (2,40)) Yong -- Date: Fri, 21 Aug 2015 19:24:09 +0530 Subject: Re: Transformation not happening for reduceByKey or GroupByKey From: jsatishchan...@gmail.com To: abhis...@tetrationanalytics.com CC: user@spark.apache.org HI Abhishek, I have even tried that but rdd2 is empty Regards, Satish On Fri, Aug 21, 2015 at 6:47 PM, Abhishek R. Singh abhis...@tetrationanalytics.com wrote: You had: RDD.reduceByKey((x,y) = x+y) RDD.take(3) Maybe try: rdd2 = RDD.reduceByKey((x,y) = x+y) rdd2.take(3) -Abhishek- On Aug 20, 2015, at 3:05 AM, satish chandra j jsatishchan...@gmail.com wrote: HI All, I have data in RDD as mentioned below: RDD : Array[(Int),(Int)] = Array((0,1), (0,2),(1,20),(1,30),(2,40)) I am expecting output as Array((0,3),(1,50),(2,40)) just a sum function on Values for each key Code: RDD.reduceByKey((x,y) = x+y) RDD.take(3) Result in console: RDD: org.apache.spark.rdd.RDD[(Int,Int)]= ShuffledRDD[1] at reduceByKey at console:73 res:Array[(Int,Int)] = Array() Command as mentioned dse spark --master local --jars postgresql-9.4-1201.jar -i ScriptFile Please let me know what is missing in my code, as my resultant Array is empty Regards, Satish
Re: Transformation not happening for reduceByKey or GroupByKey
HI All, Currently using DSE 4.7 and Spark 1.2.2 version Regards, Satish On Fri, Aug 21, 2015 at 7:30 PM, java8964 java8...@hotmail.com wrote: What version of Spark you are using, or comes with DSE 4.7? We just cannot reproduce it in Spark. yzhang@localhost$ more test.spark val pairs = sc.makeRDD(Seq((0,1),(0,2),(1,20),(1,30),(2,40))) pairs.reduceByKey((x,y) = x + y).collect yzhang@localhost$ ~/spark/bin/spark-shell --master local -i test.spark Welcome to __ / __/__ ___ _/ /__ _\ \/ _ \/ _ `/ __/ '_/ /___/ .__/\_,_/_/ /_/\_\ version 1.3.1 /_/ Using Scala version 2.10.4 Spark context available as sc. SQL context available as sqlContext. Loading test.spark... pairs: org.apache.spark.rdd.RDD[(Int, Int)] = ParallelCollectionRDD[0] at makeRDD at console:21 15/08/21 09:58:51 WARN SizeEstimator: Failed to check whether UseCompressedOops is set; assuming yes res0: Array[(Int, Int)] = Array((0,3), (1,50), (2,40)) Yong -- Date: Fri, 21 Aug 2015 19:24:09 +0530 Subject: Re: Transformation not happening for reduceByKey or GroupByKey From: jsatishchan...@gmail.com To: abhis...@tetrationanalytics.com CC: user@spark.apache.org HI Abhishek, I have even tried that but rdd2 is empty Regards, Satish On Fri, Aug 21, 2015 at 6:47 PM, Abhishek R. Singh abhis...@tetrationanalytics.com wrote: You had: RDD.reduceByKey((x,y) = x+y) RDD.take(3) Maybe try: rdd2 = RDD.reduceByKey((x,y) = x+y) rdd2.take(3) -Abhishek- On Aug 20, 2015, at 3:05 AM, satish chandra j jsatishchan...@gmail.com wrote: HI All, I have data in RDD as mentioned below: RDD : Array[(Int),(Int)] = Array((0,1), (0,2),(1,20),(1,30),(2,40)) I am expecting output as Array((0,3),(1,50),(2,40)) just a sum function on Values for each key Code: RDD.reduceByKey((x,y) = x+y) RDD.take(3) Result in console: RDD: org.apache.spark.rdd.RDD[(Int,Int)]= ShuffledRDD[1] at reduceByKey at console:73 res:Array[(Int,Int)] = Array() Command as mentioned dse spark --master local --jars postgresql-9.4-1201.jar -i ScriptFile Please let me know what is missing in my code, as my resultant Array is empty Regards, Satish
RE: Transformation not happening for reduceByKey or GroupByKey
I believe spark-shell -i scriptFile is there. We also use it, at least in Spark 1.3.1. dse spark will just wrap spark-shell command, underline it is just invoking spark-shell. I don't know too much about the original problem though. Yong Date: Fri, 21 Aug 2015 18:19:49 +0800 Subject: Re: Transformation not happening for reduceByKey or GroupByKey From: zjf...@gmail.com To: jsatishchan...@gmail.com CC: robin.e...@xense.co.uk; user@spark.apache.org Hi Satish, I don't see where spark support -i, so suspect it is provided by DSE. In that case, it might be bug of DSE. On Fri, Aug 21, 2015 at 6:02 PM, satish chandra j jsatishchan...@gmail.com wrote: HI Robin,Yes, it is DSE but issue is related to Spark only Regards,Satish Chandra On Fri, Aug 21, 2015 at 3:06 PM, Robin East robin.e...@xense.co.uk wrote: Not sure, never used dse - it’s part of DataStax Enterprise right? On 21 Aug 2015, at 10:07, satish chandra j jsatishchan...@gmail.com wrote: HI Robin,Yes, below mentioned piece or code works fine in Spark Shell but the same when place in Script File and executed with -i file name it creating an empty RDD scala val pairs = sc.makeRDD(Seq((0,1),(0,2),(1,20),(1,30),(2,40)))pairs: org.apache.spark.rdd.RDD[(Int, Int)] = ParallelCollectionRDD[77] at makeRDD at console:28 scala pairs.reduceByKey((x,y) = x + y).collectres43: Array[(Int, Int)] = Array((0,3), (1,50), (2,40)) Command: dse spark --master local --jars postgresql-9.4-1201.jar -i ScriptFile I understand, I am missing something here due to which my final RDD does not have as required output Regards,Satish Chandra On Thu, Aug 20, 2015 at 8:23 PM, Robin East robin.e...@xense.co.uk wrote: This works for me: scala val pairs = sc.makeRDD(Seq((0,1),(0,2),(1,20),(1,30),(2,40)))pairs: org.apache.spark.rdd.RDD[(Int, Int)] = ParallelCollectionRDD[77] at makeRDD at console:28 scala pairs.reduceByKey((x,y) = x + y).collectres43: Array[(Int, Int)] = Array((0,3), (1,50), (2,40)) On 20 Aug 2015, at 11:05, satish chandra j jsatishchan...@gmail.com wrote: HI All,I have data in RDD as mentioned below: RDD : Array[(Int),(Int)] = Array((0,1), (0,2),(1,20),(1,30),(2,40)) I am expecting output as Array((0,3),(1,50),(2,40)) just a sum function on Values for each key Code:RDD.reduceByKey((x,y) = x+y)RDD.take(3) Result in console: RDD: org.apache.spark.rdd.RDD[(Int,Int)]= ShuffledRDD[1] at reduceByKey at console:73res:Array[(Int,Int)] = Array() Command as mentioned dse spark --master local --jars postgresql-9.4-1201.jar -i ScriptFile Please let me know what is missing in my code, as my resultant Array is empty Regards,Satish -- Best Regards Jeff Zhang
Re: Transformation not happening for reduceByKey or GroupByKey
HI All, Any inputs for the actual problem statement Regards, Satish On Fri, Aug 21, 2015 at 5:57 PM, Jeff Zhang zjf...@gmail.com wrote: Yong, Thanks for your reply. I tried spark-shell -i script-file, it works fine for me. Not sure the different with dse spark --master local --jars postgresql-9.4-1201.jar -i ScriptFile On Fri, Aug 21, 2015 at 7:01 PM, java8964 java8...@hotmail.com wrote: I believe spark-shell -i scriptFile is there. We also use it, at least in Spark 1.3.1. dse spark will just wrap spark-shell command, underline it is just invoking spark-shell. I don't know too much about the original problem though. Yong -- Date: Fri, 21 Aug 2015 18:19:49 +0800 Subject: Re: Transformation not happening for reduceByKey or GroupByKey From: zjf...@gmail.com To: jsatishchan...@gmail.com CC: robin.e...@xense.co.uk; user@spark.apache.org Hi Satish, I don't see where spark support -i, so suspect it is provided by DSE. In that case, it might be bug of DSE. On Fri, Aug 21, 2015 at 6:02 PM, satish chandra j jsatishchan...@gmail.com wrote: HI Robin, Yes, it is DSE but issue is related to Spark only Regards, Satish Chandra On Fri, Aug 21, 2015 at 3:06 PM, Robin East robin.e...@xense.co.uk wrote: Not sure, never used dse - it’s part of DataStax Enterprise right? On 21 Aug 2015, at 10:07, satish chandra j jsatishchan...@gmail.com wrote: HI Robin, Yes, below mentioned piece or code works fine in Spark Shell but the same when place in Script File and executed with -i file name it creating an empty RDD scala val pairs = sc.makeRDD(Seq((0,1),(0,2),(1,20),(1,30),(2,40))) pairs: org.apache.spark.rdd.RDD[(Int, Int)] = ParallelCollectionRDD[77] at makeRDD at console:28 scala pairs.reduceByKey((x,y) = x + y).collect res43: Array[(Int, Int)] = Array((0,3), (1,50), (2,40)) Command: dse spark --master local --jars postgresql-9.4-1201.jar -i ScriptFile I understand, I am missing something here due to which my final RDD does not have as required output Regards, Satish Chandra On Thu, Aug 20, 2015 at 8:23 PM, Robin East robin.e...@xense.co.uk wrote: This works for me: scala val pairs = sc.makeRDD(Seq((0,1),(0,2),(1,20),(1,30),(2,40))) pairs: org.apache.spark.rdd.RDD[(Int, Int)] = ParallelCollectionRDD[77] at makeRDD at console:28 scala pairs.reduceByKey((x,y) = x + y).collect res43: Array[(Int, Int)] = Array((0,3), (1,50), (2,40)) On 20 Aug 2015, at 11:05, satish chandra j jsatishchan...@gmail.com wrote: HI All, I have data in RDD as mentioned below: RDD : Array[(Int),(Int)] = Array((0,1), (0,2),(1,20),(1,30),(2,40)) I am expecting output as Array((0,3),(1,50),(2,40)) just a sum function on Values for each key Code: RDD.reduceByKey((x,y) = x+y) RDD.take(3) Result in console: RDD: org.apache.spark.rdd.RDD[(Int,Int)]= ShuffledRDD[1] at reduceByKey at console:73 res:Array[(Int,Int)] = Array() Command as mentioned dse spark --master local --jars postgresql-9.4-1201.jar -i ScriptFile Please let me know what is missing in my code, as my resultant Array is empty Regards, Satish -- Best Regards Jeff Zhang -- Best Regards Jeff Zhang
Re: Transformation not happening for reduceByKey or GroupByKey
You had: RDD.reduceByKey((x,y) = x+y) RDD.take(3) Maybe try: rdd2 = RDD.reduceByKey((x,y) = x+y) rdd2.take(3) -Abhishek- On Aug 20, 2015, at 3:05 AM, satish chandra j jsatishchan...@gmail.com wrote: HI All, I have data in RDD as mentioned below: RDD : Array[(Int),(Int)] = Array((0,1), (0,2),(1,20),(1,30),(2,40)) I am expecting output as Array((0,3),(1,50),(2,40)) just a sum function on Values for each key Code: RDD.reduceByKey((x,y) = x+y) RDD.take(3) Result in console: RDD: org.apache.spark.rdd.RDD[(Int,Int)]= ShuffledRDD[1] at reduceByKey at console:73 res:Array[(Int,Int)] = Array() Command as mentioned dse spark --master local --jars postgresql-9.4-1201.jar -i ScriptFile Please let me know what is missing in my code, as my resultant Array is empty Regards, Satish - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Transformation not happening for reduceByKey or GroupByKey
HI Abhishek, I have even tried that but rdd2 is empty Regards, Satish On Fri, Aug 21, 2015 at 6:47 PM, Abhishek R. Singh abhis...@tetrationanalytics.com wrote: You had: RDD.reduceByKey((x,y) = x+y) RDD.take(3) Maybe try: rdd2 = RDD.reduceByKey((x,y) = x+y) rdd2.take(3) -Abhishek- On Aug 20, 2015, at 3:05 AM, satish chandra j jsatishchan...@gmail.com wrote: HI All, I have data in RDD as mentioned below: RDD : Array[(Int),(Int)] = Array((0,1), (0,2),(1,20),(1,30),(2,40)) I am expecting output as Array((0,3),(1,50),(2,40)) just a sum function on Values for each key Code: RDD.reduceByKey((x,y) = x+y) RDD.take(3) Result in console: RDD: org.apache.spark.rdd.RDD[(Int,Int)]= ShuffledRDD[1] at reduceByKey at console:73 res:Array[(Int,Int)] = Array() Command as mentioned dse spark --master local --jars postgresql-9.4-1201.jar -i ScriptFile Please let me know what is missing in my code, as my resultant Array is empty Regards, Satish
RE: Transformation not happening for reduceByKey or GroupByKey
What version of Spark you are using, or comes with DSE 4.7? We just cannot reproduce it in Spark. yzhang@localhost$ more test.sparkval pairs = sc.makeRDD(Seq((0,1),(0,2),(1,20),(1,30),(2,40)))pairs.reduceByKey((x,y) = x + y).collectyzhang@localhost$ ~/spark/bin/spark-shell --master local -i test.sparkWelcome to __ / __/__ ___ _/ /__ _\ \/ _ \/ _ `/ __/ '_/ /___/ .__/\_,_/_/ /_/\_\ version 1.3.1 /_/ Using Scala version 2.10.4Spark context available as sc.SQL context available as sqlContext.Loading test.spark...pairs: org.apache.spark.rdd.RDD[(Int, Int)] = ParallelCollectionRDD[0] at makeRDD at console:2115/08/21 09:58:51 WARN SizeEstimator: Failed to check whether UseCompressedOops is set; assuming yesres0: Array[(Int, Int)] = Array((0,3), (1,50), (2,40)) Yong Date: Fri, 21 Aug 2015 19:24:09 +0530 Subject: Re: Transformation not happening for reduceByKey or GroupByKey From: jsatishchan...@gmail.com To: abhis...@tetrationanalytics.com CC: user@spark.apache.org HI Abhishek, I have even tried that but rdd2 is empty Regards,Satish On Fri, Aug 21, 2015 at 6:47 PM, Abhishek R. Singh abhis...@tetrationanalytics.com wrote: You had: RDD.reduceByKey((x,y) = x+y) RDD.take(3) Maybe try: rdd2 = RDD.reduceByKey((x,y) = x+y) rdd2.take(3) -Abhishek- On Aug 20, 2015, at 3:05 AM, satish chandra j jsatishchan...@gmail.com wrote: HI All, I have data in RDD as mentioned below: RDD : Array[(Int),(Int)] = Array((0,1), (0,2),(1,20),(1,30),(2,40)) I am expecting output as Array((0,3),(1,50),(2,40)) just a sum function on Values for each key Code: RDD.reduceByKey((x,y) = x+y) RDD.take(3) Result in console: RDD: org.apache.spark.rdd.RDD[(Int,Int)]= ShuffledRDD[1] at reduceByKey at console:73 res:Array[(Int,Int)] = Array() Command as mentioned dse spark --master local --jars postgresql-9.4-1201.jar -i ScriptFile Please let me know what is missing in my code, as my resultant Array is empty Regards, Satish
Re: Transformation not happening for reduceByKey or GroupByKey
Yes, DSE 4.7 Regards, Satish Chandra On Fri, Aug 21, 2015 at 3:06 PM, Robin East robin.e...@xense.co.uk wrote: Not sure, never used dse - it’s part of DataStax Enterprise right? On 21 Aug 2015, at 10:07, satish chandra j jsatishchan...@gmail.com wrote: HI Robin, Yes, below mentioned piece or code works fine in Spark Shell but the same when place in Script File and executed with -i file name it creating an empty RDD scala val pairs = sc.makeRDD(Seq((0,1),(0,2),(1,20),(1,30),(2,40))) pairs: org.apache.spark.rdd.RDD[(Int, Int)] = ParallelCollectionRDD[77] at makeRDD at console:28 scala pairs.reduceByKey((x,y) = x + y).collect res43: Array[(Int, Int)] = Array((0,3), (1,50), (2,40)) Command: dse spark --master local --jars postgresql-9.4-1201.jar -i ScriptFile I understand, I am missing something here due to which my final RDD does not have as required output Regards, Satish Chandra On Thu, Aug 20, 2015 at 8:23 PM, Robin East robin.e...@xense.co.uk wrote: This works for me: scala val pairs = sc.makeRDD(Seq((0,1),(0,2),(1,20),(1,30),(2,40))) pairs: org.apache.spark.rdd.RDD[(Int, Int)] = ParallelCollectionRDD[77] at makeRDD at console:28 scala pairs.reduceByKey((x,y) = x + y).collect res43: Array[(Int, Int)] = Array((0,3), (1,50), (2,40)) On 20 Aug 2015, at 11:05, satish chandra j jsatishchan...@gmail.com wrote: HI All, I have data in RDD as mentioned below: RDD : Array[(Int),(Int)] = Array((0,1), (0,2),(1,20),(1,30),(2,40)) I am expecting output as Array((0,3),(1,50),(2,40)) just a sum function on Values for each key Code: RDD.reduceByKey((x,y) = x+y) RDD.take(3) Result in console: RDD: org.apache.spark.rdd.RDD[(Int,Int)]= ShuffledRDD[1] at reduceByKey at console:73 res:Array[(Int,Int)] = Array() Command as mentioned dse spark --master local --jars postgresql-9.4-1201.jar -i ScriptFile Please let me know what is missing in my code, as my resultant Array is empty Regards, Satish
Re: Transformation not happening for reduceByKey or GroupByKey
HI Robin, Yes, it is DSE but issue is related to Spark only Regards, Satish Chandra On Fri, Aug 21, 2015 at 3:06 PM, Robin East robin.e...@xense.co.uk wrote: Not sure, never used dse - it’s part of DataStax Enterprise right? On 21 Aug 2015, at 10:07, satish chandra j jsatishchan...@gmail.com wrote: HI Robin, Yes, below mentioned piece or code works fine in Spark Shell but the same when place in Script File and executed with -i file name it creating an empty RDD scala val pairs = sc.makeRDD(Seq((0,1),(0,2),(1,20),(1,30),(2,40))) pairs: org.apache.spark.rdd.RDD[(Int, Int)] = ParallelCollectionRDD[77] at makeRDD at console:28 scala pairs.reduceByKey((x,y) = x + y).collect res43: Array[(Int, Int)] = Array((0,3), (1,50), (2,40)) Command: dse spark --master local --jars postgresql-9.4-1201.jar -i ScriptFile I understand, I am missing something here due to which my final RDD does not have as required output Regards, Satish Chandra On Thu, Aug 20, 2015 at 8:23 PM, Robin East robin.e...@xense.co.uk wrote: This works for me: scala val pairs = sc.makeRDD(Seq((0,1),(0,2),(1,20),(1,30),(2,40))) pairs: org.apache.spark.rdd.RDD[(Int, Int)] = ParallelCollectionRDD[77] at makeRDD at console:28 scala pairs.reduceByKey((x,y) = x + y).collect res43: Array[(Int, Int)] = Array((0,3), (1,50), (2,40)) On 20 Aug 2015, at 11:05, satish chandra j jsatishchan...@gmail.com wrote: HI All, I have data in RDD as mentioned below: RDD : Array[(Int),(Int)] = Array((0,1), (0,2),(1,20),(1,30),(2,40)) I am expecting output as Array((0,3),(1,50),(2,40)) just a sum function on Values for each key Code: RDD.reduceByKey((x,y) = x+y) RDD.take(3) Result in console: RDD: org.apache.spark.rdd.RDD[(Int,Int)]= ShuffledRDD[1] at reduceByKey at console:73 res:Array[(Int,Int)] = Array() Command as mentioned dse spark --master local --jars postgresql-9.4-1201.jar -i ScriptFile Please let me know what is missing in my code, as my resultant Array is empty Regards, Satish
Re: Transformation not happening for reduceByKey or GroupByKey
Hi Satish, I don't see where spark support -i, so suspect it is provided by DSE. In that case, it might be bug of DSE. On Fri, Aug 21, 2015 at 6:02 PM, satish chandra j jsatishchan...@gmail.com wrote: HI Robin, Yes, it is DSE but issue is related to Spark only Regards, Satish Chandra On Fri, Aug 21, 2015 at 3:06 PM, Robin East robin.e...@xense.co.uk wrote: Not sure, never used dse - it’s part of DataStax Enterprise right? On 21 Aug 2015, at 10:07, satish chandra j jsatishchan...@gmail.com wrote: HI Robin, Yes, below mentioned piece or code works fine in Spark Shell but the same when place in Script File and executed with -i file name it creating an empty RDD scala val pairs = sc.makeRDD(Seq((0,1),(0,2),(1,20),(1,30),(2,40))) pairs: org.apache.spark.rdd.RDD[(Int, Int)] = ParallelCollectionRDD[77] at makeRDD at console:28 scala pairs.reduceByKey((x,y) = x + y).collect res43: Array[(Int, Int)] = Array((0,3), (1,50), (2,40)) Command: dse spark --master local --jars postgresql-9.4-1201.jar -i ScriptFile I understand, I am missing something here due to which my final RDD does not have as required output Regards, Satish Chandra On Thu, Aug 20, 2015 at 8:23 PM, Robin East robin.e...@xense.co.uk wrote: This works for me: scala val pairs = sc.makeRDD(Seq((0,1),(0,2),(1,20),(1,30),(2,40))) pairs: org.apache.spark.rdd.RDD[(Int, Int)] = ParallelCollectionRDD[77] at makeRDD at console:28 scala pairs.reduceByKey((x,y) = x + y).collect res43: Array[(Int, Int)] = Array((0,3), (1,50), (2,40)) On 20 Aug 2015, at 11:05, satish chandra j jsatishchan...@gmail.com wrote: HI All, I have data in RDD as mentioned below: RDD : Array[(Int),(Int)] = Array((0,1), (0,2),(1,20),(1,30),(2,40)) I am expecting output as Array((0,3),(1,50),(2,40)) just a sum function on Values for each key Code: RDD.reduceByKey((x,y) = x+y) RDD.take(3) Result in console: RDD: org.apache.spark.rdd.RDD[(Int,Int)]= ShuffledRDD[1] at reduceByKey at console:73 res:Array[(Int,Int)] = Array() Command as mentioned dse spark --master local --jars postgresql-9.4-1201.jar -i ScriptFile Please let me know what is missing in my code, as my resultant Array is empty Regards, Satish -- Best Regards Jeff Zhang
Re: Transformation not happening for reduceByKey or GroupByKey
HI Robin, Yes, below mentioned piece or code works fine in Spark Shell but the same when place in Script File and executed with -i file name it creating an empty RDD scala val pairs = sc.makeRDD(Seq((0,1),(0,2),(1,20),(1,30),(2,40))) pairs: org.apache.spark.rdd.RDD[(Int, Int)] = ParallelCollectionRDD[77] at makeRDD at console:28 scala pairs.reduceByKey((x,y) = x + y).collect res43: Array[(Int, Int)] = Array((0,3), (1,50), (2,40)) Command: dse spark --master local --jars postgresql-9.4-1201.jar -i ScriptFile I understand, I am missing something here due to which my final RDD does not have as required output Regards, Satish Chandra On Thu, Aug 20, 2015 at 8:23 PM, Robin East robin.e...@xense.co.uk wrote: This works for me: scala val pairs = sc.makeRDD(Seq((0,1),(0,2),(1,20),(1,30),(2,40))) pairs: org.apache.spark.rdd.RDD[(Int, Int)] = ParallelCollectionRDD[77] at makeRDD at console:28 scala pairs.reduceByKey((x,y) = x + y).collect res43: Array[(Int, Int)] = Array((0,3), (1,50), (2,40)) On 20 Aug 2015, at 11:05, satish chandra j jsatishchan...@gmail.com wrote: HI All, I have data in RDD as mentioned below: RDD : Array[(Int),(Int)] = Array((0,1), (0,2),(1,20),(1,30),(2,40)) I am expecting output as Array((0,3),(1,50),(2,40)) just a sum function on Values for each key Code: RDD.reduceByKey((x,y) = x+y) RDD.take(3) Result in console: RDD: org.apache.spark.rdd.RDD[(Int,Int)]= ShuffledRDD[1] at reduceByKey at console:73 res:Array[(Int,Int)] = Array() Command as mentioned dse spark --master local --jars postgresql-9.4-1201.jar -i ScriptFile Please let me know what is missing in my code, as my resultant Array is empty Regards, Satish
Transformation not happening for reduceByKey or GroupByKey
HI All, I have data in RDD as mentioned below: RDD : Array[(Int),(Int)] = Array((0,1), (0,2),(1,20),(1,30),(2,40)) I am expecting output as Array((0,3),(1,50),(2,40)) just a sum function on Values for each key Code: RDD.reduceByKey((x,y) = x+y) RDD.take(3) Result in console: RDD: org.apache.spark.rdd.RDD[(Int,Int)]= ShuffledRDD[1] at reduceByKey at console:73 res:Array[(Int,Int)] = Array() Command as mentioned dse spark --master local --jars postgresql-9.4-1201.jar -i ScriptFile Please let me know what is missing in my code, as my resultant Array is empty Regards, Satish
Re: Transformation not happening for reduceByKey or GroupByKey
HI All, Could anybody let me know what is that i missing here, it should work as its a basic transformation Please let me know if any additional information required Regards, Satish On Thu, Aug 20, 2015 at 3:35 PM, satish chandra j jsatishchan...@gmail.com wrote: HI All, I have data in RDD as mentioned below: RDD : Array[(Int),(Int)] = Array((0,1), (0,2),(1,20),(1,30),(2,40)) I am expecting output as Array((0,3),(1,50),(2,40)) just a sum function on Values for each key Code: RDD.reduceByKey((x,y) = x+y) RDD.take(3) Result in console: RDD: org.apache.spark.rdd.RDD[(Int,Int)]= ShuffledRDD[1] at reduceByKey at console:73 res:Array[(Int,Int)] = Array() Command as mentioned dse spark --master local --jars postgresql-9.4-1201.jar -i ScriptFile Please let me know what is missing in my code, as my resultant Array is empty Regards, Satish
Re: Apache Spark : Custom function for reduceByKey - missing arguments for method
Did you try it by adding the `_` after the method names to partially apply them? Scala is saying that its trying to immediately apply those methods but can't find arguments. But you instead are trying to pass them along as functions (which they aren't). Here is a link to a stackoverflow answer that should help clarify: http://stackoverflow.com/a/19720808/72401. I think there are two solutions, turn the getMax and getMin into functions by using val ala: val getMax: (DoubleDimension, DoubleDimension) = DoubleDimension = { (a,b) = if (a b) a else b } val getMin: (DoubleDimension, DoubleDimension) = DoubleDimension = { (a,b) = if (a b) a else b } or just partially apply them: maxVector = attribMap.reduceByKey( getMax _) minVector = attribMap.reduceByKey( getMin _) On Thu, Jul 9, 2015 at 9:09 PM, ameyamm ameya.malond...@outlook.com wrote: I am trying to normalize a dataset (convert values for all attributes in the vector to 0-1 range). I created an RDD of tuple (attrib-name, attrib-value) for all the records in the dataset as follows: val attribMap : RDD[(String,DoubleDimension)] = contactDataset.flatMap( contact = { List( (dage,contact.dage match { case Some(value) = DoubleDimension(value) ; case None = null }), (dancstry1,contact.dancstry1 match { case Some(value) = DoubleDimension(value) ; case None = null }), (dancstry2,contact.dancstry2 match { case Some(value) = DoubleDimension(value) ; case None = null }), (ddepart,contact.ddepart match { case Some(value) = DoubleDimension(value) ; case None = null }), (dhispanic,contact.dhispanic match { case Some(value) = DoubleDimension(value) ; case None = null }), (dhour89,contact.dhour89 match { case Some(value) = DoubleDimension(value) ; case None = null }) ) } ) Here, contactDataset is of the type RDD[Contact]. The fields of Contact class are of type Option[Long]. DoubleDimension is a simple wrapper over Double datatype. It extends the Ordered trait and implements corresponding compare method and equals method. To obtain the max and min attribute vector for computing the normalized values, maxVector = attribMap.reduceByKey( getMax ) minVector = attribMap.reduceByKey( getMin ) Implementation of getMax and getMin is as follows: def getMax( a : DoubleDimension, b : DoubleDimension ) : DoubleDimension = { if (a b) a else b } def getMin( a : DoubleDimension, b : DoubleDimension) : DoubleDimension = { if (a b) a else b } I get a compile error at calls to the methods getMax and getMin stating: [ERROR] .../com/ameyamm/input_generator/DatasetReader.scala:117: error: missing arguments for method getMax in class DatasetReader; [ERROR] follow this method with '_' if you want to treat it as a partially applied function [ERROR] maxVector = attribMap.reduceByKey( getMax ) [ERROR] .../com/ameyamm/input_generator/DatasetReader.scala:118: error: missing arguments for method getMin in class DatasetReader; [ERROR] follow this method with '_' if you want to treat it as a partially applied function [ERROR] minVector = attribMap.reduceByKey( getMin ) I am not sure what I am doing wrong here. My RDD is an RDD of Pairs and as per my knowledge, I can pass any method to it as long as the functions is of the type f : (V, V) = V. I am really stuck here. Please help. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Apache-Spark-Custom-function-for-reduceByKey-missing-arguments-for-method-tp23756.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org -- -- *Richard Marscher* Software Engineer Localytics Localytics.com http://localytics.com/ | Our Blog http://localytics.com/blog | Twitter http://twitter.com/localytics | Facebook http://facebook.com/localytics | LinkedIn http://www.linkedin.com/company/1148792?trk=tyah
Apache Spark : Custom function for reduceByKey - missing arguments for method
I am trying to normalize a dataset (convert values for all attributes in the vector to 0-1 range). I created an RDD of tuple (attrib-name, attrib-value) for all the records in the dataset as follows: val attribMap : RDD[(String,DoubleDimension)] = contactDataset.flatMap( contact = { List( (dage,contact.dage match { case Some(value) = DoubleDimension(value) ; case None = null }), (dancstry1,contact.dancstry1 match { case Some(value) = DoubleDimension(value) ; case None = null }), (dancstry2,contact.dancstry2 match { case Some(value) = DoubleDimension(value) ; case None = null }), (ddepart,contact.ddepart match { case Some(value) = DoubleDimension(value) ; case None = null }), (dhispanic,contact.dhispanic match { case Some(value) = DoubleDimension(value) ; case None = null }), (dhour89,contact.dhour89 match { case Some(value) = DoubleDimension(value) ; case None = null }) ) } ) Here, contactDataset is of the type RDD[Contact]. The fields of Contact class are of type Option[Long]. DoubleDimension is a simple wrapper over Double datatype. It extends the Ordered trait and implements corresponding compare method and equals method. To obtain the max and min attribute vector for computing the normalized values, maxVector = attribMap.reduceByKey( getMax ) minVector = attribMap.reduceByKey( getMin ) Implementation of getMax and getMin is as follows: def getMax( a : DoubleDimension, b : DoubleDimension ) : DoubleDimension = { if (a b) a else b } def getMin( a : DoubleDimension, b : DoubleDimension) : DoubleDimension = { if (a b) a else b } I get a compile error at calls to the methods getMax and getMin stating: [ERROR] .../com/ameyamm/input_generator/DatasetReader.scala:117: error: missing arguments for method getMax in class DatasetReader; [ERROR] follow this method with '_' if you want to treat it as a partially applied function [ERROR] maxVector = attribMap.reduceByKey( getMax ) [ERROR] .../com/ameyamm/input_generator/DatasetReader.scala:118: error: missing arguments for method getMin in class DatasetReader; [ERROR] follow this method with '_' if you want to treat it as a partially applied function [ERROR] minVector = attribMap.reduceByKey( getMin ) I am not sure what I am doing wrong here. My RDD is an RDD of Pairs and as per my knowledge, I can pass any method to it as long as the functions is of the type f : (V, V) = V. I am really stuck here. Please help. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Apache-Spark-Custom-function-for-reduceByKey-missing-arguments-for-method-tp23756.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: run reduceByKey on huge data in spark
I 'm using 50 servers , 35 executors per server, 140GB memory per server 35 executors *per server* sounds kind of odd to me. With 35 executors per server and server having 140gb, meaning each executor is going to get only 4gb, 4gb will be divided in to shuffle/storage memory fractions... assuming storage memory fraction=0.6 as default then 2.4gb working space for each executor, so if any of the partition size (key group size) exceeds 2.4gb there will be OOM... May be you can try with the less number of executors per server/node... -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/run-reduceByKey-on-huge-data-in-spark-tp23546p23555.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
run reduceByKey on huge data in spark
I'm running reduceByKey in spark. My program is the simplest example of spark: val counts = textFile.flatMap(line = line.split( )).repartition(2). .map(word = (word, 1)) .reduceByKey(_ + _, 1) counts.saveAsTextFile(hdfs://...) but it always run out of memory... I 'm using 50 servers , 35 executors per server, 140GB memory per server. the documents volume is : 8TB documents, 20 billion documents, 1000 billion words in total. and the words after reduce will be about 100 million. I wonder how to set the configuration of spark? I wonder what value should these parameters be? 1. the number of the maps ? 2 for example? 2. the number of the reduces ? 1 for example? 3. others parameters? -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/run-reduceByKey-on-huge-data-in-spark-tp23546.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: run reduceByKey on huge data in spark
hello, I ‘m using spark 1.4.2-SNAPSHOT I ‘m running in yarn mode:-) I wonder if the spark.shuffle.memoryFraction or spark.shuffle.manager work? how to set these parameters... 在 2015年7月1日,上午1:32,Ted Yu yuzhih...@gmail.com 写道: Which Spark release are you using ? Are you running in standalone mode ? Cheers On Tue, Jun 30, 2015 at 10:03 AM, hotdog lisend...@163.com mailto:lisend...@163.com wrote: I'm running reduceByKey in spark. My program is the simplest example of spark: val counts = textFile.flatMap(line = line.split( )).repartition(2). .map(word = (word, 1)) .reduceByKey(_ + _, 1) counts.saveAsTextFile(hdfs://...) but it always run out of memory... I 'm using 50 servers , 35 executors per server, 140GB memory per server. the documents volume is : 8TB documents, 20 billion documents, 1000 billion words in total. and the words after reduce will be about 100 million. I wonder how to set the configuration of spark? I wonder what value should these parameters be? 1. the number of the maps ? 2 for example? 2. the number of the reduces ? 1 for example? 3. others parameters? -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/run-reduceByKey-on-huge-data-in-spark-tp23546.html http://apache-spark-user-list.1001560.n3.nabble.com/run-reduceByKey-on-huge-data-in-spark-tp23546.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org mailto:user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org mailto:user-h...@spark.apache.org
Re: run reduceByKey on huge data in spark
Which Spark release are you using ? Are you running in standalone mode ? Cheers On Tue, Jun 30, 2015 at 10:03 AM, hotdog lisend...@163.com wrote: I'm running reduceByKey in spark. My program is the simplest example of spark: val counts = textFile.flatMap(line = line.split( )).repartition(2). .map(word = (word, 1)) .reduceByKey(_ + _, 1) counts.saveAsTextFile(hdfs://...) but it always run out of memory... I 'm using 50 servers , 35 executors per server, 140GB memory per server. the documents volume is : 8TB documents, 20 billion documents, 1000 billion words in total. and the words after reduce will be about 100 million. I wonder how to set the configuration of spark? I wonder what value should these parameters be? 1. the number of the maps ? 2 for example? 2. the number of the reduces ? 1 for example? 3. others parameters? -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/run-reduceByKey-on-huge-data-in-spark-tp23546.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
reduceByKey - add values to a list
Hi, I am trying to see what is the best way to reduce the values of a RDD of (key,value) pairs into (key,ListOfValues) pair. I know various ways of achieving this, but I am looking for a efficient, elegant one-liner if there is one. Example: Input RDD: (USA, California), (UK, Yorkshire), (USA, Colorado) Output RDD: (USA, [California, Colorado]), (UK, Yorkshire) Is it possible to use reduceByKey or foldByKey to achieve this, instead of groupBykey. Something equivalent to a cons operator from LISP?, so that I could just say reduceBykey(lambda x,y: (cons x y) ). May be it is more a python question than a spark question of how to create a list from 2 elements without a starting empty list? Thanks, Kannappan - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org