Re: Support for skewed joins in Spark

2015-03-12 Thread Soila Pertet Kavulya
Thanks Shixiong,

I'll try out your PR. Do you know what the status of the PR is? Are
there any plans to incorporate this change to the
DataFrames/SchemaRDDs in Spark 1.3?

Soila

On Thu, Mar 12, 2015 at 7:52 PM, Shixiong Zhu zsxw...@gmail.com wrote:
 I sent a PR to add skewed join last year:
 https://github.com/apache/spark/pull/3505
 However, it does not split a key to multiple partitions. Instead, if a key
 has too many values that can not be fit in to memory, it will store the
 values into the disk temporarily and use disk files to do the join.

 Best Regards,

 Shixiong Zhu

 2015-03-13 9:37 GMT+08:00 Soila Pertet Kavulya skavu...@gmail.com:

 Does Spark support skewed joins similar to Pig which distributes large
 keys over multiple partitions? I tried using the RangePartitioner but
 I am still experiencing failures because some keys are too large to
 fit in a single partition. I cannot use broadcast variables to
 work-around this because both RDDs are too large to fit in driver
 memory.

 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org



-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: NegativeArraySizeException when doing joins on skewed data

2015-03-12 Thread Soila Pertet Kavulya
Hi Tristan,

Did upgrading to Kryo3 help?

Thanks,

Soila

On Sun, Mar 1, 2015 at 2:48 PM, Tristan Blakers tris...@blackfrog.org wrote:
 Yeah I implemented the same solution. It seems to kick in around the 4B
 mark, but looking at the log I suspect it’s probably a function of the
 number of unique objects more than anything. I definitely don’t have more
 than 2B unique objects.


 Will try the same test on Kryo3 and see if it goes away.

 T


 On 27 February 2015 at 06:21, Soila Pertet Kavulya skavu...@gmail.com
 wrote:

 Thanks Tristan,

 I ran into a similar issue with broadcast variables. I worked around
 it by estimating the size of the object I want to broadcast, splitting
 it up into chunks that were less than 2G, then doing multiple
 broadcasts. This approach worked pretty well for broadcast variables
 less than 10GB on our system. However, for larger variables the spills
 to disk made progress painfully slow so we need to do regular joins.

 Do you know if there are any efforts to get Kryo to support objects
 larger than a couple of GBs.

 Soila

 On Wed, Feb 25, 2015 at 11:06 PM, Tristan Blakers tris...@blackfrog.org
 wrote:
  I get the same exception simply by doing a large broadcast of about 6GB.
  Note that I’m broadcasting a small number (~3m) of fat objects. There’s
  plenty of free RAM. This and related kryo exceptions seem to crop-up
  whenever an object graph of more than a couple of GB gets passed around.
 
  at
 
  com.esotericsoftware.kryo.serializers.FieldSerializer$ObjectField.write(FieldSerializer.java:585)
 
  at
 
  com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:213)
 
  at com.esotericsoftware.kryo.Kryo.writeObject(Kryo.java:501)
 
  at
 
  com.esotericsoftware.kryo.serializers.FieldSerializer$ObjectField.write(FieldSerializer.java:564)
 
  at
 
  com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:213)
 
  at com.esotericsoftware.kryo.Kryo.writeObject(Kryo.java:501)
 
  at
 
  com.esotericsoftware.kryo.serializers.FieldSerializer$ObjectField.write(FieldSerializer.java:564)
 
  at
 
  com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:213)
 
  at
  com.esotericsoftware.kryo.Kryo.writeClassAndObject(Kryo.java:568)
 
  at
 
  com.esotericsoftware.kryo.serializers.MapSerializer.write(MapSerializer.java:86)
 
  at
 
  com.esotericsoftware.kryo.serializers.MapSerializer.write(MapSerializer.java:17)
 
  at
  com.esotericsoftware.kryo.Kryo.writeClassAndObject(Kryo.java:568)
 
  at
 
  org.apache.spark.serializer.KryoSerializationStream.writeObject(KryoSerializer.scala:128)
 
  at
 
  org.apache.spark.broadcast.TorrentBroadcast$.blockifyObject(TorrentBroadcast.scala:202)
 
  at
 
  org.apache.spark.broadcast.TorrentBroadcast.writeBlocks(TorrentBroadcast.scala:101)
 
  at
 
  org.apache.spark.broadcast.TorrentBroadcast.init(TorrentBroadcast.scala:84)
 
  at
 
  org.apache.spark.broadcast.TorrentBroadcastFactory.newBroadcast(TorrentBroadcastFactory.scala:34)
 
  at
 
  org.apache.spark.broadcast.TorrentBroadcastFactory.newBroadcast(TorrentBroadcastFactory.scala:29)
 
  at
 
  org.apache.spark.broadcast.BroadcastManager.newBroadcast(BroadcastManager.scala:62)
 
  at
  org.apache.spark.SparkContext.broadcast(SparkContext.scala:945)
 
  at
 
  org.apache.spark.api.java.JavaSparkContext.broadcast(JavaSparkContext.scala:623)
 
 
  Caused by: java.lang.NegativeArraySizeException
 
  at
 
  com.esotericsoftware.kryo.util.IdentityObjectIntMap.resize(IdentityObjectIntMap.java:409)
 
  at
 
  com.esotericsoftware.kryo.util.IdentityObjectIntMap.putStash(IdentityObjectIntMap.java:227)
 
  at
 
  com.esotericsoftware.kryo.util.IdentityObjectIntMap.push(IdentityObjectIntMap.java:221)
 
  at
 
  com.esotericsoftware.kryo.util.IdentityObjectIntMap.put(IdentityObjectIntMap.java:117)
 
  at
 
  com.esotericsoftware.kryo.util.IdentityObjectIntMap.putStash(IdentityObjectIntMap.java:228)
 
  at
 
  com.esotericsoftware.kryo.util.IdentityObjectIntMap.push(IdentityObjectIntMap.java:221)
 
  at
 
  com.esotericsoftware.kryo.util.IdentityObjectIntMap.put(IdentityObjectIntMap.java:117)
 
  at
 
  com.esotericsoftware.kryo.util.MapReferenceResolver.addWrittenObject(MapReferenceResolver.java:23)
 
  at
  com.esotericsoftware.kryo.Kryo.writeReferenceOrNull(Kryo.java:598)
 
  at
  com.esotericsoftware.kryo.Kryo.writeObjectOrNull(Kryo.java:539)
 
  at
 
  com.esotericsoftware.kryo.serializers.FieldSerializer$ObjectField.write(FieldSerializer.java:570)
 
  ... 23 more
 
 
 
 
  On 26 February 2015 at 03:49, soila skavu...@gmail.com wrote:
 
  I have been running into NegativeArraySizeException's when doing joins
  on
  data with very skewed key distributions in Spark

