To unsubscribe, send mail to user-unsubscr...@mahout.apache.org. You can leave the subject and body empty, just send anything to that address.
Thanks, take care! On Mon, Jul 22, 2019 at 10:16 AM Simanchal <simanchal...@gmail.com> wrote: > unsubscribe > > > On Wed, Aug 1, 2018 at 6:54 AM Jaume Galí <jg...@konodrac.com> wrote: > > > Hi everybody, I'm trying to build a basic recomender with Spark and > Mahout > > on Scala. I use the follow mahout repo to compile mahout with scala 2.11 > > and spark 2.1.2 mahout_fork < > > https://github.com/actionml/mahout/tree/sparse-speedup-13.0> > > To execute my code I use spark-submit and it run fine when I put --master > > local but when I try to run on a cluster with --master > > spark://vagrant-ubuntu-trusty-64:7077 > > <spark://vagrant-ubuntu-trusty-64:7077> it fails always with the same > error. > > > > Command (Run Fine): > > > > /opt/spark/bin/spark-submit \ > > --class 'com.reco.GenerateIndicator' \ > > --name recomender \ > > --master local \ > > target/scala-2.11/recomender-0.0.1.jar > > Command (ERROR): > > > > /opt/spark/bin/spark-submit \ > > --class 'com.reco.GenerateIndicator' \ > > --name recomender \ > > --master spark <spark://vagrant-ubuntu-trusty-64:7077>: > > <spark://vagrant-ubuntu-trusty-64:7077>//vagrant-ubuntu-trusty-64:7077 > > <spark://vagrant-ubuntu-trusty-64:7077> \ > > target/scala-2.11/recomender-0.0.1.jar > > Dependencies on Build.sbt : > > > > name := "recomender" > > version := "0.0.1" > > scalaVersion := "2.11.11" > > val mahoutVersion = "0.13.0" > > val sparkVersion = "2.1.2" > > > > libraryDependencies ++= { > > Seq( > > "org.apache.spark" %% "spark-core" % sparkVersion % > "provided" , > > "org.apache.spark" %% "spark-sql" % sparkVersion % "provided" > , > > "org.apache.spark" %% "spark-mllib" % sparkVersion % > "provided", > > /* Mahout */ > > "org.apache.mahout" %% "mahout-spark" % mahoutVersion > > exclude("org.apache.spark", "spark-core_2.11") > > exclude("org.apache.spark", "spark-sql_2.11"), > > "org.apache.mahout" %% "mahout-math-scala" % mahoutVersion, > > "org.apache.mahout" % "mahout-math" % mahoutVersion, > > "org.apache.mahout" % "mahout-hdfs" % mahoutVersion > > exclude("com.thoughtworks.xstream", "xstream") > > exclude("org.apache.hadoop", "hadoop-client") > > ) > > } > > > > resolvers += "Local Repository" at "file://"+baseDirectory.value / "repo" > > resolvers += Resolver.mavenLocal > > > > … > > Main class: > > > > package com.reco > > > > import org.apache.mahout.sparkbindings.SparkDistributedContext > > import org.apache.mahout.sparkbindings._ > > import org.apache.spark.SparkConf > > import org.apache.spark.sql.SparkSession > > > > object GenerateIndicator { > > > > def main(args: Array[String]) { > > try { > > > > // Create spark-conf > > val sparkConf = new SparkConf().setAppName("recomender") > > > > implicit val mahoutCtx: SparkDistributedContext = > mahoutSparkContext( > > masterUrl = sparkConf.get("spark.master"), > > appName = "recomender", > > sparkConf = sparkConf, > > // addMahoutJars = true, > > addMahoutJars = false > > ) > > > > implicit val sdc: SparkDistributedContext = sc2sdc(mahoutCtx) > > > > val sparkSession = SparkSession > > .builder() > > .appName("recomender") > > .config(sparkConf) > > .getOrCreate() > > > > val lines = returnData() > > > > val linesRdd = sdc.sc.parallelize(lines) > > > > println("...Collecting...") > > > > linesRdd.collect().foreach( item => { // ERROR HERE! on collect() > > println(item) > > }) > > > > // Destroy Spark Session > > sparkSession.stop() > > sparkSession.close() > > > > } catch { > > case e: Exception => > > println(e) > > throw new Exception(e) > > > > } > > > > } > > > > def returnData() : Array[String] = { > > val lines = Array( > > "17,Action", > > "17,Comedy", > > "17,Crime", > > "17,Horror", > > "17,Thriller", > > "12,Crime", > > "12,Thriller", > > "16,Comedy", > > "16,Romance", > > "20,Drama", > > "20,Romance", > > "7,Drama", > > "7,Sci-Fi", > > // ... more lines in array ... > > "1680,Drama", > > "1680,Romance", > > "1681,Comedy" > > ) > > lines > > } > > > > } > > Error:: > > > > 18/08/01 14:18:53 INFO DAGScheduler: ResultStage 0 (collect at > > GenerateIndicator.scala:38) failed in 3.551 s due to Job aborted due to > > stage failure: Task 1 in stage 0.0 failed 4 times, most recent failure: > > Lost task 1.3 in stage 0.0 (TID 6, 10.0.2.15, executor 0): > > java.lang.IllegalStateException: unread block data > > at > > > java.io.ObjectInputStream$BlockDataInputStream.setBlockDataMode(ObjectInputStream.java:2773) > > at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1599) > > at > > java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2278) > > at > > java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2202) > > at > > java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2060) > > at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1567) > > at java.io.ObjectInputStream.readObject(ObjectInputStream.java:427) > > at > > > org.apache.spark.serializer.JavaDeserializationStream.readObject(JavaSerializer.scala:75) > > at > > > org.apache.spark.serializer.JavaSerializerInstance.deserialize(JavaSerializer.scala:114) > > at > > org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:301) > > at > > > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) > > at > > > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) > > at java.lang.Thread.run(Thread.java:748) > > > > Driver stacktrace: > > 18/08/01 14:18:53 INFO TaskSetManager: Lost task 0.3 in stage 0.0 (TID 7) > > on 10.0.2.15, executor 0: java.lang.IllegalStateException (unread block > > data) [duplicate 7] > > 18/08/01 14:18:53 INFO TaskSchedulerImpl: Removed TaskSet 0.0, whose > tasks > > have all completed, from pool > > 18/08/01 14:18:53 INFO DAGScheduler: Job 0 failed: collect at > > GenerateIndicator.scala:38, took 5.265593 s > > org.apache.spark.SparkException: Job aborted due to stage failure: Task 1 > > in stage 0.0 failed 4 times, most recent failure: Lost task 1.3 in stage > > 0.0 (TID 6, 10.0.2.15, executor 0): java.lang.IllegalStateException: > unread > > block data > > at > > > java.io.ObjectInputStream$BlockDataInputStream.setBlockDataMode(ObjectInputStream.java:2773) > > at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1599) > > at > > java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2278) > > at > > java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2202) > > at > > java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2060) > > at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1567) > > at java.io.ObjectInputStream.readObject(ObjectInputStream.java:427) > > at > > > org.apache.spark.serializer.JavaDeserializationStream.readObject(JavaSerializer.scala:75) > > at > > > org.apache.spark.serializer.JavaSerializerInstance.deserialize(JavaSerializer.scala:114) > > at > > org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:301) > > at > > > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) > > at > > > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) > > at java.lang.Thread.run(Thread.java:748) > > Thanks a lot for your time. > > Cheers. >