Hello, I am running the BroadcastTest example in a standalone cluster using spark-submit. I have 8 host machines and made Host1 the master. Host2 to Host8 act as 7 workers to connect to the master. The connection was fine as I could see all 7 hosts on the master web ui. The BroadcastTest example with Http broadcast also works fine, I think, as there was no error msg and all workers "EXITED" at the end. But when I changed the third argument from "Http" to "Torrent" to use Torrent broadcast, all workers got a "KILLED" status once they reached sc.stop().
Below is the stderr on one of the workers when running Torrent broadcast (I masked the IP addresses): ========================================================================================== 14/07/02 18:20:03 INFO SecurityManager: Changing view acls to: root 14/07/02 18:20:03 INFO SecurityManager: SecurityManager: authentication disabled; ui acls disabled; users with view permissions: Set(root) 14/07/02 18:20:04 INFO Slf4jLogger: Slf4jLogger started 14/07/02 18:20:04 INFO Remoting: Starting remoting 14/07/02 18:20:04 INFO Remoting: Remoting started; listening on addresses :[akka.tcp://driverPropsFetcher@dyn-xxx-xx-xx-xx:37771] 14/07/02 18:20:04 INFO Remoting: Remoting now listens on addresses: [akka.tcp://driverPropsFetcher@dyn-xxx-xx-xx-xx:37771] 14/07/02 18:20:04 INFO SecurityManager: Changing view acls to: root 14/07/02 18:20:04 INFO SecurityManager: SecurityManager: authentication disabled; ui acls disabled; users with view permissions: Set(root) 14/07/02 18:20:04 INFO RemoteActorRefProvider$RemotingTerminator: Shutting down remote daemon. 14/07/02 18:20:04 INFO RemoteActorRefProvider$RemotingTerminator: Remote daemon shut down; proceeding with flushing remote transports. 14/07/02 18:20:04 INFO Slf4jLogger: Slf4jLogger started 14/07/02 18:20:04 INFO Remoting: Starting remoting 14/07/02 18:20:04 INFO Remoting: Remoting started; listening on addresses :[akka.tcp://sparkExecutor@dyn-xxx-xx-xx-xx:53661] 14/07/02 18:20:04 INFO Remoting: Remoting now listens on addresses: [akka.tcp://sparkExecutor@dyn-xxx-xx-xx-xx:53661] 14/07/02 18:20:04 INFO CoarseGrainedExecutorBackend: Connecting to driver: akka.tcp://spark@dyn-xxx-xx-xx-xx:42436/user/CoarseGrainedScheduler 14/07/02 18:20:04 INFO WorkerWatcher: Connecting to worker akka.tcp://sparkWorker@dyn-xxx-xx-xx-xx:32818/user/Worker 14/07/02 18:20:04 INFO Remoting: Remoting shut down 14/07/02 18:20:04 INFO RemoteActorRefProvider$RemotingTerminator: Remoting shut down. 14/07/02 18:20:04 INFO WorkerWatcher: Successfully connected to akka.tcp://sparkWorker@dyn-xxx-xx-xx-xx:32818/user/Worker 14/07/02 18:20:04 INFO CoarseGrainedExecutorBackend: Successfully registered with driver 14/07/02 18:20:04 INFO SecurityManager: Changing view acls to: root 14/07/02 18:20:04 INFO SecurityManager: SecurityManager: authentication disabled; ui acls disabled; users with view permissions: Set(root) 14/07/02 18:20:04 INFO Slf4jLogger: Slf4jLogger started 14/07/02 18:20:04 INFO Remoting: Starting remoting 14/07/02 18:20:05 INFO Remoting: Remoting started; listening on addresses :[akka.tcp://spark@dyn-xxx-xx-xx-xx:57883] 14/07/02 18:20:05 INFO Remoting: Remoting now listens on addresses: [akka.tcp://spark@dyn-xxx-xx-xx-xx:57883] 14/07/02 18:20:05 INFO SparkEnv: Connecting to MapOutputTracker: akka.tcp://spark@dyn-xxx-xx-xx-xx:42436/user/MapOutputTracker 14/07/02 18:20:05 INFO SparkEnv: Connecting to BlockManagerMaster: akka.tcp://spark@dyn-xxx-xx-xx-xx:42436/user/BlockManagerMaster 14/07/02 18:20:05 INFO DiskBlockManager: Created local directory at /tmp/spark-local-20140702182005-30bd 14/07/02 18:20:05 INFO ConnectionManager: Bound socket to port 60368 with id = ConnectionManagerId(dyn-xxx-xx-xx-xx,60368) 14/07/02 18:20:05 INFO MemoryStore: MemoryStore started with capacity 294.6 MB 14/07/02 18:20:05 INFO BlockManagerMaster: Trying to register BlockManager 14/07/02 18:20:05 INFO BlockManagerMaster: Registered BlockManager 14/07/02 18:20:05 INFO HttpFileServer: HTTP File server directory is /tmp/spark-35f65442-e0e8-4122-9359-ca8232ca97a6 14/07/02 18:20:05 INFO HttpServer: Starting HTTP Server 14/07/02 18:20:06 INFO CoarseGrainedExecutorBackend: Got assigned task 9 14/07/02 18:20:06 INFO Executor: Running task ID 9 14/07/02 18:20:06 INFO Executor: Fetching http://xxx.xx.xx.xx:54292/jars/broadcast-test_2.10-1.0.jar with timestamp 1404339601903 14/07/02 18:20:06 INFO Utils: Fetching http://xxx.xx.xx.xx:54292/jars/broadcast-test_2.10-1.0.jar to /tmp/fetchFileTemp5382215579021312284.tmp 14/07/02 18:20:06 INFO BlockManager: Removing broadcast 0 14/07/02 18:20:07 INFO Executor: Adding file:/home/lrl/Desktop/spark-master/work/app-20140702182002-0006/3/./broadcast-test_2.10-1.0.jar to class loader 14/07/02 18:20:07 INFO TorrentBroadcast: Started reading broadcast variable 1 14/07/02 18:20:07 INFO SendingConnection: Initiating connection to [dyn-xxx-xx-xx-xx:60179] 14/07/02 18:20:07 INFO SendingConnection: Connected to [dyn-xxx-xx-xx-xx:60179], 1 messages pending 14/07/02 18:20:07 INFO ConnectionManager: Accepted connection from [DCTB-Host1/xxx.xx.xx.xx] 14/07/02 18:20:07 INFO BlockManager: Found block broadcast_1_meta remotely 14/07/02 18:20:07 INFO SendingConnection: Initiating connection to [dyn-xxx-xx-xx-xx:55273] 14/07/02 18:20:07 INFO SendingConnection: Connected to [dyn-xxx-xx-xx-xx:55273], 1 messages pending 14/07/02 18:20:07 INFO ConnectionManager: Accepted connection from [dyn-xxx-xx-xx-xxx] 14/07/02 18:20:07 INFO BlockManager: Found block broadcast_1_piece0 remotely 14/07/02 18:20:07 WARN SizeEstimator: Failed to check whether UseCompressedOops is set; assuming yes 14/07/02 18:20:07 INFO MemoryStore: ensureFreeSpace(4000168) called with curMem=0, maxMem=308910489 14/07/02 18:20:07 INFO MemoryStore: Block broadcast_1_piece0 stored as values in memory (estimated size 3.8 MB, free 290.8 MB) 14/07/02 18:20:07 INFO BlockManagerMaster: Updated info of block broadcast_1_piece0 14/07/02 18:20:08 INFO MemoryStore: ensureFreeSpace(4000120) called with curMem=4000168, maxMem=308910489 14/07/02 18:20:08 INFO MemoryStore: Block broadcast_1 stored as values in memory (estimated size 3.8 MB, free 287.0 MB) 14/07/02 18:20:08 INFO TorrentBroadcast: Reading broadcast variable 1 took 0.909187542 s 14/07/02 18:20:08 INFO Executor: Serialized size of result for 9 is 599 14/07/02 18:20:08 INFO Executor: Sending result for 9 directly to driver 14/07/02 18:20:08 INFO Executor: Finished task ID 9 14/07/02 18:20:08 INFO CoarseGrainedExecutorBackend: Got assigned task 13 14/07/02 18:20:08 INFO Executor: Running task ID 13 14/07/02 18:20:08 INFO TorrentBroadcast: Started reading broadcast variable 2 14/07/02 18:20:08 INFO BlockManager: Found block broadcast_2_meta remotely 14/07/02 18:20:08 INFO BlockManager: Found block broadcast_2_piece0 remotely 14/07/02 18:20:08 INFO MemoryStore: ensureFreeSpace(4000168) called with curMem=8000288, maxMem=308910489 14/07/02 18:20:08 INFO MemoryStore: Block broadcast_2_piece0 stored as values in memory (estimated size 3.8 MB, free 283.2 MB) 14/07/02 18:20:08 INFO BlockManagerMaster: Updated info of block broadcast_2_piece0 14/07/02 18:20:08 INFO MemoryStore: ensureFreeSpace(4000120) called with curMem=12000456, maxMem=308910489 14/07/02 18:20:08 INFO MemoryStore: Block broadcast_2 stored as values in memory (estimated size 3.8 MB, free 279.3 MB) 14/07/02 18:20:08 INFO TorrentBroadcast: Reading broadcast variable 2 took 0.269456367 s 14/07/02 18:20:08 INFO Executor: Serialized size of result for 13 is 599 14/07/02 18:20:08 INFO Executor: Sending result for 13 directly to driver 14/07/02 18:20:08 INFO Executor: Finished task ID 13 14/07/02 18:20:09 INFO BlockManager: Removing broadcast 2 14/07/02 18:20:09 INFO BlockManager: Removing block broadcast_2 14/07/02 18:20:09 INFO MemoryStore: Block broadcast_2 of size 4000120 dropped from memory (free 296910033) 14/07/02 18:20:09 INFO BlockManager: Removing block broadcast_2_piece0 14/07/02 18:20:09 INFO MemoryStore: Block broadcast_2_piece0 of size 4000168 dropped from memory (free 300910201) 14/07/02 18:20:09 INFO CoarseGrainedExecutorBackend: Driver commanded a shutdown 14/07/02 18:20:09 INFO RemoteActorRefProvider$RemotingTerminator: Shutting down remote daemon. 14/07/02 18:20:09 INFO BlockManagerMaster: Updated info of block broadcast_2_piece0 14/07/02 18:20:09 INFO RemoteActorRefProvider$RemotingTerminator: Remote daemon shut down; proceeding with flushing remote transports. 14/07/02 18:20:09 INFO Remoting: Remoting shut down 14/07/02 18:20:10 INFO ConnectionManager: Key not valid ? sun.nio.ch.SelectionKeyImpl@1973a69 14/07/02 18:20:10 INFO ConnectionManager: Removing SendingConnection to ConnectionManagerId(dyn-xxx-xx-xx-xx,60179) 14/07/02 18:20:10 INFO ConnectionManager: Removing ReceivingConnection to ConnectionManagerId(DCTB-Host1,60179) 14/07/02 18:20:10 ERROR ConnectionManager: Corresponding SendingConnectionManagerId not found 14/07/02 18:20:10 INFO ConnectionManager: key already cancelled ? sun.nio.ch.SelectionKeyImpl@1973a69 java.nio.channels.CancelledKeyException at org.apache.spark.network.ConnectionManager.run(ConnectionManager.scala:363) at org.apache.spark.network.ConnectionManager$$anon$4.run(ConnectionManager.scala:116) ========================================================================================== Also, here is the output when running http broadcast, as a comparision (IP addresses masked): ========================================================================================== 14/07/02 18:02:04 INFO SecurityManager: Changing view acls to: root 14/07/02 18:02:04 INFO SecurityManager: SecurityManager: authentication disabled; ui acls disabled; users with view permissions: Set(root) 14/07/02 18:02:05 INFO Slf4jLogger: Slf4jLogger started 14/07/02 18:02:05 INFO Remoting: Starting remoting 14/07/02 18:02:05 INFO Remoting: Remoting started; listening on addresses :[akka.tcp://driverPropsFetcher@dyn-xxx-xx-xx-xx:37190] 14/07/02 18:02:05 INFO Remoting: Remoting now listens on addresses: [akka.tcp://driverPropsFetcher@dyn-xxx-xx-xx-xx:37190] 14/07/02 18:02:05 INFO SecurityManager: Changing view acls to: root 14/07/02 18:02:05 INFO SecurityManager: SecurityManager: authentication disabled; ui acls disabled; users with view permissions: Set(root) 14/07/02 18:02:05 INFO RemoteActorRefProvider$RemotingTerminator: Shutting down remote daemon. 14/07/02 18:02:05 INFO RemoteActorRefProvider$RemotingTerminator: Remote daemon shut down; proceeding with flushing remote transports. 14/07/02 18:02:05 INFO Slf4jLogger: Slf4jLogger started 14/07/02 18:02:05 INFO Remoting: Starting remoting 14/07/02 18:02:05 INFO Remoting: Remoting started; listening on addresses :[akka.tcp://sparkExecutor@dyn-xxx-xx-xx-xx:44376] 14/07/02 18:02:05 INFO Remoting: Remoting now listens on addresses: [akka.tcp://sparkExecutor@dyn-xxx-xx-xx-xx:44376] 14/07/02 18:02:05 INFO CoarseGrainedExecutorBackend: Connecting to driver: akka.tcp://spark@dyn-xxx-xx-xx-xx:51300/user/CoarseGrainedScheduler 14/07/02 18:02:05 INFO WorkerWatcher: Connecting to worker akka.tcp://sparkWorker@dyn-xxx-xx-xx-xx:32818/user/Worker 14/07/02 18:02:05 INFO Remoting: Remoting shut down 14/07/02 18:02:05 INFO RemoteActorRefProvider$RemotingTerminator: Remoting shut down. 14/07/02 18:02:05 INFO WorkerWatcher: Successfully connected to akka.tcp://sparkWorker@dyn-xxx-xx-xx-xx:32818/user/Worker 14/07/02 18:02:05 INFO CoarseGrainedExecutorBackend: Successfully registered with driver 14/07/02 18:02:05 INFO SecurityManager: Changing view acls to: root 14/07/02 18:02:05 INFO SecurityManager: SecurityManager: authentication disabled; ui acls disabled; users with view permissions: Set(root) 14/07/02 18:02:05 INFO Slf4jLogger: Slf4jLogger started 14/07/02 18:02:05 INFO Remoting: Starting remoting 14/07/02 18:02:05 INFO Remoting: Remoting started; listening on addresses :[akka.tcp://spark@dyn-xxx-xx-xx-xx:39965] 14/07/02 18:02:05 INFO Remoting: Remoting now listens on addresses: [akka.tcp://spark@dyn-xxx-xx-xx-xx:39965] 14/07/02 18:02:05 INFO SparkEnv: Connecting to MapOutputTracker: akka.tcp://spark@dyn-xxx-xx-xx-xx:51300/user/MapOutputTracker 14/07/02 18:02:05 INFO SparkEnv: Connecting to BlockManagerMaster: akka.tcp://spark@dyn-xxx-xx-xx-xx:51300/user/BlockManagerMaster 14/07/02 18:02:05 INFO DiskBlockManager: Created local directory at /tmp/spark-local-20140702180205-113e 14/07/02 18:02:05 INFO ConnectionManager: Bound socket to port 48270 with id = ConnectionManagerId(dyn-xxx-xx-xx-xx,48270) 14/07/02 18:02:05 INFO MemoryStore: MemoryStore started with capacity 294.6 MB 14/07/02 18:02:05 INFO BlockManagerMaster: Trying to register BlockManager 14/07/02 18:02:05 INFO BlockManagerMaster: Registered BlockManager 14/07/02 18:02:05 INFO HttpFileServer: HTTP File server directory is /tmp/spark-5c87e636-faaa-489a-b5a6-c9100cfe4dc5 14/07/02 18:02:05 INFO HttpServer: Starting HTTP Server 14/07/02 18:02:06 INFO CoarseGrainedExecutorBackend: Got assigned task 11 14/07/02 18:02:06 INFO Executor: Running task ID 11 14/07/02 18:02:06 INFO Executor: Fetching http://xxx.xx.xx.xx:43505/jars/broadcast-test_2.10-1.0.jar with timestamp 1404338522319 14/07/02 18:02:06 INFO Utils: Fetching http://xxx.xx.xx.xx:43505/jars/broadcast-test_2.10-1.0.jar to /tmp/fetchFileTemp4777840901789178395.tmp 14/07/02 18:02:06 INFO Executor: Adding file:/home/lrl/Desktop/spark-master/work/app-20140702180202-0001/3/./broadcast-test_2.10-1.0.jar to class loader 14/07/02 18:02:06 INFO HttpBroadcast: Started reading broadcast variable 1 14/07/02 18:02:06 WARN SizeEstimator: Failed to check whether UseCompressedOops is set; assuming yes 14/07/02 18:02:06 INFO MemoryStore: ensureFreeSpace(4000120) called with curMem=0, maxMem=308910489 14/07/02 18:02:06 INFO MemoryStore: Block broadcast_1 stored as values in memory (estimated size 3.8 MB, free 290.8 MB) 14/07/02 18:02:06 INFO HttpBroadcast: Reading broadcast variable 1 took 0.121600945 s 14/07/02 18:02:06 INFO Executor: Serialized size of result for 11 is 599 14/07/02 18:02:06 INFO Executor: Sending result for 11 directly to driver 14/07/02 18:02:06 INFO Executor: Finished task ID 11 14/07/02 18:02:07 INFO CoarseGrainedExecutorBackend: Got assigned task 18 14/07/02 18:02:07 INFO Executor: Running task ID 18 14/07/02 18:02:07 INFO HttpBroadcast: Started reading broadcast variable 2 14/07/02 18:02:07 INFO MemoryStore: ensureFreeSpace(4000120) called with curMem=4000120, maxMem=308910489 14/07/02 18:02:07 INFO MemoryStore: Block broadcast_2 stored as values in memory (estimated size 3.8 MB, free 287.0 MB) 14/07/02 18:02:07 INFO HttpBroadcast: Reading broadcast variable 2 took 0.234879208 s 14/07/02 18:02:07 INFO Executor: Serialized size of result for 18 is 599 14/07/02 18:02:07 INFO Executor: Sending result for 18 directly to driver 14/07/02 18:02:07 INFO Executor: Finished task ID 18 14/07/02 18:02:07 INFO CoarseGrainedExecutorBackend: Driver commanded a shutdown 14/07/02 18:02:07 INFO RemoteActorRefProvider$RemotingTerminator: Shutting down remote daemon. 14/07/02 18:02:07 INFO RemoteActorRefProvider$RemotingTerminator: Remote daemon shut down; proceeding with flushing remote transports. 14/07/02 18:02:07 INFO Remoting: Remoting shut down ========================================================================================== It seems to me that the error does not happen until broadcasting is already finished. But still I would like to make sure why would there be such an error message occurring at sc.stop(). If it is issue with sc.stop() then why is it happening only with Torrent broadcast but not Http broadcast? Any insights on this will be very much appreciated! Thanks in advance! Best, Jack -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Running-the-BroadcastTest-scala-with-TorrentBroadcastFactory-in-a-standalone-cluster-tp8736.html Sent from the Apache Spark User List mailing list archive at Nabble.com.