Support for skewed joins in Spark

2015-03-12 Thread Soila Pertet Kavulya
Does Spark support skewed joins similar to Pig which distributes large
keys over multiple partitions? I tried using the RangePartitioner but
I am still experiencing failures because some keys are too large to
fit in a single partition. I cannot use broadcast variables to
work-around this because both RDDs are too large to fit in driver
memory.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



NegativeArraySizeException when doing joins on skewed data

2015-02-25 Thread soila
I have been running into NegativeArraySizeException's when doing joins on
data with very skewed key distributions in Spark 1.2.0. I found a previous
post that mentioned that this exception arises when the size of the blocks
spilled during the shuffle exceeds 2GB. The post recommended increasing the
number of partitions. I tried increasing the number of partitions, and using
the RangePartitioner instead of the HashPartitioner but still encountered
the problem.

Does Spark support skewed joins similar to Pig?


com.esotericsoftware.kryo.KryoException:
java.lang.NegativeArraySizeException
Serialization trace:
otherElements (org.apache.spark.util.collection.CompactBuffer)
at
com.esotericsoftware.kryo.serializers.FieldSerializer$ObjectField.write(FieldSerializer.java:585)
at
com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:213)
at com.esotericsoftware.kryo.Kryo.writeClassAndObject(Kryo.java:568)
at
com.esotericsoftware.kryo.serializers.DefaultArraySerializers$ObjectArraySerializer.write(DefaultArraySerializers.java:318)
at
com.esotericsoftware.kryo.serializers.DefaultArraySerializers$ObjectArraySerializer.write(DefaultArraySerializers.java:293)
at com.esotericsoftware.kryo.Kryo.writeObject(Kryo.java:501)
at
com.esotericsoftware.kryo.serializers.FieldSerializer$ObjectField.write(FieldSerializer.java:564)
at
com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:213)
at com.esotericsoftware.kryo.Kryo.writeClassAndObject(Kryo.java:568)
at
com.esotericsoftware.kryo.serializers.DefaultArraySerializers$ObjectArraySerializer.write(DefaultArraySerializers.java:318)
at
com.esotericsoftware.kryo.serializers.DefaultArraySerializers$ObjectArraySerializer.write(DefaultArraySerializers.java:293)
at com.esotericsoftware.kryo.Kryo.writeClassAndObject(Kryo.java:568)
at
com.twitter.chill.Tuple2Serializer.write(TupleSerializers.scala:37)
at
com.twitter.chill.Tuple2Serializer.write(TupleSerializers.scala:33)
at com.esotericsoftware.kryo.Kryo.writeClassAndObject(Kryo.java:568)
at
org.apache.spark.serializer.KryoSerializationStream.writeObject(KryoSerializer.scala:128)
at
org.apache.spark.storage.DiskBlockObjectWriter.write(BlockObjectWriter.scala:195)
at
org.apache.spark.util.collection.ExternalAppendOnlyMap.spill(ExternalAppendOnlyMap.scala:176)
at
org.apache.spark.util.collection.ExternalAppendOnlyMap.spill(ExternalAppendOnlyMap.scala:63)
at
org.apache.spark.util.collection.Spillable$class.maybeSpill(Spillable.scala:87)
at
org.apache.spark.util.collection.ExternalAppendOnlyMap.maybeSpill(ExternalAppendOnlyMap.scala:63)
at
org.apache.spark.util.collection.ExternalAppendOnlyMap.insertAll(ExternalAppendOnlyMap.scala:127)
at
org.apache.spark.rdd.CoGroupedRDD$$anonfun$compute$5.apply(CoGroupedRDD.scala:160)
at
org.apache.spark.rdd.CoGroupedRDD$$anonfun$compute$5.apply(CoGroupedRDD.scala:159)
at
scala.collection.TraversableLike$WithFilter$$anonfun$foreach$1.apply(TraversableLike.scala:772)
at
scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
at
scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
at
scala.collection.TraversableLike$WithFilter.foreach(TraversableLike.scala:771)
at org.apache.spark.rdd.CoGroupedRDD.compute(CoGroupedRDD.scala:159)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:263)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:230)
at
org.apache.spark.rdd.MappedValuesRDD.compute(MappedValuesRDD.scala:31)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:263)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:230)
at
org.apache.spark.rdd.FlatMappedValuesRDD.compute(FlatMappedValuesRDD.scala:31)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:263)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:230)
at org.apache.spark.rdd.MappedRDD.compute(MappedRDD.scala:31)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:263)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:230)
at
org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:68)
at
org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41)
at org.apache.spark.scheduler.Task.run(Task.scala:56)
at
org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:196)
at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
at java.lang.Thread.run(Thread.java:745)
Caused by: java.lang.NegativeArraySizeException
at
com.esotericsoftware.kryo.util.IdentityObjectIntMap.resize(IdentityObjectIntMap.java:409)
at

