Re: Broadcast failure with variable size of ~ 500mb with key already cancelled ?

2014-11-11 Thread Tom Seddon
Hi,

Just wondering if anyone has any advice about this issue, as I am
experiencing the same thing.  I'm working with multiple broadcast variables
in PySpark, most of which are small, but one of around 4.5GB, using 10
workers at 31GB memory each and driver with same spec.  It's not running
out of memory as far as I can see, but definitely only happens when I add
the large broadcast.  Would be most grateful for advice.

I tried playing around with the last 3 conf settings below, but no luck:

SparkConf().set(spark.master.memory, 26)
.set(spark.executor.memory, 26)
.set(spark.worker.memory, 26)
.set(spark.driver.memory, 26).
.set(spark.storage.memoryFraction,1)
.set(spark.core.connection.ack.wait.timeout,6000)
.set(spark.akka.frameSize,50)

Thanks,

Tom


On 24 October 2014 12:31, htailor hemant.tai...@live.co.uk wrote:

 Hi All,

 I am relatively new to spark and currently having troubles with
 broadcasting
 large variables ~500mb in size. Th
 e broadcast fails with an error shown below and the memory usage on the
 hosts also blow up.

 Our hardware consists of 8 hosts (1 x 64gb (driver) and 7 x 32gb (workers))
 and we are using Spark 1.1 (Python) via Cloudera CDH 5.2.

 We have managed to replicate the error using a test script shown below. I
 would be interested to know if anyone has seen this before with
 broadcasting
 or know of a fix.

 === ERROR ==

 14/10/24 08:20:04 INFO BlockManager: Found block rdd_11_31 locally
 14/10/24 08:20:08 INFO ConnectionManager: Key not valid ?
 sun.nio.ch.SelectionKeyImpl@fbc6caf
 14/10/24 08:20:08 INFO ConnectionManager: Removing ReceivingConnection to
 ConnectionManagerId(pigeon3.ldn.ebs.io,55316)
 14/10/24 08:20:08 INFO ConnectionManager: Removing SendingConnection to
 ConnectionManagerId(pigeon3.ldn.ebs.io,55316)
 14/10/24 08:20:08 INFO ConnectionManager: Removing SendingConnection to
 ConnectionManagerId(pigeon3.ldn.ebs.io,55316)
 14/10/24 08:20:08 INFO ConnectionManager: key already cancelled ?
 sun.nio.ch.SelectionKeyImpl@fbc6caf
 java.nio.channels.CancelledKeyException
 at
 org.apache.spark.network.ConnectionManager.run(ConnectionManager.scala:386)
 at

 org.apache.spark.network.ConnectionManager$$anon$4.run(ConnectionManager.scala:139)
 14/10/24 08:20:13 INFO ConnectionManager: Removing ReceivingConnection to
 ConnectionManagerId(pigeon7.ldn.ebs.io,52524)
 14/10/24 08:20:13 INFO ConnectionManager: Removing SendingConnection to
 ConnectionManagerId(pigeon7.ldn.ebs.io,52524)
 14/10/24 08:20:13 INFO ConnectionManager: Removing SendingConnection to
 ConnectionManagerId(pigeon7.ldn.ebs.io,52524)
 14/10/24 08:20:15 INFO ConnectionManager: Removing ReceivingConnection to
 ConnectionManagerId(toppigeon.ldn.ebs.io,34370)
 14/10/24 08:20:15 INFO ConnectionManager: Key not valid ?
 sun.nio.ch.SelectionKeyImpl@3ecfdb7e
 14/10/24 08:20:15 INFO ConnectionManager: Removing SendingConnection to
 ConnectionManagerId(toppigeon.ldn.ebs.io,34370)
 14/10/24 08:20:15 INFO ConnectionManager: Removing SendingConnection to
 ConnectionManagerId(toppigeon.ldn.ebs.io,34370)
 14/10/24 08:20:15 INFO ConnectionManager: key already cancelled ?
 sun.nio.ch.SelectionKeyImpl@3ecfdb7e
 java.nio.channels.CancelledKeyException
 at
 org.apache.spark.network.ConnectionManager.run(ConnectionManager.scala:386)
 at

 org.apache.spark.network.ConnectionManager$$anon$4.run(ConnectionManager.scala:139)
 14/10/24 08:20:19 INFO ConnectionManager: Removing ReceivingConnection to
 ConnectionManagerId(pigeon8.ldn.ebs.io,48628)
 14/10/24 08:20:19 INFO ConnectionManager: Removing SendingConnection to
 ConnectionManagerId(pigeon8.ldn.ebs.io,48628)
 14/10/24 08:20:19 INFO ConnectionManager: Removing ReceivingConnection to
 ConnectionManagerId(pigeon6.ldn.ebs.io,44996)
 14/10/24 08:20:19 INFO ConnectionManager: Removing SendingConnection to
 ConnectionManagerId(pigeon6.ldn.ebs.io,44996)
 14/10/24 08:20:27 ERROR SendingConnection: Exception while reading
 SendingConnection to ConnectionManagerId(pigeon8.ldn.ebs.io,48628)
 java.nio.channels.ClosedChannelException
 at
 sun.nio.ch.SocketChannelImpl.ensureReadOpen(SocketChannelImpl.java:252)
 at sun.nio.ch.SocketChannelImpl.read(SocketChannelImpl.java:295)
 at
 org.apache.spark.network.SendingConnection.read(Connection.scala:390)
 at

 org.apache.spark.network.ConnectionManager$$anon$7.run(ConnectionManager.scala:199)
 at

 java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
 at

 java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
 at java.lang.Thread.run(Thread.java:745)
 14/10/24 08:20:27 ERROR SendingConnection: Exception while reading
 SendingConnection to ConnectionManagerId(pigeon6.ldn.ebs.io,44996)
 java.nio.channels.ClosedChannelException
 at
 sun.nio.ch.SocketChannelImpl.ensureReadOpen(SocketChannelImpl.java:252)
 at sun.nio.ch.SocketChannelImpl.read(SocketChannelImpl.java:295)
   

