Hi All, I am relatively new to spark and currently having troubles with broadcasting large variables ~500mb in size. Th e broadcast fails with an error shown below and the memory usage on the hosts also blow up.
Our hardware consists of 8 hosts (1 x 64gb (driver) and 7 x 32gb (workers)) and we are using Spark 1.1 (Python) via Cloudera CDH 5.2. We have managed to replicate the error using a test script shown below. I would be interested to know if anyone has seen this before with broadcasting or know of a fix. =========== ERROR ============== 14/10/24 08:20:04 INFO BlockManager: Found block rdd_11_31 locally 14/10/24 08:20:08 INFO ConnectionManager: Key not valid ? sun.nio.ch.SelectionKeyImpl@fbc6caf 14/10/24 08:20:08 INFO ConnectionManager: Removing ReceivingConnection to ConnectionManagerId(pigeon3.ldn.ebs.io,55316) 14/10/24 08:20:08 INFO ConnectionManager: Removing SendingConnection to ConnectionManagerId(pigeon3.ldn.ebs.io,55316) 14/10/24 08:20:08 INFO ConnectionManager: Removing SendingConnection to ConnectionManagerId(pigeon3.ldn.ebs.io,55316) 14/10/24 08:20:08 INFO ConnectionManager: key already cancelled ? sun.nio.ch.SelectionKeyImpl@fbc6caf java.nio.channels.CancelledKeyException at org.apache.spark.network.ConnectionManager.run(ConnectionManager.scala:386) at org.apache.spark.network.ConnectionManager$$anon$4.run(ConnectionManager.scala:139) 14/10/24 08:20:13 INFO ConnectionManager: Removing ReceivingConnection to ConnectionManagerId(pigeon7.ldn.ebs.io,52524) 14/10/24 08:20:13 INFO ConnectionManager: Removing SendingConnection to ConnectionManagerId(pigeon7.ldn.ebs.io,52524) 14/10/24 08:20:13 INFO ConnectionManager: Removing SendingConnection to ConnectionManagerId(pigeon7.ldn.ebs.io,52524) 14/10/24 08:20:15 INFO ConnectionManager: Removing ReceivingConnection to ConnectionManagerId(toppigeon.ldn.ebs.io,34370) 14/10/24 08:20:15 INFO ConnectionManager: Key not valid ? sun.nio.ch.SelectionKeyImpl@3ecfdb7e 14/10/24 08:20:15 INFO ConnectionManager: Removing SendingConnection to ConnectionManagerId(toppigeon.ldn.ebs.io,34370) 14/10/24 08:20:15 INFO ConnectionManager: Removing SendingConnection to ConnectionManagerId(toppigeon.ldn.ebs.io,34370) 14/10/24 08:20:15 INFO ConnectionManager: key already cancelled ? sun.nio.ch.SelectionKeyImpl@3ecfdb7e java.nio.channels.CancelledKeyException at org.apache.spark.network.ConnectionManager.run(ConnectionManager.scala:386) at org.apache.spark.network.ConnectionManager$$anon$4.run(ConnectionManager.scala:139) 14/10/24 08:20:19 INFO ConnectionManager: Removing ReceivingConnection to ConnectionManagerId(pigeon8.ldn.ebs.io,48628) 14/10/24 08:20:19 INFO ConnectionManager: Removing SendingConnection to ConnectionManagerId(pigeon8.ldn.ebs.io,48628) 14/10/24 08:20:19 INFO ConnectionManager: Removing ReceivingConnection to ConnectionManagerId(pigeon6.ldn.ebs.io,44996) 14/10/24 08:20:19 INFO ConnectionManager: Removing SendingConnection to ConnectionManagerId(pigeon6.ldn.ebs.io,44996) 14/10/24 08:20:27 ERROR SendingConnection: Exception while reading SendingConnection to ConnectionManagerId(pigeon8.ldn.ebs.io,48628) java.nio.channels.ClosedChannelException at sun.nio.ch.SocketChannelImpl.ensureReadOpen(SocketChannelImpl.java:252) at sun.nio.ch.SocketChannelImpl.read(SocketChannelImpl.java:295) at org.apache.spark.network.SendingConnection.read(Connection.scala:390) at org.apache.spark.network.ConnectionManager$$anon$7.run(ConnectionManager.scala:199) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) at java.lang.Thread.run(Thread.java:745) 14/10/24 08:20:27 ERROR SendingConnection: Exception while reading SendingConnection to ConnectionManagerId(pigeon6.ldn.ebs.io,44996) java.nio.channels.ClosedChannelException at sun.nio.ch.SocketChannelImpl.ensureReadOpen(SocketChannelImpl.java:252) at sun.nio.ch.SocketChannelImpl.read(SocketChannelImpl.java:295) at org.apache.spark.network.SendingConnection.read(Connection.scala:390) at org.apache.spark.network.ConnectionManager$$anon$7.run(ConnectionManager.scala:199) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) at java.lang.Thread.run(Thread.java:745) 14/10/24 08:20:27 INFO ConnectionManager: Handling connection error on connection to ConnectionManagerId(pigeon6.ldn.ebs.io,44996) =========== PYTHON SCRIPT ============== #!/usr/bin/pyspark import subprocess from random import choice import string from pyspark import SparkContext, SparkConf path_hdfs_broadcast_test = "broadcast_test/general_test_test" subprocess.Popen(["hdfs", "dfs", "-rm", "-r", path_hdfs_broadcast_test], stdout=subprocess.PIPE, stderr=subprocess.STDOUT) sconf = SparkConf().set('spark.default.parallelism', '2048').set('spark.executor.memory', '24g') sc = SparkContext("spark://toppigeon.ldn.ebs.io:7077", "Broadcast test", conf=sconf) a = 20000000 b = sc.parallelize(range(a)) c = sc.parallelize([(x, ''.join(choice(string.ascii_uppercase + string.digits) for _ in range(20))) for x in range(a)]) c.cache() d = sc.broadcast(c.collectAsMap()) b = b.map(lambda x: (x, d.value[x])) b.saveAsTextFile(path_hdfs_broadcast_test) sc.stop() ======================================= Any help is much appreciated. Thanks Hemant -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Broadcast-failure-with-variable-size-of-500mb-with-key-already-cancelled-tp17200.html Sent from the Apache Spark User List mailing list archive at Nabble.com. --------------------------------------------------------------------- To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org