Hi everybody, I'm trying to build a basic recomender with Spark and Mahout on
Scala. I use the follow mahout repo to compile mahout with scala 2.11 and spark
2.1.2 mahout_fork <https://github.com/actionml/mahout/tree/sparse-speedup-13.0>
To execute my code I use spark-submit and it run fine when I put --master
local but when I try to run on a cluster with --master
spark://vagrant-ubuntu-trusty-64:7077 <spark://vagrant-ubuntu-trusty-64:7077>
it fails always with the same error.
Command (Run Fine):
/opt/spark/bin/spark-submit \
--class 'com.reco.GenerateIndicator' \
--name recomender \
--master local \
target/scala-2.11/recomender-0.0.1.jar
Command (ERROR):
/opt/spark/bin/spark-submit \
--class 'com.reco.GenerateIndicator' \
--name recomender \
--master spark <spark://vagrant-ubuntu-trusty-64:7077>:
<spark://vagrant-ubuntu-trusty-64:7077>//vagrant-ubuntu-trusty-64:7077
<spark://vagrant-ubuntu-trusty-64:7077> \
target/scala-2.11/recomender-0.0.1.jar
Dependencies on Build.sbt :
name := "recomender"
version := "0.0.1"
scalaVersion := "2.11.11"
val mahoutVersion = "0.13.0"
val sparkVersion = "2.1.2"
libraryDependencies ++= {
Seq(
"org.apache.spark" %% "spark-core" % sparkVersion % "provided" ,
"org.apache.spark" %% "spark-sql" % sparkVersion % "provided" ,
"org.apache.spark" %% "spark-mllib" % sparkVersion % "provided",
/* Mahout */
"org.apache.mahout" %% "mahout-spark" % mahoutVersion
exclude("org.apache.spark", "spark-core_2.11")
exclude("org.apache.spark", "spark-sql_2.11"),
"org.apache.mahout" %% "mahout-math-scala" % mahoutVersion,
"org.apache.mahout" % "mahout-math" % mahoutVersion,
"org.apache.mahout" % "mahout-hdfs" % mahoutVersion
exclude("com.thoughtworks.xstream", "xstream")
exclude("org.apache.hadoop", "hadoop-client")
)
}
resolvers += "Local Repository" at "file://"+baseDirectory.value / "repo"
resolvers += Resolver.mavenLocal
…
Main class:
package com.reco
import org.apache.mahout.sparkbindings.SparkDistributedContext
import org.apache.mahout.sparkbindings._
import org.apache.spark.SparkConf
import org.apache.spark.sql.SparkSession
object GenerateIndicator {
def main(args: Array[String]) {
try {
// Create spark-conf
val sparkConf = new SparkConf().setAppName("recomender")
implicit val mahoutCtx: SparkDistributedContext = mahoutSparkContext(
masterUrl = sparkConf.get("spark.master"),
appName = "recomender",
sparkConf = sparkConf,
// addMahoutJars = true,
addMahoutJars = false
)
implicit val sdc: SparkDistributedContext = sc2sdc(mahoutCtx)
val sparkSession = SparkSession
.builder()
.appName("recomender")
.config(sparkConf)
.getOrCreate()
val lines = returnData()
val linesRdd = sdc.sc.parallelize(lines)
println("...Collecting...")
linesRdd.collect().foreach( item => { // ERROR HERE! on collect()
println(item)
})
// Destroy Spark Session
sparkSession.stop()
sparkSession.close()
} catch {
case e: Exception =>
println(e)
throw new Exception(e)
}
}
def returnData() : Array[String] = {
val lines = Array(
"17,Action",
"17,Comedy",
"17,Crime",
"17,Horror",
"17,Thriller",
"12,Crime",
"12,Thriller",
"16,Comedy",
"16,Romance",
"20,Drama",
"20,Romance",
"7,Drama",
"7,Sci-Fi",
// ... more lines in array ...
"1680,Drama",
"1680,Romance",
"1681,Comedy"
)
lines
}
}
Error::
18/08/01 14:18:53 INFO DAGScheduler: ResultStage 0 (collect at
GenerateIndicator.scala:38) failed in 3.551 s due to Job aborted due to stage
failure: Task 1 in stage 0.0 failed 4 times, most recent failure: Lost task 1.3
in stage 0.0 (TID 6, 10.0.2.15, executor 0): java.lang.IllegalStateException:
unread block data
at
java.io.ObjectInputStream$BlockDataInputStream.setBlockDataMode(ObjectInputStream.java:2773)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1599)
at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2278)
at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2202)
at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2060)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1567)
at java.io.ObjectInputStream.readObject(ObjectInputStream.java:427)
at
org.apache.spark.serializer.JavaDeserializationStream.readObject(JavaSerializer.scala:75)
at
org.apache.spark.serializer.JavaSerializerInstance.deserialize(JavaSerializer.scala:114)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:301)
at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
Driver stacktrace:
18/08/01 14:18:53 INFO TaskSetManager: Lost task 0.3 in stage 0.0 (TID 7) on
10.0.2.15, executor 0: java.lang.IllegalStateException (unread block data)
[duplicate 7]
18/08/01 14:18:53 INFO TaskSchedulerImpl: Removed TaskSet 0.0, whose tasks have
all completed, from pool
18/08/01 14:18:53 INFO DAGScheduler: Job 0 failed: collect at
GenerateIndicator.scala:38, took 5.265593 s
org.apache.spark.SparkException: Job aborted due to stage failure: Task 1 in
stage 0.0 failed 4 times, most recent failure: Lost task 1.3 in stage 0.0 (TID
6, 10.0.2.15, executor 0): java.lang.IllegalStateException: unread block data
at
java.io.ObjectInputStream$BlockDataInputStream.setBlockDataMode(ObjectInputStream.java:2773)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1599)
at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2278)
at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2202)
at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2060)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1567)
at java.io.ObjectInputStream.readObject(ObjectInputStream.java:427)
at
org.apache.spark.serializer.JavaDeserializationStream.readObject(JavaSerializer.scala:75)
at
org.apache.spark.serializer.JavaSerializerInstance.deserialize(JavaSerializer.scala:114)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:301)
at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
Thanks a lot for your time.
Cheers.