Re: Broadcast failure with variable size of ~ 500mb with key already cancelled ?

2014-11-11 Thread Davies Liu
There is a open PR [1] to support broadcast larger than 2G, could you try it?

[1] https://github.com/apache/spark/pull/2659

On Tue, Nov 11, 2014 at 6:39 AM, Tom Seddon mr.tom.sed...@gmail.com wrote:
 Hi,

 Just wondering if anyone has any advice about this issue, as I am
 experiencing the same thing.  I'm working with multiple broadcast variables
 in PySpark, most of which are small, but one of around 4.5GB, using 10
 workers at 31GB memory each and driver with same spec.  It's not running out
 of memory as far as I can see, but definitely only happens when I add the
 large broadcast.  Would be most grateful for advice.

 I tried playing around with the last 3 conf settings below, but no luck:

 SparkConf().set(spark.master.memory, 26)
 .set(spark.executor.memory, 26)
 .set(spark.worker.memory, 26)
 .set(spark.driver.memory, 26).
 .set(spark.storage.memoryFraction,1)
 .set(spark.core.connection.ack.wait.timeout,6000)
 .set(spark.akka.frameSize,50)

There is some invalid configs here, spark.master.memory and
spark.worker.memory are not valid.

spark.storage.memoryFraction is too large, then you will have not
memory for general use (such as shuffle).

The Python jobs run in separated processes, so you should leave some
memory for them in slaves. For example, if it has 8 CPUs, each Python
process will need at least 8G (for 4G broadcast plus object overhead
in Python), then you can use only 3 process in the same time, use
spark.cores.max=2 OR spark.task.cpus=4. Also you can only set
spark.executor.memory to 10G, leave 16G for Python.