Size exceeds Integer.MAX_VALUE exception when broadcasting large variable

2015-02-13 Thread soila
I am trying to broadcast a large 5GB variable using Spark 1.2.0. I get the
following exception when the size of the broadcast variable exceeds 2GB. Any
ideas on how I can resolve this issue?

java.lang.IllegalArgumentException: Size exceeds Integer.MAX_VALUE
at sun.nio.ch.FileChannelImpl.map(FileChannelImpl.java:829)
at org.apache.spark.storage.DiskStore.getBytes(DiskStore.scala:123)
at org.apache.spark.storage.DiskStore.getBytes(DiskStore.scala:132)
at
org.apache.spark.storage.DiskStore.putIterator(DiskStore.scala:99)
at
org.apache.spark.storage.MemoryStore.putIterator(MemoryStore.scala:147)
at
org.apache.spark.storage.MemoryStore.putIterator(MemoryStore.scala:114)
at
org.apache.spark.storage.BlockManager.doPut(BlockManager.scala:787)
at
org.apache.spark.storage.BlockManager.putIterator(BlockManager.scala:638)
at
org.apache.spark.storage.BlockManager.putSingle(BlockManager.scala:992)
at
org.apache.spark.broadcast.TorrentBroadcast.writeBlocks(TorrentBroadcast.scala:98)
at
org.apache.spark.broadcast.TorrentBroadcast.init(TorrentBroadcast.scala:84)
at
org.apache.spark.broadcast.TorrentBroadcastFactory.newBroadcast(TorrentBroadcastFactory.scala:34)
at
org.apache.spark.broadcast.TorrentBroadcastFactory.newBroadcast(TorrentBroadcastFactory.scala:29)
at
org.apache.spark.broadcast.BroadcastManager.newBroadcast(BroadcastManager.scala:62)
at org.apache.spark.SparkContext.broadcast(SparkContext.scala:945)




