There is a open PR [1] to support broadcast larger than 2G, could you try it?

[1] https://github.com/apache/spark/pull/2659

On Tue, Nov 11, 2014 at 6:39 AM, Tom Seddon <mr.tom.sed...@gmail.com> wrote:
> Hi,
>
> Just wondering if anyone has any advice about this issue, as I am
> experiencing the same thing.  I'm working with multiple broadcast variables
> in PySpark, most of which are small, but one of around 4.5GB, using 10
> workers at 31GB memory each and driver with same spec.  It's not running out
> of memory as far as I can see, but definitely only happens when I add the
> large broadcast.  Would be most grateful for advice.
>
> I tried playing around with the last 3 conf settings below, but no luck:
>
> SparkConf().set("spark.master.memory", "26")
> .set("spark.executor.memory", "26")
> .set("spark.worker.memory", "26")
> .set("spark.driver.memory", "26").
> .set("spark.storage.memoryFraction","1")
> .set("spark.core.connection.ack.wait.timeout","6000")
> .set("spark.akka.frameSize","50")

There is some invalid configs here, spark.master.memory and
spark.worker.memory are not valid.

spark.storage.memoryFraction is too large, then you will have not
memory for general use (such as shuffle).

The Python jobs run in separated processes, so you should leave some
memory for them in slaves. For example, if it has 8 CPUs, each Python
process will need at least 8G (for 4G broadcast plus object overhead
in Python), then you can use only 3 process in the same time, use
spark.cores.max=2 OR spark.task.cpus=4. Also you can only set
spark.executor.memory to 10G, leave 16G for Python.