Also, you could specify spark.python.worker.memory = 8G to have better
shuffle performance in Python. (it's not necessary)

So, for large broadcast, maybe you should use Scala, which uses
multiple threads, the broadcast will be shared by multiple tasks in
same executor.

 Thanks,

 Tom


 On 24 October 2014 12:31, htailor hemant.tai...@live.co.uk wrote:

 Hi All,

 I am relatively new to spark and currently having troubles with
 broadcasting
 large variables ~500mb in size. Th
 e broadcast fails with an error shown below and the memory usage on the
 hosts also blow up.

 Our hardware consists of 8 hosts (1 x 64gb (driver) and 7 x 32gb
 (workers))
 and we are using Spark 1.1 (Python) via Cloudera CDH 5.2.

 We have managed to replicate the error using a test script shown below. I
 would be interested to know if anyone has seen this before with
 broadcasting
 or know of a fix.

 === ERROR ==

 14/10/24 08:20:04 INFO BlockManager: Found block rdd_11_31 locally
 14/10/24 08:20:08 INFO ConnectionManager: Key not valid ?
 sun.nio.ch.SelectionKeyImpl@fbc6caf
 14/10/24 08:20:08 INFO ConnectionManager: Removing ReceivingConnection to
 ConnectionManagerId(pigeon3.ldn.ebs.io,55316)
 14/10/24 08:20:08 INFO ConnectionManager: Removing SendingConnection to
 ConnectionManagerId(pigeon3.ldn.ebs.io,55316)
 14/10/24 08:20:08 INFO ConnectionManager: Removing SendingConnection to
 ConnectionManagerId(pigeon3.ldn.ebs.io,55316)
 14/10/24 08:20:08 INFO ConnectionManager: key already cancelled ?
 sun.nio.ch.SelectionKeyImpl@fbc6caf
 java.nio.channels.CancelledKeyException
 at

 org.apache.spark.network.ConnectionManager.run(ConnectionManager.scala:386)
 at

 org.apache.spark.network.ConnectionManager$$anon$4.run(ConnectionManager.scala:139)
 14/10/24 08:20:13 INFO ConnectionManager: Removing ReceivingConnection to
 ConnectionManagerId(pigeon7.ldn.ebs.io,52524)
 14/10/24 08:20:13 INFO ConnectionManager: Removing SendingConnection to
 ConnectionManagerId(pigeon7.ldn.ebs.io,52524)
 14/10/24 08:20:13 INFO ConnectionManager: Removing SendingConnection to
 ConnectionManagerId(pigeon7.ldn.ebs.io,52524)
 14/10/24 08:20:15 INFO ConnectionManager: Removing ReceivingConnection to
 ConnectionManagerId(toppigeon.ldn.ebs.io,34370)
 14/10/24 08:20:15 INFO ConnectionManager: Key not valid ?
 sun.nio.ch.SelectionKeyImpl@3ecfdb7e
 14/10/24 08:20:15 INFO ConnectionManager: Removing SendingConnection to
 ConnectionManagerId(toppigeon.ldn.ebs.io,34370)
 14/10/24 08:20:15 INFO ConnectionManager: Removing SendingConnection to
 ConnectionManagerId(toppigeon.ldn.ebs.io,34370)
 14/10/24 08:20:15 INFO ConnectionManager: key already cancelled ?
 sun.nio.ch.SelectionKeyImpl@3ecfdb7e
 java.nio.channels.CancelledKeyException
 at

 org.apache.spark.network.ConnectionManager.run(ConnectionManager.scala:386)
 at

 org.apache.spark.network.ConnectionManager$$anon$4.run(ConnectionManager.scala:139)
 14/10/24 08:20:19 INFO ConnectionManager: Removing ReceivingConnection to
 ConnectionManagerId(pigeon8.ldn.ebs.io,48628)
 14/10/24 08:20:19 INFO ConnectionManager: Removing SendingConnection to
 ConnectionManagerId(pigeon8.ldn.ebs.io,48628)
 14/10/24 08:20:19 INFO ConnectionManager: Removing ReceivingConnection to
 ConnectionManagerId(pigeon6.ldn.ebs.io,44996)
 14/10/24 08:20:19 INFO ConnectionManager: Removing SendingConnection to
 ConnectionManagerId(pigeon6.ldn.ebs.io,44996)
 14/10/24 

Broadcast failure with variable size of ~ 500mb with key already cancelled ?

2014-10-24 Thread htailor
Hi All,

I am relatively new to spark and currently having troubles with broadcasting
large variables ~500mb in size. Th
e broadcast fails with an error shown below and the memory usage on the
hosts also blow up. 

Our hardware consists of 8 hosts (1 x 64gb (driver) and 7 x 32gb (workers))
and we are using Spark 1.1 (Python) via Cloudera CDH 5.2.

We have managed to replicate the error using a test script shown below. I
would be interested to know if anyone has seen this before with broadcasting
or know of a fix.

=== ERROR ==

14/10/24 08:20:04 INFO BlockManager: Found block rdd_11_31 locally
14/10/24 08:20:08 INFO ConnectionManager: Key not valid ?
sun.nio.ch.SelectionKeyImpl@fbc6caf
14/10/24 08:20:08 INFO ConnectionManager: Removing ReceivingConnection to
ConnectionManagerId(pigeon3.ldn.ebs.io,55316)
14/10/24 08:20:08 INFO ConnectionManager: Removing SendingConnection to
ConnectionManagerId(pigeon3.ldn.ebs.io,55316)
14/10/24 08:20:08 INFO ConnectionManager: Removing SendingConnection to
ConnectionManagerId(pigeon3.ldn.ebs.io,55316)
14/10/24 08:20:08 INFO ConnectionManager: key already cancelled ?
sun.nio.ch.SelectionKeyImpl@fbc6caf
java.nio.channels.CancelledKeyException
at
org.apache.spark.network.ConnectionManager.run(ConnectionManager.scala:386)
at
org.apache.spark.network.ConnectionManager$$anon$4.run(ConnectionManager.scala:139)
14/10/24 08:20:13 INFO ConnectionManager: Removing ReceivingConnection to
ConnectionManagerId(pigeon7.ldn.ebs.io,52524)
14/10/24 08:20:13 INFO ConnectionManager: Removing SendingConnection to
ConnectionManagerId(pigeon7.ldn.ebs.io,52524)
14/10/24 08:20:13 INFO ConnectionManager: Removing SendingConnection to
ConnectionManagerId(pigeon7.ldn.ebs.io,52524)
14/10/24 08:20:15 INFO ConnectionManager: Removing ReceivingConnection to
ConnectionManagerId(toppigeon.ldn.ebs.io,34370)
14/10/24 08:20:15 INFO ConnectionManager: Key not valid ?
sun.nio.ch.SelectionKeyImpl@3ecfdb7e
14/10/24 08:20:15 INFO ConnectionManager: Removing SendingConnection to
ConnectionManagerId(toppigeon.ldn.ebs.io,34370)
14/10/24 08:20:15 INFO ConnectionManager: Removing SendingConnection to
ConnectionManagerId(toppigeon.ldn.ebs.io,34370)
14/10/24 08:20:15 INFO ConnectionManager: key already cancelled ?
sun.nio.ch.SelectionKeyImpl@3ecfdb7e
java.nio.channels.CancelledKeyException
at
org.apache.spark.network.ConnectionManager.run(ConnectionManager.scala:386)
at
org.apache.spark.network.ConnectionManager$$anon$4.run(ConnectionManager.scala:139)
14/10/24 08:20:19 INFO ConnectionManager: Removing ReceivingConnection to
ConnectionManagerId(pigeon8.ldn.ebs.io,48628)
14/10/24 08:20:19 INFO ConnectionManager: Removing SendingConnection to
ConnectionManagerId(pigeon8.ldn.ebs.io,48628)
14/10/24 08:20:19 INFO ConnectionManager: Removing ReceivingConnection to
ConnectionManagerId(pigeon6.ldn.ebs.io,44996)
14/10/24 08:20:19 INFO ConnectionManager: Removing SendingConnection to
ConnectionManagerId(pigeon6.ldn.ebs.io,44996)
14/10/24 08:20:27 ERROR SendingConnection: Exception while reading
SendingConnection to ConnectionManagerId(pigeon8.ldn.ebs.io,48628)
java.nio.channels.ClosedChannelException
at 
sun.nio.ch.SocketChannelImpl.ensureReadOpen(SocketChannelImpl.java:252)
at sun.nio.ch.SocketChannelImpl.read(SocketChannelImpl.java:295)
at org.apache.spark.network.SendingConnection.read(Connection.scala:390)
at
org.apache.spark.network.ConnectionManager$$anon$7.run(ConnectionManager.scala:199)
at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
at java.lang.Thread.run(Thread.java:745)
14/10/24 08:20:27 ERROR SendingConnection: Exception while reading
SendingConnection to ConnectionManagerId(pigeon6.ldn.ebs.io,44996)
java.nio.channels.ClosedChannelException
at 
sun.nio.ch.SocketChannelImpl.ensureReadOpen(SocketChannelImpl.java:252)
at sun.nio.ch.SocketChannelImpl.read(SocketChannelImpl.java:295)
at org.apache.spark.network.SendingConnection.read(Connection.scala:390)
at
org.apache.spark.network.ConnectionManager$$anon$7.run(ConnectionManager.scala:199)
at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
at java.lang.Thread.run(Thread.java:745)
14/10/24 08:20:27 INFO ConnectionManager: Handling connection error on
connection to ConnectionManagerId(pigeon6.ldn.ebs.io,44996)


=== PYTHON SCRIPT ==

#!/usr/bin/pyspark

import subprocess
from random import choice
import string

from pyspark import SparkContext, SparkConf

path_hdfs_broadcast_test = broadcast_test/general_test_test

subprocess.Popen([hdfs, dfs, -rm, -r, path_hdfs_broadcast_test],
stdout=subprocess.PIPE, stderr=subprocess.STDOUT)

sconf =