--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Size-exceeds-Integer-MAX-VALUE-exception-when-broadcasting-large-variable-tp21648.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Size exceeds Integer.MAX_VALUE exception when broadcasting large variable

2015-02-13 Thread Soila Pertet Kavulya
Thanks Sean and Imran,

I'll try splitting the broadcast variable into smaller ones.

I had tried a regular join but it was failing due to high garbage
collection overhead during the shuffle. One of the RDDs is very large
and has a skewed distribution where a handful of keys account for 90%
of the data. Do you have any pointers on how to handle skewed key
distributions during a join.

Soila

On Fri, Feb 13, 2015 at 10:49 AM, Imran Rashid iras...@cloudera.com wrote:
 unfortunately this is a known issue:
 https://issues.apache.org/jira/browse/SPARK-1476

 as Sean suggested, you need to think of some other way of doing the same
 thing, even if its just breaking your one big broadcast var into a few
 smaller ones

 On Fri, Feb 13, 2015 at 12:30 PM, Sean Owen so...@cloudera.com wrote:

 I think you've hit the nail on the head. Since the serialization
 ultimately creates a byte array, and arrays can have at most ~2
 billion elements in the JVM, the broadcast can be at most ~2GB.

 At that scale, you might consider whether you really have to broadcast
 these values, or want to handle them as RDDs and join and so on.

 Or consider whether you can break it up into several broadcasts?


 On Fri, Feb 13, 2015 at 6:24 PM, soila skavu...@gmail.com wrote:
  I am trying to broadcast a large 5GB variable using Spark 1.2.0. I get
  the
  following exception when the size of the broadcast variable exceeds 2GB.
  Any
  ideas on how I can resolve this issue?
 
  java.lang.IllegalArgumentException: Size exceeds Integer.MAX_VALUE
  at sun.nio.ch.FileChannelImpl.map(FileChannelImpl.java:829)
  at
  org.apache.spark.storage.DiskStore.getBytes(DiskStore.scala:123)
  at
  org.apache.spark.storage.DiskStore.getBytes(DiskStore.scala:132)
  at
  org.apache.spark.storage.DiskStore.putIterator(DiskStore.scala:99)
  at
  org.apache.spark.storage.MemoryStore.putIterator(MemoryStore.scala:147)
  at
  org.apache.spark.storage.MemoryStore.putIterator(MemoryStore.scala:114)
  at
  org.apache.spark.storage.BlockManager.doPut(BlockManager.scala:787)
  at
 
  org.apache.spark.storage.BlockManager.putIterator(BlockManager.scala:638)
  at
  org.apache.spark.storage.BlockManager.putSingle(BlockManager.scala:992)
  at
 
  org.apache.spark.broadcast.TorrentBroadcast.writeBlocks(TorrentBroadcast.scala:98)
  at
 
  org.apache.spark.broadcast.TorrentBroadcast.init(TorrentBroadcast.scala:84)
  at
 
  org.apache.spark.broadcast.TorrentBroadcastFactory.newBroadcast(TorrentBroadcastFactory.scala:34)
  at
 
  org.apache.spark.broadcast.TorrentBroadcastFactory.newBroadcast(TorrentBroadcastFactory.scala:29)
  at
 
  org.apache.spark.broadcast.BroadcastManager.newBroadcast(BroadcastManager.scala:62)
  at
  org.apache.spark.SparkContext.broadcast(SparkContext.scala:945)
 
 
 
 
  --
  View this message in context:
  http://apache-spark-user-list.1001560.n3.nabble.com/Size-exceeds-Integer-MAX-VALUE-exception-when-broadcasting-large-variable-tp21648.html
  Sent from the Apache Spark User List mailing list archive at Nabble.com.
 
  -
  To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
  For additional commands, e-mail: user-h...@spark.apache.org
 

 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org



-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Largest input data set observed for Spark.

2014-03-20 Thread Soila Pertet Kavulya
Hi Reynold,

Nice! What spark configuration parameters did you use to get your job to
run successfully on a large dataset? My job is failing on 1TB of input data
(uncompressed) on a 4-node cluster (64GB memory per node). No OutOfMemory
errors just lost executors.

Thanks,

