Hi All,

I am relatively new to spark and currently having troubles with broadcasting
large variables ~500mb in size. Th
e broadcast fails with an error shown below and the memory usage on the
hosts also blow up. 

Our hardware consists of 8 hosts (1 x 64gb (driver) and 7 x 32gb (workers))
and we are using Spark 1.1 (Python) via Cloudera CDH 5.2.    

We have managed to replicate the error using a test script shown below. I
would be interested to know if anyone has seen this before with broadcasting
or know of a fix.

=========== ERROR ==============

14/10/24 08:20:04 INFO BlockManager: Found block rdd_11_31 locally
14/10/24 08:20:08 INFO ConnectionManager: Key not valid ?
sun.nio.ch.SelectionKeyImpl@fbc6caf
14/10/24 08:20:08 INFO ConnectionManager: Removing ReceivingConnection to
ConnectionManagerId(pigeon3.ldn.ebs.io,55316)
14/10/24 08:20:08 INFO ConnectionManager: Removing SendingConnection to
ConnectionManagerId(pigeon3.ldn.ebs.io,55316)
14/10/24 08:20:08 INFO ConnectionManager: Removing SendingConnection to
ConnectionManagerId(pigeon3.ldn.ebs.io,55316)
14/10/24 08:20:08 INFO ConnectionManager: key already cancelled ?
sun.nio.ch.SelectionKeyImpl@fbc6caf
java.nio.channels.CancelledKeyException
        at
org.apache.spark.network.ConnectionManager.run(ConnectionManager.scala:386)
        at
org.apache.spark.network.ConnectionManager$$anon$4.run(ConnectionManager.scala:139)
14/10/24 08:20:13 INFO ConnectionManager: Removing ReceivingConnection to
ConnectionManagerId(pigeon7.ldn.ebs.io,52524)
14/10/24 08:20:13 INFO ConnectionManager: Removing SendingConnection to
ConnectionManagerId(pigeon7.ldn.ebs.io,52524)
14/10/24 08:20:13 INFO ConnectionManager: Removing SendingConnection to
ConnectionManagerId(pigeon7.ldn.ebs.io,52524)
14/10/24 08:20:15 INFO ConnectionManager: Removing ReceivingConnection to
ConnectionManagerId(toppigeon.ldn.ebs.io,34370)
14/10/24 08:20:15 INFO ConnectionManager: Key not valid ?
sun.nio.ch.SelectionKeyImpl@3ecfdb7e
14/10/24 08:20:15 INFO ConnectionManager: Removing SendingConnection to
ConnectionManagerId(toppigeon.ldn.ebs.io,34370)
14/10/24 08:20:15 INFO ConnectionManager: Removing SendingConnection to
ConnectionManagerId(toppigeon.ldn.ebs.io,34370)
14/10/24 08:20:15 INFO ConnectionManager: key already cancelled ?
sun.nio.ch.SelectionKeyImpl@3ecfdb7e
java.nio.channels.CancelledKeyException
        at
org.apache.spark.network.ConnectionManager.run(ConnectionManager.scala:386)
        at
org.apache.spark.network.ConnectionManager$$anon$4.run(ConnectionManager.scala:139)
14/10/24 08:20:19 INFO ConnectionManager: Removing ReceivingConnection to
ConnectionManagerId(pigeon8.ldn.ebs.io,48628)
14/10/24 08:20:19 INFO ConnectionManager: Removing SendingConnection to
ConnectionManagerId(pigeon8.ldn.ebs.io,48628)
14/10/24 08:20:19 INFO ConnectionManager: Removing ReceivingConnection to
ConnectionManagerId(pigeon6.ldn.ebs.io,44996)
14/10/24 08:20:19 INFO ConnectionManager: Removing SendingConnection to
ConnectionManagerId(pigeon6.ldn.ebs.io,44996)
14/10/24 08:20:27 ERROR SendingConnection: Exception while reading
SendingConnection to ConnectionManagerId(pigeon8.ldn.ebs.io,48628)
java.nio.channels.ClosedChannelException
        at 
sun.nio.ch.SocketChannelImpl.ensureReadOpen(SocketChannelImpl.java:252)
        at sun.nio.ch.SocketChannelImpl.read(SocketChannelImpl.java:295)
        at org.apache.spark.network.SendingConnection.read(Connection.scala:390)
        at
org.apache.spark.network.ConnectionManager$$anon$7.run(ConnectionManager.scala:199)
        at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
        at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
        at java.lang.Thread.run(Thread.java:745)
14/10/24 08:20:27 ERROR SendingConnection: Exception while reading
SendingConnection to ConnectionManagerId(pigeon6.ldn.ebs.io,44996)
java.nio.channels.ClosedChannelException
        at 
sun.nio.ch.SocketChannelImpl.ensureReadOpen(SocketChannelImpl.java:252)
        at sun.nio.ch.SocketChannelImpl.read(SocketChannelImpl.java:295)
        at org.apache.spark.network.SendingConnection.read(Connection.scala:390)
        at
org.apache.spark.network.ConnectionManager$$anon$7.run(ConnectionManager.scala:199)
        at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
        at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
        at java.lang.Thread.run(Thread.java:745)
14/10/24 08:20:27 INFO ConnectionManager: Handling connection error on
connection to ConnectionManagerId(pigeon6.ldn.ebs.io,44996)


=========== PYTHON SCRIPT ==============

#!/usr/bin/pyspark

import subprocess
from random import choice
import string

from pyspark import SparkContext, SparkConf

path_hdfs_broadcast_test = "broadcast_test/general_test_test"

subprocess.Popen(["hdfs", "dfs", "-rm", "-r", path_hdfs_broadcast_test],
stdout=subprocess.PIPE, stderr=subprocess.STDOUT)

sconf = SparkConf().set('spark.default.parallelism',
'2048').set('spark.executor.memory', '24g')
                      
sc = SparkContext("spark://toppigeon.ldn.ebs.io:7077", "Broadcast test",
conf=sconf)

a = 20000000
b = sc.parallelize(range(a))
c = sc.parallelize([(x, ''.join(choice(string.ascii_uppercase +
string.digits) for _ in range(20))) for x in range(a)])
c.cache()
d = sc.broadcast(c.collectAsMap())
b = b.map(lambda x: (x, d.value[x]))
b.saveAsTextFile(path_hdfs_broadcast_test)

sc.stop()

=======================================


Any help is much appreciated.

Thanks

Hemant








--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Broadcast-failure-with-variable-size-of-500mb-with-key-already-cancelled-tp17200.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

---------------------------------------------------------------------
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org

Reply via email to