Hi Matthias and Cody, thanks for the answer. This is the code that is raising the runtime exception:
try{ messages.foreachRDD( rdd =>{ val count = rdd.count() if (count > 0){ //someMessages should be AmazonRating... val someMessages = rdd.take(count.toInt) println("<------>") println("someMessages is " + someMessages) someMessages.foreach(println) println("<------>") println("<---POSSIBLE SOLUTION--->") messages .map { case (_, jsonRating) => val jsValue = Json.parse(jsonRating) AmazonRating.amazonRatingFormat.reads(jsValue) match { case JsSuccess(rating, _) => rating case JsError(_) => AmazonRating.empty } } .filter(_ != AmazonRating.empty) *//this line raises the runtime error, but if i comment it another different runtime exception happens!* .foreachRDD(_.foreachPartition(it => recommender.predictWithALS(it.toSeq))) println("<---POSSIBLE SOLUTION--->") } } ) }catch{ case e: IllegalArgumentException => {println("illegal arg. exception")}; case e: IllegalStateException => {println("illegal state exception")}; case e: ClassCastException => {println("ClassCastException")}; case e: Exception => {println(" Generic Exception")}; }finally{ println("Finished taking data from kafka topic...") } If i comment the line with the second foreachRDD, the next runtime exception happens within a fresh start, i mean, the kafka producer push data within the topic: 16/05/25 17:26:12 ERROR JobScheduler: Error running job streaming job 1464189972000 ms.0 org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 0.0 failed 1 times, most recent failure: Lost task 0.0 in stage 0.0 (TID 0, localhost): java.lang.ClassCastException: org.apache.spark.util.SerializableConfiguration cannot be cast to [B at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:62) at org.apache.spark.scheduler.Task.run(Task.scala:89) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:214) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) at java.lang.Thread.run(Thread.java:745) If i push another json data within the topic, the next exception happens: 16/05/25 17:27:16 INFO DAGScheduler: Job 1 finished: runJob at KafkaRDD.scala:98, took 0,039689 s <------> someMessages is [Lscala.Tuple2;@712ca120 (null,{"userId":"someUserId","productId":"0981531679","rating":9.0}) <------> <---POSSIBLE SOLUTION---> 16/05/25 17:27:16 INFO JobScheduler: Finished job streaming job 1464190036000 ms.0 from job set of time 1464190036000 ms 16/05/25 17:27:16 INFO JobScheduler: Total delay: 0,063 s for time 1464190036000 ms (execution: 0,055 s) 16/05/25 17:27:16 INFO KafkaRDD: Removing RDD 43 from persistence list 16/05/25 17:27:16 ERROR JobScheduler: Error running job streaming job 1464190036000 ms.0 java.lang.IllegalStateException: Adding new inputs, transformations, and output operations after starting a context is not supported at org.apache.spark.streaming.dstream.DStream.validateAtInit(DStream.scala:222) at org.apache.spark.streaming.dstream.DStream.<init>(DStream.scala:64) at org.apache.spark.streaming.dstream.MappedDStream.<init>(MappedDStream.scala:25) at org.apache.spark.streaming.dstream.DStream$$anonfun$map$1.apply(DStream.scala:558) at org.apache.spark.streaming.dstream.DStream$$anonfun$map$1.apply(DStream.scala:558) at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:150) at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:111) at org.apache.spark.SparkContext.withScope(SparkContext.scala:714) at org.apache.spark.streaming.StreamingContext.withScope(StreamingContext.scala:260) at org.apache.spark.streaming.dstream.DStream.map(DStream.scala:557) at example.spark.AmazonKafkaConnector$$anonfun$main$1.apply(AmazonKafkaConnectorWithMongo.scala:125) at example.spark.AmazonKafkaConnector$$anonfun$main$1.apply(AmazonKafkaConnectorWithMongo.scala:114) at org.apache.spark.streaming.dstream.DStream$$anonfun$foreachRDD$1$$anonfun$apply$mcV$sp$3.apply(DStream.scala:661) at org.apache.spark.streaming.dstream.DStream$$anonfun$foreachRDD$1$$anonfun$apply$mcV$sp$3.apply(DStream.scala:661) at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1$$anonfun$apply$mcV$sp$1.apply$mcV$sp(ForEachDStream.scala:50) at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1$$anonfun$apply$mcV$sp$1.apply(ForEachDStream.scala:50) at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1$$anonfun$apply$mcV$sp$1.apply(ForEachDStream.scala:50) at org.apache.spark.streaming.dstream.DStream.createRDDWithLocalProperties(DStream.scala:426) at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply$mcV$sp(ForEachDStream.scala:49) at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply(ForEachDStream.scala:49) at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply(ForEachDStream.scala:49) at scala.util.Try$.apply(Try.scala:161) at org.apache.spark.streaming.scheduler.Job.run(Job.scala:39) at org.apache.spark.streaming.scheduler.JobScheduler$JobHandler$$anonfun$run$1.apply$mcV$sp(JobScheduler.scala:224) at org.apache.spark.streaming.scheduler.JobScheduler$JobHandler$$anonfun$run$1.apply(JobScheduler.scala:224) at org.apache.spark.streaming.scheduler.JobScheduler$JobHandler$$anonfun$run$1.apply(JobScheduler.scala:224) at scala.util.DynamicVariable.withValue(DynamicVariable.scala:57) at org.apache.spark.streaming.scheduler.JobScheduler$JobHandler.run(JobScheduler.scala:223) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) at java.lang.Thread.run(Thread.java:745) If i decomment the second foreachRDD, this exception happens within a fresh start of the spark streaming process after the kafka producer sends data to the topic: org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 0.0 failed 1 times, most recent failure: Lost task 0.0 in stage 0.0 (TID 0, localhost): java.lang.ClassCastException: org.apache.spark.util.SerializableConfiguration cannot be cast to [B at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:62) at org.apache.spark.scheduler.Task.run(Task.scala:89) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:214) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) at java.lang.Thread.run(Thread.java:745) With a second call of the kafka producer, rises this runtime exception: <------> someMessages is [Lscala.Tuple2;@581b83b (null,{"userId":"someUserId","productId":"0981531679","rating":8.0}) <------> <---POSSIBLE SOLUTION---> 16/05/25 17:31:30 INFO JobScheduler: Finished job streaming job 1464190290000 ms.0 from job set of time 1464190290000 ms 16/05/25 17:31:30 INFO JobScheduler: Total delay: 0,066 s for time 1464190290000 ms (execution: 0,059 s) 16/05/25 17:31:30 INFO KafkaRDD: Removing RDD 37 from persistence list 16/05/25 17:31:30 ERROR JobScheduler: Error running job streaming job 1464190290000 ms.0 java.lang.IllegalStateException: Adding new inputs, transformations, and output operations after starting a context is not supported at org.apache.spark.streaming.dstream.DStream.validateAtInit(DStream.scala:222) at org.apache.spark.streaming.dstream.DStream.<init>(DStream.scala:64) at org.apache.spark.streaming.dstream.MappedDStream.<init>(MappedDStream.scala:25) at org.apache.spark.streaming.dstream.DStream$$anonfun$map$1.apply(DStream.scala:558) at org.apache.spark.streaming.dstream.DStream$$anonfun$map$1.apply(DStream.scala:558) at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:150) at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:111) at org.apache.spark.SparkContext.withScope(SparkContext.scala:714) at org.apache.spark.streaming.StreamingContext.withScope(StreamingContext.scala:260) at org.apache.spark.streaming.dstream.DStream.map(DStream.scala:557) at example.spark.AmazonKafkaConnector$$anonfun$main$1.apply(AmazonKafkaConnectorWithMongo.scala:125) at example.spark.AmazonKafkaConnector$$anonfun$main$1.apply(AmazonKafkaConnectorWithMongo.scala:114) at org.apache.spark.streaming.dstream.DStream$$anonfun$foreachRDD$1$$anonfun$apply$mcV$sp$3.apply(DStream.scala:661) at org.apache.spark.streaming.dstream.DStream$$anonfun$foreachRDD$1$$anonfun$apply$mcV$sp$3.apply(DStream.scala:661) at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1$$anonfun$apply$mcV$sp$1.apply$mcV$sp(ForEachDStream.scala:50) at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1$$anonfun$apply$mcV$sp$1.apply(ForEachDStream.scala:50) at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1$$anonfun$apply$mcV$sp$1.apply(ForEachDStream.scala:50) at org.apache.spark.streaming.dstream.DStream.createRDDWithLocalProperties(DStream.scala:426) at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply$mcV$sp(ForEachDStream.scala:49) at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply(ForEachDStream.scala:49) at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply(ForEachDStream.scala:49) at scala.util.Try$.apply(Try.scala:161) at org.apache.spark.streaming.scheduler.Job.run(Job.scala:39) at org.apache.spark.streaming.scheduler.JobScheduler$JobHandler$$anonfun$run$1.apply$mcV$sp(JobScheduler.scala:224) at org.apache.spark.streaming.scheduler.JobScheduler$JobHandler$$anonfun$run$1.apply(JobScheduler.scala:224) at org.apache.spark.streaming.scheduler.JobScheduler$JobHandler$$anonfun$run$1.apply(JobScheduler.scala:224) at scala.util.DynamicVariable.withValue(DynamicVariable.scala:57) at org.apache.spark.streaming.scheduler.JobScheduler$JobHandler.run(JobScheduler.scala:223) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) at java.lang.Thread.run(Thread.java:745) In your opinion, what changes do i have to do in order to get this code up and running correctly? The idea is to run every rating message that i receive from kafka topic in order to run recommender.predictWithALS method and save results within a mongo instance. I was thinking that this kind of task should be asynchronous, wasn't he? if i am right, how should i change the method to do such that way? Recommender.predictWithALS method: def predictWithALS(ratings: Seq[AmazonRating]) = { // train model val myRatings = ratings.map(toSparkRating) val myRatingRDD = sc.parallelize(myRatings) val startAls = DateTime.now val model = ALS.train((sparkRatings ++ myRatingRDD).repartition(NumPartitions), 10, 20, 0.01) val myProducts = myRatings.map(_.product).toSet val candidates = sc.parallelize((0 until productDict.size).filterNot(myProducts.contains)) // get ratings of all products not in my history ordered by rating (higher first) and only keep the first NumRecommendations val myUserId = userDict.getIndex(MyUsername) val recommendations = model.predict(candidates.map((myUserId, _))).collect val endAls = DateTime.now val result = recommendations.sortBy(-_.rating).take(NumRecommendations).map(toAmazonRating) val alsTime = Seconds.secondsBetween(startAls, endAls).getSeconds println(s"ALS Time: $alsTime seconds") result } thank you very much Alonso Isidoro Roman [image: https://]about.me/alonso.isidoro.roman <https://about.me/alonso.isidoro.roman?promo=email_sig&utm_source=email_sig&utm_medium=email_sig&utm_campaign=external_links> 2016-05-25 16:54 GMT+02:00 Matthias Niehoff <matthias.nieh...@codecentric.de >: > Hi, > > you register some output actions (in this case foreachRDD) after starting > the streaming context. StreamingContext.start() has to be called after all! > output actions. > > 2016-05-25 15:59 GMT+02:00 Alonso <alons...@gmail.com>: > >> Hi, i am receiving this exception when direct spark streaming process >> tries to pull data from kafka topic: >> >> 16/05/25 11:30:30 INFO CheckpointWriter: Checkpoint for time >> 1464168630000 ms saved to file >> 'file:/Users/aironman/my-recommendation-spark-engine/checkpoint/checkpoint-1464168630000', >> took 5928 bytes and 8 ms >> >> 16/05/25 11:30:30 INFO Executor: Finished task 0.0 in stage 2.0 (TID 2). >> 1041 bytes result sent to driver >> 16/05/25 11:30:30 INFO TaskSetManager: Finished task 0.0 in stage 2.0 (TID >> 2) in 4 ms on localhost (1/1) >> 16/05/25 11:30:30 INFO TaskSchedulerImpl: Removed TaskSet 2.0, whose tasks >> have all completed, from pool >> 16/05/25 11:30:30 INFO DAGScheduler: ResultStage 2 (runJob at >> KafkaRDD.scala:98) finished in 0,004 s >> 16/05/25 11:30:30 INFO DAGScheduler: Job 2 finished: runJob at >> KafkaRDD.scala:98, took 0,008740 s >> <------> >> someMessages is [Lscala.Tuple2;@2641d687 >> (null,{"userId":"someUserId","productId":"0981531679","rating":6.0}) >> <------> >> <---POSSIBLE SOLUTION---> >> 16/05/25 11:30:30 INFO JobScheduler: Finished job streaming job >> 1464168630000 ms.0 from job set of time 1464168630000 ms >> 16/05/25 11:30:30 INFO KafkaRDD: Removing RDD 105 from persistence list >> 16/05/25 11:30:30 INFO JobScheduler: Total delay: 0,020 s for time >> 1464168630000 ms (execution: 0,012 s) >> 16/05/25 11:30:30 ERROR JobScheduler: Error running job streaming job >> 1464168630000 ms.0*java.lang.IllegalStateException: Adding new inputs, >> transformations, and output operations after starting a context is not >> supported >> at* >> org.apache.spark.streaming.dstream.DStream.validateAtInit(DStream.scala:222) >> at org.apache.spark.streaming.dstream.DStream.<init>(DStream.scala:64) >> at >> org.apache.spark.streaming.dstream.MappedDStream.<init>(MappedDStream.scala:25) >> at >> org.apache.spark.streaming.dstream.DStream$$anonfun$map$1.apply(DStream.scala:558) >> at >> org.apache.spark.streaming.dstream.DStream$$anonfun$map$1.apply(DStream.scala:558) >> at >> org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:150) >> at >> org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:111) >> at org.apache.spark.SparkContext.withScope(SparkContext.scala:714) >> at >> org.apache.spark.streaming.StreamingContext.withScope(StreamingContext.scala:260) >> at org.apache.spark.streaming.dstream.DStream.map(DStream.scala:557) >> at >> example.spark.AmazonKafkaConnector$$anonfun$main$1.apply(AmazonKafkaConnectorWithMongo.scala:125) >> at >> example.spark.AmazonKafkaConnector$$anonfun$main$1.apply(AmazonKafkaConnectorWithMongo.scala:114) >> at >> org.apache.spark.streaming.dstream.DStream$$anonfun$foreachRDD$1$$anonfun$apply$mcV$sp$3.apply(DStream.scala:661) >> at >> org.apache.spark.streaming.dstream.DStream$$anonfun$foreachRDD$1$$anonfun$apply$mcV$sp$3.apply(DStream.scala:661) >> at >> org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1$$anonfun$apply$mcV$sp$1.apply$mcV$sp(ForEachDStream.scala:50) >> at >> org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1$$anonfun$apply$mcV$sp$1.apply(ForEachDStream.scala:50) >> at >> org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1$$anonfun$apply$mcV$sp$1.apply(ForEachDStream.scala:50) >> at >> org.apache.spark.streaming.dstream.DStream.createRDDWithLocalProperties(DStream.scala:426) >> at >> org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply$mcV$sp(ForEachDStream.scala:49) >> at >> org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply(ForEachDStream.scala:49) >> at >> org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply(ForEachDStream.scala:49) >> at scala.util.Try$.apply(Try.scala:161) >> at org.apache.spark.streaming.scheduler.Job.run(Job.scala:39) >> at >> org.apache.spark.streaming.scheduler.JobScheduler$JobHandler$$anonfun$run$1.apply$mcV$sp(JobScheduler.scala:224) >> at >> org.apache.spark.streaming.scheduler.JobScheduler$JobHandler$$anonfun$run$1.apply(JobScheduler.scala:224) >> at >> org.apache.spark.streaming.scheduler.JobScheduler$JobHandler$$anonfun$run$1.apply(JobScheduler.scala:224) >> at scala.util.DynamicVariable.withValue(DynamicVariable.scala:57) >> at >> org.apache.spark.streaming.scheduler.JobScheduler$JobHandler.run(JobScheduler.scala:223) >> at >> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) >> at >> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) >> at java.lang.Thread.run(Thread.java:745) >> 16/05/25 11:30:30 INFO BlockManager: Removing RDD 105 >> >> >> This is the code that rises the exception in the spark streaming process: >> >> try{ >> messages.foreachRDD( rdd =>{ >> val count = rdd.count() >> if (count > 0){ >> //someMessages should be AmazonRating... >> val someMessages = rdd.take(count.toInt) >> println("<------>") >> println("someMessages is " + someMessages) >> someMessages.foreach(println) >> println("<------>") >> println("<---POSSIBLE SOLUTION--->") >> messages >> .map { case (_, jsonRating) => >> val jsValue = Json.parse(jsonRating) >> AmazonRating.amazonRatingFormat.reads(jsValue) match { >> case JsSuccess(rating, _) => rating >> case JsError(_) => AmazonRating.empty >> } >> } >> .filter(_ != AmazonRating.empty) >> *//I think that this line provokes the runtime exception...* >> * .foreachRDD(_.foreachPartition(it => >> recommender.predictWithALS(it.toSeq)))* >> >> println("<---POSSIBLE SOLUTION--->") >> >> } >> } >> ) >> }catch{ >> case e: IllegalArgumentException => {println("illegal arg. >> exception")}; >> case e: IllegalStateException => {println("illegal state >> exception")}; >> case e: ClassCastException => {println("ClassCastException")}; >> case e: Exception => {println(" Generic Exception")}; >> }finally{ >> >> println("Finished taking data from kafka topic...") >> } >> >> Recommender object: >> >> *def predictWithALS(ratings: Seq[AmazonRating])* = { >> // train model >> val myRatings = ratings.map(toSparkRating) >> val myRatingRDD = sc.parallelize(myRatings) >> >> val startAls = DateTime.now >> val model = ALS.train((sparkRatings ++ >> myRatingRDD).repartition(NumPartitions), 10, 20, 0.01) >> >> val myProducts = myRatings.map(_.product).toSet >> val candidates = sc.parallelize((0 until >> productDict.size).filterNot(myProducts.contains)) >> >> // get ratings of all products not in my history ordered by rating >> (higher first) and only keep the first NumRecommendations >> val myUserId = userDict.getIndex(MyUsername) >> val recommendations = model.predict(candidates.map((myUserId, >> _))).collect >> val endAls = DateTime.now >> val result = >> recommendations.sortBy(-_.rating).take(NumRecommendations).map(toAmazonRating) >> val alsTime = Seconds.secondsBetween(startAls, endAls).getSeconds >> >> println(s"ALS Time: $alsTime seconds") >> result >> } >> } >> >> And this is the kafka producer that push the json data within the topic: >> >> object AmazonProducerExample { >> def main(args: Array[String]): Unit = { >> >> val productId = args(0).toString >> val userId = args(1).toString >> val rating = args(2).toDouble >> val topicName = "amazonRatingsTopic" >> >> val producer = Producer[String](topicName) >> >> //0981531679 is Scala Puzzlers... >> //AmazonProductAndRating >> AmazonPageParser.parse(productId,userId,rating).onSuccess { case >> amazonRating => >> //Is this the correct way? the best performance? possibly not, what >> about using avro or parquet? >> producer.send(Json.toJson(amazonRating).toString) >> //producer.send(amazonRating) >> println("amazon product with rating sent to kafka cluster..." + >> amazonRating.toString) >> System.exit(0) >> } >> >> } >> } >> >> >> I have written a stack overflow post >> <http://stackoverflow.com/questions/37303202/about-an-error-accessing-a-field-inside-tuple2>, >> with more details, please help, i am stuck with this issue and i don't know >> how to continue. >> >> Regards >> >> Alonso Isidoro Roman >> [image: https://]about.me/alonso.isidoro.roman >> >> <https://about.me/alonso.isidoro.roman?promo=email_sig&utm_source=email_sig&utm_medium=email_sig&utm_campaign=external_links> >> >> ------------------------------ >> View this message in context: about an exception when receiving data >> from kafka topic using Direct mode of Spark Streaming >> <http://apache-spark-user-list.1001560.n3.nabble.com/about-an-exception-when-receiving-data-from-kafka-topic-using-Direct-mode-of-Spark-Streaming-tp27022.html> >> Sent from the Apache Spark User List mailing list archive >> <http://apache-spark-user-list.1001560.n3.nabble.com/> at Nabble.com. >> > > > > -- > Matthias Niehoff | IT-Consultant | Agile Software Factory | Consulting > codecentric AG | Zeppelinstr 2 | 76185 Karlsruhe | Deutschland > tel: +49 (0) 721.9595-681 | fax: +49 (0) 721.9595-666 | mobil: +49 (0) > 172.1702676 > www.codecentric.de | blog.codecentric.de | www.meettheexperts.de | > www.more4fi.de > > Sitz der Gesellschaft: Solingen | HRB 25917| Amtsgericht Wuppertal > Vorstand: Michael Hochgürtel . Mirko Novakovic . Rainer Vehns > Aufsichtsrat: Patric Fedlmeier (Vorsitzender) . Klaus Jäger . Jürgen Schütz > > Diese E-Mail einschließlich evtl. beigefügter Dateien enthält vertrauliche > und/oder rechtlich geschützte Informationen. Wenn Sie nicht der richtige > Adressat sind oder diese E-Mail irrtümlich erhalten haben, informieren Sie > bitte sofort den Absender und löschen Sie diese E-Mail und evtl. > beigefügter Dateien umgehend. Das unerlaubte Kopieren, Nutzen oder Öffnen > evtl. beigefügter Dateien sowie die unbefugte Weitergabe dieser E-Mail ist > nicht gestattet >