Hi Srinivas,

Seems the query you used is val results =sqlContext.sql("select type from
table1"). However, table1 does not have a field called type. The schema of
table1 is defined as the class definition of your case class Record (i.e. ID,
name, score, and school are fields of your table1). Can you change your
query and see if your program works?

Thanks,

Yin


On Wed, Jul 16, 2014 at 8:25 AM, srinivas <kusamsrini...@gmail.com> wrote:

> Hi TD,
>   I Defines the Case Class outside the main method and was able to compile
> the code successfully. But getting a run time error when trying to process
> some json file from kafka. here is the code i an to compile
>
> import java.util.Properties
> import kafka.producer._
> import org.apache.spark.streaming._
> import org.apache.spark.streaming.kafka._
> import org.apache.spark.streaming.StreamingContext._
> import org.apache.spark.SparkConf
> import scala.util.parsing.json.JSON
> import org.apache.spark.sql.SQLContext
> import org.apache.spark.SparkContext
> import org.apache.spark.SparkContext._
> case class Record(ID:String,name:String,score:String,school:String)
> object KafkaWordCount {
>   def main(args: Array[String]) {
>     if (args.length < 4) {
>       System.err.println("Usage: KafkaWordCount <zkQuorum> <group> <topics>
> <numThreads>")
>       System.exit(1)
>     }
>
>    //StreamingExamples.setStreamingLogLevels()
>
>     val Array(zkQuorum, group, topics, numThreads) = args
>     val sparkConf = new SparkConf().setAppName("KafkaWordCount")
>     val ssc = new StreamingContext(sparkConf, Seconds(10))
>     val sql = new SparkContext(sparkConf)
>     val sqlContext = new SQLContext(sql)
>     val timer = Time(10000)
>    // ssc.checkpoint("checkpoint")
>
> import sqlContext._
>     val topicpMap = topics.split(",").map((_,numThreads.toInt)).toMap
>      val lines = KafkaUtils.createStream(ssc, zkQuorum, group,
> topicpMap).map(_._2)
>      val jsonf =
>
> lines.map(JSON.parseFull(_)).map(_.get.asInstanceOf[scala.collection.immutable.Map[String,Any]])
> val fields =
>
> jsonf.map(data=>Record(data("ID").toString,data("name").toString,data("score").toString,data("school").toString))
> fields.print()
> val results = fields.foreachRDD((recrdd,tt) => {
> recrdd.registerAsTable("table1")
> val results =sqlContext.sql("select type from table1")
> println(results)
> results.foreach(println)
> results.map(t => "Type:" +t(0)).collect().foreach(println)
> })
> //results.print()
>     ssc.start()
>     ssc.awaitTermination()
>   }
> }
>
> and here is the error i am getting when trying to process some data
>  == Query Plan ==
> Project ['type]
>  ExistingRdd [ID#60,name#61,score#62,school#63], MapPartitionsRDD[111] at
> mapPartitions at basicOperators.scala:174)
> 14/07/16 14:34:10 INFO TaskSchedulerImpl: Adding task set 1.0 with 1 tasks
> 14/07/16 14:34:10 INFO TaskSetManager: Starting task 1.0:0 as TID 1 on
> executor localhost: localhost (PROCESS_LOCAL)
> 14/07/16 14:34:10 INFO TaskSetManager: Serialized task 1.0:0 as 2710 bytes
> in 0 ms
> 14/07/16 14:34:10 INFO Executor: Running task ID 1
> 14/07/16 14:34:10 ERROR Executor: Exception in task ID 1
> java.lang.Exception: Could not compute split, block input-0-1405521243800
> not found
>         at org.apache.spark.rdd.BlockRDD.compute(BlockRDD.scala:51)
>         at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:262)
>         at org.apache.spark.rdd.RDD.iterator(RDD.scala:229)
>         at org.apache.spark.rdd.MappedRDD.compute(MappedRDD.scala:31)
>         at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:262)
>         at org.apache.spark.rdd.RDD.iterator(RDD.scala:229)
>         at org.apache.spark.rdd.MappedRDD.compute(MappedRDD.scala:31)
>         at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:262)
>         at org.apache.spark.rdd.RDD.iterator(RDD.scala:229)
>         at org.apache.spark.rdd.MappedRDD.compute(MappedRDD.scala:31)
>         at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:262)
>         at org.apache.spark.rdd.RDD.iterator(RDD.scala:229)
>         at org.apache.spark.rdd.MappedRDD.compute(MappedRDD.scala:31)
>         at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:262)
>         at org.apache.spark.rdd.RDD.iterator(RDD.scala:229)
>         at
> org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35)
>         at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:262)
>         at org.apache.spark.rdd.RDD.iterator(RDD.scala:229)
>         at
> org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35)
>         at org.apache.spark.sql.SchemaRDD.compute(SchemaRDD.scala:110)
>         at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:262)
>         at org.apache.spark.rdd.RDD.iterator(RDD.scala:229)
>         at
> org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:111)
>         at org.apache.spark.scheduler.Task.run(Task.scala:51)
>         at
> org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:187)
>         at
>
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
>         at
>
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
>         at java.lang.Thread.run(Thread.java:745)
> 14/07/16 14:34:10 WARN TaskSetManager: Lost TID 1 (task 1.0:0)
> 14/07/16 14:34:10 WARN TaskSetManager: Loss was due to java.lang.Exception
> java.lang.Exception: Could not compute split, block input-0-1405521243800
> not found
>         at org.apache.spark.rdd.BlockRDD.compute(BlockRDD.scala:51)
>         at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:262)
>         at org.apache.spark.rdd.RDD.iterator(RDD.scala:229)
>         at org.apache.spark.rdd.MappedRDD.compute(MappedRDD.scala:31)
>         at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:262)
>         at org.apache.spark.rdd.RDD.iterator(RDD.scala:229)
>         at org.apache.spark.rdd.MappedRDD.compute(MappedRDD.scala:31)
>         at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:262)
>         at org.apache.spark.rdd.RDD.iterator(RDD.scala:229)
>         at org.apache.spark.rdd.MappedRDD.compute(MappedRDD.scala:31)
>         at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:262)
>         at org.apache.spark.rdd.RDD.iterator(RDD.scala:229)
>         at org.apache.spark.rdd.MappedRDD.compute(MappedRDD.scala:31)
>         at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:262)
>         at org.apache.spark.rdd.RDD.iterator(RDD.scala:229)
>         at
> org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35)
>         at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:262)
>         at org.apache.spark.rdd.RDD.iterator(RDD.scala:229)
>         at
> org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35)
>         at org.apache.spark.sql.SchemaRDD.compute(SchemaRDD.scala:110)
>         at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:262)
>         at org.apache.spark.rdd.RDD.iterator(RDD.scala:229)
>         at
> org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:111)
>         at org.apache.spark.scheduler.Task.run(Task.scala:51)
>         at
> org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:187)
>         at
>
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
>         at
>
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
>         at java.lang.Thread.run(Thread.java:745)
> 14/07/16 14:34:10 ERROR TaskSetManager: Task 1.0:0 failed 1 times; aborting
> job
> 14/07/16 14:34:10 INFO DAGScheduler: Failed to run foreach at
> jsonfile.scala:42
> 14/07/16 14:34:10 ERROR JobScheduler: Error running job streaming job
> 1405521250000 ms.1
> org.apache.spark.SparkException: Job aborted due to stage failure: Task
> 1.0:0 failed 1 times, most recent failure: Exception failure in TID 1 on
> host localhost: java.lang.Exception: Could not compute split, block
> input-0-1405521243800 not found
>         org.apache.spark.rdd.BlockRDD.compute(BlockRDD.scala:51)
>         org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:262)
>         org.apache.spark.rdd.RDD.iterator(RDD.scala:229)
>         org.apache.spark.rdd.MappedRDD.compute(MappedRDD.scala:31)
>         org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:262)
>         org.apache.spark.rdd.RDD.iterator(RDD.scala:229)
>         org.apache.spark.rdd.MappedRDD.compute(MappedRDD.scala:31)
>         org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:262)
>         org.apache.spark.rdd.RDD.iterator(RDD.scala:229)
>         org.apache.spark.rdd.MappedRDD.compute(MappedRDD.scala:31)
>         org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:262)
>         org.apache.spark.rdd.RDD.iterator(RDD.scala:229)
>         org.apache.spark.rdd.MappedRDD.compute(MappedRDD.scala:31)
>         org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:262)
>         org.apache.spark.rdd.RDD.iterator(RDD.scala:229)
>
> org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35)
>         org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:262)
>         org.apache.spark.rdd.RDD.iterator(RDD.scala:229)
>
> org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35)
>         org.apache.spark.sql.SchemaRDD.compute(SchemaRDD.scala:110)
>         org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:262)
>         org.apache.spark.rdd.RDD.iterator(RDD.scala:229)
>         org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:111)
>         org.apache.spark.scheduler.Task.run(Task.scala:51)
>
> org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:187)
>
>
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
>
>
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
>         java.lang.Thread.run(Thread.java:745)
> Driver stacktrace:
>         at
> org.apache.spark.scheduler.DAGScheduler.org
> $apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1033)
>         at
>
> org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1017)
>         at
>
> org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1015)
>         at
>
> scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
>         at
> scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
>         at
> org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1015)
>         at
>
> org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:633)
>         at
>
> org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:633)
>         at scala.Option.foreach(Option.scala:236)
>         at
>
> org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:633)
>         at
>
> org.apache.spark.scheduler.DAGSchedulerEventProcessActor$$anonfun$receive$2.applyOrElse(DAGScheduler.scala:1207)
>         at akka.actor.ActorCell.receiveMessage(ActorCell.scala:498)
>         at akka.actor.ActorCell.invoke(ActorCell.scala:456)
>         at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:237)
>         at akka.dispatch.Mailbox.run(Mailbox.scala:219)
>         at
>
> akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:386)
>         at
> scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
>         at
>
> scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
>         at
> scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
>         at
>
> scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
>
>
> I am trying to enter data to kafka like
> {"type":"math","name":"srinivas","score":"10","school":"lfs"}
>
> I am thinking of some thing wrong with input RDD. Please let me know whats
> causing this error.
>
> Thanks,
> -Srinivas.
>
>
>
> --
> View this message in context:
> http://apache-spark-user-list.1001560.n3.nabble.com/Spark-Streaming-Json-file-groupby-function-tp9618p9933.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>

Reply via email to