unsubscribe
customized comparator in groupByKey
I'd like to override the logic of comparing keys for equality in groupByKey. Kinda like how combineByKey allows you to pass in the combining logic for values, I'd like to do the same for keys. My code looks like this: val res = rdd.groupBy(myPartitioner) Here, rdd is of type RDD[(MyKey, MyValue)], so res turns out to be of type RDD[(MyKey, Seq[MyValue])] MyKey is defined as case class MyKey(field1: Int, field2: Int) and myPartitioner's getPartition(key: Any), here key is of type MyKey and the partitioning logic is an expression on both field1 and field2. I'm guessing the groupBy uses equals to compare like instances of MyKey. Currently, the equals method of MyKey uses both field1 and field2, as would be natural to its implementation. However, I'd like to have the groupBy only use field1. Any pointers on how I can go about doing it? One way is the following, but I'd like to avoid creating all those MyNewKey objects: val partitionedRdd = rdd.partitionBy(myPartitioner) val mappedRdd = partitionedRdd.mapPartitions(partition = partition.map(case (myKey, myValue) = (new MyNewKey(myKey.field1), myValue)), preservesPartitioning=true) val groupedRdd = mappedRdd.groupByKey() Thanks, Ameet
Re: question on setup() and cleanup() methods for map() and reduce()
I don't think there is a setup() or cleanup() in Spark but you can usually achieve the same using mapPartitions and having the setup code at the top of the mapPartitions and cleanup at the end. The reason why this usually works is that in Hadoop map/reduce, each map task runs over an input split. If you call mapPartitions over a HadoopRDD, each partition is effectively an input split. Ameet On Mon, Apr 28, 2014 at 9:22 PM, Parsian, Mahmoud mpars...@illumina.comwrote: In classic MapReduce/Hadoop, you may optionally define setup() and cleanup() methods. They ( setup() and cleanup() ) are called for each task, so if you have 20 mappers running, the setup/cleanup will be called for each one. What is the equivalent of these in Spark? Thanks, best regards, Mahmoud
Re: shuffle memory requirements
Turns out that my ulimit settings were too low. I bumped up and the job successfully completes. Here's what I have now: $ ulimit -u // for max user processes 81920 $ ulimit -n // for open files 81920 I was thrown off by the OutOfMemoryError into thinking it is Spark running out of memory in the shuffle stage. My previous settings were 1024 for both, and while that worked for shuffle on small jobs (10s of gigs), it'd choke on the large ones. It would be good to document these in the tuning / configuration section. Something like section 2.5 ulimit and nproc of https://hbase.apache.org/book.html 14/04/10 15:16:58 WARN DFSClient: DataStreamer Exception java.lang.OutOfMemoryError: unable to create new native thread at java.lang.Thread.start0(Native Method) at java.lang.Thread.start(Thread.java:657) at org.apache.hadoop.hdfs.DFSOutputStream$DataStreamer.initDataStreaming(DFSOutputStream.java:408) at org.apache.hadoop.hdfs.DFSOutputStream$DataStreamer.run(DFSOutputStream.java:488) 14/04/10 15:16:58 INFO BlockFetcherIterator$BasicBlockFetcherIterator: maxBytesInFlight: 50331648, minRequest: 10066329 14/04/10 15:16:58 INFO BlockFetcherIterator$BasicBlockFetcherIterator: maxBytesInFlight: 50331648, minRequest: 10066329 14/04/10 15:16:58 INFO BlockFetcherIterator$BasicBlockFetcherIterator: Getting 0 non-zero-bytes blocks out of 7773 blocks 14/04/10 15:16:58 INFO BlockFetcherIterator$BasicBlockFetcherIterator: Started 0 remote gets in 1 ms 14/04/10 15:16:58 INFO Ingest: Working on partition 6215 with rep = (3, 3) 14/04/10 15:16:58 ERROR Executor: Exception in task ID 21756 java.io.IOException: DFSOutputStream is closed at org.apache.hadoop.hdfs.DFSOutputStream.isClosed(DFSOutputStream.java:1265) at org.apache.hadoop.hdfs.DFSOutputStream.waitForAckedSeqno(DFSOutputStream.java:1715) at org.apache.hadoop.hdfs.DFSOutputStream.flushInternal(DFSOutputStream.java:1694) at org.apache.hadoop.hdfs.DFSOutputStream.close(DFSOutputStream.java:1778) at org.apache.hadoop.fs.FSDataOutputStream$PositionCache.close(FSDataOutputStream.java:66) at org.apache.hadoop.fs.FSDataOutputStream.close(FSDataOutputStream.java:99) at org.apache.hadoop.io.SequenceFile$Writer.close(SequenceFile.java:1240) at org.apache.hadoop.io.MapFile$Writer.close(MapFile.java:300) at geotrellis.spark.cmd.Ingest$$anonfun$4.apply(Ingest.scala:189) at geotrellis.spark.cmd.Ingest$$anonfun$4.apply(Ingest.scala:176) at org.apache.spark.rdd.RDD$$anonfun$2.apply(RDD.scala:466) at org.apache.spark.rdd.RDD$$anonfun$2.apply(RDD.scala:466) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:34) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:241) at org.apache.spark.rdd.RDD.iterator(RDD.scala:232) at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:109) at org.apache.spark.scheduler.Task.run(Task.scala:53) at org.apache.spark.executor.Executor$TaskRunner$$anonfun$run$1.apply$mcV$sp(Executor.scala:211) at org.apache.spark.deploy.SparkHadoopUtil$$anon$1.run(SparkHadoopUtil.scala:42) at org.apache.spark.deploy.SparkHadoopUtil$$anon$1.run(SparkHadoopUtil.scala:41) at java.security.AccessController.doPrivileged(Native Method) at javax.security.auth.Subject.doAs(Subject.java:416) at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1408) at org.apache.spark.deploy.SparkHadoopUtil.runAsUser(SparkHadoopUtil.scala:41) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:176) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1146) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) at java.lang.Thread.run(Thread.java:679) Thanks, Ameet On Wed, Apr 9, 2014 at 10:48 PM, Ameet Kini ameetk...@gmail.com wrote: val hrdd = sc.hadoopRDD(..) val res = hrdd.partitionBy(myCustomPartitioner).reduceKey(..).mapPartitionsWithIndex( some code to save those partitions ) I'm getting OutOfMemoryErrors on the read side of partitionBy shuffle. My custom partitioner generates over 20,000 partitions, so there are 20,000 tasks reading the shuffle files. On problems with low partitions (~ 1000), the job completes successfully. On my cluster, each worker gets 24 GB (SPARK_WORKER_MEMORY = 24 GB) and each executor gets 21 GB (SPARK_MEM = 21 GB). I have tried assigning 6 cores per executor and brought it down to 3, and I still get OutOfMemoryErrors at 20,000 partitions. I have spark.shuffle.memoryFraction=0.5 and spark.storage.memoryFraction=0.2 since I am not caching any RDDs. Do those config params look reasonable for my shuffle size ? I'm not sure what to increase - shuffle.memoryFraction or the memory that the reduce tasks get. The latter I am guessing is whatever is left after giving storage.memoryFraction and shuffle.memoryFraction. Thanks, Ameet
Re: shuffle memory requirements
A typo - I mean't section 2.1.2.5 ulimit and nproc of https://hbase.apache.org/book.html Ameet On Fri, Apr 11, 2014 at 10:32 AM, Ameet Kini ameetk...@gmail.com wrote: Turns out that my ulimit settings were too low. I bumped up and the job successfully completes. Here's what I have now: $ ulimit -u // for max user processes 81920 $ ulimit -n // for open files 81920 I was thrown off by the OutOfMemoryError into thinking it is Spark running out of memory in the shuffle stage. My previous settings were 1024 for both, and while that worked for shuffle on small jobs (10s of gigs), it'd choke on the large ones. It would be good to document these in the tuning / configuration section. Something like section 2.5 ulimit and nproc of https://hbase.apache.org/book.html 14/04/10 15:16:58 WARN DFSClient: DataStreamer Exception java.lang.OutOfMemoryError: unable to create new native thread at java.lang.Thread.start0(Native Method) at java.lang.Thread.start(Thread.java:657) at org.apache.hadoop.hdfs.DFSOutputStream$DataStreamer.initDataStreaming(DFSOutputStream.java:408) at org.apache.hadoop.hdfs.DFSOutputStream$DataStreamer.run(DFSOutputStream.java:488) 14/04/10 15:16:58 INFO BlockFetcherIterator$BasicBlockFetcherIterator: maxBytesInFlight: 50331648, minRequest: 10066329 14/04/10 15:16:58 INFO BlockFetcherIterator$BasicBlockFetcherIterator: maxBytesInFlight: 50331648, minRequest: 10066329 14/04/10 15:16:58 INFO BlockFetcherIterator$BasicBlockFetcherIterator: Getting 0 non-zero-bytes blocks out of 7773 blocks 14/04/10 15:16:58 INFO BlockFetcherIterator$BasicBlockFetcherIterator: Started 0 remote gets in 1 ms 14/04/10 15:16:58 INFO Ingest: Working on partition 6215 with rep = (3, 3) 14/04/10 15:16:58 ERROR Executor: Exception in task ID 21756 java.io.IOException: DFSOutputStream is closed at org.apache.hadoop.hdfs.DFSOutputStream.isClosed(DFSOutputStream.java:1265) at org.apache.hadoop.hdfs.DFSOutputStream.waitForAckedSeqno(DFSOutputStream.java:1715) at org.apache.hadoop.hdfs.DFSOutputStream.flushInternal(DFSOutputStream.java:1694) at org.apache.hadoop.hdfs.DFSOutputStream.close(DFSOutputStream.java:1778) at org.apache.hadoop.fs.FSDataOutputStream$PositionCache.close(FSDataOutputStream.java:66) at org.apache.hadoop.fs.FSDataOutputStream.close(FSDataOutputStream.java:99) at org.apache.hadoop.io.SequenceFile$Writer.close(SequenceFile.java:1240) at org.apache.hadoop.io.MapFile$Writer.close(MapFile.java:300) at geotrellis.spark.cmd.Ingest$$anonfun$4.apply(Ingest.scala:189) at geotrellis.spark.cmd.Ingest$$anonfun$4.apply(Ingest.scala:176) at org.apache.spark.rdd.RDD$$anonfun$2.apply(RDD.scala:466) at org.apache.spark.rdd.RDD$$anonfun$2.apply(RDD.scala:466) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:34) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:241) at org.apache.spark.rdd.RDD.iterator(RDD.scala:232) at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:109) at org.apache.spark.scheduler.Task.run(Task.scala:53) at org.apache.spark.executor.Executor$TaskRunner$$anonfun$run$1.apply$mcV$sp(Executor.scala:211) at org.apache.spark.deploy.SparkHadoopUtil$$anon$1.run(SparkHadoopUtil.scala:42) at org.apache.spark.deploy.SparkHadoopUtil$$anon$1.run(SparkHadoopUtil.scala:41) at java.security.AccessController.doPrivileged(Native Method) at javax.security.auth.Subject.doAs(Subject.java:416) at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1408) at org.apache.spark.deploy.SparkHadoopUtil.runAsUser(SparkHadoopUtil.scala:41) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:176) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1146) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) at java.lang.Thread.run(Thread.java:679) Thanks, Ameet On Wed, Apr 9, 2014 at 10:48 PM, Ameet Kini ameetk...@gmail.com wrote: val hrdd = sc.hadoopRDD(..) val res = hrdd.partitionBy(myCustomPartitioner).reduceKey(..).mapPartitionsWithIndex( some code to save those partitions ) I'm getting OutOfMemoryErrors on the read side of partitionBy shuffle. My custom partitioner generates over 20,000 partitions, so there are 20,000 tasks reading the shuffle files. On problems with low partitions (~ 1000), the job completes successfully. On my cluster, each worker gets 24 GB (SPARK_WORKER_MEMORY = 24 GB) and each executor gets 21 GB (SPARK_MEM = 21 GB). I have tried assigning 6 cores per executor and brought it down to 3, and I still get OutOfMemoryErrors at 20,000 partitions. I have spark.shuffle.memoryFraction=0.5 and spark.storage.memoryFraction=0.2 since I am not caching any RDDs. Do those config params look reasonable for my shuffle size ? I'm not sure what
Re: sort order after reduceByKey / groupByKey
I saw that but I don't need a global sort, only intra-partition sort. Ameet On Thu, Mar 20, 2014 at 3:26 PM, Mayur Rustagi mayur.rust...@gmail.comwrote: Thats expected. I think sortByKey is option too probably a better one. Mayur Rustagi Ph: +1 (760) 203 3257 http://www.sigmoidanalytics.com @mayur_rustagi https://twitter.com/mayur_rustagi On Thu, Mar 20, 2014 at 3:20 PM, Ameet Kini ameetk...@gmail.com wrote: val rdd2 = rdd.partitionBy(my partitioner).reduceByKey(some function) I see that rdd2's partitions are not internally sorted. Can someone confirm that this is expected behavior? And if so, the only way to get partitions internally sorted is to follow it with something like this val rdd2 = rdd.partitionBy(my partitioner).reduceByKey(some function).mapPartitions(p = sort(p)) Thanks, Ameet