Also, you could specify spark.python.worker.memory = 8G to have better
shuffle performance in Python. (it's not necessary)

So, for large broadcast, maybe you should use Scala, which uses
multiple threads, the broadcast will be shared by multiple tasks in
same executor.

> Thanks,
>
> Tom
>
>
> On 24 October 2014 12:31, htailor <hemant.tai...@live.co.uk> wrote:
>>
>> Hi All,
>>
>> I am relatively new to spark and currently having troubles with
>> broadcasting
>> large variables ~500mb in size. Th
>> e broadcast fails with an error shown below and the memory usage on the
>> hosts also blow up.
>>
>> Our hardware consists of 8 hosts (1 x 64gb (driver) and 7 x 32gb
>> (workers))
>> and we are using Spark 1.1 (Python) via Cloudera CDH 5.2.
>>
>> We have managed to replicate the error using a test script shown below. I
>> would be interested to know if anyone has seen this before with
>> broadcasting
>> or know of a fix.
>>
>> =========== ERROR ==============
>>
>> 14/10/24 08:20:04 INFO BlockManager: Found block rdd_11_31 locally
>> 14/10/24 08:20:08 INFO ConnectionManager: Key not valid ?
>> sun.nio.ch.SelectionKeyImpl@fbc6caf
>> 14/10/24 08:20:08 INFO ConnectionManager: Removing ReceivingConnection to
>> ConnectionManagerId(pigeon3.ldn.ebs.io,55316)
>> 14/10/24 08:20:08 INFO ConnectionManager: Removing SendingConnection to
>> ConnectionManagerId(pigeon3.ldn.ebs.io,55316)
>> 14/10/24 08:20:08 INFO ConnectionManager: Removing SendingConnection to
>> ConnectionManagerId(pigeon3.ldn.ebs.io,55316)
>> 14/10/24 08:20:08 INFO ConnectionManager: key already cancelled ?
>> sun.nio.ch.SelectionKeyImpl@fbc6caf
>> java.nio.channels.CancelledKeyException
>>         at
>>
>> org.apache.spark.network.ConnectionManager.run(ConnectionManager.scala:386)
>>         at
>>
>> org.apache.spark.network.ConnectionManager$$anon$4.run(ConnectionManager.scala:139)
>> 14/10/24 08:20:13 INFO ConnectionManager: Removing ReceivingConnection to
>> ConnectionManagerId(pigeon7.ldn.ebs.io,52524)
>> 14/10/24 08:20:13 INFO ConnectionManager: Removing SendingConnection to
>> ConnectionManagerId(pigeon7.ldn.ebs.io,52524)
>> 14/10/24 08:20:13 INFO ConnectionManager: Removing SendingConnection to
>> ConnectionManagerId(pigeon7.ldn.ebs.io,52524)
>> 14/10/24 08:20:15 INFO ConnectionManager: Removing ReceivingConnection to
>> ConnectionManagerId(toppigeon.ldn.ebs.io,34370)
>> 14/10/24 08:20:15 INFO ConnectionManager: Key not valid ?
>> sun.nio.ch.SelectionKeyImpl@3ecfdb7e
>> 14/10/24 08:20:15 INFO ConnectionManager: Removing SendingConnection to
>> ConnectionManagerId(toppigeon.ldn.ebs.io,34370)
>> 14/10/24 08:20:15 INFO ConnectionManager: Removing SendingConnection to
>> ConnectionManagerId(toppigeon.ldn.ebs.io,34370)
>> 14/10/24 08:20:15 INFO ConnectionManager: key already cancelled ?
>> sun.nio.ch.SelectionKeyImpl@3ecfdb7e
>> java.nio.channels.CancelledKeyException
>>         at
>>
>> org.apache.spark.network.ConnectionManager.run(ConnectionManager.scala:386)
>>         at
>>
>> org.apache.spark.network.ConnectionManager$$anon$4.run(ConnectionManager.scala:139)
>> 14/10/24 08:20:19 INFO ConnectionManager: Removing ReceivingConnection to
>> ConnectionManagerId(pigeon8.ldn.ebs.io,48628)
>> 14/10/24 08:20:19 INFO ConnectionManager: Removing SendingConnection to
>> ConnectionManagerId(pigeon8.ldn.ebs.io,48628)
>> 14/10/24 08:20:19 INFO ConnectionManager: Removing ReceivingConnection to
>> ConnectionManagerId(pigeon6.ldn.ebs.io,44996)
>> 14/10/24 08:20:19 INFO ConnectionManager: Removing SendingConnection to
>> ConnectionManagerId(pigeon6.ldn.ebs.io,44996)
>> 14/10/24 08:20:27 ERROR SendingConnection: Exception while reading
>> SendingConnection to ConnectionManagerId(pigeon8.ldn.ebs.io,48628)
>> java.nio.channels.ClosedChannelException
>>         at
>> sun.nio.ch.SocketChannelImpl.ensureReadOpen(SocketChannelImpl.java:252)
>>         at sun.nio.ch.SocketChannelImpl.read(SocketChannelImpl.java:295)
>>         at
>> org.apache.spark.network.SendingConnection.read(Connection.scala:390)
>>         at
>>
>> org.apache.spark.network.ConnectionManager$$anon$7.run(ConnectionManager.scala:199)
>>         at
>>
>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
>>         at
>>
>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
>>         at java.lang.Thread.run(Thread.java:745)
>> 14/10/24 08:20:27 ERROR SendingConnection: Exception while reading
>> SendingConnection to ConnectionManagerId(pigeon6.ldn.ebs.io,44996)
>> java.nio.channels.ClosedChannelException
>>         at
>> sun.nio.ch.SocketChannelImpl.ensureReadOpen(SocketChannelImpl.java:252)
>>         at sun.nio.ch.SocketChannelImpl.read(SocketChannelImpl.java:295)
>>         at
>> org.apache.spark.network.SendingConnection.read(Connection.scala:390)
>>         at
>>
>> org.apache.spark.network.ConnectionManager$$anon$7.run(ConnectionManager.scala:199)
>>         at
>>
>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
>>         at
>>
>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
>>         at java.lang.Thread.run(Thread.java:745)
>> 14/10/24 08:20:27 INFO ConnectionManager: Handling connection error on
>> connection to ConnectionManagerId(pigeon6.ldn.ebs.io,44996)
>>
>>
>> =========== PYTHON SCRIPT ==============
>>
>> #!/usr/bin/pyspark
>>
>> import subprocess
>> from random import choice
>> import string
>>
>> from pyspark import SparkContext, SparkConf
>>
>> path_hdfs_broadcast_test = "broadcast_test/general_test_test"
>>
>> subprocess.Popen(["hdfs", "dfs", "-rm", "-r", path_hdfs_broadcast_test],
>> stdout=subprocess.PIPE, stderr=subprocess.STDOUT)
>>
>> sconf = SparkConf().set('spark.default.parallelism',
>> '2048').set('spark.executor.memory', '24g')
>>
>> sc = SparkContext("spark://toppigeon.ldn.ebs.io:7077", "Broadcast test",
>> conf=sconf)
>>
>> a = 20000000
>> b = sc.parallelize(range(a))
>> c = sc.parallelize([(x, ''.join(choice(string.ascii_uppercase +
>> string.digits) for _ in range(20))) for x in range(a)])
>> c.cache()
>> d = sc.broadcast(c.collectAsMap())
>> b = b.map(lambda x: (x, d.value[x]))
>> b.saveAsTextFile(path_hdfs_broadcast_test)
>>
>> sc.stop()
>>
>> =======================================
>>
>>
>> Any help is much appreciated.
>>
>> Thanks
>>
>> Hemant
>>
>>
>>
>>
>>
>>
>>
>>
>> --
>> View this message in context:
>> http://apache-spark-user-list.1001560.n3.nabble.com/Broadcast-failure-with-variable-size-of-500mb-with-key-already-cancelled-tp17200.html
>> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>>
>> ---------------------------------------------------------------------
>> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
>> For additional commands, e-mail: user-h...@spark.apache.org
>>
>

---------------------------------------------------------------------
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org

Reply via email to