[ 
https://issues.apache.org/jira/browse/SPARK-1977?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14053994#comment-14053994
 ] 

Xiangrui Meng commented on SPARK-1977:
--------------------------------------

I think now I understand when it happens. We use storage level MEMORY_AND_DISK 
for user/product in/out links, which contains BitSet objects. If the dataset is 
large, these RDDs will be pushed from in memory storage to on disk storage, 
where the latter requires serialization. So the easiest way to re-produce this 
error is changing the storage level of inLinks/outLinks to DISK_ONLY and run 
with kryo.

[~neville] Instead of mapping mutable.BitSet to immutable.BitSet, which 
introduces overhead, we can register mutable.BitSet in our MovieLensALS example 
code and wait for the next Chill release. Does it sound good to you?

> mutable.BitSet in ALS not serializable with KryoSerializer
> ----------------------------------------------------------
>
>                 Key: SPARK-1977
>                 URL: https://issues.apache.org/jira/browse/SPARK-1977
>             Project: Spark
>          Issue Type: Bug
>          Components: MLlib
>    Affects Versions: 1.0.0
>            Reporter: Neville Li
>            Priority: Minor
>
> OutLinkBlock in ALS.scala has an Array[mutable.BitSet] member.
> KryoSerializer uses AllScalaRegistrar from Twitter chill but it doesn't 
> register mutable.BitSet.
> Right now we have to register mutable.BitSet manually. A proper fix would be 
> using immutable.BitSet in ALS or register mutable.BitSet in upstream chill.
> {code}
> Caused by: org.apache.spark.SparkException: Job aborted due to stage failure: 
> Task 1724.0:9 failed 4 times, most recent failure: Exception failure in TID 
> 68548 on host lon4-hadoopslave-b232.lon4.spotify.net: 
> com.esotericsoftware.kryo.KryoException: java.lang.ArrayStoreException: 
> scala.collection.mutable.HashSet
> Serialization trace:
> shouldSend (org.apache.spark.mllib.recommendation.OutLinkBlock)
>         
> com.esotericsoftware.kryo.serializers.FieldSerializer$ObjectField.read(FieldSerializer.java:626)
>         
> com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:221)
>         com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:732)
>         com.twitter.chill.Tuple2Serializer.read(TupleSerializers.scala:43)
>         com.twitter.chill.Tuple2Serializer.read(TupleSerializers.scala:34)
>         com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:732)
>         
> org.apache.spark.serializer.KryoDeserializationStream.readObject(KryoSerializer.scala:115)
>         
> org.apache.spark.serializer.DeserializationStream$$anon$1.getNext(Serializer.scala:125)
>         org.apache.spark.util.NextIterator.hasNext(NextIterator.scala:71)
>         
> org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:39)
>         
> org.apache.spark.rdd.CoGroupedRDD$$anonfun$compute$4.apply(CoGroupedRDD.scala:155)
>         
> org.apache.spark.rdd.CoGroupedRDD$$anonfun$compute$4.apply(CoGroupedRDD.scala:154)
>         
> scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
>         scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
>         org.apache.spark.rdd.CoGroupedRDD.compute(CoGroupedRDD.scala:154)
>         org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:262)
>         org.apache.spark.rdd.RDD.iterator(RDD.scala:229)
>         org.apache.spark.rdd.MappedValuesRDD.compute(MappedValuesRDD.scala:31)
>         org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:262)
>         org.apache.spark.rdd.RDD.iterator(RDD.scala:229)
>         
> org.apache.spark.rdd.FlatMappedValuesRDD.compute(FlatMappedValuesRDD.scala:31)
>         org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:262)
>         org.apache.spark.rdd.RDD.iterator(RDD.scala:229)
>         org.apache.spark.rdd.FlatMappedRDD.compute(FlatMappedRDD.scala:33)
>         org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:262)
>         org.apache.spark.CacheManager.getOrCompute(CacheManager.scala:77)
>         org.apache.spark.rdd.RDD.iterator(RDD.scala:227)
>         org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:111)
>         org.apache.spark.scheduler.Task.run(Task.scala:51)
>         org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:187)
>         
> java.util.concurrent.ThreadPoolExecutor$Worker.runTask(ThreadPoolExecutor.java:886)
>         
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:908)
>         java.lang.Thread.run(Thread.java:662)
> Driver stacktrace:
>       at 
> org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1033)
>       at 
> org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1017)
>       at 
> org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1015)
>       at 
> scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
>       at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
>       at 
> org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1015)
>       at 
> org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:633)
>       at 
> org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:633)
>       at scala.Option.foreach(Option.scala:236)
>       at 
> org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:633)
>       at 
> org.apache.spark.scheduler.DAGSchedulerEventProcessActor$$anonfun$receive$2.applyOrElse(DAGScheduler.scala:1207)
>       at akka.actor.ActorCell.receiveMessage(ActorCell.scala:498)
>       at akka.actor.ActorCell.invoke(ActorCell.scala:456)
>       at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:237)
>       at akka.dispatch.Mailbox.run(Mailbox.scala:219)
>       at 
> akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:386)
>       at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
>       at 
> scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
>       at 
> scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
>       at 
> scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
> {code}



--
This message was sent by Atlassian JIRA
(v6.2#6252)

Reply via email to