Soila
On Mar 20, 2014 11:29 AM, Reynold Xin r...@databricks.com wrote:

 I'm not really at liberty to discuss details of the job. It involves some
 expensive aggregated statistics, and took 10 hours to complete (mostly
 bottlenecked by network  io).





 On Thu, Mar 20, 2014 at 11:12 AM, Surendranauth Hiraman 
 suren.hira...@velos.io wrote:

 Reynold,

 How complex was that job (I guess in terms of number of transforms and
 actions) and how long did that take to process?

 -Suren



 On Thu, Mar 20, 2014 at 2:08 PM, Reynold Xin r...@databricks.com wrote:

  Actually we just ran a job with 70TB+ compressed data on 28 worker
 nodes -
  I didn't count the size of the uncompressed data, but I am guessing it
 is
  somewhere between 200TB to 700TB.
 
 
 
  On Thu, Mar 20, 2014 at 12:23 AM, Usman Ghani us...@platfora.com
 wrote:
 
   All,
   What is the largest input data set y'all have come across that has
 been
   successfully processed in production using spark. Ball park?
  
 



 --

 SUREN HIRAMAN, VP TECHNOLOGY
 Velos
 Accelerating Machine Learning

 440 NINTH AVENUE, 11TH FLOOR
 NEW YORK, NY 10001
 O: (917) 525-2466 ext. 105
 F: 646.349.4063
 E: suren.hiraman@v suren.hira...@sociocast.comelos.io
 W: www.velos.io





saveAsTextFile() failing for large datasets

2014-03-19 Thread Soila Pertet Kavulya
I am testing the performance of Spark to see how it behaves when the
dataset size exceeds the amount of memory available. I am running
wordcount on a 4-node cluster (Intel Xeon 16 cores (32 threads), 256GB
RAM per node). I limited spark.executor.memory to 64g, so I have 256g
of memory available in the cluster. Wordcount  fails due to connection
errors during saveAsTextFile() when the input size is 1TB. I have
tried experimenting with different timeouts, and akka frame sizes but
the job is still failing. Are there any changes that I should make to
get the job to run successfully?

Here is my most recent config.

SPARK_WORKER_CORES=32
SPARK_WORKER_MEMORY=64g
SPARK_WORKER_INSTANCES=1
SPARK_DAEMON_MEMORY=1g

SPARK_JAVA_OPTS=-Dspark.executor.memory=64g
-Dspark.default.parallelism=128 -Dspark.deploy.spreadOut=true
-Dspark.storage.memoryFraction=0.5
-Dspark.shuffle.consolidateFiles=true -Dspark.akka.frameSize=200
-Dspark.akka.timeout=300
-Dspark.storage.blockManagerSlaveTimeoutMs=30


Error logs:
14/03/17 13:07:52 WARN ExternalAppendOnlyMap: Spilling in-memory map
of 584 MB to disk (1 time so far)

14/03/17 13:07:52 WARN ExternalAppendOnlyMap: Spilling in-memory map
of 510 MB to disk (1 time so far)

14/03/17 13:08:03 INFO ConnectionManager: Removing ReceivingConnection
to ConnectionManagerId(node02,56673)

14/03/17 13:08:03 INFO ConnectionManager: Removing SendingConnection
to ConnectionManagerId(node02,56673)

14/03/17 13:08:03 INFO ConnectionManager: Removing SendingConnection
to ConnectionManagerId(node02,56673)

14/03/17 13:08:03 INFO ConnectionManager: Notifying
org.apache.spark.network.ConnectionManager$MessageStatus@2c762242

14/03/17 13:08:03 INFO ConnectionManager: Notifying
org.apache.spark.network.ConnectionManager$MessageStatus@7fc331db

14/03/17 13:08:03 INFO ConnectionManager: Notifying
org.apache.spark.network.ConnectionManager$MessageStatus@220eecfa

14/03/17 13:08:03 INFO ConnectionManager: Notifying
org.apache.spark.network.ConnectionManager$MessageStatus@286cca3

14/03/17 13:08:03 ERROR
BlockFetcherIterator$BasicBlockFetcherIterator: Could not get block(s)
from ConnectionManagerId(node02,56673)

14/03/17 13:08:03 ERROR
BlockFetcherIterator$BasicBlockFetcherIterator: Could not get block(s)
from ConnectionManagerId(node02,56673)

14/03/17 13:08:03 ERROR
BlockFetcherIterator$BasicBlockFetcherIterator: Could not get block(s)
from ConnectionManagerId(node02,56673)

14/03/17 13:08:03 INFO ConnectionManager: Notifying
org.apache.spark.network.ConnectionManager$MessageStatus@2d1d6b0a

14/03/17 13:08:03 ERROR
BlockFetcherIterator$BasicBlockFetcherIterator: Could not get block(s)
from ConnectionManagerId(node02,56673)