Hi Matthias and Cody,

You can see in the code that StreamingContext.start() is called after the
messages.foreachRDD output action. Another problem @Cody is how can i avoid
the inner .foreachRDD(_.foreachPartition(it =>
recommender.predictWithALS(it.toSeq))) in order to invoke asynchronously
recommender.predictWithALS which runs a machine learning ALS implementation
with a message from the kafka topic?.

In the actual code i am not using for now any code to save data within the
mongo instance, for now, it is more important to be focus in how to receive
the message from the kafka topic and feeding asynchronously the ALS
implementation. Probably the Recommender object will need the code for
 interact with the mongo instance.

The idea of the process is to receive data from the kafka topic, calculate
its recommendations based on the incoming message and save the results
within a mongo instance. Is it possible?  Am i missing something important?

def main(args: Array[String]) {
    // Process program arguments and set properties

    if (args.length < 2) {
      System.err.println("Usage: " + this.getClass.getSimpleName + "
<brokers> <topics>")
      System.exit(1)
    }

    val Array(brokers, topics) = args

    println("Initializing Streaming Spark Context and kafka connector...")
    // Create context with 2 second batch interval
    val sparkConf = new SparkConf().setAppName("AmazonKafkaConnector")
                                   .setMaster("local[4]")

.set("spark.driver.allowMultipleContexts", "true")

    val sc = new SparkContext(sparkConf)

sc.addJar("target/scala-2.10/blog-spark-recommendation_2.10-1.0-SNAPSHOT.jar")
    val ssc = new StreamingContext(sparkConf, Seconds(2))
    //this checkpointdir should be in a conf file, for now it is hardcoded!
    val streamingCheckpointDir =
"/Users/aironman/my-recommendation-spark-engine/checkpoint"
    ssc.checkpoint(streamingCheckpointDir)

    // Create direct kafka stream with brokers and topics
    val topicsSet = topics.split(",").toSet
    val kafkaParams = Map[String, String]("metadata.broker.list" -> brokers)
    val messages = KafkaUtils.createDirectStream[String, String,
StringDecoder, StringDecoder](ssc, kafkaParams, topicsSet)
    println("Initialized Streaming Spark Context and kafka connector...")

    //create recomendation module
    println("Creating rating recommender module...")
    val ratingFile= "ratings.csv"
    val recommender = new Recommender(sc,ratingFile)
    println("Initialized rating recommender module...")

    //i have to convert messages which is a InputDStream into a
Seq[AmazonRating]...
    try{
    messages.foreachRDD( rdd =>{
      val count = rdd.count()
      if (count > 0){
        //someMessages should be AmazonRating...
        val someMessages = rdd.take(count.toInt)
        println("<------>")
        println("someMessages is " + someMessages)
        someMessages.foreach(println)
        println("<------>")
        println("<---POSSIBLE SOLUTION--->")

        messages
        .map { case (_, jsonRating) =>
          val jsValue = Json.parse(jsonRating)
          AmazonRating.amazonRatingFormat.reads(jsValue) match {
            case JsSuccess(rating, _) => rating
            case JsError(_) => AmazonRating.empty
          }
             }
        .filter(_ != AmazonRating.empty)
        //probably is not a good idea to do this...
        .foreachRDD(_.foreachPartition(it =>
recommender.predictWithALS(it.toSeq)))

        println("<---POSSIBLE SOLUTION--->")

      }
      }
    )
    }catch{
      case e: IllegalArgumentException => {println("illegal arg.
exception")};
      case e: IllegalStateException    => {println("illegal state
exception")};
      case e: ClassCastException       => {println("ClassCastException")};
      case e: Exception                => {println(" Generic Exception")};
    }finally{

      println("Finished taking data from kafka topic...")
    }

    //println("jsonParsed is " + jsonParsed)
    //The idea is to save results from Recommender.predict within mongodb,
so i will have to deal with this issue
    //after resolving the issue of
.foreachRDD(_.foreachPartition(recommender.predict(_.toSeq)))

    *ssc.start()*
    ssc.awaitTermination()

    println("Finished!")
  }
}

Thank you for reading until here, please, i need your assistance.

Regards


Alonso Isidoro Roman
[image: https://]about.me/alonso.isidoro.roman
<https://about.me/alonso.isidoro.roman?promo=email_sig&utm_source=email_sig&utm_medium=email_sig&utm_campaign=external_links>

2016-05-25 17:33 GMT+02:00 Alonso Isidoro Roman <alons...@gmail.com>:

> Hi Matthias and Cody, thanks for the answer. This is the code that is
> raising the runtime exception:
>
> try{
>     messages.foreachRDD( rdd =>{
>       val count = rdd.count()
>       if (count > 0){
>         //someMessages should be AmazonRating...
>         val someMessages = rdd.take(count.toInt)
>         println("<------>")
>         println("someMessages is " + someMessages)
>         someMessages.foreach(println)
>         println("<------>")
>         println("<---POSSIBLE SOLUTION--->")
>         messages
>         .map { case (_, jsonRating) =>
>           val jsValue = Json.parse(jsonRating)
>           AmazonRating.amazonRatingFormat.reads(jsValue) match {
>             case JsSuccess(rating, _) => rating
>             case JsError(_) => AmazonRating.empty
>           }
>              }
>         .filter(_ != AmazonRating.empty)
>         *//this line raises the runtime error, but if i comment it
> another different runtime exception happens!*
>         .foreachRDD(_.foreachPartition(it =>
> recommender.predictWithALS(it.toSeq)))
>         println("<---POSSIBLE SOLUTION--->")
>       }
>       }
>     )
>     }catch{
>       case e: IllegalArgumentException => {println("illegal arg.
> exception")};
>       case e: IllegalStateException    => {println("illegal state
> exception")};
>       case e: ClassCastException       => {println("ClassCastException")};
>       case e: Exception                => {println(" Generic Exception")};
>     }finally{
>
>       println("Finished taking data from kafka topic...")
>     }
>
> If i comment the line with the second foreachRDD, the next runtime
> exception happens within a fresh start, i mean, the kafka producer push
> data within the topic:
>
> 16/05/25 17:26:12 ERROR JobScheduler: Error running job streaming job
> 1464189972000 ms.0
>
> org.apache.spark.SparkException: Job aborted due to stage failure: Task 0
> in stage 0.0 failed 1 times, most recent failure: Lost task 0.0 in stage
> 0.0 (TID 0, localhost): java.lang.ClassCastException:
> org.apache.spark.util.SerializableConfiguration cannot be cast to [B
>
> at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:62)
>
> at org.apache.spark.scheduler.Task.run(Task.scala:89)
>
> at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:214)
>
> at
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
>
> at
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
>
> at java.lang.Thread.run(Thread.java:745)
>
> If i push another json data within the topic, the next exception happens:
>
> 16/05/25 17:27:16 INFO DAGScheduler: Job 1 finished: runJob at
> KafkaRDD.scala:98, took 0,039689 s
>
> <------>
>
> someMessages is [Lscala.Tuple2;@712ca120
>
> (null,{"userId":"someUserId","productId":"0981531679","rating":9.0})
>
> <------>
>
> <---POSSIBLE SOLUTION--->
>
> 16/05/25 17:27:16 INFO JobScheduler: Finished job streaming job
> 1464190036000 ms.0 from job set of time 1464190036000 ms
>
> 16/05/25 17:27:16 INFO JobScheduler: Total delay: 0,063 s for time
> 1464190036000 ms (execution: 0,055 s)
>
> 16/05/25 17:27:16 INFO KafkaRDD: Removing RDD 43 from persistence list
>
> 16/05/25 17:27:16 ERROR JobScheduler: Error running job streaming job
> 1464190036000 ms.0
>
> java.lang.IllegalStateException: Adding new inputs, transformations, and
> output operations after starting a context is not supported
>
> at
> org.apache.spark.streaming.dstream.DStream.validateAtInit(DStream.scala:222)
>
> at org.apache.spark.streaming.dstream.DStream.<init>(DStream.scala:64)
>
> at
> org.apache.spark.streaming.dstream.MappedDStream.<init>(MappedDStream.scala:25)
>
> at
> org.apache.spark.streaming.dstream.DStream$$anonfun$map$1.apply(DStream.scala:558)
>
> at
> org.apache.spark.streaming.dstream.DStream$$anonfun$map$1.apply(DStream.scala:558)
>
> at
> org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:150)
>
> at
> org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:111)
>
> at org.apache.spark.SparkContext.withScope(SparkContext.scala:714)
>
> at
> org.apache.spark.streaming.StreamingContext.withScope(StreamingContext.scala:260)
>
> at org.apache.spark.streaming.dstream.DStream.map(DStream.scala:557)
>
> at
> example.spark.AmazonKafkaConnector$$anonfun$main$1.apply(AmazonKafkaConnectorWithMongo.scala:125)
>
> at
> example.spark.AmazonKafkaConnector$$anonfun$main$1.apply(AmazonKafkaConnectorWithMongo.scala:114)
>
> at
> org.apache.spark.streaming.dstream.DStream$$anonfun$foreachRDD$1$$anonfun$apply$mcV$sp$3.apply(DStream.scala:661)
>
> at
> org.apache.spark.streaming.dstream.DStream$$anonfun$foreachRDD$1$$anonfun$apply$mcV$sp$3.apply(DStream.scala:661)
>
> at
> org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1$$anonfun$apply$mcV$sp$1.apply$mcV$sp(ForEachDStream.scala:50)
>
> at
> org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1$$anonfun$apply$mcV$sp$1.apply(ForEachDStream.scala:50)
>
> at
> org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1$$anonfun$apply$mcV$sp$1.apply(ForEachDStream.scala:50)
>
> at
> org.apache.spark.streaming.dstream.DStream.createRDDWithLocalProperties(DStream.scala:426)
>
> at
> org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply$mcV$sp(ForEachDStream.scala:49)
>
> at
> org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply(ForEachDStream.scala:49)
>
> at
> org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply(ForEachDStream.scala:49)
>
> at scala.util.Try$.apply(Try.scala:161)
>
> at org.apache.spark.streaming.scheduler.Job.run(Job.scala:39)
>
> at
> org.apache.spark.streaming.scheduler.JobScheduler$JobHandler$$anonfun$run$1.apply$mcV$sp(JobScheduler.scala:224)
>
> at
> org.apache.spark.streaming.scheduler.JobScheduler$JobHandler$$anonfun$run$1.apply(JobScheduler.scala:224)
>
> at
> org.apache.spark.streaming.scheduler.JobScheduler$JobHandler$$anonfun$run$1.apply(JobScheduler.scala:224)
>
> at scala.util.DynamicVariable.withValue(DynamicVariable.scala:57)
>
> at
> org.apache.spark.streaming.scheduler.JobScheduler$JobHandler.run(JobScheduler.scala:223)
>
> at
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
>
> at
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
> at java.lang.Thread.run(Thread.java:745)
>
> If i decomment the second foreachRDD, this exception happens within a
> fresh start of the spark streaming process after the kafka producer sends
> data to the topic:
>
> org.apache.spark.SparkException: Job aborted due to stage failure: Task 0
> in stage 0.0 failed 1 times, most recent failure: Lost task 0.0 in stage
> 0.0 (TID 0, localhost): java.lang.ClassCastException:
> org.apache.spark.util.SerializableConfiguration cannot be cast to [B
>
> at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:62)
>
> at org.apache.spark.scheduler.Task.run(Task.scala:89)
>
> at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:214)
>
> at
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
>
> at
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
> at java.lang.Thread.run(Thread.java:745)
>
> With a second call of the kafka producer, rises this runtime exception:
>
> <------>
>
> someMessages is [Lscala.Tuple2;@581b83b
>
> (null,{"userId":"someUserId","productId":"0981531679","rating":8.0})
>
> <------>
>
> <---POSSIBLE SOLUTION--->
>
> 16/05/25 17:31:30 INFO JobScheduler: Finished job streaming job
> 1464190290000 ms.0 from job set of time 1464190290000 ms
>
> 16/05/25 17:31:30 INFO JobScheduler: Total delay: 0,066 s for time
> 1464190290000 ms (execution: 0,059 s)
>
> 16/05/25 17:31:30 INFO KafkaRDD: Removing RDD 37 from persistence list
>
> 16/05/25 17:31:30 ERROR JobScheduler: Error running job streaming job
> 1464190290000 ms.0
>
> java.lang.IllegalStateException: Adding new inputs, transformations, and
> output operations after starting a context is not supported
>
> at
> org.apache.spark.streaming.dstream.DStream.validateAtInit(DStream.scala:222)
>
> at org.apache.spark.streaming.dstream.DStream.<init>(DStream.scala:64)
>
> at
> org.apache.spark.streaming.dstream.MappedDStream.<init>(MappedDStream.scala:25)
>
> at
> org.apache.spark.streaming.dstream.DStream$$anonfun$map$1.apply(DStream.scala:558)
>
> at
> org.apache.spark.streaming.dstream.DStream$$anonfun$map$1.apply(DStream.scala:558)
>
> at
> org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:150)
>
> at
> org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:111)
>
> at org.apache.spark.SparkContext.withScope(SparkContext.scala:714)
>
> at
> org.apache.spark.streaming.StreamingContext.withScope(StreamingContext.scala:260)
>
> at org.apache.spark.streaming.dstream.DStream.map(DStream.scala:557)
>
> at
> example.spark.AmazonKafkaConnector$$anonfun$main$1.apply(AmazonKafkaConnectorWithMongo.scala:125)
>
> at
> example.spark.AmazonKafkaConnector$$anonfun$main$1.apply(AmazonKafkaConnectorWithMongo.scala:114)
>
> at
> org.apache.spark.streaming.dstream.DStream$$anonfun$foreachRDD$1$$anonfun$apply$mcV$sp$3.apply(DStream.scala:661)
>
> at
> org.apache.spark.streaming.dstream.DStream$$anonfun$foreachRDD$1$$anonfun$apply$mcV$sp$3.apply(DStream.scala:661)
>
> at
> org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1$$anonfun$apply$mcV$sp$1.apply$mcV$sp(ForEachDStream.scala:50)
>
> at
> org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1$$anonfun$apply$mcV$sp$1.apply(ForEachDStream.scala:50)
>
> at
> org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1$$anonfun$apply$mcV$sp$1.apply(ForEachDStream.scala:50)
>
> at
> org.apache.spark.streaming.dstream.DStream.createRDDWithLocalProperties(DStream.scala:426)
>
> at
> org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply$mcV$sp(ForEachDStream.scala:49)
>
> at
> org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply(ForEachDStream.scala:49)
>
> at
> org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply(ForEachDStream.scala:49)
>
> at scala.util.Try$.apply(Try.scala:161)
>
> at org.apache.spark.streaming.scheduler.Job.run(Job.scala:39)
>
> at
> org.apache.spark.streaming.scheduler.JobScheduler$JobHandler$$anonfun$run$1.apply$mcV$sp(JobScheduler.scala:224)
>
> at
> org.apache.spark.streaming.scheduler.JobScheduler$JobHandler$$anonfun$run$1.apply(JobScheduler.scala:224)
>
> at
> org.apache.spark.streaming.scheduler.JobScheduler$JobHandler$$anonfun$run$1.apply(JobScheduler.scala:224)
>
> at scala.util.DynamicVariable.withValue(DynamicVariable.scala:57)
>
> at
> org.apache.spark.streaming.scheduler.JobScheduler$JobHandler.run(JobScheduler.scala:223)
>
> at
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
>
> at
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
>
> at java.lang.Thread.run(Thread.java:745)
>
>
>
> In your opinion, what changes do i have to do in order to get this code up
> and running correctly?
>
> The idea is to run every rating message that i receive from kafka topic in
> order to run recommender.predictWithALS method and save results within a
> mongo instance. I was thinking that this kind of task should be
> asynchronous, wasn't he? if i am right, how should i change the method to
> do such that way?
>
> Recommender.predictWithALS method:
>
> def predictWithALS(ratings: Seq[AmazonRating]) = {
>     // train model
>     val myRatings = ratings.map(toSparkRating)
>     val myRatingRDD = sc.parallelize(myRatings)
>
>     val startAls = DateTime.now
>     val model = ALS.train((sparkRatings ++
> myRatingRDD).repartition(NumPartitions), 10, 20, 0.01)
>
>     val myProducts = myRatings.map(_.product).toSet
>     val candidates = sc.parallelize((0 until
> productDict.size).filterNot(myProducts.contains))
>
>     // get ratings of all products not in my history ordered by rating
> (higher first) and only keep the first NumRecommendations
>     val myUserId = userDict.getIndex(MyUsername)
>     val recommendations = model.predict(candidates.map((myUserId,
> _))).collect
>     val endAls = DateTime.now
>     val result =
> recommendations.sortBy(-_.rating).take(NumRecommendations).map(toAmazonRating)
>     val alsTime = Seconds.secondsBetween(startAls, endAls).getSeconds
>
>     println(s"ALS Time: $alsTime seconds")
>     result
>   }
>
> thank you very much
>
> Alonso Isidoro Roman
> [image: https://]about.me/alonso.isidoro.roman
>
> <https://about.me/alonso.isidoro.roman?promo=email_sig&utm_source=email_sig&utm_medium=email_sig&utm_campaign=external_links>
>
> 2016-05-25 16:54 GMT+02:00 Matthias Niehoff <
> matthias.nieh...@codecentric.de>:
>
>> Hi,
>>
>> you register some output actions (in this case foreachRDD) after starting
>> the streaming context. StreamingContext.start() has to be called after all!
>> output actions.
>>
>> 2016-05-25 15:59 GMT+02:00 Alonso <alons...@gmail.com>:
>>
>>> Hi, i am receiving this exception when direct spark streaming process
>>> tries to pull data from kafka topic:
>>>
>>> 16/05/25 11:30:30 INFO CheckpointWriter: Checkpoint for time
>>> 1464168630000 ms saved to file
>>> 'file:/Users/aironman/my-recommendation-spark-engine/checkpoint/checkpoint-1464168630000',
>>> took 5928 bytes and 8 ms
>>>
>>> 16/05/25 11:30:30 INFO Executor: Finished task 0.0 in stage 2.0 (TID 2). 
>>> 1041 bytes result sent to driver
>>> 16/05/25 11:30:30 INFO TaskSetManager: Finished task 0.0 in stage 2.0 (TID 
>>> 2) in 4 ms on localhost (1/1)
>>> 16/05/25 11:30:30 INFO TaskSchedulerImpl: Removed TaskSet 2.0, whose tasks 
>>> have all completed, from pool
>>> 16/05/25 11:30:30 INFO DAGScheduler: ResultStage 2 (runJob at 
>>> KafkaRDD.scala:98) finished in 0,004 s
>>> 16/05/25 11:30:30 INFO DAGScheduler: Job 2 finished: runJob at 
>>> KafkaRDD.scala:98, took 0,008740 s
>>> <------>
>>> someMessages is [Lscala.Tuple2;@2641d687
>>> (null,{"userId":"someUserId","productId":"0981531679","rating":6.0})
>>> <------>
>>> <---POSSIBLE SOLUTION--->
>>> 16/05/25 11:30:30 INFO JobScheduler: Finished job streaming job 
>>> 1464168630000 ms.0 from job set of time 1464168630000 ms
>>> 16/05/25 11:30:30 INFO KafkaRDD: Removing RDD 105 from persistence list
>>> 16/05/25 11:30:30 INFO JobScheduler: Total delay: 0,020 s for time 
>>> 1464168630000 ms (execution: 0,012 s)
>>> 16/05/25 11:30:30 ERROR JobScheduler: Error running job streaming job 
>>> 1464168630000 ms.0*java.lang.IllegalStateException: Adding new inputs, 
>>> transformations, and output operations after starting a context is not 
>>> supported
>>>     at* 
>>> org.apache.spark.streaming.dstream.DStream.validateAtInit(DStream.scala:222)
>>>     at org.apache.spark.streaming.dstream.DStream.<init>(DStream.scala:64)
>>>     at 
>>> org.apache.spark.streaming.dstream.MappedDStream.<init>(MappedDStream.scala:25)
>>>     at 
>>> org.apache.spark.streaming.dstream.DStream$$anonfun$map$1.apply(DStream.scala:558)
>>>     at 
>>> org.apache.spark.streaming.dstream.DStream$$anonfun$map$1.apply(DStream.scala:558)
>>>     at 
>>> org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:150)
>>>     at 
>>> org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:111)
>>>     at org.apache.spark.SparkContext.withScope(SparkContext.scala:714)
>>>     at 
>>> org.apache.spark.streaming.StreamingContext.withScope(StreamingContext.scala:260)
>>>     at org.apache.spark.streaming.dstream.DStream.map(DStream.scala:557)
>>>     at 
>>> example.spark.AmazonKafkaConnector$$anonfun$main$1.apply(AmazonKafkaConnectorWithMongo.scala:125)
>>>     at 
>>> example.spark.AmazonKafkaConnector$$anonfun$main$1.apply(AmazonKafkaConnectorWithMongo.scala:114)
>>>     at 
>>> org.apache.spark.streaming.dstream.DStream$$anonfun$foreachRDD$1$$anonfun$apply$mcV$sp$3.apply(DStream.scala:661)
>>>     at 
>>> org.apache.spark.streaming.dstream.DStream$$anonfun$foreachRDD$1$$anonfun$apply$mcV$sp$3.apply(DStream.scala:661)
>>>     at 
>>> org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1$$anonfun$apply$mcV$sp$1.apply$mcV$sp(ForEachDStream.scala:50)
>>>     at 
>>> org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1$$anonfun$apply$mcV$sp$1.apply(ForEachDStream.scala:50)
>>>     at 
>>> org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1$$anonfun$apply$mcV$sp$1.apply(ForEachDStream.scala:50)
>>>     at 
>>> org.apache.spark.streaming.dstream.DStream.createRDDWithLocalProperties(DStream.scala:426)
>>>     at 
>>> org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply$mcV$sp(ForEachDStream.scala:49)
>>>     at 
>>> org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply(ForEachDStream.scala:49)
>>>     at 
>>> org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply(ForEachDStream.scala:49)
>>>     at scala.util.Try$.apply(Try.scala:161)
>>>     at org.apache.spark.streaming.scheduler.Job.run(Job.scala:39)
>>>     at 
>>> org.apache.spark.streaming.scheduler.JobScheduler$JobHandler$$anonfun$run$1.apply$mcV$sp(JobScheduler.scala:224)
>>>     at 
>>> org.apache.spark.streaming.scheduler.JobScheduler$JobHandler$$anonfun$run$1.apply(JobScheduler.scala:224)
>>>     at 
>>> org.apache.spark.streaming.scheduler.JobScheduler$JobHandler$$anonfun$run$1.apply(JobScheduler.scala:224)
>>>     at scala.util.DynamicVariable.withValue(DynamicVariable.scala:57)
>>>     at 
>>> org.apache.spark.streaming.scheduler.JobScheduler$JobHandler.run(JobScheduler.scala:223)
>>>     at 
>>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
>>>     at 
>>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
>>>     at java.lang.Thread.run(Thread.java:745)
>>> 16/05/25 11:30:30 INFO BlockManager: Removing RDD 105
>>>
>>>
>>> This is the code that rises the exception in the spark streaming process:
>>>
>>> try{
>>>     messages.foreachRDD( rdd =>{
>>>       val count = rdd.count()
>>>       if (count > 0){
>>>         //someMessages should be AmazonRating...
>>>         val someMessages = rdd.take(count.toInt)
>>>         println("<------>")
>>>         println("someMessages is " + someMessages)
>>>         someMessages.foreach(println)
>>>         println("<------>")
>>>         println("<---POSSIBLE SOLUTION--->")
>>>         messages
>>>         .map { case (_, jsonRating) =>
>>>           val jsValue = Json.parse(jsonRating)
>>>           AmazonRating.amazonRatingFormat.reads(jsValue) match {
>>>             case JsSuccess(rating, _) => rating
>>>             case JsError(_) => AmazonRating.empty
>>>           }
>>>              }
>>>         .filter(_ != AmazonRating.empty)
>>>         *//I think that this line provokes the runtime exception...*
>>> *        .foreachRDD(_.foreachPartition(it =>
>>> recommender.predictWithALS(it.toSeq)))*
>>>
>>>         println("<---POSSIBLE SOLUTION--->")
>>>
>>>       }
>>>       }
>>>     )
>>>     }catch{
>>>       case e: IllegalArgumentException => {println("illegal arg.
>>> exception")};
>>>       case e: IllegalStateException    => {println("illegal state
>>> exception")};
>>>       case e: ClassCastException       =>
>>> {println("ClassCastException")};
>>>       case e: Exception                => {println(" Generic
>>> Exception")};
>>>     }finally{
>>>
>>>       println("Finished taking data from kafka topic...")
>>>     }
>>>
>>> Recommender object:
>>>
>>> *def predictWithALS(ratings: Seq[AmazonRating])* = {
>>>     // train model
>>>     val myRatings = ratings.map(toSparkRating)
>>>     val myRatingRDD = sc.parallelize(myRatings)
>>>
>>>     val startAls = DateTime.now
>>>     val model = ALS.train((sparkRatings ++
>>> myRatingRDD).repartition(NumPartitions), 10, 20, 0.01)
>>>
>>>     val myProducts = myRatings.map(_.product).toSet
>>>     val candidates = sc.parallelize((0 until
>>> productDict.size).filterNot(myProducts.contains))
>>>
>>>     // get ratings of all products not in my history ordered by rating
>>> (higher first) and only keep the first NumRecommendations
>>>     val myUserId = userDict.getIndex(MyUsername)
>>>     val recommendations = model.predict(candidates.map((myUserId,
>>> _))).collect
>>>     val endAls = DateTime.now
>>>     val result =
>>> recommendations.sortBy(-_.rating).take(NumRecommendations).map(toAmazonRating)
>>>     val alsTime = Seconds.secondsBetween(startAls, endAls).getSeconds
>>>
>>>     println(s"ALS Time: $alsTime seconds")
>>>     result
>>>   }
>>> }
>>>
>>> And this is the kafka producer that push the json data within the topic:
>>>
>>> object AmazonProducerExample {
>>>   def main(args: Array[String]): Unit = {
>>>
>>>     val productId = args(0).toString
>>>     val userId = args(1).toString
>>>     val rating = args(2).toDouble
>>>     val topicName = "amazonRatingsTopic"
>>>
>>>     val producer = Producer[String](topicName)
>>>
>>>     //0981531679 is Scala Puzzlers...
>>>     //AmazonProductAndRating
>>>     AmazonPageParser.parse(productId,userId,rating).onSuccess { case
>>> amazonRating =>
>>>       //Is this the correct way? the best performance? possibly not,
>>> what about using avro or parquet?
>>>       producer.send(Json.toJson(amazonRating).toString)
>>>       //producer.send(amazonRating)
>>>       println("amazon product with rating sent to kafka cluster..." +
>>> amazonRating.toString)
>>>       System.exit(0)
>>>     }
>>>
>>>   }
>>> }
>>>
>>>
>>> I have written a stack overflow post
>>> <http://stackoverflow.com/questions/37303202/about-an-error-accessing-a-field-inside-tuple2>,
>>> with more details, please help, i am stuck with this issue and i don't know
>>> how to continue.
>>>
>>> Regards
>>>
>>> Alonso Isidoro Roman
>>> [image: https://]about.me/alonso.isidoro.roman
>>>
>>> <https://about.me/alonso.isidoro.roman?promo=email_sig&utm_source=email_sig&utm_medium=email_sig&utm_campaign=external_links>
>>>
>>> ------------------------------
>>> View this message in context: about an exception when receiving data
>>> from kafka topic using Direct mode of Spark Streaming
>>> <http://apache-spark-user-list.1001560.n3.nabble.com/about-an-exception-when-receiving-data-from-kafka-topic-using-Direct-mode-of-Spark-Streaming-tp27022.html>
>>> Sent from the Apache Spark User List mailing list archive
>>> <http://apache-spark-user-list.1001560.n3.nabble.com/> at Nabble.com.
>>>
>>
>>
>>
>> --
>> Matthias Niehoff | IT-Consultant | Agile Software Factory  | Consulting
>> codecentric AG | Zeppelinstr 2 | 76185 Karlsruhe | Deutschland
>> tel: +49 (0) 721.9595-681 | fax: +49 (0) 721.9595-666 | mobil: +49 (0)
>> 172.1702676
>> www.codecentric.de | blog.codecentric.de | www.meettheexperts.de |
>> www.more4fi.de
>>
>> Sitz der Gesellschaft: Solingen | HRB 25917| Amtsgericht Wuppertal
>> Vorstand: Michael Hochgürtel . Mirko Novakovic . Rainer Vehns
>> Aufsichtsrat: Patric Fedlmeier (Vorsitzender) . Klaus Jäger . Jürgen
>> Schütz
>>
>> Diese E-Mail einschließlich evtl. beigefügter Dateien enthält
>> vertrauliche und/oder rechtlich geschützte Informationen. Wenn Sie nicht
>> der richtige Adressat sind oder diese E-Mail irrtümlich erhalten haben,
>> informieren Sie bitte sofort den Absender und löschen Sie diese E-Mail und
>> evtl. beigefügter Dateien umgehend. Das unerlaubte Kopieren, Nutzen oder
>> Öffnen evtl. beigefügter Dateien sowie die unbefugte Weitergabe dieser
>> E-Mail ist nicht gestattet
>>
>
>

Reply via email to