Thank you Cody, i will try to follow your advice. Alonso Isidoro Roman [image: https://]about.me/alonso.isidoro.roman <https://about.me/alonso.isidoro.roman?promo=email_sig&utm_source=email_sig&utm_medium=email_sig&utm_campaign=external_links>
2016-05-26 17:00 GMT+02:00 Cody Koeninger <c...@koeninger.org>: > Honestly given this thread, and the stack overflow thread, I'd say you > need to back up, start very simply, and learn spark. If for some reason > the official docs aren't doing it for you, learning spark from oreilly is a > good book. > > Given your specific question, why not just > > messages.foreachRDD { rdd => > rdd.foreachPartition { iterator => > someWorkOnAnIterator(iterator) > > > All of the other extraneous stuff you're doing doesn't make any sense to > me. > > > > On Thu, May 26, 2016 at 2:48 AM, Alonso Isidoro Roman <alons...@gmail.com> > wrote: > >> Hi Matthias and Cody, >> >> You can see in the code that StreamingContext.start() is called after the >> messages.foreachRDD output action. Another problem @Cody is how can i avoid >> the inner .foreachRDD(_.foreachPartition(it => >> recommender.predictWithALS(it.toSeq))) in order to invoke asynchronously >> recommender.predictWithALS which runs a machine learning ALS implementation >> with a message from the kafka topic?. >> >> In the actual code i am not using for now any code to save data within >> the mongo instance, for now, it is more important to be focus in how to >> receive the message from the kafka topic and feeding asynchronously the ALS >> implementation. Probably the Recommender object will need the code for >> interact with the mongo instance. >> >> The idea of the process is to receive data from the kafka topic, >> calculate its recommendations based on the incoming message and save the >> results within a mongo instance. Is it possible? Am i missing something >> important? >> >> def main(args: Array[String]) { >> // Process program arguments and set properties >> >> if (args.length < 2) { >> System.err.println("Usage: " + this.getClass.getSimpleName + " >> <brokers> <topics>") >> System.exit(1) >> } >> >> val Array(brokers, topics) = args >> >> println("Initializing Streaming Spark Context and kafka connector...") >> // Create context with 2 second batch interval >> val sparkConf = new SparkConf().setAppName("AmazonKafkaConnector") >> .setMaster("local[4]") >> >> .set("spark.driver.allowMultipleContexts", "true") >> >> val sc = new SparkContext(sparkConf) >> >> sc.addJar("target/scala-2.10/blog-spark-recommendation_2.10-1.0-SNAPSHOT.jar") >> val ssc = new StreamingContext(sparkConf, Seconds(2)) >> //this checkpointdir should be in a conf file, for now it is >> hardcoded! >> val streamingCheckpointDir = >> "/Users/aironman/my-recommendation-spark-engine/checkpoint" >> ssc.checkpoint(streamingCheckpointDir) >> >> // Create direct kafka stream with brokers and topics >> val topicsSet = topics.split(",").toSet >> val kafkaParams = Map[String, String]("metadata.broker.list" -> >> brokers) >> val messages = KafkaUtils.createDirectStream[String, String, >> StringDecoder, StringDecoder](ssc, kafkaParams, topicsSet) >> println("Initialized Streaming Spark Context and kafka connector...") >> >> //create recomendation module >> println("Creating rating recommender module...") >> val ratingFile= "ratings.csv" >> val recommender = new Recommender(sc,ratingFile) >> println("Initialized rating recommender module...") >> >> //i have to convert messages which is a InputDStream into a >> Seq[AmazonRating]... >> try{ >> messages.foreachRDD( rdd =>{ >> val count = rdd.count() >> if (count > 0){ >> //someMessages should be AmazonRating... >> val someMessages = rdd.take(count.toInt) >> println("<------>") >> println("someMessages is " + someMessages) >> someMessages.foreach(println) >> println("<------>") >> println("<---POSSIBLE SOLUTION--->") >> >> messages >> .map { case (_, jsonRating) => >> val jsValue = Json.parse(jsonRating) >> AmazonRating.amazonRatingFormat.reads(jsValue) match { >> case JsSuccess(rating, _) => rating >> case JsError(_) => AmazonRating.empty >> } >> } >> .filter(_ != AmazonRating.empty) >> //probably is not a good idea to do this... >> .foreachRDD(_.foreachPartition(it => >> recommender.predictWithALS(it.toSeq))) >> >> println("<---POSSIBLE SOLUTION--->") >> >> } >> } >> ) >> }catch{ >> case e: IllegalArgumentException => {println("illegal arg. >> exception")}; >> case e: IllegalStateException => {println("illegal state >> exception")}; >> case e: ClassCastException => {println("ClassCastException")}; >> case e: Exception => {println(" Generic Exception")}; >> }finally{ >> >> println("Finished taking data from kafka topic...") >> } >> >> //println("jsonParsed is " + jsonParsed) >> //The idea is to save results from Recommender.predict within >> mongodb, so i will have to deal with this issue >> //after resolving the issue of >> .foreachRDD(_.foreachPartition(recommender.predict(_.toSeq))) >> >> *ssc.start()* >> ssc.awaitTermination() >> >> println("Finished!") >> } >> } >> >> Thank you for reading until here, please, i need your assistance. >> >> Regards >> >> >> Alonso Isidoro Roman >> [image: https://]about.me/alonso.isidoro.roman >> >> <https://about.me/alonso.isidoro.roman?promo=email_sig&utm_source=email_sig&utm_medium=email_sig&utm_campaign=external_links> >> >> 2016-05-25 17:33 GMT+02:00 Alonso Isidoro Roman <alons...@gmail.com>: >> >>> Hi Matthias and Cody, thanks for the answer. This is the code that is >>> raising the runtime exception: >>> >>> try{ >>> messages.foreachRDD( rdd =>{ >>> val count = rdd.count() >>> if (count > 0){ >>> //someMessages should be AmazonRating... >>> val someMessages = rdd.take(count.toInt) >>> println("<------>") >>> println("someMessages is " + someMessages) >>> someMessages.foreach(println) >>> println("<------>") >>> println("<---POSSIBLE SOLUTION--->") >>> messages >>> .map { case (_, jsonRating) => >>> val jsValue = Json.parse(jsonRating) >>> AmazonRating.amazonRatingFormat.reads(jsValue) match { >>> case JsSuccess(rating, _) => rating >>> case JsError(_) => AmazonRating.empty >>> } >>> } >>> .filter(_ != AmazonRating.empty) >>> *//this line raises the runtime error, but if i comment it >>> another different runtime exception happens!* >>> .foreachRDD(_.foreachPartition(it => >>> recommender.predictWithALS(it.toSeq))) >>> println("<---POSSIBLE SOLUTION--->") >>> } >>> } >>> ) >>> }catch{ >>> case e: IllegalArgumentException => {println("illegal arg. >>> exception")}; >>> case e: IllegalStateException => {println("illegal state >>> exception")}; >>> case e: ClassCastException => >>> {println("ClassCastException")}; >>> case e: Exception => {println(" Generic >>> Exception")}; >>> }finally{ >>> >>> println("Finished taking data from kafka topic...") >>> } >>> >>> If i comment the line with the second foreachRDD, the next runtime >>> exception happens within a fresh start, i mean, the kafka producer push >>> data within the topic: >>> >>> 16/05/25 17:26:12 ERROR JobScheduler: Error running job streaming job >>> 1464189972000 ms.0 >>> >>> org.apache.spark.SparkException: Job aborted due to stage failure: Task >>> 0 in stage 0.0 failed 1 times, most recent failure: Lost task 0.0 in stage >>> 0.0 (TID 0, localhost): java.lang.ClassCastException: >>> org.apache.spark.util.SerializableConfiguration cannot be cast to [B >>> >>> at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:62) >>> >>> at org.apache.spark.scheduler.Task.run(Task.scala:89) >>> >>> at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:214) >>> >>> at >>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) >>> >>> at >>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) >>> >>> at java.lang.Thread.run(Thread.java:745) >>> >>> If i push another json data within the topic, the next exception happens: >>> >>> 16/05/25 17:27:16 INFO DAGScheduler: Job 1 finished: runJob at >>> KafkaRDD.scala:98, took 0,039689 s >>> >>> <------> >>> >>> someMessages is [Lscala.Tuple2;@712ca120 >>> >>> (null,{"userId":"someUserId","productId":"0981531679","rating":9.0}) >>> >>> <------> >>> >>> <---POSSIBLE SOLUTION---> >>> >>> 16/05/25 17:27:16 INFO JobScheduler: Finished job streaming job >>> 1464190036000 ms.0 from job set of time 1464190036000 ms >>> >>> 16/05/25 17:27:16 INFO JobScheduler: Total delay: 0,063 s for time >>> 1464190036000 ms (execution: 0,055 s) >>> >>> 16/05/25 17:27:16 INFO KafkaRDD: Removing RDD 43 from persistence list >>> >>> 16/05/25 17:27:16 ERROR JobScheduler: Error running job streaming job >>> 1464190036000 ms.0 >>> >>> java.lang.IllegalStateException: Adding new inputs, transformations, and >>> output operations after starting a context is not supported >>> >>> at >>> org.apache.spark.streaming.dstream.DStream.validateAtInit(DStream.scala:222) >>> >>> at org.apache.spark.streaming.dstream.DStream.<init>(DStream.scala:64) >>> >>> at >>> org.apache.spark.streaming.dstream.MappedDStream.<init>(MappedDStream.scala:25) >>> >>> at >>> org.apache.spark.streaming.dstream.DStream$$anonfun$map$1.apply(DStream.scala:558) >>> >>> at >>> org.apache.spark.streaming.dstream.DStream$$anonfun$map$1.apply(DStream.scala:558) >>> >>> at >>> org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:150) >>> >>> at >>> org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:111) >>> >>> at org.apache.spark.SparkContext.withScope(SparkContext.scala:714) >>> >>> at >>> org.apache.spark.streaming.StreamingContext.withScope(StreamingContext.scala:260) >>> >>> at org.apache.spark.streaming.dstream.DStream.map(DStream.scala:557) >>> >>> at >>> example.spark.AmazonKafkaConnector$$anonfun$main$1.apply(AmazonKafkaConnectorWithMongo.scala:125) >>> >>> at >>> example.spark.AmazonKafkaConnector$$anonfun$main$1.apply(AmazonKafkaConnectorWithMongo.scala:114) >>> >>> at >>> org.apache.spark.streaming.dstream.DStream$$anonfun$foreachRDD$1$$anonfun$apply$mcV$sp$3.apply(DStream.scala:661) >>> >>> at >>> org.apache.spark.streaming.dstream.DStream$$anonfun$foreachRDD$1$$anonfun$apply$mcV$sp$3.apply(DStream.scala:661) >>> >>> at >>> org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1$$anonfun$apply$mcV$sp$1.apply$mcV$sp(ForEachDStream.scala:50) >>> >>> at >>> org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1$$anonfun$apply$mcV$sp$1.apply(ForEachDStream.scala:50) >>> >>> at >>> org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1$$anonfun$apply$mcV$sp$1.apply(ForEachDStream.scala:50) >>> >>> at >>> org.apache.spark.streaming.dstream.DStream.createRDDWithLocalProperties(DStream.scala:426) >>> >>> at >>> org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply$mcV$sp(ForEachDStream.scala:49) >>> >>> at >>> org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply(ForEachDStream.scala:49) >>> >>> at >>> org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply(ForEachDStream.scala:49) >>> >>> at scala.util.Try$.apply(Try.scala:161) >>> >>> at org.apache.spark.streaming.scheduler.Job.run(Job.scala:39) >>> >>> at >>> org.apache.spark.streaming.scheduler.JobScheduler$JobHandler$$anonfun$run$1.apply$mcV$sp(JobScheduler.scala:224) >>> >>> at >>> org.apache.spark.streaming.scheduler.JobScheduler$JobHandler$$anonfun$run$1.apply(JobScheduler.scala:224) >>> >>> at >>> org.apache.spark.streaming.scheduler.JobScheduler$JobHandler$$anonfun$run$1.apply(JobScheduler.scala:224) >>> >>> at scala.util.DynamicVariable.withValue(DynamicVariable.scala:57) >>> >>> at >>> org.apache.spark.streaming.scheduler.JobScheduler$JobHandler.run(JobScheduler.scala:223) >>> >>> at >>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) >>> >>> at >>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) >>> at java.lang.Thread.run(Thread.java:745) >>> >>> If i decomment the second foreachRDD, this exception happens within a >>> fresh start of the spark streaming process after the kafka producer sends >>> data to the topic: >>> >>> org.apache.spark.SparkException: Job aborted due to stage failure: Task >>> 0 in stage 0.0 failed 1 times, most recent failure: Lost task 0.0 in stage >>> 0.0 (TID 0, localhost): java.lang.ClassCastException: >>> org.apache.spark.util.SerializableConfiguration cannot be cast to [B >>> >>> at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:62) >>> >>> at org.apache.spark.scheduler.Task.run(Task.scala:89) >>> >>> at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:214) >>> >>> at >>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) >>> >>> at >>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) >>> at java.lang.Thread.run(Thread.java:745) >>> >>> With a second call of the kafka producer, rises this runtime exception: >>> >>> <------> >>> >>> someMessages is [Lscala.Tuple2;@581b83b >>> >>> (null,{"userId":"someUserId","productId":"0981531679","rating":8.0}) >>> >>> <------> >>> >>> <---POSSIBLE SOLUTION---> >>> >>> 16/05/25 17:31:30 INFO JobScheduler: Finished job streaming job >>> 1464190290000 ms.0 from job set of time 1464190290000 ms >>> >>> 16/05/25 17:31:30 INFO JobScheduler: Total delay: 0,066 s for time >>> 1464190290000 ms (execution: 0,059 s) >>> >>> 16/05/25 17:31:30 INFO KafkaRDD: Removing RDD 37 from persistence list >>> >>> 16/05/25 17:31:30 ERROR JobScheduler: Error running job streaming job >>> 1464190290000 ms.0 >>> >>> java.lang.IllegalStateException: Adding new inputs, transformations, and >>> output operations after starting a context is not supported >>> >>> at >>> org.apache.spark.streaming.dstream.DStream.validateAtInit(DStream.scala:222) >>> >>> at org.apache.spark.streaming.dstream.DStream.<init>(DStream.scala:64) >>> >>> at >>> org.apache.spark.streaming.dstream.MappedDStream.<init>(MappedDStream.scala:25) >>> >>> at >>> org.apache.spark.streaming.dstream.DStream$$anonfun$map$1.apply(DStream.scala:558) >>> >>> at >>> org.apache.spark.streaming.dstream.DStream$$anonfun$map$1.apply(DStream.scala:558) >>> >>> at >>> org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:150) >>> >>> at >>> org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:111) >>> >>> at org.apache.spark.SparkContext.withScope(SparkContext.scala:714) >>> >>> at >>> org.apache.spark.streaming.StreamingContext.withScope(StreamingContext.scala:260) >>> >>> at org.apache.spark.streaming.dstream.DStream.map(DStream.scala:557) >>> >>> at >>> example.spark.AmazonKafkaConnector$$anonfun$main$1.apply(AmazonKafkaConnectorWithMongo.scala:125) >>> >>> at >>> example.spark.AmazonKafkaConnector$$anonfun$main$1.apply(AmazonKafkaConnectorWithMongo.scala:114) >>> >>> at >>> org.apache.spark.streaming.dstream.DStream$$anonfun$foreachRDD$1$$anonfun$apply$mcV$sp$3.apply(DStream.scala:661) >>> >>> at >>> org.apache.spark.streaming.dstream.DStream$$anonfun$foreachRDD$1$$anonfun$apply$mcV$sp$3.apply(DStream.scala:661) >>> >>> at >>> org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1$$anonfun$apply$mcV$sp$1.apply$mcV$sp(ForEachDStream.scala:50) >>> >>> at >>> org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1$$anonfun$apply$mcV$sp$1.apply(ForEachDStream.scala:50) >>> >>> at >>> org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1$$anonfun$apply$mcV$sp$1.apply(ForEachDStream.scala:50) >>> >>> at >>> org.apache.spark.streaming.dstream.DStream.createRDDWithLocalProperties(DStream.scala:426) >>> >>> at >>> org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply$mcV$sp(ForEachDStream.scala:49) >>> >>> at >>> org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply(ForEachDStream.scala:49) >>> >>> at >>> org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply(ForEachDStream.scala:49) >>> >>> at scala.util.Try$.apply(Try.scala:161) >>> >>> at org.apache.spark.streaming.scheduler.Job.run(Job.scala:39) >>> >>> at >>> org.apache.spark.streaming.scheduler.JobScheduler$JobHandler$$anonfun$run$1.apply$mcV$sp(JobScheduler.scala:224) >>> >>> at >>> org.apache.spark.streaming.scheduler.JobScheduler$JobHandler$$anonfun$run$1.apply(JobScheduler.scala:224) >>> >>> at >>> org.apache.spark.streaming.scheduler.JobScheduler$JobHandler$$anonfun$run$1.apply(JobScheduler.scala:224) >>> >>> at scala.util.DynamicVariable.withValue(DynamicVariable.scala:57) >>> >>> at >>> org.apache.spark.streaming.scheduler.JobScheduler$JobHandler.run(JobScheduler.scala:223) >>> >>> at >>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) >>> >>> at >>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) >>> >>> at java.lang.Thread.run(Thread.java:745) >>> >>> >>> >>> In your opinion, what changes do i have to do in order to get this code >>> up and running correctly? >>> >>> The idea is to run every rating message that i receive from kafka topic >>> in order to run recommender.predictWithALS method and save results within a >>> mongo instance. I was thinking that this kind of task should be >>> asynchronous, wasn't he? if i am right, how should i change the method to >>> do such that way? >>> >>> Recommender.predictWithALS method: >>> >>> def predictWithALS(ratings: Seq[AmazonRating]) = { >>> // train model >>> val myRatings = ratings.map(toSparkRating) >>> val myRatingRDD = sc.parallelize(myRatings) >>> >>> val startAls = DateTime.now >>> val model = ALS.train((sparkRatings ++ >>> myRatingRDD).repartition(NumPartitions), 10, 20, 0.01) >>> >>> val myProducts = myRatings.map(_.product).toSet >>> val candidates = sc.parallelize((0 until >>> productDict.size).filterNot(myProducts.contains)) >>> >>> // get ratings of all products not in my history ordered by rating >>> (higher first) and only keep the first NumRecommendations >>> val myUserId = userDict.getIndex(MyUsername) >>> val recommendations = model.predict(candidates.map((myUserId, >>> _))).collect >>> val endAls = DateTime.now >>> val result = >>> recommendations.sortBy(-_.rating).take(NumRecommendations).map(toAmazonRating) >>> val alsTime = Seconds.secondsBetween(startAls, endAls).getSeconds >>> >>> println(s"ALS Time: $alsTime seconds") >>> result >>> } >>> >>> thank you very much >>> >>> Alonso Isidoro Roman >>> [image: https://]about.me/alonso.isidoro.roman >>> >>> <https://about.me/alonso.isidoro.roman?promo=email_sig&utm_source=email_sig&utm_medium=email_sig&utm_campaign=external_links> >>> >>> 2016-05-25 16:54 GMT+02:00 Matthias Niehoff < >>> matthias.nieh...@codecentric.de>: >>> >>>> Hi, >>>> >>>> you register some output actions (in this case foreachRDD) after >>>> starting the streaming context. StreamingContext.start() has to be called >>>> after all! output actions. >>>> >>>> 2016-05-25 15:59 GMT+02:00 Alonso <alons...@gmail.com>: >>>> >>>>> Hi, i am receiving this exception when direct spark streaming process >>>>> tries to pull data from kafka topic: >>>>> >>>>> 16/05/25 11:30:30 INFO CheckpointWriter: Checkpoint for time >>>>> 1464168630000 ms saved to file >>>>> 'file:/Users/aironman/my-recommendation-spark-engine/checkpoint/checkpoint-1464168630000', >>>>> took 5928 bytes and 8 ms >>>>> >>>>> 16/05/25 11:30:30 INFO Executor: Finished task 0.0 in stage 2.0 (TID 2). >>>>> 1041 bytes result sent to driver >>>>> 16/05/25 11:30:30 INFO TaskSetManager: Finished task 0.0 in stage 2.0 >>>>> (TID 2) in 4 ms on localhost (1/1) >>>>> 16/05/25 11:30:30 INFO TaskSchedulerImpl: Removed TaskSet 2.0, whose >>>>> tasks have all completed, from pool >>>>> 16/05/25 11:30:30 INFO DAGScheduler: ResultStage 2 (runJob at >>>>> KafkaRDD.scala:98) finished in 0,004 s >>>>> 16/05/25 11:30:30 INFO DAGScheduler: Job 2 finished: runJob at >>>>> KafkaRDD.scala:98, took 0,008740 s >>>>> <------> >>>>> someMessages is [Lscala.Tuple2;@2641d687 >>>>> (null,{"userId":"someUserId","productId":"0981531679","rating":6.0}) >>>>> <------> >>>>> <---POSSIBLE SOLUTION---> >>>>> 16/05/25 11:30:30 INFO JobScheduler: Finished job streaming job >>>>> 1464168630000 ms.0 from job set of time 1464168630000 ms >>>>> 16/05/25 11:30:30 INFO KafkaRDD: Removing RDD 105 from persistence list >>>>> 16/05/25 11:30:30 INFO JobScheduler: Total delay: 0,020 s for time >>>>> 1464168630000 ms (execution: 0,012 s) >>>>> 16/05/25 11:30:30 ERROR JobScheduler: Error running job streaming job >>>>> 1464168630000 ms.0*java.lang.IllegalStateException: Adding new inputs, >>>>> transformations, and output operations after starting a context is not >>>>> supported >>>>> at* >>>>> org.apache.spark.streaming.dstream.DStream.validateAtInit(DStream.scala:222) >>>>> at org.apache.spark.streaming.dstream.DStream.<init>(DStream.scala:64) >>>>> at >>>>> org.apache.spark.streaming.dstream.MappedDStream.<init>(MappedDStream.scala:25) >>>>> at >>>>> org.apache.spark.streaming.dstream.DStream$$anonfun$map$1.apply(DStream.scala:558) >>>>> at >>>>> org.apache.spark.streaming.dstream.DStream$$anonfun$map$1.apply(DStream.scala:558) >>>>> at >>>>> org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:150) >>>>> at >>>>> org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:111) >>>>> at org.apache.spark.SparkContext.withScope(SparkContext.scala:714) >>>>> at >>>>> org.apache.spark.streaming.StreamingContext.withScope(StreamingContext.scala:260) >>>>> at org.apache.spark.streaming.dstream.DStream.map(DStream.scala:557) >>>>> at >>>>> example.spark.AmazonKafkaConnector$$anonfun$main$1.apply(AmazonKafkaConnectorWithMongo.scala:125) >>>>> at >>>>> example.spark.AmazonKafkaConnector$$anonfun$main$1.apply(AmazonKafkaConnectorWithMongo.scala:114) >>>>> at >>>>> org.apache.spark.streaming.dstream.DStream$$anonfun$foreachRDD$1$$anonfun$apply$mcV$sp$3.apply(DStream.scala:661) >>>>> at >>>>> org.apache.spark.streaming.dstream.DStream$$anonfun$foreachRDD$1$$anonfun$apply$mcV$sp$3.apply(DStream.scala:661) >>>>> at >>>>> org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1$$anonfun$apply$mcV$sp$1.apply$mcV$sp(ForEachDStream.scala:50) >>>>> at >>>>> org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1$$anonfun$apply$mcV$sp$1.apply(ForEachDStream.scala:50) >>>>> at >>>>> org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1$$anonfun$apply$mcV$sp$1.apply(ForEachDStream.scala:50) >>>>> at >>>>> org.apache.spark.streaming.dstream.DStream.createRDDWithLocalProperties(DStream.scala:426) >>>>> at >>>>> org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply$mcV$sp(ForEachDStream.scala:49) >>>>> at >>>>> org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply(ForEachDStream.scala:49) >>>>> at >>>>> org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply(ForEachDStream.scala:49) >>>>> at scala.util.Try$.apply(Try.scala:161) >>>>> at org.apache.spark.streaming.scheduler.Job.run(Job.scala:39) >>>>> at >>>>> org.apache.spark.streaming.scheduler.JobScheduler$JobHandler$$anonfun$run$1.apply$mcV$sp(JobScheduler.scala:224) >>>>> at >>>>> org.apache.spark.streaming.scheduler.JobScheduler$JobHandler$$anonfun$run$1.apply(JobScheduler.scala:224) >>>>> at >>>>> org.apache.spark.streaming.scheduler.JobScheduler$JobHandler$$anonfun$run$1.apply(JobScheduler.scala:224) >>>>> at scala.util.DynamicVariable.withValue(DynamicVariable.scala:57) >>>>> at >>>>> org.apache.spark.streaming.scheduler.JobScheduler$JobHandler.run(JobScheduler.scala:223) >>>>> at >>>>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) >>>>> at >>>>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) >>>>> at java.lang.Thread.run(Thread.java:745) >>>>> 16/05/25 11:30:30 INFO BlockManager: Removing RDD 105 >>>>> >>>>> >>>>> This is the code that rises the exception in the spark streaming >>>>> process: >>>>> >>>>> try{ >>>>> messages.foreachRDD( rdd =>{ >>>>> val count = rdd.count() >>>>> if (count > 0){ >>>>> //someMessages should be AmazonRating... >>>>> val someMessages = rdd.take(count.toInt) >>>>> println("<------>") >>>>> println("someMessages is " + someMessages) >>>>> someMessages.foreach(println) >>>>> println("<------>") >>>>> println("<---POSSIBLE SOLUTION--->") >>>>> messages >>>>> .map { case (_, jsonRating) => >>>>> val jsValue = Json.parse(jsonRating) >>>>> AmazonRating.amazonRatingFormat.reads(jsValue) match { >>>>> case JsSuccess(rating, _) => rating >>>>> case JsError(_) => AmazonRating.empty >>>>> } >>>>> } >>>>> .filter(_ != AmazonRating.empty) >>>>> *//I think that this line provokes the runtime exception...* >>>>> * .foreachRDD(_.foreachPartition(it => >>>>> recommender.predictWithALS(it.toSeq)))* >>>>> >>>>> println("<---POSSIBLE SOLUTION--->") >>>>> >>>>> } >>>>> } >>>>> ) >>>>> }catch{ >>>>> case e: IllegalArgumentException => {println("illegal arg. >>>>> exception")}; >>>>> case e: IllegalStateException => {println("illegal state >>>>> exception")}; >>>>> case e: ClassCastException => >>>>> {println("ClassCastException")}; >>>>> case e: Exception => {println(" Generic >>>>> Exception")}; >>>>> }finally{ >>>>> >>>>> println("Finished taking data from kafka topic...") >>>>> } >>>>> >>>>> Recommender object: >>>>> >>>>> *def predictWithALS(ratings: Seq[AmazonRating])* = { >>>>> // train model >>>>> val myRatings = ratings.map(toSparkRating) >>>>> val myRatingRDD = sc.parallelize(myRatings) >>>>> >>>>> val startAls = DateTime.now >>>>> val model = ALS.train((sparkRatings ++ >>>>> myRatingRDD).repartition(NumPartitions), 10, 20, 0.01) >>>>> >>>>> val myProducts = myRatings.map(_.product).toSet >>>>> val candidates = sc.parallelize((0 until >>>>> productDict.size).filterNot(myProducts.contains)) >>>>> >>>>> // get ratings of all products not in my history ordered by rating >>>>> (higher first) and only keep the first NumRecommendations >>>>> val myUserId = userDict.getIndex(MyUsername) >>>>> val recommendations = model.predict(candidates.map((myUserId, >>>>> _))).collect >>>>> val endAls = DateTime.now >>>>> val result = >>>>> recommendations.sortBy(-_.rating).take(NumRecommendations).map(toAmazonRating) >>>>> val alsTime = Seconds.secondsBetween(startAls, endAls).getSeconds >>>>> >>>>> println(s"ALS Time: $alsTime seconds") >>>>> result >>>>> } >>>>> } >>>>> >>>>> And this is the kafka producer that push the json data within the >>>>> topic: >>>>> >>>>> object AmazonProducerExample { >>>>> def main(args: Array[String]): Unit = { >>>>> >>>>> val productId = args(0).toString >>>>> val userId = args(1).toString >>>>> val rating = args(2).toDouble >>>>> val topicName = "amazonRatingsTopic" >>>>> >>>>> val producer = Producer[String](topicName) >>>>> >>>>> //0981531679 is Scala Puzzlers... >>>>> //AmazonProductAndRating >>>>> AmazonPageParser.parse(productId,userId,rating).onSuccess { case >>>>> amazonRating => >>>>> //Is this the correct way? the best performance? possibly not, >>>>> what about using avro or parquet? >>>>> producer.send(Json.toJson(amazonRating).toString) >>>>> //producer.send(amazonRating) >>>>> println("amazon product with rating sent to kafka cluster..." + >>>>> amazonRating.toString) >>>>> System.exit(0) >>>>> } >>>>> >>>>> } >>>>> } >>>>> >>>>> >>>>> I have written a stack overflow post >>>>> <http://stackoverflow.com/questions/37303202/about-an-error-accessing-a-field-inside-tuple2>, >>>>> with more details, please help, i am stuck with this issue and i don't >>>>> know >>>>> how to continue. >>>>> >>>>> Regards >>>>> >>>>> Alonso Isidoro Roman >>>>> [image: https://]about.me/alonso.isidoro.roman >>>>> >>>>> <https://about.me/alonso.isidoro.roman?promo=email_sig&utm_source=email_sig&utm_medium=email_sig&utm_campaign=external_links> >>>>> >>>>> ------------------------------ >>>>> View this message in context: about an exception when receiving data >>>>> from kafka topic using Direct mode of Spark Streaming >>>>> <http://apache-spark-user-list.1001560.n3.nabble.com/about-an-exception-when-receiving-data-from-kafka-topic-using-Direct-mode-of-Spark-Streaming-tp27022.html> >>>>> Sent from the Apache Spark User List mailing list archive >>>>> <http://apache-spark-user-list.1001560.n3.nabble.com/> at Nabble.com. >>>>> >>>> >>>> >>>> >>>> -- >>>> Matthias Niehoff | IT-Consultant | Agile Software Factory | Consulting >>>> codecentric AG | Zeppelinstr 2 | 76185 Karlsruhe | Deutschland >>>> tel: +49 (0) 721.9595-681 | fax: +49 (0) 721.9595-666 | mobil: +49 (0) >>>> 172.1702676 >>>> www.codecentric.de | blog.codecentric.de | www.meettheexperts.de | >>>> www.more4fi.de >>>> >>>> Sitz der Gesellschaft: Solingen | HRB 25917| Amtsgericht Wuppertal >>>> Vorstand: Michael Hochgürtel . Mirko Novakovic . Rainer Vehns >>>> Aufsichtsrat: Patric Fedlmeier (Vorsitzender) . Klaus Jäger . Jürgen >>>> Schütz >>>> >>>> Diese E-Mail einschließlich evtl. beigefügter Dateien enthält >>>> vertrauliche und/oder rechtlich geschützte Informationen. Wenn Sie nicht >>>> der richtige Adressat sind oder diese E-Mail irrtümlich erhalten haben, >>>> informieren Sie bitte sofort den Absender und löschen Sie diese E-Mail und >>>> evtl. beigefügter Dateien umgehend. Das unerlaubte Kopieren, Nutzen oder >>>> Öffnen evtl. beigefügter Dateien sowie die unbefugte Weitergabe dieser >>>> E-Mail ist nicht gestattet >>>> >>> >>> >> >