Hi,

you register some output actions (in this case foreachRDD) after starting
the streaming context. StreamingContext.start() has to be called after all!
output actions.

2016-05-25 15:59 GMT+02:00 Alonso <alons...@gmail.com>:

> Hi, i am receiving this exception when direct spark streaming process
> tries to pull data from kafka topic:
>
> 16/05/25 11:30:30 INFO CheckpointWriter: Checkpoint for time 1464168630000
> ms saved to file
> 'file:/Users/aironman/my-recommendation-spark-engine/checkpoint/checkpoint-1464168630000',
> took 5928 bytes and 8 ms
>
> 16/05/25 11:30:30 INFO Executor: Finished task 0.0 in stage 2.0 (TID 2). 1041 
> bytes result sent to driver
> 16/05/25 11:30:30 INFO TaskSetManager: Finished task 0.0 in stage 2.0 (TID 2) 
> in 4 ms on localhost (1/1)
> 16/05/25 11:30:30 INFO TaskSchedulerImpl: Removed TaskSet 2.0, whose tasks 
> have all completed, from pool
> 16/05/25 11:30:30 INFO DAGScheduler: ResultStage 2 (runJob at 
> KafkaRDD.scala:98) finished in 0,004 s
> 16/05/25 11:30:30 INFO DAGScheduler: Job 2 finished: runJob at 
> KafkaRDD.scala:98, took 0,008740 s
> <------>
> someMessages is [Lscala.Tuple2;@2641d687
> (null,{"userId":"someUserId","productId":"0981531679","rating":6.0})
> <------>
> <---POSSIBLE SOLUTION--->
> 16/05/25 11:30:30 INFO JobScheduler: Finished job streaming job 1464168630000 
> ms.0 from job set of time 1464168630000 ms
> 16/05/25 11:30:30 INFO KafkaRDD: Removing RDD 105 from persistence list
> 16/05/25 11:30:30 INFO JobScheduler: Total delay: 0,020 s for time 
> 1464168630000 ms (execution: 0,012 s)
> 16/05/25 11:30:30 ERROR JobScheduler: Error running job streaming job 
> 1464168630000 ms.0*java.lang.IllegalStateException: Adding new inputs, 
> transformations, and output operations after starting a context is not 
> supported
>       at* 
> org.apache.spark.streaming.dstream.DStream.validateAtInit(DStream.scala:222)
>       at org.apache.spark.streaming.dstream.DStream.<init>(DStream.scala:64)
>       at 
> org.apache.spark.streaming.dstream.MappedDStream.<init>(MappedDStream.scala:25)
>       at 
> org.apache.spark.streaming.dstream.DStream$$anonfun$map$1.apply(DStream.scala:558)
>       at 
> org.apache.spark.streaming.dstream.DStream$$anonfun$map$1.apply(DStream.scala:558)
>       at 
> org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:150)
>       at 
> org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:111)
>       at org.apache.spark.SparkContext.withScope(SparkContext.scala:714)
>       at 
> org.apache.spark.streaming.StreamingContext.withScope(StreamingContext.scala:260)
>       at org.apache.spark.streaming.dstream.DStream.map(DStream.scala:557)
>       at 
> example.spark.AmazonKafkaConnector$$anonfun$main$1.apply(AmazonKafkaConnectorWithMongo.scala:125)
>       at 
> example.spark.AmazonKafkaConnector$$anonfun$main$1.apply(AmazonKafkaConnectorWithMongo.scala:114)
>       at 
> org.apache.spark.streaming.dstream.DStream$$anonfun$foreachRDD$1$$anonfun$apply$mcV$sp$3.apply(DStream.scala:661)
>       at 
> org.apache.spark.streaming.dstream.DStream$$anonfun$foreachRDD$1$$anonfun$apply$mcV$sp$3.apply(DStream.scala:661)
>       at 
> org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1$$anonfun$apply$mcV$sp$1.apply$mcV$sp(ForEachDStream.scala:50)
>       at 
> org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1$$anonfun$apply$mcV$sp$1.apply(ForEachDStream.scala:50)
>       at 
> org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1$$anonfun$apply$mcV$sp$1.apply(ForEachDStream.scala:50)
>       at 
> org.apache.spark.streaming.dstream.DStream.createRDDWithLocalProperties(DStream.scala:426)
>       at 
> org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply$mcV$sp(ForEachDStream.scala:49)
>       at 
> org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply(ForEachDStream.scala:49)
>       at 
> org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply(ForEachDStream.scala:49)
>       at scala.util.Try$.apply(Try.scala:161)
>       at org.apache.spark.streaming.scheduler.Job.run(Job.scala:39)
>       at 
> org.apache.spark.streaming.scheduler.JobScheduler$JobHandler$$anonfun$run$1.apply$mcV$sp(JobScheduler.scala:224)
>       at 
> org.apache.spark.streaming.scheduler.JobScheduler$JobHandler$$anonfun$run$1.apply(JobScheduler.scala:224)
>       at 
> org.apache.spark.streaming.scheduler.JobScheduler$JobHandler$$anonfun$run$1.apply(JobScheduler.scala:224)
>       at scala.util.DynamicVariable.withValue(DynamicVariable.scala:57)
>       at 
> org.apache.spark.streaming.scheduler.JobScheduler$JobHandler.run(JobScheduler.scala:223)
>       at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
>       at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
>       at java.lang.Thread.run(Thread.java:745)
> 16/05/25 11:30:30 INFO BlockManager: Removing RDD 105
>
>
> This is the code that rises the exception in the spark streaming process:
>
> try{
>     messages.foreachRDD( rdd =>{
>       val count = rdd.count()
>       if (count > 0){
>         //someMessages should be AmazonRating...
>         val someMessages = rdd.take(count.toInt)
>         println("<------>")
>         println("someMessages is " + someMessages)
>         someMessages.foreach(println)
>         println("<------>")
>         println("<---POSSIBLE SOLUTION--->")
>         messages
>         .map { case (_, jsonRating) =>
>           val jsValue = Json.parse(jsonRating)
>           AmazonRating.amazonRatingFormat.reads(jsValue) match {
>             case JsSuccess(rating, _) => rating
>             case JsError(_) => AmazonRating.empty
>           }
>              }
>         .filter(_ != AmazonRating.empty)
>         *//I think that this line provokes the runtime exception...*
> *        .foreachRDD(_.foreachPartition(it =>
> recommender.predictWithALS(it.toSeq)))*
>
>         println("<---POSSIBLE SOLUTION--->")
>
>       }
>       }
>     )
>     }catch{
>       case e: IllegalArgumentException => {println("illegal arg.
> exception")};
>       case e: IllegalStateException    => {println("illegal state
> exception")};
>       case e: ClassCastException       => {println("ClassCastException")};
>       case e: Exception                => {println(" Generic Exception")};
>     }finally{
>
>       println("Finished taking data from kafka topic...")
>     }
>
> Recommender object:
>
> *def predictWithALS(ratings: Seq[AmazonRating])* = {
>     // train model
>     val myRatings = ratings.map(toSparkRating)
>     val myRatingRDD = sc.parallelize(myRatings)
>
>     val startAls = DateTime.now
>     val model = ALS.train((sparkRatings ++
> myRatingRDD).repartition(NumPartitions), 10, 20, 0.01)
>
>     val myProducts = myRatings.map(_.product).toSet
>     val candidates = sc.parallelize((0 until
> productDict.size).filterNot(myProducts.contains))
>
>     // get ratings of all products not in my history ordered by rating
> (higher first) and only keep the first NumRecommendations
>     val myUserId = userDict.getIndex(MyUsername)
>     val recommendations = model.predict(candidates.map((myUserId,
> _))).collect
>     val endAls = DateTime.now
>     val result =
> recommendations.sortBy(-_.rating).take(NumRecommendations).map(toAmazonRating)
>     val alsTime = Seconds.secondsBetween(startAls, endAls).getSeconds
>
>     println(s"ALS Time: $alsTime seconds")
>     result
>   }
> }
>
> And this is the kafka producer that push the json data within the topic:
>
> object AmazonProducerExample {
>   def main(args: Array[String]): Unit = {
>
>     val productId = args(0).toString
>     val userId = args(1).toString
>     val rating = args(2).toDouble
>     val topicName = "amazonRatingsTopic"
>
>     val producer = Producer[String](topicName)
>
>     //0981531679 is Scala Puzzlers...
>     //AmazonProductAndRating
>     AmazonPageParser.parse(productId,userId,rating).onSuccess { case
> amazonRating =>
>       //Is this the correct way? the best performance? possibly not, what
> about using avro or parquet?
>       producer.send(Json.toJson(amazonRating).toString)
>       //producer.send(amazonRating)
>       println("amazon product with rating sent to kafka cluster..." +
> amazonRating.toString)
>       System.exit(0)
>     }
>
>   }
> }
>
>
> I have written a stack overflow post
> <http://stackoverflow.com/questions/37303202/about-an-error-accessing-a-field-inside-tuple2>,
> with more details, please help, i am stuck with this issue and i don't know
> how to continue.
>
> Regards
>
> Alonso Isidoro Roman
> [image: https://]about.me/alonso.isidoro.roman
>
> <https://about.me/alonso.isidoro.roman?promo=email_sig&utm_source=email_sig&utm_medium=email_sig&utm_campaign=external_links>
>
> ------------------------------
> View this message in context: about an exception when receiving data from
> kafka topic using Direct mode of Spark Streaming
> <http://apache-spark-user-list.1001560.n3.nabble.com/about-an-exception-when-receiving-data-from-kafka-topic-using-Direct-mode-of-Spark-Streaming-tp27022.html>
> Sent from the Apache Spark User List mailing list archive
> <http://apache-spark-user-list.1001560.n3.nabble.com/> at Nabble.com.
>



-- 
Matthias Niehoff | IT-Consultant | Agile Software Factory  | Consulting
codecentric AG | Zeppelinstr 2 | 76185 Karlsruhe | Deutschland
tel: +49 (0) 721.9595-681 | fax: +49 (0) 721.9595-666 | mobil: +49 (0)
172.1702676
www.codecentric.de | blog.codecentric.de | www.meettheexperts.de |
www.more4fi.de

Sitz der Gesellschaft: Solingen | HRB 25917| Amtsgericht Wuppertal
Vorstand: Michael Hochgürtel . Mirko Novakovic . Rainer Vehns
Aufsichtsrat: Patric Fedlmeier (Vorsitzender) . Klaus Jäger . Jürgen Schütz

Diese E-Mail einschließlich evtl. beigefügter Dateien enthält vertrauliche
und/oder rechtlich geschützte Informationen. Wenn Sie nicht der richtige
Adressat sind oder diese E-Mail irrtümlich erhalten haben, informieren Sie
bitte sofort den Absender und löschen Sie diese E-Mail und evtl.
beigefügter Dateien umgehend. Das unerlaubte Kopieren, Nutzen oder Öffnen
evtl. beigefügter Dateien sowie die unbefugte Weitergabe dieser E-Mail ist
nicht gestattet

Reply via email to