RE: DAGScheduler: Failed to run foreach
) at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$submitStage(DAGScheduler.scala:713) at org.apache.spark.scheduler.DAGScheduler.handleJobSubmitted(DAGScheduler.scala:697) at org.apache.spark.scheduler.DAGSchedulerEventProcessActor$$anonfun$receive$2.applyOrElse(DAGScheduler.scala:1176) at akka.actor.ActorCell.receiveMessage(ActorCell.scala:498) at akka.actor.ActorCell.invoke(ActorCell.scala:456) at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:237) at akka.dispatch.Mailbox.run(Mailbox.scala:219) at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:386) at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) From: ilike...@gmail.com Date: Mon, 23 Jun 2014 18:00:27 -0700 Subject: Re: DAGScheduler: Failed to run foreach To: user@spark.apache.org CC: u...@spark.incubator.apache.org Please note that this: for (sentence - sourcerdd) {... } is actually Scala syntactic sugar which is converted into sourcerdd.foreach { sentence = ... } What this means is that this will actually run on the cluster, which is probably not what you want if you're trying to print them. Try this instead: for (sentence - sourcerdd.toLocalIterator) { ...} (By the way, the reason this was throwing a NotSerializableException was because you were trying to pass printScoreCanndedString as part of the job's closure. In Java, class methods have an implicit reference to this, so it tried to serialize the class CalculateScore, which is presumably not marked as Serializable.) On Mon, Jun 23, 2014 at 5:45 PM, Sameer Tilak ssti...@live.com wrote: The subject should be: org.apache.spark.SparkException: Job aborted due to stage failure: Task not serializable: java.io.NotSerializableException: and not DAGScheduler: Failed to run foreach If I call printScoreCanndedString with a hard-coded string and identical 2nd parameter, it works fine. However for my application that is not sufficient. From: ssti...@live.com To: u...@spark.incubator.apache.org Subject: DAGScheduler: Failed to run foreach Date: Mon, 23 Jun 2014 17:05:03 -0700 Hi All, I am using spark for text analysis. I have a source file that has few thousand sentences and a dataset of tens of millions of statements. I want to compare each statement from the sourceFile with each statement from the dataset and generate a score. I am having following problem. I would really appreciate help. Here is what I do within spark-shell // Source file with few thousand sentences val srcFile = sc.textFile(hdfs://serverip/data/dummy/src.txt); // Dataset with tens of millions of statements. val destFile = sc.textFile(hdfs://serverip/data/dummy/sample.txt); // Initialize the score variable. val score = new mypackage.Score() // Generate score. score.calculateScore(srcFile, destFile); Here is my snippet from my scala class (Score.scala) def calculateScore (sourcerdd: RDD[String], destrdd: RDD[String]) { for (sentence - sourcerdd) { println(Source String is: + sentence + Data Type is: + sentence.getClass.getSimpleName) printScoreCanndedString(sentence, destrdd); } }def printScoreCanndedString(sourceStr: String,rdd: RDD[String]) : RDD[Double] = {// Do the analysis here.} The print statement displays the data correctly along with data type as String as expected. However, I get the following error message when it tries to execute printScoreCanndedString method. Any help with this will be great. 14/06/23 16:45:04 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable 14/06/23 16:45:04 WARN LoadSnappy: Snappy native library not loaded14/06/23 16:45:04 INFO FileInputFormat: Total input paths to process : 114/06/23 16:45:04 INFO SparkContext: Starting job: foreach at calculateScore.scala:51 14/06/23 16:45:04 INFO DAGScheduler: Got job 0 (foreach at CalculateScore.scala:51) with 2 output partitions (allowLocal=false)14/06/23 16:45:04 INFO DAGScheduler: Final stage: Stage 0(foreach at CalculateScore.scala:51) 14/06/23 16:45:04 INFO DAGScheduler: Parents of final stage: List()14/06/23 16:45:04 INFO DAGScheduler: Missing parents: List()14/06/23 16:45:04 INFO DAGScheduler: Submitting Stage 0 (MappedRDD[1] at textFile at console:12), which has no missing parents 14/06/23 16:45:04 INFO DAGScheduler: Failed to run foreach at CalculateScore.scala:51org.apache.spark.SparkException: Job aborted due to stage failure: Task not serializable: java.io.NotSerializableException: approxstrmatch.CalculateScore at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler
RE: DAGScheduler: Failed to run foreach
The subject should be: org.apache.spark.SparkException: Job aborted due to stage failure: Task not serializable: java.io.NotSerializableException: and not DAGScheduler: Failed to run foreach If I call printScoreCanndedString with a hard-coded string and identical 2nd parameter, it works fine. However for my application that is not sufficient. From: ssti...@live.com To: u...@spark.incubator.apache.org Subject: DAGScheduler: Failed to run foreach Date: Mon, 23 Jun 2014 17:05:03 -0700 Hi All, I am using spark for text analysis. I have a source file that has few thousand sentences and a dataset of tens of millions of statements. I want to compare each statement from the sourceFile with each statement from the dataset and generate a score. I am having following problem. I would really appreciate help. Here is what I do within spark-shell // Source file with few thousand sentences val srcFile = sc.textFile(hdfs://serverip/data/dummy/src.txt); // Dataset with tens of millions of statements. val destFile = sc.textFile(hdfs://serverip/data/dummy/sample.txt); // Initialize the score variable. val score = new mypackage.Score() // Generate score. score.calculateScore(srcFile, destFile); Here is my snippet from my scala class (Score.scala) def calculateScore (sourcerdd: RDD[String], destrdd: RDD[String]) {for (sentence - sourcerdd) { println(Source String is: + sentence + Data Type is: + sentence.getClass.getSimpleName) printScoreCanndedString(sentence, destrdd); } }def printScoreCanndedString(sourceStr: String,rdd: RDD[String]) : RDD[Double] = {// Do the analysis here.} The print statement displays the data correctly along with data type as String as expected. However, I get the following error message when it tries to execute printScoreCanndedString method. Any help with this will be great. 14/06/23 16:45:04 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable14/06/23 16:45:04 WARN LoadSnappy: Snappy native library not loaded14/06/23 16:45:04 INFO FileInputFormat: Total input paths to process : 114/06/23 16:45:04 INFO SparkContext: Starting job: foreach at calculateScore.scala:5114/06/23 16:45:04 INFO DAGScheduler: Got job 0 (foreach at CalculateScore.scala:51) with 2 output partitions (allowLocal=false)14/06/23 16:45:04 INFO DAGScheduler: Final stage: Stage 0(foreach at CalculateScore.scala:51)14/06/23 16:45:04 INFO DAGScheduler: Parents of final stage: List()14/06/23 16:45:04 INFO DAGScheduler: Missing parents: List()14/06/23 16:45:04 INFO DAGScheduler: Submitting Stage 0 (MappedRDD[1] at textFile at console:12), which has no missing parents14/06/23 16:45:04 INFO DAGScheduler: Failed to run foreach at CalculateScore.scala:51org.apache.spark.SparkException: Job aborted due to stage failure: Task not serializable: java.io.NotSerializableException: approxstrmatch.CalculateScore at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1033) at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1017) at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1015) at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47) at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1015) at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$submitMissingTasks(DAGScheduler.scala:770) at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$submitStage(DAGScheduler.scala:713) at org.apache.spark.scheduler.DAGScheduler.handleJobSubmitted(DAGScheduler.scala:697) at org.apache.spark.scheduler.DAGSchedulerEventProcessActor$$anonfun$receive$2.applyOrElse(DAGScheduler.scala:1176) at akka.actor.ActorCell.receiveMessage(ActorCell.scala:498) at akka.actor.ActorCell.invoke(ActorCell.scala:456) at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:237) at akka.dispatch.Mailbox.run(Mailbox.scala:219) at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:386) at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
Re: DAGScheduler: Failed to run foreach
Please note that this: for (sentence - sourcerdd) { ... } is actually Scala syntactic sugar which is converted into sourcerdd.foreach { sentence = ... } What this means is that this will actually run on the cluster, which is probably not what you want if you're trying to print them. Try this instead: for (sentence - sourcerdd.toLocalIterator) { ... } (By the way, the reason this was throwing a NotSerializableException was because you were trying to pass printScoreCanndedString as part of the job's closure. In Java, class methods have an implicit reference to this, so it tried to serialize the class CalculateScore, which is presumably not marked as Serializable.) On Mon, Jun 23, 2014 at 5:45 PM, Sameer Tilak ssti...@live.com wrote: The subject should be: org.apache.spark.SparkException: Job aborted due to stage failure: Task not serializable: java.io.NotSerializableException: and not DAGScheduler: Failed to run foreach If I call printScoreCanndedString with a hard-coded string and identical 2nd parameter, it works fine. However for my application that is not sufficient. -- From: ssti...@live.com To: u...@spark.incubator.apache.org Subject: DAGScheduler: Failed to run foreach Date: Mon, 23 Jun 2014 17:05:03 -0700 Hi All, I am using spark for text analysis. I have a source file that has few thousand sentences and a dataset of tens of millions of statements. I want to compare each statement from the sourceFile with each statement from the dataset and generate a score. I am having following problem. I would really appreciate help. Here is what I do within spark-shell // Source file with few thousand sentences val srcFile = sc.textFile(hdfs://serverip/data/dummy/src.txt); // Dataset with tens of millions of statements. val destFile = sc.textFile(hdfs://serverip/data/dummy/sample.txt); // Initialize the score variable. val score = new mypackage.Score() // Generate score. score.calculateScore(srcFile, destFile); Here is my snippet from my scala class (Score.scala) def calculateScore (sourcerdd: RDD[String], destrdd: RDD[String]) { for (sentence - sourcerdd) { println(Source String is: + sentence + Data Type is: + sentence.getClass.getSimpleName) printScoreCanndedString(sentence, destrdd); } } def printScoreCanndedString(sourceStr: String,rdd: RDD[String]) : RDD[Double] = { // Do the analysis here. } The print statement displays the data correctly along with data type as String as expected. However, I get the following error message when it tries to execute printScoreCanndedString method. Any help with this will be great. 14/06/23 16:45:04 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable 14/06/23 16:45:04 WARN LoadSnappy: Snappy native library not loaded 14/06/23 16:45:04 INFO FileInputFormat: Total input paths to process : 1 14/06/23 16:45:04 INFO SparkContext: Starting job: foreach at calculateScore.scala:51 14/06/23 16:45:04 INFO DAGScheduler: Got job 0 (foreach at CalculateScore.scala:51) with 2 output partitions (allowLocal=false) 14/06/23 16:45:04 INFO DAGScheduler: Final stage: Stage 0(foreach at CalculateScore.scala:51) 14/06/23 16:45:04 INFO DAGScheduler: Parents of final stage: List() 14/06/23 16:45:04 INFO DAGScheduler: Missing parents: List() 14/06/23 16:45:04 INFO DAGScheduler: Submitting Stage 0 (MappedRDD[1] at textFile at console:12), which has no missing parents 14/06/23 16:45:04 INFO DAGScheduler: Failed to run foreach at CalculateScore.scala:51 org.apache.spark.SparkException: Job aborted due to stage failure: Task not serializable: java.io.NotSerializableException: approxstrmatch. CalculateScore at org.apache.spark.scheduler.DAGScheduler.org $apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1033) at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1017) at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1015) at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47) at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1015) at org.apache.spark.scheduler.DAGScheduler.org $apache$spark$scheduler$DAGScheduler$$submitMissingTasks(DAGScheduler.scala:770) at org.apache.spark.scheduler.DAGScheduler.org $apache$spark$scheduler$DAGScheduler$$submitStage(DAGScheduler.scala:713) at org.apache.spark.scheduler.DAGScheduler.handleJobSubmitted(DAGScheduler.scala:697) at org.apache.spark.scheduler.DAGSchedulerEventProcessActor$$anonfun$receive$2.applyOrElse(DAGScheduler.scala:1176) at akka.actor.ActorCell.receiveMessage(ActorCell.scala:498) at akka.actor.ActorCell.invoke(ActorCell.scala:456) at