I think I know what the problem is. Spark Streaming is constantly doing
garbage cleanup by throwing away data that it does not based on the
operations in the DStream. Here the DSTream operations are not aware of the
spark sql queries thats happening asynchronous to spark streaming. So data
is being cleared before the sql queries are completing, hence the
block-not-found error. There is a easy fix. You can call
streamingContext.remember() to specify how long to keep all the data
around. If you keep that sufficiently long, longer than what the sql
queries may require to run, then things should run fine.

Let me know if this helps.

TD


On Wed, Jul 16, 2014 at 9:50 AM, Yin Huai <yh...@databricks.com> wrote:

> Hi Srinivas,
>
> Seems the query you used is val results =sqlContext.sql("select type from
> table1"). However, table1 does not have a field called type. The schema of
> table1 is defined as the class definition of your case class Record (i.e. ID,
> name, score, and school are fields of your table1). Can you change your
> query and see if your program works?
>
> Thanks,
>
> Yin
>
>
> On Wed, Jul 16, 2014 at 8:25 AM, srinivas <kusamsrini...@gmail.com> wrote:
>
>> Hi TD,
>>   I Defines the Case Class outside the main method and was able to compile
>> the code successfully. But getting a run time error when trying to process
>> some json file from kafka. here is the code i an to compile
>>
>> import java.util.Properties
>> import kafka.producer._
>> import org.apache.spark.streaming._
>> import org.apache.spark.streaming.kafka._
>> import org.apache.spark.streaming.StreamingContext._
>> import org.apache.spark.SparkConf
>> import scala.util.parsing.json.JSON
>> import org.apache.spark.sql.SQLContext
>> import org.apache.spark.SparkContext
>> import org.apache.spark.SparkContext._
>> case class Record(ID:String,name:String,score:String,school:String)
>> object KafkaWordCount {
>>   def main(args: Array[String]) {
>>     if (args.length < 4) {
>>       System.err.println("Usage: KafkaWordCount <zkQuorum> <group>
>> <topics>
>> <numThreads>")
>>       System.exit(1)
>>     }
>>
>>    //StreamingExamples.setStreamingLogLevels()
>>
>>     val Array(zkQuorum, group, topics, numThreads) = args
>>     val sparkConf = new SparkConf().setAppName("KafkaWordCount")
>>     val ssc = new StreamingContext(sparkConf, Seconds(10))
>>     val sql = new SparkContext(sparkConf)
>>     val sqlContext = new SQLContext(sql)
>>     val timer = Time(10000)
>>    // ssc.checkpoint("checkpoint")
>>
>> import sqlContext._
>>     val topicpMap = topics.split(",").map((_,numThreads.toInt)).toMap
>>      val lines = KafkaUtils.createStream(ssc, zkQuorum, group,
>> topicpMap).map(_._2)
>>      val jsonf =
>>
>> lines.map(JSON.parseFull(_)).map(_.get.asInstanceOf[scala.collection.immutable.Map[String,Any]])
>> val fields =
>>
>> jsonf.map(data=>Record(data("ID").toString,data("name").toString,data("score").toString,data("school").toString))
>> fields.print()
>> val results = fields.foreachRDD((recrdd,tt) => {
>> recrdd.registerAsTable("table1")
>> val results =sqlContext.sql("select type from table1")
>> println(results)
>> results.foreach(println)
>> results.map(t => "Type:" +t(0)).collect().foreach(println)
>> })
>> //results.print()
>>     ssc.start()
>>     ssc.awaitTermination()
>>   }
>> }
>>
>> and here is the error i am getting when trying to process some data
>>  == Query Plan ==
>> Project ['type]
>>  ExistingRdd [ID#60,name#61,score#62,school#63], MapPartitionsRDD[111] at
>> mapPartitions at basicOperators.scala:174)
>> 14/07/16 14:34:10 INFO TaskSchedulerImpl: Adding task set 1.0 with 1 tasks
>> 14/07/16 14:34:10 INFO TaskSetManager: Starting task 1.0:0 as TID 1 on
>> executor localhost: localhost (PROCESS_LOCAL)
>> 14/07/16 14:34:10 INFO TaskSetManager: Serialized task 1.0:0 as 2710 bytes
>> in 0 ms
>> 14/07/16 14:34:10 INFO Executor: Running task ID 1
>> 14/07/16 14:34:10 ERROR Executor: Exception in task ID 1
>> java.lang.Exception: Could not compute split, block input-0-1405521243800
>> not found
>>         at org.apache.spark.rdd.BlockRDD.compute(BlockRDD.scala:51)
>>         at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:262)
>>         at org.apache.spark.rdd.RDD.iterator(RDD.scala:229)
>>         at org.apache.spark.rdd.MappedRDD.compute(MappedRDD.scala:31)
>>         at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:262)
>>         at org.apache.spark.rdd.RDD.iterator(RDD.scala:229)
>>         at org.apache.spark.rdd.MappedRDD.compute(MappedRDD.scala:31)
>>         at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:262)
>>         at org.apache.spark.rdd.RDD.iterator(RDD.scala:229)
>>         at org.apache.spark.rdd.MappedRDD.compute(MappedRDD.scala:31)
>>         at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:262)
>>         at org.apache.spark.rdd.RDD.iterator(RDD.scala:229)
>>         at org.apache.spark.rdd.MappedRDD.compute(MappedRDD.scala:31)
>>         at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:262)
>>         at org.apache.spark.rdd.RDD.iterator(RDD.scala:229)
>>         at
>> org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35)
>>         at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:262)
>>         at org.apache.spark.rdd.RDD.iterator(RDD.scala:229)
>>         at
>> org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35)
>>         at org.apache.spark.sql.SchemaRDD.compute(SchemaRDD.scala:110)
>>         at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:262)
>>         at org.apache.spark.rdd.RDD.iterator(RDD.scala:229)
>>         at
>> org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:111)
>>         at org.apache.spark.scheduler.Task.run(Task.scala:51)
>>         at
>> org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:187)
>>         at
>>
>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
>>         at
>>
>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
>>         at java.lang.Thread.run(Thread.java:745)
>> 14/07/16 14:34:10 WARN TaskSetManager: Lost TID 1 (task 1.0:0)
>> 14/07/16 14:34:10 WARN TaskSetManager: Loss was due to java.lang.Exception
>> java.lang.Exception: Could not compute split, block input-0-1405521243800
>> not found
>>         at org.apache.spark.rdd.BlockRDD.compute(BlockRDD.scala:51)
>>         at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:262)
>>         at org.apache.spark.rdd.RDD.iterator(RDD.scala:229)
>>         at org.apache.spark.rdd.MappedRDD.compute(MappedRDD.scala:31)
>>         at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:262)
>>         at org.apache.spark.rdd.RDD.iterator(RDD.scala:229)
>>         at org.apache.spark.rdd.MappedRDD.compute(MappedRDD.scala:31)
>>         at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:262)
>>         at org.apache.spark.rdd.RDD.iterator(RDD.scala:229)
>>         at org.apache.spark.rdd.MappedRDD.compute(MappedRDD.scala:31)
>>         at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:262)
>>         at org.apache.spark.rdd.RDD.iterator(RDD.scala:229)
>>         at org.apache.spark.rdd.MappedRDD.compute(MappedRDD.scala:31)
>>         at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:262)
>>         at org.apache.spark.rdd.RDD.iterator(RDD.scala:229)
>>         at
>> org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35)
>>         at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:262)
>>         at org.apache.spark.rdd.RDD.iterator(RDD.scala:229)
>>         at
>> org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35)
>>         at org.apache.spark.sql.SchemaRDD.compute(SchemaRDD.scala:110)
>>         at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:262)
>>         at org.apache.spark.rdd.RDD.iterator(RDD.scala:229)
>>         at
>> org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:111)
>>         at org.apache.spark.scheduler.Task.run(Task.scala:51)
>>         at
>> org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:187)
>>         at
>>
>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
>>         at
>>
>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
>>         at java.lang.Thread.run(Thread.java:745)
>> 14/07/16 14:34:10 ERROR TaskSetManager: Task 1.0:0 failed 1 times;
>> aborting
>> job
>> 14/07/16 14:34:10 INFO DAGScheduler: Failed to run foreach at
>> jsonfile.scala:42
>> 14/07/16 14:34:10 ERROR JobScheduler: Error running job streaming job
>> 1405521250000 ms.1
>> org.apache.spark.SparkException: Job aborted due to stage failure: Task
>> 1.0:0 failed 1 times, most recent failure: Exception failure in TID 1 on
>> host localhost: java.lang.Exception: Could not compute split, block
>> input-0-1405521243800 not found
>>         org.apache.spark.rdd.BlockRDD.compute(BlockRDD.scala:51)
>>         org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:262)
>>         org.apache.spark.rdd.RDD.iterator(RDD.scala:229)
>>         org.apache.spark.rdd.MappedRDD.compute(MappedRDD.scala:31)
>>         org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:262)
>>         org.apache.spark.rdd.RDD.iterator(RDD.scala:229)
>>         org.apache.spark.rdd.MappedRDD.compute(MappedRDD.scala:31)
>>         org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:262)
>>         org.apache.spark.rdd.RDD.iterator(RDD.scala:229)
>>         org.apache.spark.rdd.MappedRDD.compute(MappedRDD.scala:31)
>>         org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:262)
>>         org.apache.spark.rdd.RDD.iterator(RDD.scala:229)
>>         org.apache.spark.rdd.MappedRDD.compute(MappedRDD.scala:31)
>>         org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:262)
>>         org.apache.spark.rdd.RDD.iterator(RDD.scala:229)
>>
>> org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35)
>>         org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:262)
>>         org.apache.spark.rdd.RDD.iterator(RDD.scala:229)
>>
>> org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35)
>>         org.apache.spark.sql.SchemaRDD.compute(SchemaRDD.scala:110)
>>         org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:262)
>>         org.apache.spark.rdd.RDD.iterator(RDD.scala:229)
>>
>> org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:111)
>>         org.apache.spark.scheduler.Task.run(Task.scala:51)
>>
>> org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:187)
>>
>>
>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
>>
>>
>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
>>         java.lang.Thread.run(Thread.java:745)
>> Driver stacktrace:
>>         at
>> org.apache.spark.scheduler.DAGScheduler.org
>> $apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1033)
>>         at
>>
>> org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1017)
>>         at
>>
>> org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1015)
>>         at
>>
>> scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
>>         at
>> scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
>>         at
>>
>> org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1015)
>>         at
>>
>> org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:633)
>>         at
>>
>> org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:633)
>>         at scala.Option.foreach(Option.scala:236)
>>         at
>>
>> org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:633)
>>         at
>>
>> org.apache.spark.scheduler.DAGSchedulerEventProcessActor$$anonfun$receive$2.applyOrElse(DAGScheduler.scala:1207)
>>         at akka.actor.ActorCell.receiveMessage(ActorCell.scala:498)
>>         at akka.actor.ActorCell.invoke(ActorCell.scala:456)
>>         at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:237)
>>         at akka.dispatch.Mailbox.run(Mailbox.scala:219)
>>         at
>>
>> akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:386)
>>         at
>> scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
>>         at
>>
>> scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
>>         at
>> scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
>>         at
>>
>> scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
>>
>>
>> I am trying to enter data to kafka like
>> {"type":"math","name":"srinivas","score":"10","school":"lfs"}
>>
>> I am thinking of some thing wrong with input RDD. Please let me know whats
>> causing this error.
>>
>> Thanks,
>> -Srinivas.
>>
>>
>>
>> --
>> View this message in context:
>> http://apache-spark-user-list.1001560.n3.nabble.com/Spark-Streaming-Json-file-groupby-function-tp9618p9933.html
>> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>>
>
>

Reply via email to