java.lang.ClassCastException: java.lang.Long cannot be cast to scala.Tuple2

2014-09-10 Thread Jeffrey Picard
Hey guys,

After rebuilding from the master branch this morning, I’ve started to see these 
errors that I’ve never gotten before while running connected components. Anyone 
seen this before?

14/09/10 20:38:53 INFO collection.ExternalSorter: Thread 87 spilling in-memory 
batch of 1020 MB to disk (1 spill so far)
14/09/10 20:38:53 INFO collection.ExternalSorter: Thread 58 spilling in-memory 
batch of 1020 MB to disk (1 spill so far)
14/09/10 20:38:53 INFO collection.ExternalSorter: Thread 57 spilling in-memory 
batch of 1020 MB to disk (1 spill so far)
14/09/10 20:38:53 INFO collection.ExternalSorter: Thread 60 spilling in-memory 
batch of 1020 MB to disk (1 spill so far)
14/09/10 20:39:15 ERROR executor.Executor: Exception in task 275.0 in stage 3.0 
(TID 994)
java.lang.ClassCastException: java.lang.Long cannot be cast to scala.Tuple2
at 
org.apache.spark.graphx.impl.RoutingTableMessageSerializer$$anon$1$$anon$2.writeObject(Serializers.scala:39)
at 
org.apache.spark.storage.DiskBlockObjectWriter.write(BlockObjectWriter.scala:195)
at 
org.apache.spark.util.collection.ExternalSorter.spillToMergeableFile(ExternalSorter.scala:329)
at 
org.apache.spark.util.collection.ExternalSorter.spill(ExternalSorter.scala:271)
at 
org.apache.spark.util.collection.ExternalSorter.maybeSpill(ExternalSorter.scala:249)
at 
org.apache.spark.util.collection.ExternalSorter.insertAll(ExternalSorter.scala:220)
at 
org.apache.spark.shuffle.sort.SortShuffleWriter.write(SortShuffleWriter.scala:65)
at 
org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:68)
at 
org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41)
at org.apache.spark.scheduler.Task.run(Task.scala:54)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:181)
at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
at java.lang.Thread.run(Thread.java:745)
14/09/10 20:39:15 ERROR executor.Executor: Exception in task 176.0 in stage 3.0 
(TID 894)
java.lang.ClassCastException: java.lang.Long cannot be cast to scala.Tuple2
at 
org.apache.spark.graphx.impl.RoutingTableMessageSerializer$$anon$1$$anon$2.writeObject(Serializers.scala:39)
at 
org.apache.spark.storage.DiskBlockObjectWriter.write(BlockObjectWriter.scala:195)
at 
org.apache.spark.util.collection.ExternalSorter.spillToMergeableFile(ExternalSorter.scala:329)
at 
org.apache.spark.util.collection.ExternalSorter.spill(ExternalSorter.scala:271)
at 
org.apache.spark.util.collection.ExternalSorter.maybeSpill(ExternalSorter.scala:249)
at 
org.apache.spark.util.collection.ExternalSorter.insertAll(ExternalSorter.scala:220)
at 
org.apache.spark.shuffle.sort.SortShuffleWriter.write(SortShuffleWriter.scala:65)
at 
org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:68)
at 
org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41)
at org.apache.spark.scheduler.Task.run(Task.scala:54)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:181)
at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
at java.lang.Thread.run(Thread.java:745)

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



java.nio.channels.CancelledKeyException in Graphx Connected Components

2014-08-18 Thread Jeffrey Picard
Hey all,

I’m trying to run connected components in graphx on about 400GB of data on 50 
m3.xlarge nodes on emr. I keep getting java.nio.channels.CancelledKeyException 
when it gets to mapPartitions at VertexRDD.scala:347”. I haven’t been able to 
find much about this online, and nothing that seems relevant to my situation. 
I’m using spark built from the master repo, because I need the custom storage 
level feature in graphx.

My configuration:

.set(spark.executor.memory, 13g)
.set(spark.serializer, 
org.apache.spark.serializer.KryoSerializer)

.set(spark.kryo.registrator,org.apache.spark.graphx.GraphKryoRegistrator)

.set(spark.executor.extraJavaOptions,-XX:-UseGCOverheadLimit)
.set(spark.akka.frameSize, 128)
.set(spark.storage.memoryFraction, 0.2)
.set(spark.shuffle.memoryFraction, 0.7)
.set(spark.akka.timeout,600”)

The code I’m running:

val graph = GraphLoader.edgeListFile(sc, “/foo/bar, minEdgePartitions=2000, 
edgeStorageLevel=StorageLevel.MEMORY_AND_DISK_SER, 
vertexStorageLevel=StorageLevel.MEMORY_AND_DISK_SER)

val cc = graph.connectedComponents().vertices

Stack trace:

