Re: about an exception when receiving data from kafka topic using Direct mode of Spark Streaming

2016-05-26 Thread Alonso Isidoro Roman
Thank you Cody, i will try to follow your advice.

Alonso Isidoro Roman
[image: https://]about.me/alonso.isidoro.roman


2016-05-26 17:00 GMT+02:00 Cody Koeninger :

> Honestly given this thread, and the stack overflow thread, I'd say you
> need to back up, start very simply, and learn spark.  If for some reason
> the official docs aren't doing it for you, learning spark from oreilly is a
> good book.
>
> Given your specific question, why not just
>
> messages.foreachRDD { rdd =>
> rdd.foreachPartition { iterator =>
>   someWorkOnAnIterator(iterator)
>
>
> All of the other extraneous stuff you're doing doesn't make any sense to
> me.
>
>
>
> On Thu, May 26, 2016 at 2:48 AM, Alonso Isidoro Roman 
> wrote:
>
>> Hi Matthias and Cody,
>>
>> You can see in the code that StreamingContext.start() is called after the
>> messages.foreachRDD output action. Another problem @Cody is how can i avoid
>> the inner .foreachRDD(_.foreachPartition(it =>
>> recommender.predictWithALS(it.toSeq))) in order to invoke asynchronously
>> recommender.predictWithALS which runs a machine learning ALS implementation
>> with a message from the kafka topic?.
>>
>> In the actual code i am not using for now any code to save data within
>> the mongo instance, for now, it is more important to be focus in how to
>> receive the message from the kafka topic and feeding asynchronously the ALS
>> implementation. Probably the Recommender object will need the code for
>>  interact with the mongo instance.
>>
>> The idea of the process is to receive data from the kafka topic,
>> calculate its recommendations based on the incoming message and save the
>> results within a mongo instance. Is it possible?  Am i missing something
>> important?
>>
>> def main(args: Array[String]) {
>> // Process program arguments and set properties
>>
>> if (args.length < 2) {
>>   System.err.println("Usage: " + this.getClass.getSimpleName + "
>>  ")
>>   System.exit(1)
>> }
>>
>> val Array(brokers, topics) = args
>>
>> println("Initializing Streaming Spark Context and kafka connector...")
>> // Create context with 2 second batch interval
>> val sparkConf = new SparkConf().setAppName("AmazonKafkaConnector")
>>.setMaster("local[4]")
>>
>> .set("spark.driver.allowMultipleContexts", "true")
>>
>> val sc = new SparkContext(sparkConf)
>>
>> sc.addJar("target/scala-2.10/blog-spark-recommendation_2.10-1.0-SNAPSHOT.jar")
>> val ssc = new StreamingContext(sparkConf, Seconds(2))
>> //this checkpointdir should be in a conf file, for now it is
>> hardcoded!
>> val streamingCheckpointDir =
>> "/Users/aironman/my-recommendation-spark-engine/checkpoint"
>> ssc.checkpoint(streamingCheckpointDir)
>>
>> // Create direct kafka stream with brokers and topics
>> val topicsSet = topics.split(",").toSet
>> val kafkaParams = Map[String, String]("metadata.broker.list" ->
>> brokers)
>> val messages = KafkaUtils.createDirectStream[String, String,
>> StringDecoder, StringDecoder](ssc, kafkaParams, topicsSet)
>> println("Initialized Streaming Spark Context and kafka connector...")
>>
>> //create recomendation module
>> println("Creating rating recommender module...")
>> val ratingFile= "ratings.csv"
>> val recommender = new Recommender(sc,ratingFile)
>> println("Initialized rating recommender module...")
>>
>> //i have to convert messages which is a InputDStream into a
>> Seq[AmazonRating]...
>> try{
>> messages.foreachRDD( rdd =>{
>>   val count = rdd.count()
>>   if (count > 0){
>> //someMessages should be AmazonRating...
>> val someMessages = rdd.take(count.toInt)
>> println("<-->")
>> println("someMessages is " + someMessages)
>> someMessages.foreach(println)
>> println("<-->")
>> println("<---POSSIBLE SOLUTION--->")
>>
>> messages
>> .map { case (_, jsonRating) =>
>>   val jsValue = Json.parse(jsonRating)
>>   AmazonRating.amazonRatingFormat.reads(jsValue) match {
>> case JsSuccess(rating, _) => rating
>> case JsError(_) => AmazonRating.empty
>>   }
>>  }
>> .filter(_ != AmazonRating.empty)
>> //probably is not a good idea to do this...
>> .foreachRDD(_.foreachPartition(it =>
>> recommender.predictWithALS(it.toSeq)))
>>
>> println("<---POSSIBLE SOLUTION--->")
>>
>>   }
>>   }
>> )
>> }catch{
>>   case e: IllegalArgumentException => {println("illegal arg.
>> exception")};
>>   case e: IllegalStateException=> {println("illegal state
>> exception")};
>>   case e: ClassCastException   => {println("ClassCastException")};
>>   case e: Exception=> {println(" Generic 

Re: about an exception when receiving data from kafka topic using Direct mode of Spark Streaming

2016-05-26 Thread Cody Koeninger
Honestly given this thread, and the stack overflow thread, I'd say you need
to back up, start very simply, and learn spark.  If for some reason the
official docs aren't doing it for you, learning spark from oreilly is a
good book.

Given your specific question, why not just

messages.foreachRDD { rdd =>
rdd.foreachPartition { iterator =>
  someWorkOnAnIterator(iterator)


All of the other extraneous stuff you're doing doesn't make any sense to
me.



On Thu, May 26, 2016 at 2:48 AM, Alonso Isidoro Roman 
wrote:

> Hi Matthias and Cody,
>
> You can see in the code that StreamingContext.start() is called after the
> messages.foreachRDD output action. Another problem @Cody is how can i avoid
> the inner .foreachRDD(_.foreachPartition(it =>
> recommender.predictWithALS(it.toSeq))) in order to invoke asynchronously
> recommender.predictWithALS which runs a machine learning ALS implementation
> with a message from the kafka topic?.
>
> In the actual code i am not using for now any code to save data within the
> mongo instance, for now, it is more important to be focus in how to receive
> the message from the kafka topic and feeding asynchronously the ALS
> implementation. Probably the Recommender object will need the code for
>  interact with the mongo instance.
>
> The idea of the process is to receive data from the kafka topic, calculate
> its recommendations based on the incoming message and save the results
> within a mongo instance. Is it possible?  Am i missing something important?
>
> def main(args: Array[String]) {
> // Process program arguments and set properties
>
> if (args.length < 2) {
>   System.err.println("Usage: " + this.getClass.getSimpleName + "
>  ")
>   System.exit(1)
> }
>
> val Array(brokers, topics) = args
>
> println("Initializing Streaming Spark Context and kafka connector...")
> // Create context with 2 second batch interval
> val sparkConf = new SparkConf().setAppName("AmazonKafkaConnector")
>.setMaster("local[4]")
>
> .set("spark.driver.allowMultipleContexts", "true")
>
> val sc = new SparkContext(sparkConf)
>
> sc.addJar("target/scala-2.10/blog-spark-recommendation_2.10-1.0-SNAPSHOT.jar")
> val ssc = new StreamingContext(sparkConf, Seconds(2))
> //this checkpointdir should be in a conf file, for now it is hardcoded!
> val streamingCheckpointDir =
> "/Users/aironman/my-recommendation-spark-engine/checkpoint"
> ssc.checkpoint(streamingCheckpointDir)
>
> // Create direct kafka stream with brokers and topics
> val topicsSet = topics.split(",").toSet
> val kafkaParams = Map[String, String]("metadata.broker.list" ->
> brokers)
> val messages = KafkaUtils.createDirectStream[String, String,
> StringDecoder, StringDecoder](ssc, kafkaParams, topicsSet)
> println("Initialized Streaming Spark Context and kafka connector...")
>
> //create recomendation module
> println("Creating rating recommender module...")
> val ratingFile= "ratings.csv"
> val recommender = new Recommender(sc,ratingFile)
> println("Initialized rating recommender module...")
>
> //i have to convert messages which is a InputDStream into a
> Seq[AmazonRating]...
> try{
> messages.foreachRDD( rdd =>{
>   val count = rdd.count()
>   if (count > 0){
> //someMessages should be AmazonRating...
> val someMessages = rdd.take(count.toInt)
> println("<-->")
> println("someMessages is " + someMessages)
> someMessages.foreach(println)
> println("<-->")
> println("<---POSSIBLE SOLUTION--->")
>
> messages
> .map { case (_, jsonRating) =>
>   val jsValue = Json.parse(jsonRating)
>   AmazonRating.amazonRatingFormat.reads(jsValue) match {
> case JsSuccess(rating, _) => rating
> case JsError(_) => AmazonRating.empty
>   }
>  }
> .filter(_ != AmazonRating.empty)
> //probably is not a good idea to do this...
> .foreachRDD(_.foreachPartition(it =>
> recommender.predictWithALS(it.toSeq)))
>
> println("<---POSSIBLE SOLUTION--->")
>
>   }
>   }
> )
> }catch{
>   case e: IllegalArgumentException => {println("illegal arg.
> exception")};
>   case e: IllegalStateException=> {println("illegal state
> exception")};
>   case e: ClassCastException   => {println("ClassCastException")};
>   case e: Exception=> {println(" Generic Exception")};
> }finally{
>
>   println("Finished taking data from kafka topic...")
> }
>
> //println("jsonParsed is " + jsonParsed)
> //The idea is to save results from Recommender.predict within mongodb,
> so i will have to deal with this issue
> //after resolving the issue of
> .foreachRDD(_.foreachPartition(recommender.predict(_.toSeq)))
>
> *ssc.start()*
> ssc.awaitTermination()
>
> 

Re: about an exception when receiving data from kafka topic using Direct mode of Spark Streaming

2016-05-26 Thread Alonso Isidoro Roman
Hi Matthias and Cody,

You can see in the code that StreamingContext.start() is called after the
messages.foreachRDD output action. Another problem @Cody is how can i avoid
the inner .foreachRDD(_.foreachPartition(it =>
recommender.predictWithALS(it.toSeq))) in order to invoke asynchronously
recommender.predictWithALS which runs a machine learning ALS implementation
with a message from the kafka topic?.

In the actual code i am not using for now any code to save data within the
mongo instance, for now, it is more important to be focus in how to receive
the message from the kafka topic and feeding asynchronously the ALS
implementation. Probably the Recommender object will need the code for
 interact with the mongo instance.

The idea of the process is to receive data from the kafka topic, calculate
its recommendations based on the incoming message and save the results
within a mongo instance. Is it possible?  Am i missing something important?

def main(args: Array[String]) {
// Process program arguments and set properties

if (args.length < 2) {
  System.err.println("Usage: " + this.getClass.getSimpleName + "
 ")
  System.exit(1)
}

val Array(brokers, topics) = args

println("Initializing Streaming Spark Context and kafka connector...")
// Create context with 2 second batch interval
val sparkConf = new SparkConf().setAppName("AmazonKafkaConnector")
   .setMaster("local[4]")

.set("spark.driver.allowMultipleContexts", "true")

val sc = new SparkContext(sparkConf)

sc.addJar("target/scala-2.10/blog-spark-recommendation_2.10-1.0-SNAPSHOT.jar")
val ssc = new StreamingContext(sparkConf, Seconds(2))
//this checkpointdir should be in a conf file, for now it is hardcoded!
val streamingCheckpointDir =
"/Users/aironman/my-recommendation-spark-engine/checkpoint"
ssc.checkpoint(streamingCheckpointDir)

// Create direct kafka stream with brokers and topics
val topicsSet = topics.split(",").toSet
val kafkaParams = Map[String, String]("metadata.broker.list" -> brokers)
val messages = KafkaUtils.createDirectStream[String, String,
StringDecoder, StringDecoder](ssc, kafkaParams, topicsSet)
println("Initialized Streaming Spark Context and kafka connector...")

//create recomendation module
println("Creating rating recommender module...")
val ratingFile= "ratings.csv"
val recommender = new Recommender(sc,ratingFile)
println("Initialized rating recommender module...")

//i have to convert messages which is a InputDStream into a
Seq[AmazonRating]...
try{
messages.foreachRDD( rdd =>{
  val count = rdd.count()
  if (count > 0){
//someMessages should be AmazonRating...
val someMessages = rdd.take(count.toInt)
println("<-->")
println("someMessages is " + someMessages)
someMessages.foreach(println)
println("<-->")
println("<---POSSIBLE SOLUTION--->")

messages
.map { case (_, jsonRating) =>
  val jsValue = Json.parse(jsonRating)
  AmazonRating.amazonRatingFormat.reads(jsValue) match {
case JsSuccess(rating, _) => rating
case JsError(_) => AmazonRating.empty
  }
 }
.filter(_ != AmazonRating.empty)
//probably is not a good idea to do this...
.foreachRDD(_.foreachPartition(it =>
recommender.predictWithALS(it.toSeq)))

println("<---POSSIBLE SOLUTION--->")

  }
  }
)
}catch{
  case e: IllegalArgumentException => {println("illegal arg.
exception")};
  case e: IllegalStateException=> {println("illegal state
exception")};
  case e: ClassCastException   => {println("ClassCastException")};
  case e: Exception=> {println(" Generic Exception")};
}finally{

  println("Finished taking data from kafka topic...")
}

//println("jsonParsed is " + jsonParsed)
//The idea is to save results from Recommender.predict within mongodb,
so i will have to deal with this issue
//after resolving the issue of
.foreachRDD(_.foreachPartition(recommender.predict(_.toSeq)))

*ssc.start()*
ssc.awaitTermination()

println("Finished!")
  }
}

Thank you for reading until here, please, i need your assistance.

Regards


Alonso Isidoro Roman
[image: https://]about.me/alonso.isidoro.roman


2016-05-25 17:33 GMT+02:00 Alonso Isidoro Roman :

> Hi Matthias and Cody, thanks for the answer. This is the code that is
> raising the runtime exception:
>
> try{
> messages.foreachRDD( rdd =>{
>   val count = rdd.count()
>   if (count > 0){
> //someMessages should be AmazonRating...
> val someMessages = rdd.take(count.toInt)
> println("<-->")
> println("someMessages is " + someMessages)
> 

Re: about an exception when receiving data from kafka topic using Direct mode of Spark Streaming

2016-05-25 Thread Alonso Isidoro Roman
Hi Matthias and Cody, thanks for the answer. This is the code that is
raising the runtime exception:

try{
messages.foreachRDD( rdd =>{
  val count = rdd.count()
  if (count > 0){
//someMessages should be AmazonRating...
val someMessages = rdd.take(count.toInt)
println("<-->")
println("someMessages is " + someMessages)
someMessages.foreach(println)
println("<-->")
println("<---POSSIBLE SOLUTION--->")
messages
.map { case (_, jsonRating) =>
  val jsValue = Json.parse(jsonRating)
  AmazonRating.amazonRatingFormat.reads(jsValue) match {
case JsSuccess(rating, _) => rating
case JsError(_) => AmazonRating.empty
  }
 }
.filter(_ != AmazonRating.empty)
*//this line raises the runtime error, but if i comment it another
different runtime exception happens!*
.foreachRDD(_.foreachPartition(it =>
recommender.predictWithALS(it.toSeq)))
println("<---POSSIBLE SOLUTION--->")
  }
  }
)
}catch{
  case e: IllegalArgumentException => {println("illegal arg.
exception")};
  case e: IllegalStateException=> {println("illegal state
exception")};
  case e: ClassCastException   => {println("ClassCastException")};
  case e: Exception=> {println(" Generic Exception")};
}finally{

  println("Finished taking data from kafka topic...")
}

If i comment the line with the second foreachRDD, the next runtime
exception happens within a fresh start, i mean, the kafka producer push
data within the topic:

16/05/25 17:26:12 ERROR JobScheduler: Error running job streaming job
1464189972000 ms.0

org.apache.spark.SparkException: Job aborted due to stage failure: Task 0
in stage 0.0 failed 1 times, most recent failure: Lost task 0.0 in stage
0.0 (TID 0, localhost): java.lang.ClassCastException:
org.apache.spark.util.SerializableConfiguration cannot be cast to [B

at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:62)

at org.apache.spark.scheduler.Task.run(Task.scala:89)

at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:214)

at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)

at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)

at java.lang.Thread.run(Thread.java:745)

If i push another json data within the topic, the next exception happens:

16/05/25 17:27:16 INFO DAGScheduler: Job 1 finished: runJob at
KafkaRDD.scala:98, took 0,039689 s

<-->

someMessages is [Lscala.Tuple2;@712ca120

(null,{"userId":"someUserId","productId":"0981531679","rating":9.0})

<-->

<---POSSIBLE SOLUTION--->

16/05/25 17:27:16 INFO JobScheduler: Finished job streaming job
1464190036000 ms.0 from job set of time 1464190036000 ms

16/05/25 17:27:16 INFO JobScheduler: Total delay: 0,063 s for time
1464190036000 ms (execution: 0,055 s)

16/05/25 17:27:16 INFO KafkaRDD: Removing RDD 43 from persistence list

16/05/25 17:27:16 ERROR JobScheduler: Error running job streaming job
1464190036000 ms.0

java.lang.IllegalStateException: Adding new inputs, transformations, and
output operations after starting a context is not supported

at
org.apache.spark.streaming.dstream.DStream.validateAtInit(DStream.scala:222)

at org.apache.spark.streaming.dstream.DStream.(DStream.scala:64)

at
org.apache.spark.streaming.dstream.MappedDStream.(MappedDStream.scala:25)

at
org.apache.spark.streaming.dstream.DStream$$anonfun$map$1.apply(DStream.scala:558)

at
org.apache.spark.streaming.dstream.DStream$$anonfun$map$1.apply(DStream.scala:558)

at
org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:150)

at
org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:111)

at org.apache.spark.SparkContext.withScope(SparkContext.scala:714)

at
org.apache.spark.streaming.StreamingContext.withScope(StreamingContext.scala:260)

at org.apache.spark.streaming.dstream.DStream.map(DStream.scala:557)

at
example.spark.AmazonKafkaConnector$$anonfun$main$1.apply(AmazonKafkaConnectorWithMongo.scala:125)

at
example.spark.AmazonKafkaConnector$$anonfun$main$1.apply(AmazonKafkaConnectorWithMongo.scala:114)

at
org.apache.spark.streaming.dstream.DStream$$anonfun$foreachRDD$1$$anonfun$apply$mcV$sp$3.apply(DStream.scala:661)

at
org.apache.spark.streaming.dstream.DStream$$anonfun$foreachRDD$1$$anonfun$apply$mcV$sp$3.apply(DStream.scala:661)

at
org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1$$anonfun$apply$mcV$sp$1.apply$mcV$sp(ForEachDStream.scala:50)

at
org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1$$anonfun$apply$mcV$sp$1.apply(ForEachDStream.scala:50)

at
org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1$$anonfun$apply$mcV$sp$1.apply(ForEachDStream.scala:50)

at
org.apache.spark.streaming.dstream.DStream.createRDDWithLocalProperties(DStream.scala:426)

at

Re: about an exception when receiving data from kafka topic using Direct mode of Spark Streaming

2016-05-25 Thread Matthias Niehoff
Hi,

you register some output actions (in this case foreachRDD) after starting
the streaming context. StreamingContext.start() has to be called after all!
output actions.

2016-05-25 15:59 GMT+02:00 Alonso :

> Hi, i am receiving this exception when direct spark streaming process
> tries to pull data from kafka topic:
>
> 16/05/25 11:30:30 INFO CheckpointWriter: Checkpoint for time 146416863
> ms saved to file
> 'file:/Users/aironman/my-recommendation-spark-engine/checkpoint/checkpoint-146416863',
> took 5928 bytes and 8 ms
>
> 16/05/25 11:30:30 INFO Executor: Finished task 0.0 in stage 2.0 (TID 2). 1041 
> bytes result sent to driver
> 16/05/25 11:30:30 INFO TaskSetManager: Finished task 0.0 in stage 2.0 (TID 2) 
> in 4 ms on localhost (1/1)
> 16/05/25 11:30:30 INFO TaskSchedulerImpl: Removed TaskSet 2.0, whose tasks 
> have all completed, from pool
> 16/05/25 11:30:30 INFO DAGScheduler: ResultStage 2 (runJob at 
> KafkaRDD.scala:98) finished in 0,004 s
> 16/05/25 11:30:30 INFO DAGScheduler: Job 2 finished: runJob at 
> KafkaRDD.scala:98, took 0,008740 s
> <-->
> someMessages is [Lscala.Tuple2;@2641d687
> (null,{"userId":"someUserId","productId":"0981531679","rating":6.0})
> <-->
> <---POSSIBLE SOLUTION--->
> 16/05/25 11:30:30 INFO JobScheduler: Finished job streaming job 146416863 
> ms.0 from job set of time 146416863 ms
> 16/05/25 11:30:30 INFO KafkaRDD: Removing RDD 105 from persistence list
> 16/05/25 11:30:30 INFO JobScheduler: Total delay: 0,020 s for time 
> 146416863 ms (execution: 0,012 s)
> 16/05/25 11:30:30 ERROR JobScheduler: Error running job streaming job 
> 146416863 ms.0*java.lang.IllegalStateException: Adding new inputs, 
> transformations, and output operations after starting a context is not 
> supported
>   at* 
> org.apache.spark.streaming.dstream.DStream.validateAtInit(DStream.scala:222)
>   at org.apache.spark.streaming.dstream.DStream.(DStream.scala:64)
>   at 
> org.apache.spark.streaming.dstream.MappedDStream.(MappedDStream.scala:25)
>   at 
> org.apache.spark.streaming.dstream.DStream$$anonfun$map$1.apply(DStream.scala:558)
>   at 
> org.apache.spark.streaming.dstream.DStream$$anonfun$map$1.apply(DStream.scala:558)
>   at 
> org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:150)
>   at 
> org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:111)
>   at org.apache.spark.SparkContext.withScope(SparkContext.scala:714)
>   at 
> org.apache.spark.streaming.StreamingContext.withScope(StreamingContext.scala:260)
>   at org.apache.spark.streaming.dstream.DStream.map(DStream.scala:557)
>   at 
> example.spark.AmazonKafkaConnector$$anonfun$main$1.apply(AmazonKafkaConnectorWithMongo.scala:125)
>   at 
> example.spark.AmazonKafkaConnector$$anonfun$main$1.apply(AmazonKafkaConnectorWithMongo.scala:114)
>   at 
> org.apache.spark.streaming.dstream.DStream$$anonfun$foreachRDD$1$$anonfun$apply$mcV$sp$3.apply(DStream.scala:661)
>   at 
> org.apache.spark.streaming.dstream.DStream$$anonfun$foreachRDD$1$$anonfun$apply$mcV$sp$3.apply(DStream.scala:661)
>   at 
> org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1$$anonfun$apply$mcV$sp$1.apply$mcV$sp(ForEachDStream.scala:50)
>   at 
> org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1$$anonfun$apply$mcV$sp$1.apply(ForEachDStream.scala:50)
>   at 
> org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1$$anonfun$apply$mcV$sp$1.apply(ForEachDStream.scala:50)
>   at 
> org.apache.spark.streaming.dstream.DStream.createRDDWithLocalProperties(DStream.scala:426)
>   at 
> org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply$mcV$sp(ForEachDStream.scala:49)
>   at 
> org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply(ForEachDStream.scala:49)
>   at 
> org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply(ForEachDStream.scala:49)
>   at scala.util.Try$.apply(Try.scala:161)
>   at org.apache.spark.streaming.scheduler.Job.run(Job.scala:39)
>   at 
> org.apache.spark.streaming.scheduler.JobScheduler$JobHandler$$anonfun$run$1.apply$mcV$sp(JobScheduler.scala:224)
>   at 
> org.apache.spark.streaming.scheduler.JobScheduler$JobHandler$$anonfun$run$1.apply(JobScheduler.scala:224)
>   at 
> org.apache.spark.streaming.scheduler.JobScheduler$JobHandler$$anonfun$run$1.apply(JobScheduler.scala:224)
>   at scala.util.DynamicVariable.withValue(DynamicVariable.scala:57)
>   at 
> org.apache.spark.streaming.scheduler.JobScheduler$JobHandler.run(JobScheduler.scala:223)
>   at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
>   at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
>   at java.lang.Thread.run(Thread.java:745)
> 16/05/25 11:30:30 INFO BlockManager: Removing RDD 105
>
>
> This is the code that rises the exception 

Re: about an exception when receiving data from kafka topic using Direct mode of Spark Streaming

2016-05-25 Thread Cody Koeninger
Am I reading this correctly that you're calling messages.foreachRDD inside
of the messages.foreachRDD block?  Don't do that.

On Wed, May 25, 2016 at 8:59 AM, Alonso  wrote:

> Hi, i am receiving this exception when direct spark streaming process
> tries to pull data from kafka topic:
>
> 16/05/25 11:30:30 INFO CheckpointWriter: Checkpoint for time 146416863
> ms saved to file
> 'file:/Users/aironman/my-recommendation-spark-engine/checkpoint/checkpoint-146416863',
> took 5928 bytes and 8 ms
>
> 16/05/25 11:30:30 INFO Executor: Finished task 0.0 in stage 2.0 (TID 2). 1041 
> bytes result sent to driver
> 16/05/25 11:30:30 INFO TaskSetManager: Finished task 0.0 in stage 2.0 (TID 2) 
> in 4 ms on localhost (1/1)
> 16/05/25 11:30:30 INFO TaskSchedulerImpl: Removed TaskSet 2.0, whose tasks 
> have all completed, from pool
> 16/05/25 11:30:30 INFO DAGScheduler: ResultStage 2 (runJob at 
> KafkaRDD.scala:98) finished in 0,004 s
> 16/05/25 11:30:30 INFO DAGScheduler: Job 2 finished: runJob at 
> KafkaRDD.scala:98, took 0,008740 s
> <-->
> someMessages is [Lscala.Tuple2;@2641d687
> (null,{"userId":"someUserId","productId":"0981531679","rating":6.0})
> <-->
> <---POSSIBLE SOLUTION--->
> 16/05/25 11:30:30 INFO JobScheduler: Finished job streaming job 146416863 
> ms.0 from job set of time 146416863 ms
> 16/05/25 11:30:30 INFO KafkaRDD: Removing RDD 105 from persistence list
> 16/05/25 11:30:30 INFO JobScheduler: Total delay: 0,020 s for time 
> 146416863 ms (execution: 0,012 s)
> 16/05/25 11:30:30 ERROR JobScheduler: Error running job streaming job 
> 146416863 ms.0*java.lang.IllegalStateException: Adding new inputs, 
> transformations, and output operations after starting a context is not 
> supported
>   at* 
> org.apache.spark.streaming.dstream.DStream.validateAtInit(DStream.scala:222)
>   at org.apache.spark.streaming.dstream.DStream.(DStream.scala:64)
>   at 
> org.apache.spark.streaming.dstream.MappedDStream.(MappedDStream.scala:25)
>   at 
> org.apache.spark.streaming.dstream.DStream$$anonfun$map$1.apply(DStream.scala:558)
>   at 
> org.apache.spark.streaming.dstream.DStream$$anonfun$map$1.apply(DStream.scala:558)
>   at 
> org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:150)
>   at 
> org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:111)
>   at org.apache.spark.SparkContext.withScope(SparkContext.scala:714)
>   at 
> org.apache.spark.streaming.StreamingContext.withScope(StreamingContext.scala:260)
>   at org.apache.spark.streaming.dstream.DStream.map(DStream.scala:557)
>   at 
> example.spark.AmazonKafkaConnector$$anonfun$main$1.apply(AmazonKafkaConnectorWithMongo.scala:125)
>   at 
> example.spark.AmazonKafkaConnector$$anonfun$main$1.apply(AmazonKafkaConnectorWithMongo.scala:114)
>   at 
> org.apache.spark.streaming.dstream.DStream$$anonfun$foreachRDD$1$$anonfun$apply$mcV$sp$3.apply(DStream.scala:661)
>   at 
> org.apache.spark.streaming.dstream.DStream$$anonfun$foreachRDD$1$$anonfun$apply$mcV$sp$3.apply(DStream.scala:661)
>   at 
> org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1$$anonfun$apply$mcV$sp$1.apply$mcV$sp(ForEachDStream.scala:50)
>   at 
> org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1$$anonfun$apply$mcV$sp$1.apply(ForEachDStream.scala:50)
>   at 
> org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1$$anonfun$apply$mcV$sp$1.apply(ForEachDStream.scala:50)
>   at 
> org.apache.spark.streaming.dstream.DStream.createRDDWithLocalProperties(DStream.scala:426)
>   at 
> org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply$mcV$sp(ForEachDStream.scala:49)
>   at 
> org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply(ForEachDStream.scala:49)
>   at 
> org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply(ForEachDStream.scala:49)
>   at scala.util.Try$.apply(Try.scala:161)
>   at org.apache.spark.streaming.scheduler.Job.run(Job.scala:39)
>   at 
> org.apache.spark.streaming.scheduler.JobScheduler$JobHandler$$anonfun$run$1.apply$mcV$sp(JobScheduler.scala:224)
>   at 
> org.apache.spark.streaming.scheduler.JobScheduler$JobHandler$$anonfun$run$1.apply(JobScheduler.scala:224)
>   at 
> org.apache.spark.streaming.scheduler.JobScheduler$JobHandler$$anonfun$run$1.apply(JobScheduler.scala:224)
>   at scala.util.DynamicVariable.withValue(DynamicVariable.scala:57)
>   at 
> org.apache.spark.streaming.scheduler.JobScheduler$JobHandler.run(JobScheduler.scala:223)
>   at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
>   at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
>   at java.lang.Thread.run(Thread.java:745)
> 16/05/25 11:30:30 INFO BlockManager: Removing RDD 105
>
>
> This is the code that rises the exception in the spark streaming process:
>