Hey all,
I’m trying to run connected components in graphx on about 400GB of data on 50
m3.xlarge nodes on emr. I keep getting java.nio.channels.CancelledKeyException
when it gets to mapPartitions at VertexRDD.scala:347”. I haven’t been able to
find much about this online, and nothing that seems relevant to my situation.
I’m using spark built from the master repo, because I need the custom storage
level feature in graphx.
My configuration:
.set(spark.executor.memory, 13g)
.set(spark.serializer,
org.apache.spark.serializer.KryoSerializer)
.set(spark.kryo.registrator,org.apache.spark.graphx.GraphKryoRegistrator)
.set(spark.executor.extraJavaOptions,-XX:-UseGCOverheadLimit)
.set(spark.akka.frameSize, 128)
.set(spark.storage.memoryFraction, 0.2)
.set(spark.shuffle.memoryFraction, 0.7)
.set(spark.akka.timeout,600”)
The code I’m running:
val graph = GraphLoader.edgeListFile(sc, “/foo/bar, minEdgePartitions=2000,
edgeStorageLevel=StorageLevel.MEMORY_AND_DISK_SER,
vertexStorageLevel=StorageLevel.MEMORY_AND_DISK_SER)
val cc = graph.connectedComponents().vertices
Stack trace:
14/08/18 18:50:07 INFO network.ConnectionManager: key already cancelled ?
sun.nio.ch.SelectionKeyImpl@501c1504
java.nio.channels.CancelledKeyException
at sun.nio.ch.SelectionKeyImpl.ensureValid(SelectionKeyImpl.java:73)
at sun.nio.ch.SelectionKeyImpl.readyOps(SelectionKeyImpl.java:87)
at java.nio.channels.SelectionKey.isAcceptable(SelectionKey.java:360)
at
org.apache.spark.network.ConnectionManager.run(ConnectionManager.scala:372)
at
org.apache.spark.network.ConnectionManager$$anon$4.run(ConnectionManager.scala:139)
14/08/18 18:50:07 INFO cluster.SparkDeploySchedulerBackend: Executor 48
disconnected, so removing it
14/08/18 18:50:07 ERROR network.SendingConnection: Exception while reading
SendingConnection to ConnectionManagerId(ip-10-136-91-34.ec2.internal,51095)
java.nio.channels.ClosedChannelException
at
sun.nio.ch.SocketChannelImpl.ensureReadOpen(SocketChannelImpl.java:252)
at sun.nio.ch.SocketChannelImpl.read(SocketChannelImpl.java:295)
at org.apache.spark.network.SendingConnection.read(Connection.scala:390)
at
org.apache.spark.network.ConnectionManager$$anon$7.run(ConnectionManager.scala:199)
at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
at java.lang.Thread.run(Thread.java:745)
14/08/18 18:50:07 INFO storage.BlockManagerInfo: Added rdd_16_161 in memory on
ip-10-150-84-111.ec2.internal:58219 (size: 78.4 MB, free: 716.5 MB)
14/08/18 18:50:07 ERROR scheduler.TaskSchedulerImpl: Lost executor 48 on
ip-10-136-91-34.ec2.internal: remote Akka client disassociated
Log from the worker node in spark/logs/:
14/08/18 18:40:52 INFO worker.ExecutorRunner: Launch command:
/usr/lib/jvm/java-7-oracle/bin/java -cp
/home/hadoop/spark/lib/*;/home/hadoop/lib/*;/home/hadoop/:/home/hadoop/spark/lib/*;/home/hadoop/lib/*;/home/hadoop/::/home/hadoop/spark/conf:/home/hadoop/spark/assembly/target/scala-2.10/spark-assembly-1.1.0-SNAPSHOT-hadoop1.0.3.jar:/home/hadoop/conf/
-XX:MaxPermSize=128m -Dspark.akka.timeout=600 -Dspark.akka.frameSize=128
-Dspark.driver.port=50456 -XX:-UseGCOverheadLimit -Xms13312M -Xmx13312M
org.apache.spark.executor.CoarseGrainedExecutorBackend
akka.tcp://spark@ip-10-171-49-153.ec2.internal:50456/user/CoarseGrainedScheduler
48 ip-10-136-91-34.ec2.internal 4
akka.tcp://sparkWorker@ip-10-136-91-34.ec2.internal:43942/user/Worker
app-20140818184051-
240.261: [GC [PSYoungGen: 132096K-18338K(153600K)] 132096K-18346K(503296K),
0.0285710 secs] [Times: user=0.04 sys=0.02, real=0.03 secs]
14/08/18 18:50:07 INFO worker.Worker: Executor app-20140818184051-/48
finished with state EXITED message Command exited with code 137 exitStatus 137
14/08/18 18:50:07 INFO worker.Worker: Asked to launch executor
app-20140818184051-/50 for Spark CC
14/08/18 18:50:07 INFO actor.LocalActorRef: Message
[akka.remote.transport.ActorTransportAdapter$DisassociateUnderlying] from
Actor[akka://sparkWorker/deadLetters] to
Actor[akka://sparkWorker/system/transports/akkaprotocolmanager.tcp0/akkaProtocol-tcp%3A%2F%2FsparkWorker%4010.136.91.34%3A47436-2#201455087]
was not delivered. [1] dead letters encountered. This logging can be turned
off or adjusted with configuration settings 'akka.log-dead-letters' and
'akka.log-dead-letters-during-shutdown'.
14/08/18 18:50:07 ERROR remote.EndpointWriter: AssociationError
[akka.tcp://sparkWorker@ip-10-136-91-34.ec2.internal:43942] -
[akka.tcp://sparkExecutor@ip-10-136-91-34.ec2.internal:52301]: Error
[Association failed with
[akka.tcp://sparkExecutor@ip-10-136-91-34.ec2.internal:52301]] [
akka.remote.EndpointAssociationException: Association