unsubscribe

2024-02-24 Thread Ameet Kini



customized comparator in groupByKey

2014-05-06 Thread Ameet Kini
I'd like to override the logic of comparing keys for equality in
groupByKey. Kinda like how combineByKey allows you to pass in the combining
logic for values, I'd like to do the same for keys.

My code looks like this:
val res = rdd.groupBy(myPartitioner)
Here, rdd is of type RDD[(MyKey, MyValue)], so res turns out to be of type
RDD[(MyKey, Seq[MyValue])]

MyKey is defined as case class MyKey(field1: Int, field2: Int)
and myPartitioner's getPartition(key: Any), here key is of type MyKey and
the partitioning logic is an expression on both field1 and field2.

I'm guessing the groupBy uses equals to compare like instances of MyKey.
Currently, the equals method of MyKey uses both field1 and field2, as
would be natural to its implementation. However, I'd like to have the
groupBy only use field1. Any pointers on how I can go about doing it?

One way is the following, but I'd like to avoid creating all those MyNewKey
objects:
val partitionedRdd = rdd.partitionBy(myPartitioner)
val mappedRdd = partitionedRdd.mapPartitions(partition =
partition.map(case (myKey, myValue) = (new MyNewKey(myKey.field1),
myValue)),
preservesPartitioning=true)
val groupedRdd = mappedRdd.groupByKey()


Thanks,
Ameet


Re: question on setup() and cleanup() methods for map() and reduce()

2014-04-28 Thread Ameet Kini
I don't think there is a setup() or cleanup() in Spark but you can usually
achieve the same using mapPartitions and having the setup code at the top
of the mapPartitions and cleanup at the end.

The reason why this usually works is that in Hadoop map/reduce, each map
task runs over an input split. If you call mapPartitions over a HadoopRDD,
each partition is effectively an input split.

Ameet


On Mon, Apr 28, 2014 at 9:22 PM, Parsian, Mahmoud mpars...@illumina.comwrote:

  In classic MapReduce/Hadoop, you may optionally define setup() and
 cleanup() methods.
  They ( setup() and cleanup() ) are called for each task, so if you have
 20 mappers running, the setup/cleanup will be called for each one.
 What is the equivalent of these in Spark?

  Thanks,
   best regards,
 Mahmoud





Re: shuffle memory requirements

2014-04-11 Thread Ameet Kini
Turns out that my ulimit settings were too low. I bumped  up and the job
successfully completes. Here's what I have now:

$ ulimit -u   // for max user processes
81920
$ ulimit -n  // for open files
81920

I was thrown off by the OutOfMemoryError into thinking it is Spark running
out of memory in the shuffle stage. My previous settings were 1024 for
both, and while that worked for shuffle on small jobs (10s of gigs), it'd
choke on the large ones. It would be good to document these in the tuning /
configuration section. Something like section 2.5 ulimit and nproc of
https://hbase.apache.org/book.html


14/04/10 15:16:58 WARN DFSClient: DataStreamer Exception
java.lang.OutOfMemoryError: unable to create new native thread
at java.lang.Thread.start0(Native Method)
at java.lang.Thread.start(Thread.java:657)
at
org.apache.hadoop.hdfs.DFSOutputStream$DataStreamer.initDataStreaming(DFSOutputStream.java:408)
at
org.apache.hadoop.hdfs.DFSOutputStream$DataStreamer.run(DFSOutputStream.java:488)
14/04/10 15:16:58 INFO BlockFetcherIterator$BasicBlockFetcherIterator:
maxBytesInFlight: 50331648, minRequest: 10066329
14/04/10 15:16:58 INFO BlockFetcherIterator$BasicBlockFetcherIterator:
maxBytesInFlight: 50331648, minRequest: 10066329
14/04/10 15:16:58 INFO BlockFetcherIterator$BasicBlockFetcherIterator:
Getting 0 non-zero-bytes blocks out of 7773 blocks
14/04/10 15:16:58 INFO BlockFetcherIterator$BasicBlockFetcherIterator:
Started 0 remote gets in  1 ms
14/04/10 15:16:58 INFO Ingest: Working on partition 6215 with rep = (3, 3)
14/04/10 15:16:58 ERROR Executor: Exception in task ID 21756
java.io.IOException: DFSOutputStream is closed
at
org.apache.hadoop.hdfs.DFSOutputStream.isClosed(DFSOutputStream.java:1265)
at
org.apache.hadoop.hdfs.DFSOutputStream.waitForAckedSeqno(DFSOutputStream.java:1715)
at
org.apache.hadoop.hdfs.DFSOutputStream.flushInternal(DFSOutputStream.java:1694)
at
org.apache.hadoop.hdfs.DFSOutputStream.close(DFSOutputStream.java:1778)
at
org.apache.hadoop.fs.FSDataOutputStream$PositionCache.close(FSDataOutputStream.java:66)
at
org.apache.hadoop.fs.FSDataOutputStream.close(FSDataOutputStream.java:99)
at
org.apache.hadoop.io.SequenceFile$Writer.close(SequenceFile.java:1240)
at org.apache.hadoop.io.MapFile$Writer.close(MapFile.java:300)
at geotrellis.spark.cmd.Ingest$$anonfun$4.apply(Ingest.scala:189)
at geotrellis.spark.cmd.Ingest$$anonfun$4.apply(Ingest.scala:176)
at org.apache.spark.rdd.RDD$$anonfun$2.apply(RDD.scala:466)
at org.apache.spark.rdd.RDD$$anonfun$2.apply(RDD.scala:466)
at
org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:34)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:241)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:232)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:109)
at org.apache.spark.scheduler.Task.run(Task.scala:53)
at
org.apache.spark.executor.Executor$TaskRunner$$anonfun$run$1.apply$mcV$sp(Executor.scala:211)
at
org.apache.spark.deploy.SparkHadoopUtil$$anon$1.run(SparkHadoopUtil.scala:42)
at
org.apache.spark.deploy.SparkHadoopUtil$$anon$1.run(SparkHadoopUtil.scala:41)
at java.security.AccessController.doPrivileged(Native Method)
at javax.security.auth.Subject.doAs(Subject.java:416)
at
org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1408)
at
org.apache.spark.deploy.SparkHadoopUtil.runAsUser(SparkHadoopUtil.scala:41)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:176)
at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1146)
at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
at java.lang.Thread.run(Thread.java:679)

