Hey all,

I’m trying to run connected components in graphx on about 400GB of data on 50 
m3.xlarge nodes on emr. I keep getting java.nio.channels.CancelledKeyException 
when it gets to "mapPartitions at VertexRDD.scala:347”. I haven’t been able to 
find much about this online, and nothing that seems relevant to my situation. 
I’m using spark built from the master repo, because I need the custom storage 
level feature in graphx.

My configuration:

                .set("spark.executor.memory", "13g")
                .set("spark.serializer", 
"org.apache.spark.serializer.KryoSerializer")
                
.set("spark.kryo.registrator","org.apache.spark.graphx.GraphKryoRegistrator")
                
.set("spark.executor.extraJavaOptions","-XX:-UseGCOverheadLimit")
                .set("spark.akka.frameSize", "128")
                .set("spark.storage.memoryFraction", "0.2")
                .set("spark.shuffle.memoryFraction", "0.7")
                .set("spark.akka.timeout","600”)

The code I’m running:

val graph = GraphLoader.edgeListFile(sc, “/foo/bar", minEdgePartitions=2000, 
edgeStorageLevel=StorageLevel.MEMORY_AND_DISK_SER, 
vertexStorageLevel=StorageLevel.MEMORY_AND_DISK_SER)

val cc = graph.connectedComponents().vertices

Stack trace:

14/08/18 18:50:07 INFO network.ConnectionManager: key already cancelled ? 
sun.nio.ch.SelectionKeyImpl@501c1504
java.nio.channels.CancelledKeyException
        at sun.nio.ch.SelectionKeyImpl.ensureValid(SelectionKeyImpl.java:73)
        at sun.nio.ch.SelectionKeyImpl.readyOps(SelectionKeyImpl.java:87)
        at java.nio.channels.SelectionKey.isAcceptable(SelectionKey.java:360)
        at 
org.apache.spark.network.ConnectionManager.run(ConnectionManager.scala:372)
        at 
org.apache.spark.network.ConnectionManager$$anon$4.run(ConnectionManager.scala:139)
14/08/18 18:50:07 INFO cluster.SparkDeploySchedulerBackend: Executor 48 
disconnected, so removing it
14/08/18 18:50:07 ERROR network.SendingConnection: Exception while reading 
SendingConnection to ConnectionManagerId(ip-10-136-91-34.ec2.internal,51095)
java.nio.channels.ClosedChannelException
        at 
sun.nio.ch.SocketChannelImpl.ensureReadOpen(SocketChannelImpl.java:252)
        at sun.nio.ch.SocketChannelImpl.read(SocketChannelImpl.java:295)
        at org.apache.spark.network.SendingConnection.read(Connection.scala:390)
        at 
org.apache.spark.network.ConnectionManager$$anon$7.run(ConnectionManager.scala:199)
        at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
        at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
        at java.lang.Thread.run(Thread.java:745)
14/08/18 18:50:07 INFO storage.BlockManagerInfo: Added rdd_16_161 in memory on 
ip-10-150-84-111.ec2.internal:58219 (size: 78.4 MB, free: 716.5 MB)
14/08/18 18:50:07 ERROR scheduler.TaskSchedulerImpl: Lost executor 48 on 
ip-10-136-91-34.ec2.internal: remote Akka client disassociated

Log from the worker node in spark/logs/:

14/08/18 18:40:52 INFO worker.ExecutorRunner: Launch command: 
"/usr/lib/jvm/java-7-oracle/bin/java" "-cp" 
"/home/hadoop/spark/lib/*;/home/hadoop/lib/*;/home/hadoop/:/home/hadoop/spark/lib/*;/home/hadoop/lib/*;/home/hadoop/::/home/hadoop/spark/conf:/home/hadoop/spark/assembly/target/scala-2.10/spark-assembly-1.1.0-SNAPSHOT-hadoop1.0.3.jar:/home/hadoop/conf/"
 "-XX:MaxPermSize=128m" "-Dspark.akka.timeout=600" "-Dspark.akka.frameSize=128" 
"-Dspark.driver.port=50456" "-XX:-UseGCOverheadLimit" "-Xms13312M" "-Xmx13312M" 
"org.apache.spark.executor.CoarseGrainedExecutorBackend" 
"akka.tcp://spark@ip-10-171-49-153.ec2.internal:50456/user/CoarseGrainedScheduler"
 "48" "ip-10-136-91-34.ec2.internal" "4" 
"akka.tcp://sparkWorker@ip-10-136-91-34.ec2.internal:43942/user/Worker" 
"app-20140818184051-0000"
240.261: [GC [PSYoungGen: 132096K->18338K(153600K)] 132096K->18346K(503296K), 
0.0285710 secs] [Times: user=0.04 sys=0.02, real=0.03 secs]
14/08/18 18:50:07 INFO worker.Worker: Executor app-20140818184051-0000/48 
finished with state EXITED message Command exited with code 137 exitStatus 137
14/08/18 18:50:07 INFO worker.Worker: Asked to launch executor 
app-20140818184051-0000/50 for Spark CC
14/08/18 18:50:07 INFO actor.LocalActorRef: Message 
[akka.remote.transport.ActorTransportAdapter$DisassociateUnderlying] from 
Actor[akka://sparkWorker/deadLetters] to 
Actor[akka://sparkWorker/system/transports/akkaprotocolmanager.tcp0/akkaProtocol-tcp%3A%2F%2FsparkWorker%4010.136.91.34%3A47436-2#201455087]
 was not delivered. [1] dead letters encountered. This logging can be turned 
off or adjusted with configuration settings 'akka.log-dead-letters' and 
'akka.log-dead-letters-during-shutdown'.
14/08/18 18:50:07 ERROR remote.EndpointWriter: AssociationError 
[akka.tcp://sparkWorker@ip-10-136-91-34.ec2.internal:43942] -> 
[akka.tcp://sparkExecutor@ip-10-136-91-34.ec2.internal:52301]: Error 
[Association failed with 
[akka.tcp://sparkExecutor@ip-10-136-91-34.ec2.internal:52301]] [
akka.remote.EndpointAssociationException: Association failed with 
[akka.tcp://sparkExecutor@ip-10-136-91-34.ec2.internal:52301]
Caused by: 
akka.remote.transport.netty.NettyTransport$$anonfun$associate$1$$anon$2: 
Connection refused: ip-10-136-91-34.ec2.internal/10.136.91.34:52301
]
14/08/18 18:50:07 ERROR remote.EndpointWriter: AssociationError 
[akka.tcp://sparkWorker@ip-10-136-91-34.ec2.internal:43942] -> 
[akka.tcp://sparkExecutor@ip-10-136-91-34.ec2.internal:52301]: Error 
[Association failed with 
[akka.tcp://sparkExecutor@ip-10-136-91-34.ec2.internal:52301]] [
akka.remote.EndpointAssociationException: Association failed with 
[akka.tcp://sparkExecutor@ip-10-136-91-34.ec2.internal:52301]
Caused by: 
akka.remote.transport.netty.NettyTransport$$anonfun$associate$1$$anon$2: 
Connection refused: ip-10-136-91-34.ec2.internal/10.136.91.34:52301
]
14/08/18 18:50:07 ERROR remote.EndpointWriter: AssociationError 
[akka.tcp://sparkWorker@ip-10-136-91-34.ec2.internal:43942] -> 
[akka.tcp://sparkExecutor@ip-10-136-91-34.ec2.internal:52301]: Error 
[Association failed with 
[akka.tcp://sparkExecutor@ip-10-136-91-34.ec2.internal:52301]] [
akka.remote.EndpointAssociationException: Association failed with 
[akka.tcp://sparkExecutor@ip-10-136-91-34.ec2.internal:52301]
Caused by: 
akka.remote.transport.netty.NettyTransport$$anonfun$associate$1$$anon$2: 
Connection refused: ip-10-136-91-34.ec2.internal/10.136.91.34:52301
]

---------------------------------------------------------------------
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org

Reply via email to