Thank you Cody, i will try to follow your advice.

Alonso Isidoro Roman
[image: https://]about.me/alonso.isidoro.roman
<https://about.me/alonso.isidoro.roman?promo=email_sig&utm_source=email_sig&utm_medium=email_sig&utm_campaign=external_links>

2016-05-26 17:00 GMT+02:00 Cody Koeninger <c...@koeninger.org>:

> Honestly given this thread, and the stack overflow thread, I'd say you
> need to back up, start very simply, and learn spark.  If for some reason
> the official docs aren't doing it for you, learning spark from oreilly is a
> good book.
>
> Given your specific question, why not just
>
> messages.foreachRDD { rdd =>
> rdd.foreachPartition { iterator =>
>   someWorkOnAnIterator(iterator)
>
>
> All of the other extraneous stuff you're doing doesn't make any sense to
> me.
>
>
>
> On Thu, May 26, 2016 at 2:48 AM, Alonso Isidoro Roman <alons...@gmail.com>
> wrote:
>
>> Hi Matthias and Cody,
>>
>> You can see in the code that StreamingContext.start() is called after the
>> messages.foreachRDD output action. Another problem @Cody is how can i avoid
>> the inner .foreachRDD(_.foreachPartition(it =>
>> recommender.predictWithALS(it.toSeq))) in order to invoke asynchronously
>> recommender.predictWithALS which runs a machine learning ALS implementation
>> with a message from the kafka topic?.
>>
>> In the actual code i am not using for now any code to save data within
>> the mongo instance, for now, it is more important to be focus in how to
>> receive the message from the kafka topic and feeding asynchronously the ALS
>> implementation. Probably the Recommender object will need the code for
>>  interact with the mongo instance.
>>
>> The idea of the process is to receive data from the kafka topic,
>> calculate its recommendations based on the incoming message and save the
>> results within a mongo instance. Is it possible?  Am i missing something
>> important?
>>
>> def main(args: Array[String]) {
>>     // Process program arguments and set properties
>>
>>     if (args.length < 2) {
>>       System.err.println("Usage: " + this.getClass.getSimpleName + "
>> <brokers> <topics>")
>>       System.exit(1)
>>     }
>>
>>     val Array(brokers, topics) = args
>>
>>     println("Initializing Streaming Spark Context and kafka connector...")
>>     // Create context with 2 second batch interval
>>     val sparkConf = new SparkConf().setAppName("AmazonKafkaConnector")
>>                                    .setMaster("local[4]")
>>
>> .set("spark.driver.allowMultipleContexts", "true")
>>
>>     val sc = new SparkContext(sparkConf)
>>
>> sc.addJar("target/scala-2.10/blog-spark-recommendation_2.10-1.0-SNAPSHOT.jar")
>>     val ssc = new StreamingContext(sparkConf, Seconds(2))
>>     //this checkpointdir should be in a conf file, for now it is
>> hardcoded!
>>     val streamingCheckpointDir =
>> "/Users/aironman/my-recommendation-spark-engine/checkpoint"
>>     ssc.checkpoint(streamingCheckpointDir)
>>
>>     // Create direct kafka stream with brokers and topics
>>     val topicsSet = topics.split(",").toSet
>>     val kafkaParams = Map[String, String]("metadata.broker.list" ->
>> brokers)
>>     val messages = KafkaUtils.createDirectStream[String, String,
>> StringDecoder, StringDecoder](ssc, kafkaParams, topicsSet)
>>     println("Initialized Streaming Spark Context and kafka connector...")
>>
>>     //create recomendation module
>>     println("Creating rating recommender module...")
>>     val ratingFile= "ratings.csv"
>>     val recommender = new Recommender(sc,ratingFile)
>>     println("Initialized rating recommender module...")
>>
>>     //i have to convert messages which is a InputDStream into a
>> Seq[AmazonRating]...
>>     try{
>>     messages.foreachRDD( rdd =>{
>>       val count = rdd.count()
>>       if (count > 0){
>>         //someMessages should be AmazonRating...
>>         val someMessages = rdd.take(count.toInt)
>>         println("<------>")
>>         println("someMessages is " + someMessages)
>>         someMessages.foreach(println)
>>         println("<------>")
>>         println("<---POSSIBLE SOLUTION--->")
>>
>>         messages
>>         .map { case (_, jsonRating) =>
>>           val jsValue = Json.parse(jsonRating)
>>           AmazonRating.amazonRatingFormat.reads(jsValue) match {
>>             case JsSuccess(rating, _) => rating
>>             case JsError(_) => AmazonRating.empty
>>           }
>>              }
>>         .filter(_ != AmazonRating.empty)
>>         //probably is not a good idea to do this...
>>         .foreachRDD(_.foreachPartition(it =>
>> recommender.predictWithALS(it.toSeq)))
>>
>>         println("<---POSSIBLE SOLUTION--->")
>>
>>       }
>>       }
>>     )
>>     }catch{
>>       case e: IllegalArgumentException => {println("illegal arg.
>> exception")};
>>       case e: IllegalStateException    => {println("illegal state
>> exception")};
>>       case e: ClassCastException       => {println("ClassCastException")};
>>       case e: Exception                => {println(" Generic Exception")};
>>     }finally{
>>
>>       println("Finished taking data from kafka topic...")
>>     }
>>
>>     //println("jsonParsed is " + jsonParsed)
>>     //The idea is to save results from Recommender.predict within
>> mongodb, so i will have to deal with this issue
>>     //after resolving the issue of
>> .foreachRDD(_.foreachPartition(recommender.predict(_.toSeq)))
>>
>>     *ssc.start()*
>>     ssc.awaitTermination()
>>
>>     println("Finished!")
>>   }
>> }
>>
>> Thank you for reading until here, please, i need your assistance.
>>
>> Regards
>>
>>
>> Alonso Isidoro Roman
>> [image: https://]about.me/alonso.isidoro.roman
>>
>> <https://about.me/alonso.isidoro.roman?promo=email_sig&utm_source=email_sig&utm_medium=email_sig&utm_campaign=external_links>
>>
>> 2016-05-25 17:33 GMT+02:00 Alonso Isidoro Roman <alons...@gmail.com>:
>>
>>> Hi Matthias and Cody, thanks for the answer. This is the code that is
>>> raising the runtime exception:
>>>
>>> try{
>>>     messages.foreachRDD( rdd =>{
>>>       val count = rdd.count()
>>>       if (count > 0){
>>>         //someMessages should be AmazonRating...
>>>         val someMessages = rdd.take(count.toInt)
>>>         println("<------>")
>>>         println("someMessages is " + someMessages)
>>>         someMessages.foreach(println)
>>>         println("<------>")
>>>         println("<---POSSIBLE SOLUTION--->")
>>>         messages
>>>         .map { case (_, jsonRating) =>
>>>           val jsValue = Json.parse(jsonRating)
>>>           AmazonRating.amazonRatingFormat.reads(jsValue) match {
>>>             case JsSuccess(rating, _) => rating
>>>             case JsError(_) => AmazonRating.empty
>>>           }
>>>              }
>>>         .filter(_ != AmazonRating.empty)
>>>         *//this line raises the runtime error, but if i comment it
>>> another different runtime exception happens!*
>>>         .foreachRDD(_.foreachPartition(it =>
>>> recommender.predictWithALS(it.toSeq)))
>>>         println("<---POSSIBLE SOLUTION--->")
>>>       }
>>>       }
>>>     )
>>>     }catch{
>>>       case e: IllegalArgumentException => {println("illegal arg.
>>> exception")};
>>>       case e: IllegalStateException    => {println("illegal state
>>> exception")};
>>>       case e: ClassCastException       =>
>>> {println("ClassCastException")};
>>>       case e: Exception                => {println(" Generic
>>> Exception")};
>>>     }finally{
>>>
>>>       println("Finished taking data from kafka topic...")
>>>     }
>>>
>>> If i comment the line with the second foreachRDD, the next runtime
>>> exception happens within a fresh start, i mean, the kafka producer push
>>> data within the topic:
>>>
>>> 16/05/25 17:26:12 ERROR JobScheduler: Error running job streaming job
>>> 1464189972000 ms.0
>>>
>>> org.apache.spark.SparkException: Job aborted due to stage failure: Task
>>> 0 in stage 0.0 failed 1 times, most recent failure: Lost task 0.0 in stage
>>> 0.0 (TID 0, localhost): java.lang.ClassCastException:
>>> org.apache.spark.util.SerializableConfiguration cannot be cast to [B
>>>
>>> at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:62)
>>>
>>> at org.apache.spark.scheduler.Task.run(Task.scala:89)
>>>
>>> at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:214)
>>>
>>> at
>>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
>>>
>>> at
>>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
>>>
>>> at java.lang.Thread.run(Thread.java:745)
>>>
>>> If i push another json data within the topic, the next exception happens:
>>>
>>> 16/05/25 17:27:16 INFO DAGScheduler: Job 1 finished: runJob at
>>> KafkaRDD.scala:98, took 0,039689 s
>>>
>>> <------>
>>>
>>> someMessages is [Lscala.Tuple2;@712ca120
>>>
>>> (null,{"userId":"someUserId","productId":"0981531679","rating":9.0})
>>>
>>> <------>
>>>
>>> <---POSSIBLE SOLUTION--->
>>>
>>> 16/05/25 17:27:16 INFO JobScheduler: Finished job streaming job
>>> 1464190036000 ms.0 from job set of time 1464190036000 ms
>>>
>>> 16/05/25 17:27:16 INFO JobScheduler: Total delay: 0,063 s for time
>>> 1464190036000 ms (execution: 0,055 s)
>>>
>>> 16/05/25 17:27:16 INFO KafkaRDD: Removing RDD 43 from persistence list
>>>
>>> 16/05/25 17:27:16 ERROR JobScheduler: Error running job streaming job
>>> 1464190036000 ms.0
>>>
>>> java.lang.IllegalStateException: Adding new inputs, transformations, and
>>> output operations after starting a context is not supported
>>>
>>> at
>>> org.apache.spark.streaming.dstream.DStream.validateAtInit(DStream.scala:222)
>>>
>>> at org.apache.spark.streaming.dstream.DStream.<init>(DStream.scala:64)
>>>
>>> at
>>> org.apache.spark.streaming.dstream.MappedDStream.<init>(MappedDStream.scala:25)
>>>
>>> at
>>> org.apache.spark.streaming.dstream.DStream$$anonfun$map$1.apply(DStream.scala:558)
>>>
>>> at
>>> org.apache.spark.streaming.dstream.DStream$$anonfun$map$1.apply(DStream.scala:558)
>>>
>>> at
>>> org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:150)
>>>
>>> at
>>> org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:111)
>>>
>>> at org.apache.spark.SparkContext.withScope(SparkContext.scala:714)
>>>
>>> at
>>> org.apache.spark.streaming.StreamingContext.withScope(StreamingContext.scala:260)
>>>
>>> at org.apache.spark.streaming.dstream.DStream.map(DStream.scala:557)
>>>
>>> at
>>> example.spark.AmazonKafkaConnector$$anonfun$main$1.apply(AmazonKafkaConnectorWithMongo.scala:125)
>>>
>>> at
>>> example.spark.AmazonKafkaConnector$$anonfun$main$1.apply(AmazonKafkaConnectorWithMongo.scala:114)
>>>
>>> at
>>> org.apache.spark.streaming.dstream.DStream$$anonfun$foreachRDD$1$$anonfun$apply$mcV$sp$3.apply(DStream.scala:661)
>>>
>>> at
>>> org.apache.spark.streaming.dstream.DStream$$anonfun$foreachRDD$1$$anonfun$apply$mcV$sp$3.apply(DStream.scala:661)
>>>
>>> at
>>> org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1$$anonfun$apply$mcV$sp$1.apply$mcV$sp(ForEachDStream.scala:50)
>>>
>>> at
>>> org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1$$anonfun$apply$mcV$sp$1.apply(ForEachDStream.scala:50)
>>>
>>> at
>>> org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1$$anonfun$apply$mcV$sp$1.apply(ForEachDStream.scala:50)
>>>
>>> at
>>> org.apache.spark.streaming.dstream.DStream.createRDDWithLocalProperties(DStream.scala:426)
>>>
>>> at
>>> org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply$mcV$sp(ForEachDStream.scala:49)
>>>
>>> at
>>> org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply(ForEachDStream.scala:49)
>>>
>>> at
>>> org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply(ForEachDStream.scala:49)
>>>
>>> at scala.util.Try$.apply(Try.scala:161)
>>>
>>> at org.apache.spark.streaming.scheduler.Job.run(Job.scala:39)
>>>
>>> at
>>> org.apache.spark.streaming.scheduler.JobScheduler$JobHandler$$anonfun$run$1.apply$mcV$sp(JobScheduler.scala:224)
>>>
>>> at
>>> org.apache.spark.streaming.scheduler.JobScheduler$JobHandler$$anonfun$run$1.apply(JobScheduler.scala:224)
>>>
>>> at
>>> org.apache.spark.streaming.scheduler.JobScheduler$JobHandler$$anonfun$run$1.apply(JobScheduler.scala:224)
>>>
>>> at scala.util.DynamicVariable.withValue(DynamicVariable.scala:57)
>>>
>>> at
>>> org.apache.spark.streaming.scheduler.JobScheduler$JobHandler.run(JobScheduler.scala:223)
>>>
>>> at
>>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
>>>
>>> at
>>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
>>> at java.lang.Thread.run(Thread.java:745)
>>>
>>> If i decomment the second foreachRDD, this exception happens within a
>>> fresh start of the spark streaming process after the kafka producer sends
>>> data to the topic:
>>>
>>> org.apache.spark.SparkException: Job aborted due to stage failure: Task
>>> 0 in stage 0.0 failed 1 times, most recent failure: Lost task 0.0 in stage
>>> 0.0 (TID 0, localhost): java.lang.ClassCastException:
>>> org.apache.spark.util.SerializableConfiguration cannot be cast to [B
>>>
>>> at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:62)
>>>
>>> at org.apache.spark.scheduler.Task.run(Task.scala:89)
>>>
>>> at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:214)
>>>
>>> at
>>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
>>>
>>> at
>>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
>>> at java.lang.Thread.run(Thread.java:745)
>>>
>>> With a second call of the kafka producer, rises this runtime exception:
>>>
>>> <------>
>>>
>>> someMessages is [Lscala.Tuple2;@581b83b
>>>
>>> (null,{"userId":"someUserId","productId":"0981531679","rating":8.0})
>>>
>>> <------>
>>>
>>> <---POSSIBLE SOLUTION--->
>>>
>>> 16/05/25 17:31:30 INFO JobScheduler: Finished job streaming job
>>> 1464190290000 ms.0 from job set of time 1464190290000 ms
>>>
>>> 16/05/25 17:31:30 INFO JobScheduler: Total delay: 0,066 s for time
>>> 1464190290000 ms (execution: 0,059 s)
>>>
>>> 16/05/25 17:31:30 INFO KafkaRDD: Removing RDD 37 from persistence list
>>>
>>> 16/05/25 17:31:30 ERROR JobScheduler: Error running job streaming job
>>> 1464190290000 ms.0
>>>
>>> java.lang.IllegalStateException: Adding new inputs, transformations, and
>>> output operations after starting a context is not supported
>>>
>>> at
>>> org.apache.spark.streaming.dstream.DStream.validateAtInit(DStream.scala:222)
>>>
>>> at org.apache.spark.streaming.dstream.DStream.<init>(DStream.scala:64)
>>>
>>> at
>>> org.apache.spark.streaming.dstream.MappedDStream.<init>(MappedDStream.scala:25)
>>>
>>> at
>>> org.apache.spark.streaming.dstream.DStream$$anonfun$map$1.apply(DStream.scala:558)
>>>
>>> at
>>> org.apache.spark.streaming.dstream.DStream$$anonfun$map$1.apply(DStream.scala:558)
>>>
>>> at
>>> org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:150)
>>>
>>> at
>>> org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:111)
>>>
>>> at org.apache.spark.SparkContext.withScope(SparkContext.scala:714)
>>>
>>> at
>>> org.apache.spark.streaming.StreamingContext.withScope(StreamingContext.scala:260)
>>>
>>> at org.apache.spark.streaming.dstream.DStream.map(DStream.scala:557)
>>>
>>> at
>>> example.spark.AmazonKafkaConnector$$anonfun$main$1.apply(AmazonKafkaConnectorWithMongo.scala:125)
>>>
>>> at
>>> example.spark.AmazonKafkaConnector$$anonfun$main$1.apply(AmazonKafkaConnectorWithMongo.scala:114)
>>>
>>> at
>>> org.apache.spark.streaming.dstream.DStream$$anonfun$foreachRDD$1$$anonfun$apply$mcV$sp$3.apply(DStream.scala:661)
>>>
>>> at
>>> org.apache.spark.streaming.dstream.DStream$$anonfun$foreachRDD$1$$anonfun$apply$mcV$sp$3.apply(DStream.scala:661)
>>>
>>> at
>>> org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1$$anonfun$apply$mcV$sp$1.apply$mcV$sp(ForEachDStream.scala:50)
>>>
>>> at
>>> org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1$$anonfun$apply$mcV$sp$1.apply(ForEachDStream.scala:50)
>>>
>>> at
>>> org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1$$anonfun$apply$mcV$sp$1.apply(ForEachDStream.scala:50)
>>>
>>> at
>>> org.apache.spark.streaming.dstream.DStream.createRDDWithLocalProperties(DStream.scala:426)
>>>
>>> at
>>> org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply$mcV$sp(ForEachDStream.scala:49)
>>>
>>> at
>>> org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply(ForEachDStream.scala:49)
>>>
>>> at
>>> org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply(ForEachDStream.scala:49)
>>>
>>> at scala.util.Try$.apply(Try.scala:161)
>>>
>>> at org.apache.spark.streaming.scheduler.Job.run(Job.scala:39)
>>>
>>> at
>>> org.apache.spark.streaming.scheduler.JobScheduler$JobHandler$$anonfun$run$1.apply$mcV$sp(JobScheduler.scala:224)
>>>
>>> at
>>> org.apache.spark.streaming.scheduler.JobScheduler$JobHandler$$anonfun$run$1.apply(JobScheduler.scala:224)
>>>
>>> at
>>> org.apache.spark.streaming.scheduler.JobScheduler$JobHandler$$anonfun$run$1.apply(JobScheduler.scala:224)
>>>
>>> at scala.util.DynamicVariable.withValue(DynamicVariable.scala:57)
>>>
>>> at
>>> org.apache.spark.streaming.scheduler.JobScheduler$JobHandler.run(JobScheduler.scala:223)
>>>
>>> at
>>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
>>>
>>> at
>>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
>>>
>>> at java.lang.Thread.run(Thread.java:745)
>>>
>>>
>>>
>>> In your opinion, what changes do i have to do in order to get this code
>>> up and running correctly?
>>>
>>> The idea is to run every rating message that i receive from kafka topic
>>> in order to run recommender.predictWithALS method and save results within a
>>> mongo instance. I was thinking that this kind of task should be
>>> asynchronous, wasn't he? if i am right, how should i change the method to
>>> do such that way?
>>>
>>> Recommender.predictWithALS method:
>>>
>>> def predictWithALS(ratings: Seq[AmazonRating]) = {
>>>     // train model
>>>     val myRatings = ratings.map(toSparkRating)
>>>     val myRatingRDD = sc.parallelize(myRatings)
>>>
>>>     val startAls = DateTime.now
>>>     val model = ALS.train((sparkRatings ++
>>> myRatingRDD).repartition(NumPartitions), 10, 20, 0.01)
>>>
>>>     val myProducts = myRatings.map(_.product).toSet
>>>     val candidates = sc.parallelize((0 until
>>> productDict.size).filterNot(myProducts.contains))
>>>
>>>     // get ratings of all products not in my history ordered by rating
>>> (higher first) and only keep the first NumRecommendations
>>>     val myUserId = userDict.getIndex(MyUsername)
>>>     val recommendations = model.predict(candidates.map((myUserId,
>>> _))).collect
>>>     val endAls = DateTime.now
>>>     val result =
>>> recommendations.sortBy(-_.rating).take(NumRecommendations).map(toAmazonRating)
>>>     val alsTime = Seconds.secondsBetween(startAls, endAls).getSeconds
>>>
>>>     println(s"ALS Time: $alsTime seconds")
>>>     result
>>>   }
>>>
>>> thank you very much
>>>
>>> Alonso Isidoro Roman
>>> [image: https://]about.me/alonso.isidoro.roman
>>>
>>> <https://about.me/alonso.isidoro.roman?promo=email_sig&utm_source=email_sig&utm_medium=email_sig&utm_campaign=external_links>
>>>
>>> 2016-05-25 16:54 GMT+02:00 Matthias Niehoff <
>>> matthias.nieh...@codecentric.de>:
>>>
>>>> Hi,
>>>>
>>>> you register some output actions (in this case foreachRDD) after
>>>> starting the streaming context. StreamingContext.start() has to be called
>>>> after all! output actions.
>>>>
>>>> 2016-05-25 15:59 GMT+02:00 Alonso <alons...@gmail.com>:
>>>>
>>>>> Hi, i am receiving this exception when direct spark streaming process
>>>>> tries to pull data from kafka topic:
>>>>>
>>>>> 16/05/25 11:30:30 INFO CheckpointWriter: Checkpoint for time
>>>>> 1464168630000 ms saved to file
>>>>> 'file:/Users/aironman/my-recommendation-spark-engine/checkpoint/checkpoint-1464168630000',
>>>>> took 5928 bytes and 8 ms
>>>>>
>>>>> 16/05/25 11:30:30 INFO Executor: Finished task 0.0 in stage 2.0 (TID 2). 
>>>>> 1041 bytes result sent to driver
>>>>> 16/05/25 11:30:30 INFO TaskSetManager: Finished task 0.0 in stage 2.0 
>>>>> (TID 2) in 4 ms on localhost (1/1)
>>>>> 16/05/25 11:30:30 INFO TaskSchedulerImpl: Removed TaskSet 2.0, whose 
>>>>> tasks have all completed, from pool
>>>>> 16/05/25 11:30:30 INFO DAGScheduler: ResultStage 2 (runJob at 
>>>>> KafkaRDD.scala:98) finished in 0,004 s
>>>>> 16/05/25 11:30:30 INFO DAGScheduler: Job 2 finished: runJob at 
>>>>> KafkaRDD.scala:98, took 0,008740 s
>>>>> <------>
>>>>> someMessages is [Lscala.Tuple2;@2641d687
>>>>> (null,{"userId":"someUserId","productId":"0981531679","rating":6.0})
>>>>> <------>
>>>>> <---POSSIBLE SOLUTION--->
>>>>> 16/05/25 11:30:30 INFO JobScheduler: Finished job streaming job 
>>>>> 1464168630000 ms.0 from job set of time 1464168630000 ms
>>>>> 16/05/25 11:30:30 INFO KafkaRDD: Removing RDD 105 from persistence list
>>>>> 16/05/25 11:30:30 INFO JobScheduler: Total delay: 0,020 s for time 
>>>>> 1464168630000 ms (execution: 0,012 s)
>>>>> 16/05/25 11:30:30 ERROR JobScheduler: Error running job streaming job 
>>>>> 1464168630000 ms.0*java.lang.IllegalStateException: Adding new inputs, 
>>>>> transformations, and output operations after starting a context is not 
>>>>> supported
>>>>>   at* 
>>>>> org.apache.spark.streaming.dstream.DStream.validateAtInit(DStream.scala:222)
>>>>>   at org.apache.spark.streaming.dstream.DStream.<init>(DStream.scala:64)
>>>>>   at 
>>>>> org.apache.spark.streaming.dstream.MappedDStream.<init>(MappedDStream.scala:25)
>>>>>   at 
>>>>> org.apache.spark.streaming.dstream.DStream$$anonfun$map$1.apply(DStream.scala:558)
>>>>>   at 
>>>>> org.apache.spark.streaming.dstream.DStream$$anonfun$map$1.apply(DStream.scala:558)
>>>>>   at 
>>>>> org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:150)
>>>>>   at 
>>>>> org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:111)
>>>>>   at org.apache.spark.SparkContext.withScope(SparkContext.scala:714)
>>>>>   at 
>>>>> org.apache.spark.streaming.StreamingContext.withScope(StreamingContext.scala:260)
>>>>>   at org.apache.spark.streaming.dstream.DStream.map(DStream.scala:557)
>>>>>   at 
>>>>> example.spark.AmazonKafkaConnector$$anonfun$main$1.apply(AmazonKafkaConnectorWithMongo.scala:125)
>>>>>   at 
>>>>> example.spark.AmazonKafkaConnector$$anonfun$main$1.apply(AmazonKafkaConnectorWithMongo.scala:114)
>>>>>   at 
>>>>> org.apache.spark.streaming.dstream.DStream$$anonfun$foreachRDD$1$$anonfun$apply$mcV$sp$3.apply(DStream.scala:661)
>>>>>   at 
>>>>> org.apache.spark.streaming.dstream.DStream$$anonfun$foreachRDD$1$$anonfun$apply$mcV$sp$3.apply(DStream.scala:661)
>>>>>   at 
>>>>> org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1$$anonfun$apply$mcV$sp$1.apply$mcV$sp(ForEachDStream.scala:50)
>>>>>   at 
>>>>> org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1$$anonfun$apply$mcV$sp$1.apply(ForEachDStream.scala:50)
>>>>>   at 
>>>>> org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1$$anonfun$apply$mcV$sp$1.apply(ForEachDStream.scala:50)
>>>>>   at 
>>>>> org.apache.spark.streaming.dstream.DStream.createRDDWithLocalProperties(DStream.scala:426)
>>>>>   at 
>>>>> org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply$mcV$sp(ForEachDStream.scala:49)
>>>>>   at 
>>>>> org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply(ForEachDStream.scala:49)
>>>>>   at 
>>>>> org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply(ForEachDStream.scala:49)
>>>>>   at scala.util.Try$.apply(Try.scala:161)
>>>>>   at org.apache.spark.streaming.scheduler.Job.run(Job.scala:39)
>>>>>   at 
>>>>> org.apache.spark.streaming.scheduler.JobScheduler$JobHandler$$anonfun$run$1.apply$mcV$sp(JobScheduler.scala:224)
>>>>>   at 
>>>>> org.apache.spark.streaming.scheduler.JobScheduler$JobHandler$$anonfun$run$1.apply(JobScheduler.scala:224)
>>>>>   at 
>>>>> org.apache.spark.streaming.scheduler.JobScheduler$JobHandler$$anonfun$run$1.apply(JobScheduler.scala:224)
>>>>>   at scala.util.DynamicVariable.withValue(DynamicVariable.scala:57)
>>>>>   at 
>>>>> org.apache.spark.streaming.scheduler.JobScheduler$JobHandler.run(JobScheduler.scala:223)
>>>>>   at 
>>>>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
>>>>>   at 
>>>>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
>>>>>   at java.lang.Thread.run(Thread.java:745)
>>>>> 16/05/25 11:30:30 INFO BlockManager: Removing RDD 105
>>>>>
>>>>>
>>>>> This is the code that rises the exception in the spark streaming
>>>>> process:
>>>>>
>>>>> try{
>>>>>     messages.foreachRDD( rdd =>{
>>>>>       val count = rdd.count()
>>>>>       if (count > 0){
>>>>>         //someMessages should be AmazonRating...
>>>>>         val someMessages = rdd.take(count.toInt)
>>>>>         println("<------>")
>>>>>         println("someMessages is " + someMessages)
>>>>>         someMessages.foreach(println)
>>>>>         println("<------>")
>>>>>         println("<---POSSIBLE SOLUTION--->")
>>>>>         messages
>>>>>         .map { case (_, jsonRating) =>
>>>>>           val jsValue = Json.parse(jsonRating)
>>>>>           AmazonRating.amazonRatingFormat.reads(jsValue) match {
>>>>>             case JsSuccess(rating, _) => rating
>>>>>             case JsError(_) => AmazonRating.empty
>>>>>           }
>>>>>              }
>>>>>         .filter(_ != AmazonRating.empty)
>>>>>         *//I think that this line provokes the runtime exception...*
>>>>> *        .foreachRDD(_.foreachPartition(it =>
>>>>> recommender.predictWithALS(it.toSeq)))*
>>>>>
>>>>>         println("<---POSSIBLE SOLUTION--->")
>>>>>
>>>>>       }
>>>>>       }
>>>>>     )
>>>>>     }catch{
>>>>>       case e: IllegalArgumentException => {println("illegal arg.
>>>>> exception")};
>>>>>       case e: IllegalStateException    => {println("illegal state
>>>>> exception")};
>>>>>       case e: ClassCastException       =>
>>>>> {println("ClassCastException")};
>>>>>       case e: Exception                => {println(" Generic
>>>>> Exception")};
>>>>>     }finally{
>>>>>
>>>>>       println("Finished taking data from kafka topic...")
>>>>>     }
>>>>>
>>>>> Recommender object:
>>>>>
>>>>> *def predictWithALS(ratings: Seq[AmazonRating])* = {
>>>>>     // train model
>>>>>     val myRatings = ratings.map(toSparkRating)
>>>>>     val myRatingRDD = sc.parallelize(myRatings)
>>>>>
>>>>>     val startAls = DateTime.now
>>>>>     val model = ALS.train((sparkRatings ++
>>>>> myRatingRDD).repartition(NumPartitions), 10, 20, 0.01)
>>>>>
>>>>>     val myProducts = myRatings.map(_.product).toSet
>>>>>     val candidates = sc.parallelize((0 until
>>>>> productDict.size).filterNot(myProducts.contains))
>>>>>
>>>>>     // get ratings of all products not in my history ordered by rating
>>>>> (higher first) and only keep the first NumRecommendations
>>>>>     val myUserId = userDict.getIndex(MyUsername)
>>>>>     val recommendations = model.predict(candidates.map((myUserId,
>>>>> _))).collect
>>>>>     val endAls = DateTime.now
>>>>>     val result =
>>>>> recommendations.sortBy(-_.rating).take(NumRecommendations).map(toAmazonRating)
>>>>>     val alsTime = Seconds.secondsBetween(startAls, endAls).getSeconds
>>>>>
>>>>>     println(s"ALS Time: $alsTime seconds")
>>>>>     result
>>>>>   }
>>>>> }
>>>>>
>>>>> And this is the kafka producer that push the json data within the
>>>>> topic:
>>>>>
>>>>> object AmazonProducerExample {
>>>>>   def main(args: Array[String]): Unit = {
>>>>>
>>>>>     val productId = args(0).toString
>>>>>     val userId = args(1).toString
>>>>>     val rating = args(2).toDouble
>>>>>     val topicName = "amazonRatingsTopic"
>>>>>
>>>>>     val producer = Producer[String](topicName)
>>>>>
>>>>>     //0981531679 is Scala Puzzlers...
>>>>>     //AmazonProductAndRating
>>>>>     AmazonPageParser.parse(productId,userId,rating).onSuccess { case
>>>>> amazonRating =>
>>>>>       //Is this the correct way? the best performance? possibly not,
>>>>> what about using avro or parquet?
>>>>>       producer.send(Json.toJson(amazonRating).toString)
>>>>>       //producer.send(amazonRating)
>>>>>       println("amazon product with rating sent to kafka cluster..." +
>>>>> amazonRating.toString)
>>>>>       System.exit(0)
>>>>>     }
>>>>>
>>>>>   }
>>>>> }
>>>>>
>>>>>
>>>>> I have written a stack overflow post
>>>>> <http://stackoverflow.com/questions/37303202/about-an-error-accessing-a-field-inside-tuple2>,
>>>>> with more details, please help, i am stuck with this issue and i don't 
>>>>> know
>>>>> how to continue.
>>>>>
>>>>> Regards
>>>>>
>>>>> Alonso Isidoro Roman
>>>>> [image: https://]about.me/alonso.isidoro.roman
>>>>>
>>>>> <https://about.me/alonso.isidoro.roman?promo=email_sig&utm_source=email_sig&utm_medium=email_sig&utm_campaign=external_links>
>>>>>
>>>>> ------------------------------
>>>>> View this message in context: about an exception when receiving data
>>>>> from kafka topic using Direct mode of Spark Streaming
>>>>> <http://apache-spark-user-list.1001560.n3.nabble.com/about-an-exception-when-receiving-data-from-kafka-topic-using-Direct-mode-of-Spark-Streaming-tp27022.html>
>>>>> Sent from the Apache Spark User List mailing list archive
>>>>> <http://apache-spark-user-list.1001560.n3.nabble.com/> at Nabble.com.
>>>>>
>>>>
>>>>
>>>>
>>>> --
>>>> Matthias Niehoff | IT-Consultant | Agile Software Factory  | Consulting
>>>> codecentric AG | Zeppelinstr 2 | 76185 Karlsruhe | Deutschland
>>>> tel: +49 (0) 721.9595-681 | fax: +49 (0) 721.9595-666 | mobil: +49 (0)
>>>> 172.1702676
>>>> www.codecentric.de | blog.codecentric.de | www.meettheexperts.de |
>>>> www.more4fi.de
>>>>
>>>> Sitz der Gesellschaft: Solingen | HRB 25917| Amtsgericht Wuppertal
>>>> Vorstand: Michael Hochgürtel . Mirko Novakovic . Rainer Vehns
>>>> Aufsichtsrat: Patric Fedlmeier (Vorsitzender) . Klaus Jäger . Jürgen
>>>> Schütz
>>>>
>>>> Diese E-Mail einschließlich evtl. beigefügter Dateien enthält
>>>> vertrauliche und/oder rechtlich geschützte Informationen. Wenn Sie nicht
>>>> der richtige Adressat sind oder diese E-Mail irrtümlich erhalten haben,
>>>> informieren Sie bitte sofort den Absender und löschen Sie diese E-Mail und
>>>> evtl. beigefügter Dateien umgehend. Das unerlaubte Kopieren, Nutzen oder
>>>> Öffnen evtl. beigefügter Dateien sowie die unbefugte Weitergabe dieser
>>>> E-Mail ist nicht gestattet
>>>>
>>>
>>>
>>
>

Reply via email to