Thanks,
Ameet


On Wed, Apr 9, 2014 at 10:48 PM, Ameet Kini ameetk...@gmail.com wrote:

 val hrdd = sc.hadoopRDD(..)
 val res =
 hrdd.partitionBy(myCustomPartitioner).reduceKey(..).mapPartitionsWithIndex(
 some code to save those partitions )

 I'm getting OutOfMemoryErrors on the read side of partitionBy shuffle. My
 custom partitioner generates over 20,000 partitions, so there are 20,000
 tasks reading the shuffle files. On problems with low partitions (~ 1000),
 the job completes successfully.

 On my cluster, each worker gets 24 GB (SPARK_WORKER_MEMORY = 24 GB) and
 each executor gets 21 GB (SPARK_MEM = 21 GB). I have tried assigning 6
 cores per executor and brought it down to 3, and I still get
 OutOfMemoryErrors at 20,000 partitions. I have
 spark.shuffle.memoryFraction=0.5 and spark.storage.memoryFraction=0.2 since
 I am not caching any RDDs.

 Do those config params look reasonable for my shuffle size ? I'm not sure
 what to increase - shuffle.memoryFraction or the memory that the reduce
 tasks get. The latter I am guessing is whatever is left after giving
 storage.memoryFraction and shuffle.memoryFraction.

 Thanks,
 Ameet





Re: shuffle memory requirements

2014-04-11 Thread Ameet Kini
A typo - I mean't section 2.1.2.5 ulimit and nproc of
https://hbase.apache.org/book.html

Ameet