14/08/18 18:50:07 INFO network.ConnectionManager: key already cancelled ? 
sun.nio.ch.SelectionKeyImpl@501c1504
java.nio.channels.CancelledKeyException
at sun.nio.ch.SelectionKeyImpl.ensureValid(SelectionKeyImpl.java:73)
at sun.nio.ch.SelectionKeyImpl.readyOps(SelectionKeyImpl.java:87)
at java.nio.channels.SelectionKey.isAcceptable(SelectionKey.java:360)
at 
org.apache.spark.network.ConnectionManager.run(ConnectionManager.scala:372)
at 
org.apache.spark.network.ConnectionManager$$anon$4.run(ConnectionManager.scala:139)
14/08/18 18:50:07 INFO cluster.SparkDeploySchedulerBackend: Executor 48 
disconnected, so removing it
14/08/18 18:50:07 ERROR network.SendingConnection: Exception while reading 
SendingConnection to ConnectionManagerId(ip-10-136-91-34.ec2.internal,51095)
java.nio.channels.ClosedChannelException
at 
sun.nio.ch.SocketChannelImpl.ensureReadOpen(SocketChannelImpl.java:252)
at sun.nio.ch.SocketChannelImpl.read(SocketChannelImpl.java:295)
at org.apache.spark.network.SendingConnection.read(Connection.scala:390)
at 
org.apache.spark.network.ConnectionManager$$anon$7.run(ConnectionManager.scala:199)
at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
at java.lang.Thread.run(Thread.java:745)
14/08/18 18:50:07 INFO storage.BlockManagerInfo: Added rdd_16_161 in memory on 
ip-10-150-84-111.ec2.internal:58219 (size: 78.4 MB, free: 716.5 MB)
14/08/18 18:50:07 ERROR scheduler.TaskSchedulerImpl: Lost executor 48 on 
ip-10-136-91-34.ec2.internal: remote Akka client disassociated

Log from the worker node in spark/logs/:

14/08/18 18:40:52 INFO worker.ExecutorRunner: Launch command: 
/usr/lib/jvm/java-7-oracle/bin/java -cp 
/home/hadoop/spark/lib/*;/home/hadoop/lib/*;/home/hadoop/:/home/hadoop/spark/lib/*;/home/hadoop/lib/*;/home/hadoop/::/home/hadoop/spark/conf:/home/hadoop/spark/assembly/target/scala-2.10/spark-assembly-1.1.0-SNAPSHOT-hadoop1.0.3.jar:/home/hadoop/conf/
 -XX:MaxPermSize=128m -Dspark.akka.timeout=600 -Dspark.akka.frameSize=128 
-Dspark.driver.port=50456 -XX:-UseGCOverheadLimit -Xms13312M -Xmx13312M 
org.apache.spark.executor.CoarseGrainedExecutorBackend 
akka.tcp://spark@ip-10-171-49-153.ec2.internal:50456/user/CoarseGrainedScheduler
 48 ip-10-136-91-34.ec2.internal 4 
akka.tcp://sparkWorker@ip-10-136-91-34.ec2.internal:43942/user/Worker 
app-20140818184051-
240.261: [GC [PSYoungGen: 132096K-18338K(153600K)] 132096K-18346K(503296K), 
0.0285710 secs] [Times: user=0.04 sys=0.02, real=0.03 secs]
14/08/18 18:50:07 INFO worker.Worker: Executor app-20140818184051-/48 
finished with state EXITED message Command exited with code 137 exitStatus 137
14/08/18 18:50:07 INFO worker.Worker: Asked to launch executor 
app-20140818184051-/50 for Spark CC
14/08/18 18:50:07 INFO actor.LocalActorRef: Message 
[akka.remote.transport.ActorTransportAdapter$DisassociateUnderlying] from 
Actor[akka://sparkWorker/deadLetters] to 
Actor[akka://sparkWorker/system/transports/akkaprotocolmanager.tcp0/akkaProtocol-tcp%3A%2F%2FsparkWorker%4010.136.91.34%3A47436-2#201455087]
 was not delivered. [1] dead letters encountered. This logging can be turned 
off or adjusted with configuration settings 'akka.log-dead-letters' and 
'akka.log-dead-letters-during-shutdown'.
14/08/18 18:50:07 ERROR remote.EndpointWriter: AssociationError 
[akka.tcp://sparkWorker@ip-10-136-91-34.ec2.internal:43942] - 
[akka.tcp://sparkExecutor@ip-10-136-91-34.ec2.internal:52301]: Error 
[Association failed with 
[akka.tcp://sparkExecutor@ip-10-136-91-34.ec2.internal:52301]] [
akka.remote.EndpointAssociationException: Association 

GraphX Connected Components

2014-07-29 Thread Jeffrey Picard
Hey all,

I’m currently trying to run connected components using GraphX on a large graph 
(~1.8b vertices and ~3b edges, most of them are self edges where the only edge 
that exists for vertex v is v-v) on emr using 50 m3.xlarge nodes. As the 
program runs I’m seeing each iteration take longer and longer to complete, this 
seems counter intuitive to me, especially since I am seeing the shuffle 
read/write amounts decrease with each iteration. I would think that as more and 
more vertices converged the iterations should take a shorter amount of time. I 
can run on up to 150 of the 500 part files (stored on s3) and it finishes in 
about 12 minutes, but with all the data I’ve let it run up to 4 hours and it 
still doesn’t complete. Does anyone have ideas for approaches to trouble 
shooting this, spark parameters that might need to be tuned, etc?

Best Regards,

Jeffrey Picard