Hi,

Just wondering if anyone has any advice about this issue, as I am
experiencing the same thing.  I'm working with multiple broadcast variables
in PySpark, most of which are small, but one of around 4.5GB, using 10
workers at 31GB memory each and driver with same spec.  It's not running
out of memory as far as I can see, but definitely only happens when I add
the large broadcast.  Would be most grateful for advice.

I tried playing around with the last 3 conf settings below, but no luck:

SparkConf().set("spark.master.memory", "26")
.set("spark.executor.memory", "26")
.set("spark.worker.memory", "26")
.set("spark.driver.memory", "26").
.set("spark.storage.memoryFraction","1")
.set("spark.core.connection.ack.wait.timeout","6000")
.set("spark.akka.frameSize","50")

Thanks,

Tom


On 24 October 2014 12:31, htailor <hemant.tai...@live.co.uk> wrote:

> Hi All,
>
> I am relatively new to spark and currently having troubles with
> broadcasting
> large variables ~500mb in size. Th
> e broadcast fails with an error shown below and the memory usage on the
> hosts also blow up.
>
> Our hardware consists of 8 hosts (1 x 64gb (driver) and 7 x 32gb (workers))
> and we are using Spark 1.1 (Python) via Cloudera CDH 5.2.
>
> We have managed to replicate the error using a test script shown below. I
> would be interested to know if anyone has seen this before with
> broadcasting
> or know of a fix.
>
> =========== ERROR ==============
>
> 14/10/24 08:20:04 INFO BlockManager: Found block rdd_11_31 locally
> 14/10/24 08:20:08 INFO ConnectionManager: Key not valid ?
> sun.nio.ch.SelectionKeyImpl@fbc6caf
> 14/10/24 08:20:08 INFO ConnectionManager: Removing ReceivingConnection to
> ConnectionManagerId(pigeon3.ldn.ebs.io,55316)
> 14/10/24 08:20:08 INFO ConnectionManager: Removing SendingConnection to
> ConnectionManagerId(pigeon3.ldn.ebs.io,55316)
> 14/10/24 08:20:08 INFO ConnectionManager: Removing SendingConnection to
> ConnectionManagerId(pigeon3.ldn.ebs.io,55316)
> 14/10/24 08:20:08 INFO ConnectionManager: key already cancelled ?
> sun.nio.ch.SelectionKeyImpl@fbc6caf
> java.nio.channels.CancelledKeyException
>         at
> org.apache.spark.network.ConnectionManager.run(ConnectionManager.scala:386)
>         at
>
> org.apache.spark.network.ConnectionManager$$anon$4.run(ConnectionManager.scala:139)
> 14/10/24 08:20:13 INFO ConnectionManager: Removing ReceivingConnection to
> ConnectionManagerId(pigeon7.ldn.ebs.io,52524)
> 14/10/24 08:20:13 INFO ConnectionManager: Removing SendingConnection to
> ConnectionManagerId(pigeon7.ldn.ebs.io,52524)
> 14/10/24 08:20:13 INFO ConnectionManager: Removing SendingConnection to
> ConnectionManagerId(pigeon7.ldn.ebs.io,52524)
> 14/10/24 08:20:15 INFO ConnectionManager: Removing ReceivingConnection to
> ConnectionManagerId(toppigeon.ldn.ebs.io,34370)
> 14/10/24 08:20:15 INFO ConnectionManager: Key not valid ?
> sun.nio.ch.SelectionKeyImpl@3ecfdb7e
> 14/10/24 08:20:15 INFO ConnectionManager: Removing SendingConnection to
> ConnectionManagerId(toppigeon.ldn.ebs.io,34370)
> 14/10/24 08:20:15 INFO ConnectionManager: Removing SendingConnection to
> ConnectionManagerId(toppigeon.ldn.ebs.io,34370)
> 14/10/24 08:20:15 INFO ConnectionManager: key already cancelled ?
> sun.nio.ch.SelectionKeyImpl@3ecfdb7e
> java.nio.channels.CancelledKeyException
>         at
> org.apache.spark.network.ConnectionManager.run(ConnectionManager.scala:386)
>         at
>
> org.apache.spark.network.ConnectionManager$$anon$4.run(ConnectionManager.scala:139)
> 14/10/24 08:20:19 INFO ConnectionManager: Removing ReceivingConnection to
> ConnectionManagerId(pigeon8.ldn.ebs.io,48628)
> 14/10/24 08:20:19 INFO ConnectionManager: Removing SendingConnection to
> ConnectionManagerId(pigeon8.ldn.ebs.io,48628)
> 14/10/24 08:20:19 INFO ConnectionManager: Removing ReceivingConnection to
> ConnectionManagerId(pigeon6.ldn.ebs.io,44996)
> 14/10/24 08:20:19 INFO ConnectionManager: Removing SendingConnection to
> ConnectionManagerId(pigeon6.ldn.ebs.io,44996)
> 14/10/24 08:20:27 ERROR SendingConnection: Exception while reading
> SendingConnection to ConnectionManagerId(pigeon8.ldn.ebs.io,48628)
> java.nio.channels.ClosedChannelException
>         at
> sun.nio.ch.SocketChannelImpl.ensureReadOpen(SocketChannelImpl.java:252)
>         at sun.nio.ch.SocketChannelImpl.read(SocketChannelImpl.java:295)
>         at
> org.apache.spark.network.SendingConnection.read(Connection.scala:390)
>         at
>
> org.apache.spark.network.ConnectionManager$$anon$7.run(ConnectionManager.scala:199)
>         at
>
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
>         at
>
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
>         at java.lang.Thread.run(Thread.java:745)
> 14/10/24 08:20:27 ERROR SendingConnection: Exception while reading
> SendingConnection to ConnectionManagerId(pigeon6.ldn.ebs.io,44996)
> java.nio.channels.ClosedChannelException
>         at
> sun.nio.ch.SocketChannelImpl.ensureReadOpen(SocketChannelImpl.java:252)
>         at sun.nio.ch.SocketChannelImpl.read(SocketChannelImpl.java:295)
>         at
> org.apache.spark.network.SendingConnection.read(Connection.scala:390)
>         at
>
> org.apache.spark.network.ConnectionManager$$anon$7.run(ConnectionManager.scala:199)
>         at
>
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
>         at
>
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
>         at java.lang.Thread.run(Thread.java:745)
> 14/10/24 08:20:27 INFO ConnectionManager: Handling connection error on
> connection to ConnectionManagerId(pigeon6.ldn.ebs.io,44996)
>
>
> =========== PYTHON SCRIPT ==============
>
> #!/usr/bin/pyspark
>
> import subprocess
> from random import choice
> import string
>
> from pyspark import SparkContext, SparkConf
>
> path_hdfs_broadcast_test = "broadcast_test/general_test_test"
>
> subprocess.Popen(["hdfs", "dfs", "-rm", "-r", path_hdfs_broadcast_test],
> stdout=subprocess.PIPE, stderr=subprocess.STDOUT)
>
> sconf = SparkConf().set('spark.default.parallelism',
> '2048').set('spark.executor.memory', '24g')
>
> sc = SparkContext("spark://toppigeon.ldn.ebs.io:7077", "Broadcast test",
> conf=sconf)
>
> a = 20000000
> b = sc.parallelize(range(a))
> c = sc.parallelize([(x, ''.join(choice(string.ascii_uppercase +
> string.digits) for _ in range(20))) for x in range(a)])
> c.cache()
> d = sc.broadcast(c.collectAsMap())
> b = b.map(lambda x: (x, d.value[x]))
> b.saveAsTextFile(path_hdfs_broadcast_test)
>
> sc.stop()
>
> =======================================
>
>
> Any help is much appreciated.
>
> Thanks
>
> Hemant
>
>
>
>
>
>
>
>
> --
> View this message in context:
> http://apache-spark-user-list.1001560.n3.nabble.com/Broadcast-failure-with-variable-size-of-500mb-with-key-already-cancelled-tp17200.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>
> ---------------------------------------------------------------------
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
>
>

Reply via email to