unsubscribe On Wed, Aug 1, 2018 at 8:54 AM Jaume Galí <jg...@konodrac.com> wrote:
> Hi everybody, I'm trying to build a basic recomender with Spark and Mahout > on Scala. I use the follow mahout repo to compile mahout with scala 2.11 > and spark 2.1.2 mahout_fork < > https://github.com/actionml/mahout/tree/sparse-speedup-13.0> > To execute my code I use spark-submit and it run fine when I put --master > local but when I try to run on a cluster with --master > spark://vagrant-ubuntu-trusty-64:7077 > <spark://vagrant-ubuntu-trusty-64:7077> it fails always with the same error. > > Command (Run Fine): > > /opt/spark/bin/spark-submit \ > --class 'com.reco.GenerateIndicator' \ > --name recomender \ > --master local \ > target/scala-2.11/recomender-0.0.1.jar > Command (ERROR): > > /opt/spark/bin/spark-submit \ > --class 'com.reco.GenerateIndicator' \ > --name recomender \ > --master spark <spark://vagrant-ubuntu-trusty-64:7077>: > <spark://vagrant-ubuntu-trusty-64:7077>//vagrant-ubuntu-trusty-64:7077 > <spark://vagrant-ubuntu-trusty-64:7077> \ > target/scala-2.11/recomender-0.0.1.jar > Dependencies on Build.sbt : > > name := "recomender" > version := "0.0.1" > scalaVersion := "2.11.11" > val mahoutVersion = "0.13.0" > val sparkVersion = "2.1.2" > > libraryDependencies ++= { > Seq( > "org.apache.spark" %% "spark-core" % sparkVersion % "provided" , > "org.apache.spark" %% "spark-sql" % sparkVersion % "provided" , > "org.apache.spark" %% "spark-mllib" % sparkVersion % "provided", > /* Mahout */ > "org.apache.mahout" %% "mahout-spark" % mahoutVersion > exclude("org.apache.spark", "spark-core_2.11") > exclude("org.apache.spark", "spark-sql_2.11"), > "org.apache.mahout" %% "mahout-math-scala" % mahoutVersion, > "org.apache.mahout" % "mahout-math" % mahoutVersion, > "org.apache.mahout" % "mahout-hdfs" % mahoutVersion > exclude("com.thoughtworks.xstream", "xstream") > exclude("org.apache.hadoop", "hadoop-client") > ) > } > > resolvers += "Local Repository" at "file://"+baseDirectory.value / "repo" > resolvers += Resolver.mavenLocal > > … > Main class: > > package com.reco > > import org.apache.mahout.sparkbindings.SparkDistributedContext > import org.apache.mahout.sparkbindings._ > import org.apache.spark.SparkConf > import org.apache.spark.sql.SparkSession > > object GenerateIndicator { > > def main(args: Array[String]) { > try { > > // Create spark-conf > val sparkConf = new SparkConf().setAppName("recomender") > > implicit val mahoutCtx: SparkDistributedContext = mahoutSparkContext( > masterUrl = sparkConf.get("spark.master"), > appName = "recomender", > sparkConf = sparkConf, > // addMahoutJars = true, > addMahoutJars = false > ) > > implicit val sdc: SparkDistributedContext = sc2sdc(mahoutCtx) > > val sparkSession = SparkSession > .builder() > .appName("recomender") > .config(sparkConf) > .getOrCreate() > > val lines = returnData() > > val linesRdd = sdc.sc.parallelize(lines) > > println("...Collecting...") > > linesRdd.collect().foreach( item => { // ERROR HERE! on collect() > println(item) > }) > > // Destroy Spark Session > sparkSession.stop() > sparkSession.close() > > } catch { > case e: Exception => > println(e) > throw new Exception(e) > > } > > } > > def returnData() : Array[String] = { > val lines = Array( > "17,Action", > "17,Comedy", > "17,Crime", > "17,Horror", > "17,Thriller", > "12,Crime", > "12,Thriller", > "16,Comedy", > "16,Romance", > "20,Drama", > "20,Romance", > "7,Drama", > "7,Sci-Fi", > // ... more lines in array ... > "1680,Drama", > "1680,Romance", > "1681,Comedy" > ) > lines > } > > } > Error:: > > 18/08/01 14:18:53 INFO DAGScheduler: ResultStage 0 (collect at > GenerateIndicator.scala:38) failed in 3.551 s due to Job aborted due to > stage failure: Task 1 in stage 0.0 failed 4 times, most recent failure: > Lost task 1.3 in stage 0.0 (TID 6, 10.0.2.15, executor 0): > java.lang.IllegalStateException: unread block data > at > java.io.ObjectInputStream$BlockDataInputStream.setBlockDataMode(ObjectInputStream.java:2773) > at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1599) > at > java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2278) > at > java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2202) > at > java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2060) > at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1567) > at java.io.ObjectInputStream.readObject(ObjectInputStream.java:427) > at > org.apache.spark.serializer.JavaDeserializationStream.readObject(JavaSerializer.scala:75) > at > org.apache.spark.serializer.JavaSerializerInstance.deserialize(JavaSerializer.scala:114) > at > org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:301) > at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) > at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) > at java.lang.Thread.run(Thread.java:748) > > Driver stacktrace: > 18/08/01 14:18:53 INFO TaskSetManager: Lost task 0.3 in stage 0.0 (TID 7) > on 10.0.2.15, executor 0: java.lang.IllegalStateException (unread block > data) [duplicate 7] > 18/08/01 14:18:53 INFO TaskSchedulerImpl: Removed TaskSet 0.0, whose tasks > have all completed, from pool > 18/08/01 14:18:53 INFO DAGScheduler: Job 0 failed: collect at > GenerateIndicator.scala:38, took 5.265593 s > org.apache.spark.SparkException: Job aborted due to stage failure: Task 1 > in stage 0.0 failed 4 times, most recent failure: Lost task 1.3 in stage > 0.0 (TID 6, 10.0.2.15, executor 0): java.lang.IllegalStateException: unread > block data > at > java.io.ObjectInputStream$BlockDataInputStream.setBlockDataMode(ObjectInputStream.java:2773) > at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1599) > at > java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2278) > at > java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2202) > at > java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2060) > at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1567) > at java.io.ObjectInputStream.readObject(ObjectInputStream.java:427) > at > org.apache.spark.serializer.JavaDeserializationStream.readObject(JavaSerializer.scala:75) > at > org.apache.spark.serializer.JavaSerializerInstance.deserialize(JavaSerializer.scala:114) > at > org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:301) > at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) > at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) > at java.lang.Thread.run(Thread.java:748) > Thanks a lot for your time. > Cheers. -- Eric Link 214.641.5465