Hi,
Recently I tried mahout spark, for example:
./bin/mahout spark-itemsimilarity -i ${input} -o ${output} --master $MyMaster
--sparkExecutorMem 2g
then I met a error like "Caused by: java.lang.ClassCastException:
org.apache.mahout.sparkbindings.io.MahoutKryoRegistrator cannot be cast to
org.apache.spark.serializer.KryoRegistrator", It seems that our spark version
is NOT compatible with mahout.
For the spark system is deployed by ops, so what I can do just follow them.
Then I do something as below:
1. Modify <spark.version>1.1.1</spark.version> to
<spark.version>1.3.0</spark.version> , which is our spark version, in pom.xml
in mahout.
2. Run mvn -DskipTests clean install
3. Get a error when build:
[ERROR]
spark/src/main/scala/org/apache/mahout/sparkbindings/drm/CheckpointedDrmSpark.scala:168:
error: value saveAsSequenceFile is not a member of
org.apache.mahout.sparkbindings.DrmRdd[K]
[ERROR] rdd.saveAsSequenceFile(path)
[ERROR] ^
[ERROR]spark/src/main/scala/org/apache/mahout/sparkbindings/drm/package.scala:26:
error: object FilteredRDD is not a member of package org.apache.spark.rdd
[ERROR] import org.apache.spark.rdd.{FilteredRDD, RDD}
4. Check 1.3.0 spark, FilteredRDD is dismissed
5. Check 1.1.1 spark, FilteredRDD is available.
So, my question is how can I solve it?
The error details is as below when I run ./bin/mahout spark-itemsimilarity -i
${input} -o ${output} --master $MyMaster --sparkExecutorMem 2g
15/05/22 12:22:27 WARN TaskSetManager: Lost task 8.0 in stage 0.0 (TID 8,
182.118.21.30): java.io.IOException: org.apache.spark.SparkException: Failed to
register classes with Kryo
at org.apache.spark.util.Utils$.tryOrIOException(Utils.scala:1008)
at
org.apache.spark.broadcast.TorrentBroadcast.readBroadcastBlock(TorrentBroadcast.scala:164)
at
org.apache.spark.broadcast.TorrentBroadcast._value$lzycompute(TorrentBroadcast.scala:64)
at
org.apache.spark.broadcast.TorrentBroadcast._value(TorrentBroadcast.scala:64)
at
org.apache.spark.broadcast.TorrentBroadcast.getValue(TorrentBroadcast.scala:87)
at org.apache.spark.broadcast.Broadcast.value(Broadcast.scala:70)
at
org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:61)
at
org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41)
at org.apache.spark.scheduler.Task.run(Task.scala:56)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:195)
at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1110)
at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:603)
at java.lang.Thread.run(Thread.java:722)
Caused by: org.apache.spark.SparkException: Failed to register classes with Kryo
at
org.apache.spark.serializer.KryoSerializer.newKryo(KryoSerializer.scala:105)
at
org.apache.spark.serializer.KryoSerializerInstance.<init>(KryoSerializer.scala:157)
at
org.apache.spark.serializer.KryoSerializer.newInstance(KryoSerializer.scala:119)
at
org.apache.spark.broadcast.TorrentBroadcast$.unBlockifyObject(TorrentBroadcast.scala:214)
at
org.apache.spark.broadcast.TorrentBroadcast$$anonfun$readBroadcastBlock$1.apply(TorrentBroadcast.scala:177)
at org.apache.spark.util.Utils$.tryOrIOException(Utils.scala:1005)
... 12 more
Caused by: java.lang.ClassCastException:
org.apache.mahout.sparkbindings.io.MahoutKryoRegistrator cannot be cast to
org.apache.spark.serializer.KryoRegistrator
at
org.apache.spark.serializer.KryoSerializer$$anonfun$newKryo$3.apply(KryoSerializer.scala:101)
at
org.apache.spark.serializer.KryoSerializer$$anonfun$newKryo$3.apply(KryoSerializer.scala:101)
at scala.Option.map(Option.scala:145)
at
org.apache.spark.serializer.KryoSerializer.newKryo(KryoSerializer.scala:101)
... 17 more