On Fri, Apr 11, 2014 at 10:32 AM, Ameet Kini ameetk...@gmail.com wrote:


 Turns out that my ulimit settings were too low. I bumped  up and the job
 successfully completes. Here's what I have now:

 $ ulimit -u   // for max user processes
 81920
 $ ulimit -n  // for open files
 81920

 I was thrown off by the OutOfMemoryError into thinking it is Spark running
 out of memory in the shuffle stage. My previous settings were 1024 for
 both, and while that worked for shuffle on small jobs (10s of gigs), it'd
 choke on the large ones. It would be good to document these in the tuning /
 configuration section. Something like section 2.5 ulimit and nproc of
 https://hbase.apache.org/book.html


 14/04/10 15:16:58 WARN DFSClient: DataStreamer Exception
 java.lang.OutOfMemoryError: unable to create new native thread
 at java.lang.Thread.start0(Native Method)
 at java.lang.Thread.start(Thread.java:657)
 at
 org.apache.hadoop.hdfs.DFSOutputStream$DataStreamer.initDataStreaming(DFSOutputStream.java:408)
 at
 org.apache.hadoop.hdfs.DFSOutputStream$DataStreamer.run(DFSOutputStream.java:488)
 14/04/10 15:16:58 INFO BlockFetcherIterator$BasicBlockFetcherIterator:
 maxBytesInFlight: 50331648, minRequest: 10066329
 14/04/10 15:16:58 INFO BlockFetcherIterator$BasicBlockFetcherIterator:
 maxBytesInFlight: 50331648, minRequest: 10066329
 14/04/10 15:16:58 INFO BlockFetcherIterator$BasicBlockFetcherIterator:
 Getting 0 non-zero-bytes blocks out of 7773 blocks
 14/04/10 15:16:58 INFO BlockFetcherIterator$BasicBlockFetcherIterator:
 Started 0 remote gets in  1 ms
 14/04/10 15:16:58 INFO Ingest: Working on partition 6215 with rep = (3, 3)
 14/04/10 15:16:58 ERROR Executor: Exception in task ID 21756
 java.io.IOException: DFSOutputStream is closed
 at
 org.apache.hadoop.hdfs.DFSOutputStream.isClosed(DFSOutputStream.java:1265)
 at
 org.apache.hadoop.hdfs.DFSOutputStream.waitForAckedSeqno(DFSOutputStream.java:1715)
 at
 org.apache.hadoop.hdfs.DFSOutputStream.flushInternal(DFSOutputStream.java:1694)
 at
 org.apache.hadoop.hdfs.DFSOutputStream.close(DFSOutputStream.java:1778)
 at
 org.apache.hadoop.fs.FSDataOutputStream$PositionCache.close(FSDataOutputStream.java:66)
 at
 org.apache.hadoop.fs.FSDataOutputStream.close(FSDataOutputStream.java:99)
 at
 org.apache.hadoop.io.SequenceFile$Writer.close(SequenceFile.java:1240)
 at org.apache.hadoop.io.MapFile$Writer.close(MapFile.java:300)
 at geotrellis.spark.cmd.Ingest$$anonfun$4.apply(Ingest.scala:189)
 at geotrellis.spark.cmd.Ingest$$anonfun$4.apply(Ingest.scala:176)
 at org.apache.spark.rdd.RDD$$anonfun$2.apply(RDD.scala:466)
 at org.apache.spark.rdd.RDD$$anonfun$2.apply(RDD.scala:466)
 at
 org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:34)
 at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:241)
 at org.apache.spark.rdd.RDD.iterator(RDD.scala:232)
 at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:109)
 at org.apache.spark.scheduler.Task.run(Task.scala:53)
 at
 org.apache.spark.executor.Executor$TaskRunner$$anonfun$run$1.apply$mcV$sp(Executor.scala:211)
 at
 org.apache.spark.deploy.SparkHadoopUtil$$anon$1.run(SparkHadoopUtil.scala:42)
 at
 org.apache.spark.deploy.SparkHadoopUtil$$anon$1.run(SparkHadoopUtil.scala:41)
 at java.security.AccessController.doPrivileged(Native Method)
 at javax.security.auth.Subject.doAs(Subject.java:416)
 at
 org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1408)
 at
 org.apache.spark.deploy.SparkHadoopUtil.runAsUser(SparkHadoopUtil.scala:41)
 at
 org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:176)
 at
 java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1146)
 at
 java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
 at java.lang.Thread.run(Thread.java:679)

 Thanks,
 Ameet


 On Wed, Apr 9, 2014 at 10:48 PM, Ameet Kini ameetk...@gmail.com wrote:

 val hrdd = sc.hadoopRDD(..)
 val res =
 hrdd.partitionBy(myCustomPartitioner).reduceKey(..).mapPartitionsWithIndex(
 some code to save those partitions )

 I'm getting OutOfMemoryErrors on the read side of partitionBy shuffle. My
 custom partitioner generates over 20,000 partitions, so there are 20,000
 tasks reading the shuffle files. On problems with low partitions (~ 1000),
 the job completes successfully.

 On my cluster, each worker gets 24 GB (SPARK_WORKER_MEMORY = 24 GB) and
 each executor gets 21 GB (SPARK_MEM = 21 GB). I have tried assigning 6
 cores per executor and brought it down to 3, and I still get
 OutOfMemoryErrors at 20,000 partitions. I have
 spark.shuffle.memoryFraction=0.5 and spark.storage.memoryFraction=0.2 since
 I am not caching any RDDs.

 Do those config params look reasonable for my shuffle size ? I'm not sure
 what

Re: sort order after reduceByKey / groupByKey

2014-03-20 Thread Ameet Kini
I saw that but I don't need a global sort, only intra-partition sort.

Ameet


On Thu, Mar 20, 2014 at 3:26 PM, Mayur Rustagi mayur.rust...@gmail.comwrote:

 Thats expected. I think sortByKey is option too  probably a better one.

 Mayur Rustagi
 Ph: +1 (760) 203 3257
 http://www.sigmoidanalytics.com
  @mayur_rustagi https://twitter.com/mayur_rustagi



 On Thu, Mar 20, 2014 at 3:20 PM, Ameet Kini ameetk...@gmail.com wrote:


 val rdd2 = rdd.partitionBy(my partitioner).reduceByKey(some function)

 I see that rdd2's partitions are not internally sorted. Can someone
 confirm that this is expected behavior? And if so, the only way to get
 partitions internally sorted is to follow it with something like this

 val rdd2 = rdd.partitionBy(my partitioner).reduceByKey(some
 function).mapPartitions(p = sort(p))

 Thanks,
 Ameet