Re: Broadcast failure with variable size of ~ 500mb with key already cancelled ?
Hi, Just wondering if anyone has any advice about this issue, as I am experiencing the same thing. I'm working with multiple broadcast variables in PySpark, most of which are small, but one of around 4.5GB, using 10 workers at 31GB memory each and driver with same spec. It's not running out of memory as far as I can see, but definitely only happens when I add the large broadcast. Would be most grateful for advice. I tried playing around with the last 3 conf settings below, but no luck: SparkConf().set(spark.master.memory, 26) .set(spark.executor.memory, 26) .set(spark.worker.memory, 26) .set(spark.driver.memory, 26). .set(spark.storage.memoryFraction,1) .set(spark.core.connection.ack.wait.timeout,6000) .set(spark.akka.frameSize,50) Thanks, Tom On 24 October 2014 12:31, htailor hemant.tai...@live.co.uk wrote: Hi All, I am relatively new to spark and currently having troubles with broadcasting large variables ~500mb in size. Th e broadcast fails with an error shown below and the memory usage on the hosts also blow up. Our hardware consists of 8 hosts (1 x 64gb (driver) and 7 x 32gb (workers)) and we are using Spark 1.1 (Python) via Cloudera CDH 5.2. We have managed to replicate the error using a test script shown below. I would be interested to know if anyone has seen this before with broadcasting or know of a fix. === ERROR == 14/10/24 08:20:04 INFO BlockManager: Found block rdd_11_31 locally 14/10/24 08:20:08 INFO ConnectionManager: Key not valid ? sun.nio.ch.SelectionKeyImpl@fbc6caf 14/10/24 08:20:08 INFO ConnectionManager: Removing ReceivingConnection to ConnectionManagerId(pigeon3.ldn.ebs.io,55316) 14/10/24 08:20:08 INFO ConnectionManager: Removing SendingConnection to ConnectionManagerId(pigeon3.ldn.ebs.io,55316) 14/10/24 08:20:08 INFO ConnectionManager: Removing SendingConnection to ConnectionManagerId(pigeon3.ldn.ebs.io,55316) 14/10/24 08:20:08 INFO ConnectionManager: key already cancelled ? sun.nio.ch.SelectionKeyImpl@fbc6caf java.nio.channels.CancelledKeyException at org.apache.spark.network.ConnectionManager.run(ConnectionManager.scala:386) at org.apache.spark.network.ConnectionManager$$anon$4.run(ConnectionManager.scala:139) 14/10/24 08:20:13 INFO ConnectionManager: Removing ReceivingConnection to ConnectionManagerId(pigeon7.ldn.ebs.io,52524) 14/10/24 08:20:13 INFO ConnectionManager: Removing SendingConnection to ConnectionManagerId(pigeon7.ldn.ebs.io,52524) 14/10/24 08:20:13 INFO ConnectionManager: Removing SendingConnection to ConnectionManagerId(pigeon7.ldn.ebs.io,52524) 14/10/24 08:20:15 INFO ConnectionManager: Removing ReceivingConnection to ConnectionManagerId(toppigeon.ldn.ebs.io,34370) 14/10/24 08:20:15 INFO ConnectionManager: Key not valid ? sun.nio.ch.SelectionKeyImpl@3ecfdb7e 14/10/24 08:20:15 INFO ConnectionManager: Removing SendingConnection to ConnectionManagerId(toppigeon.ldn.ebs.io,34370) 14/10/24 08:20:15 INFO ConnectionManager: Removing SendingConnection to ConnectionManagerId(toppigeon.ldn.ebs.io,34370) 14/10/24 08:20:15 INFO ConnectionManager: key already cancelled ? sun.nio.ch.SelectionKeyImpl@3ecfdb7e java.nio.channels.CancelledKeyException at org.apache.spark.network.ConnectionManager.run(ConnectionManager.scala:386) at org.apache.spark.network.ConnectionManager$$anon$4.run(ConnectionManager.scala:139) 14/10/24 08:20:19 INFO ConnectionManager: Removing ReceivingConnection to ConnectionManagerId(pigeon8.ldn.ebs.io,48628) 14/10/24 08:20:19 INFO ConnectionManager: Removing SendingConnection to ConnectionManagerId(pigeon8.ldn.ebs.io,48628) 14/10/24 08:20:19 INFO ConnectionManager: Removing ReceivingConnection to ConnectionManagerId(pigeon6.ldn.ebs.io,44996) 14/10/24 08:20:19 INFO ConnectionManager: Removing SendingConnection to ConnectionManagerId(pigeon6.ldn.ebs.io,44996) 14/10/24 08:20:27 ERROR SendingConnection: Exception while reading SendingConnection to ConnectionManagerId(pigeon8.ldn.ebs.io,48628) java.nio.channels.ClosedChannelException at sun.nio.ch.SocketChannelImpl.ensureReadOpen(SocketChannelImpl.java:252) at sun.nio.ch.SocketChannelImpl.read(SocketChannelImpl.java:295) at org.apache.spark.network.SendingConnection.read(Connection.scala:390) at org.apache.spark.network.ConnectionManager$$anon$7.run(ConnectionManager.scala:199) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) at java.lang.Thread.run(Thread.java:745) 14/10/24 08:20:27 ERROR SendingConnection: Exception while reading SendingConnection to ConnectionManagerId(pigeon6.ldn.ebs.io,44996) java.nio.channels.ClosedChannelException at sun.nio.ch.SocketChannelImpl.ensureReadOpen(SocketChannelImpl.java:252) at sun.nio.ch.SocketChannelImpl.read(SocketChannelImpl.java:295)
Re: Broadcast failure with variable size of ~ 500mb with key already cancelled ?
There is a open PR [1] to support broadcast larger than 2G, could you try it? [1] https://github.com/apache/spark/pull/2659 On Tue, Nov 11, 2014 at 6:39 AM, Tom Seddon mr.tom.sed...@gmail.com wrote: Hi, Just wondering if anyone has any advice about this issue, as I am experiencing the same thing. I'm working with multiple broadcast variables in PySpark, most of which are small, but one of around 4.5GB, using 10 workers at 31GB memory each and driver with same spec. It's not running out of memory as far as I can see, but definitely only happens when I add the large broadcast. Would be most grateful for advice. I tried playing around with the last 3 conf settings below, but no luck: SparkConf().set(spark.master.memory, 26) .set(spark.executor.memory, 26) .set(spark.worker.memory, 26) .set(spark.driver.memory, 26). .set(spark.storage.memoryFraction,1) .set(spark.core.connection.ack.wait.timeout,6000) .set(spark.akka.frameSize,50) There is some invalid configs here, spark.master.memory and spark.worker.memory are not valid. spark.storage.memoryFraction is too large, then you will have not memory for general use (such as shuffle). The Python jobs run in separated processes, so you should leave some memory for them in slaves. For example, if it has 8 CPUs, each Python process will need at least 8G (for 4G broadcast plus object overhead in Python), then you can use only 3 process in the same time, use spark.cores.max=2 OR spark.task.cpus=4. Also you can only set spark.executor.memory to 10G, leave 16G for Python. Also, you could specify spark.python.worker.memory = 8G to have better shuffle performance in Python. (it's not necessary) So, for large broadcast, maybe you should use Scala, which uses multiple threads, the broadcast will be shared by multiple tasks in same executor. Thanks, Tom On 24 October 2014 12:31, htailor hemant.tai...@live.co.uk wrote: Hi All, I am relatively new to spark and currently having troubles with broadcasting large variables ~500mb in size. Th e broadcast fails with an error shown below and the memory usage on the hosts also blow up. Our hardware consists of 8 hosts (1 x 64gb (driver) and 7 x 32gb (workers)) and we are using Spark 1.1 (Python) via Cloudera CDH 5.2. We have managed to replicate the error using a test script shown below. I would be interested to know if anyone has seen this before with broadcasting or know of a fix. === ERROR == 14/10/24 08:20:04 INFO BlockManager: Found block rdd_11_31 locally 14/10/24 08:20:08 INFO ConnectionManager: Key not valid ? sun.nio.ch.SelectionKeyImpl@fbc6caf 14/10/24 08:20:08 INFO ConnectionManager: Removing ReceivingConnection to ConnectionManagerId(pigeon3.ldn.ebs.io,55316) 14/10/24 08:20:08 INFO ConnectionManager: Removing SendingConnection to ConnectionManagerId(pigeon3.ldn.ebs.io,55316) 14/10/24 08:20:08 INFO ConnectionManager: Removing SendingConnection to ConnectionManagerId(pigeon3.ldn.ebs.io,55316) 14/10/24 08:20:08 INFO ConnectionManager: key already cancelled ? sun.nio.ch.SelectionKeyImpl@fbc6caf java.nio.channels.CancelledKeyException at org.apache.spark.network.ConnectionManager.run(ConnectionManager.scala:386) at org.apache.spark.network.ConnectionManager$$anon$4.run(ConnectionManager.scala:139) 14/10/24 08:20:13 INFO ConnectionManager: Removing ReceivingConnection to ConnectionManagerId(pigeon7.ldn.ebs.io,52524) 14/10/24 08:20:13 INFO ConnectionManager: Removing SendingConnection to ConnectionManagerId(pigeon7.ldn.ebs.io,52524) 14/10/24 08:20:13 INFO ConnectionManager: Removing SendingConnection to ConnectionManagerId(pigeon7.ldn.ebs.io,52524) 14/10/24 08:20:15 INFO ConnectionManager: Removing ReceivingConnection to ConnectionManagerId(toppigeon.ldn.ebs.io,34370) 14/10/24 08:20:15 INFO ConnectionManager: Key not valid ? sun.nio.ch.SelectionKeyImpl@3ecfdb7e 14/10/24 08:20:15 INFO ConnectionManager: Removing SendingConnection to ConnectionManagerId(toppigeon.ldn.ebs.io,34370) 14/10/24 08:20:15 INFO ConnectionManager: Removing SendingConnection to ConnectionManagerId(toppigeon.ldn.ebs.io,34370) 14/10/24 08:20:15 INFO ConnectionManager: key already cancelled ? sun.nio.ch.SelectionKeyImpl@3ecfdb7e java.nio.channels.CancelledKeyException at org.apache.spark.network.ConnectionManager.run(ConnectionManager.scala:386) at org.apache.spark.network.ConnectionManager$$anon$4.run(ConnectionManager.scala:139) 14/10/24 08:20:19 INFO ConnectionManager: Removing ReceivingConnection to ConnectionManagerId(pigeon8.ldn.ebs.io,48628) 14/10/24 08:20:19 INFO ConnectionManager: Removing SendingConnection to ConnectionManagerId(pigeon8.ldn.ebs.io,48628) 14/10/24 08:20:19 INFO ConnectionManager: Removing ReceivingConnection to ConnectionManagerId(pigeon6.ldn.ebs.io,44996) 14/10/24 08:20:19 INFO ConnectionManager: Removing SendingConnection to ConnectionManagerId(pigeon6.ldn.ebs.io,44996) 14/10/24
Broadcast failure with variable size of ~ 500mb with key already cancelled ?
Hi All, I am relatively new to spark and currently having troubles with broadcasting large variables ~500mb in size. Th e broadcast fails with an error shown below and the memory usage on the hosts also blow up. Our hardware consists of 8 hosts (1 x 64gb (driver) and 7 x 32gb (workers)) and we are using Spark 1.1 (Python) via Cloudera CDH 5.2. We have managed to replicate the error using a test script shown below. I would be interested to know if anyone has seen this before with broadcasting or know of a fix. === ERROR == 14/10/24 08:20:04 INFO BlockManager: Found block rdd_11_31 locally 14/10/24 08:20:08 INFO ConnectionManager: Key not valid ? sun.nio.ch.SelectionKeyImpl@fbc6caf 14/10/24 08:20:08 INFO ConnectionManager: Removing ReceivingConnection to ConnectionManagerId(pigeon3.ldn.ebs.io,55316) 14/10/24 08:20:08 INFO ConnectionManager: Removing SendingConnection to ConnectionManagerId(pigeon3.ldn.ebs.io,55316) 14/10/24 08:20:08 INFO ConnectionManager: Removing SendingConnection to ConnectionManagerId(pigeon3.ldn.ebs.io,55316) 14/10/24 08:20:08 INFO ConnectionManager: key already cancelled ? sun.nio.ch.SelectionKeyImpl@fbc6caf java.nio.channels.CancelledKeyException at org.apache.spark.network.ConnectionManager.run(ConnectionManager.scala:386) at org.apache.spark.network.ConnectionManager$$anon$4.run(ConnectionManager.scala:139) 14/10/24 08:20:13 INFO ConnectionManager: Removing ReceivingConnection to ConnectionManagerId(pigeon7.ldn.ebs.io,52524) 14/10/24 08:20:13 INFO ConnectionManager: Removing SendingConnection to ConnectionManagerId(pigeon7.ldn.ebs.io,52524) 14/10/24 08:20:13 INFO ConnectionManager: Removing SendingConnection to ConnectionManagerId(pigeon7.ldn.ebs.io,52524) 14/10/24 08:20:15 INFO ConnectionManager: Removing ReceivingConnection to ConnectionManagerId(toppigeon.ldn.ebs.io,34370) 14/10/24 08:20:15 INFO ConnectionManager: Key not valid ? sun.nio.ch.SelectionKeyImpl@3ecfdb7e 14/10/24 08:20:15 INFO ConnectionManager: Removing SendingConnection to ConnectionManagerId(toppigeon.ldn.ebs.io,34370) 14/10/24 08:20:15 INFO ConnectionManager: Removing SendingConnection to ConnectionManagerId(toppigeon.ldn.ebs.io,34370) 14/10/24 08:20:15 INFO ConnectionManager: key already cancelled ? sun.nio.ch.SelectionKeyImpl@3ecfdb7e java.nio.channels.CancelledKeyException at org.apache.spark.network.ConnectionManager.run(ConnectionManager.scala:386) at org.apache.spark.network.ConnectionManager$$anon$4.run(ConnectionManager.scala:139) 14/10/24 08:20:19 INFO ConnectionManager: Removing ReceivingConnection to ConnectionManagerId(pigeon8.ldn.ebs.io,48628) 14/10/24 08:20:19 INFO ConnectionManager: Removing SendingConnection to ConnectionManagerId(pigeon8.ldn.ebs.io,48628) 14/10/24 08:20:19 INFO ConnectionManager: Removing ReceivingConnection to ConnectionManagerId(pigeon6.ldn.ebs.io,44996) 14/10/24 08:20:19 INFO ConnectionManager: Removing SendingConnection to ConnectionManagerId(pigeon6.ldn.ebs.io,44996) 14/10/24 08:20:27 ERROR SendingConnection: Exception while reading SendingConnection to ConnectionManagerId(pigeon8.ldn.ebs.io,48628) java.nio.channels.ClosedChannelException at sun.nio.ch.SocketChannelImpl.ensureReadOpen(SocketChannelImpl.java:252) at sun.nio.ch.SocketChannelImpl.read(SocketChannelImpl.java:295) at org.apache.spark.network.SendingConnection.read(Connection.scala:390) at org.apache.spark.network.ConnectionManager$$anon$7.run(ConnectionManager.scala:199) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) at java.lang.Thread.run(Thread.java:745) 14/10/24 08:20:27 ERROR SendingConnection: Exception while reading SendingConnection to ConnectionManagerId(pigeon6.ldn.ebs.io,44996) java.nio.channels.ClosedChannelException at sun.nio.ch.SocketChannelImpl.ensureReadOpen(SocketChannelImpl.java:252) at sun.nio.ch.SocketChannelImpl.read(SocketChannelImpl.java:295) at org.apache.spark.network.SendingConnection.read(Connection.scala:390) at org.apache.spark.network.ConnectionManager$$anon$7.run(ConnectionManager.scala:199) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) at java.lang.Thread.run(Thread.java:745) 14/10/24 08:20:27 INFO ConnectionManager: Handling connection error on connection to ConnectionManagerId(pigeon6.ldn.ebs.io,44996) === PYTHON SCRIPT == #!/usr/bin/pyspark import subprocess from random import choice import string from pyspark import SparkContext, SparkConf path_hdfs_broadcast_test = broadcast_test/general_test_test subprocess.Popen([hdfs, dfs, -rm, -r, path_hdfs_broadcast_test], stdout=subprocess.PIPE, stderr=subprocess.STDOUT) sconf =