Hi Tathagata, I could see the output of count, but no sql results. Run in standalone is meaningless for me and I just run in my local single node yarn cluster. Thanks
On Tue, Jul 15, 2014 at 12:48 PM, Tathagata Das <tathagata.das1...@gmail.com > wrote: > Could you run it locally first to make sure it works, and you see output? > Also, I recommend going through the previous step-by-step approach to > narrow down where the problem is. > > TD > > > On Mon, Jul 14, 2014 at 9:15 PM, hsy...@gmail.com <hsy...@gmail.com> > wrote: > >> Actually, I deployed this on yarn cluster(spark-submit) and I couldn't >> find any output from the yarn stdout logs >> >> >> On Mon, Jul 14, 2014 at 6:25 PM, Tathagata Das < >> tathagata.das1...@gmail.com> wrote: >> >>> Can you make sure you are running locally on more than 1 local cores? >>> You could set the master in the SparkConf as conf.setMaster("local[4]"). >>> Then see if there are jobs running on every batch of data in the Spark web >>> ui (running on localhost:4040). If you still dont get any output, try first >>> simple printing recRDD.count() in the foreachRDD (that is, first test spark >>> streaming). If you can get that to work, then I would test the Spark SQL >>> stuff. >>> >>> TD >>> >>> >>> On Mon, Jul 14, 2014 at 5:25 PM, hsy...@gmail.com <hsy...@gmail.com> >>> wrote: >>> >>>> No errors but no output either... Thanks! >>>> >>>> >>>> On Mon, Jul 14, 2014 at 4:59 PM, Tathagata Das < >>>> tathagata.das1...@gmail.com> wrote: >>>> >>>>> Could you elaborate on what is the problem you are facing? Compiler >>>>> error? Runtime error? Class-not-found error? Not receiving any data from >>>>> Kafka? Receiving data but SQL command throwing error? No errors but no >>>>> output either? >>>>> >>>>> TD >>>>> >>>>> >>>>> On Mon, Jul 14, 2014 at 4:06 PM, hsy...@gmail.com <hsy...@gmail.com> >>>>> wrote: >>>>> >>>>>> Hi All, >>>>>> >>>>>> Couple days ago, I tried to integrate SQL and streaming together. My >>>>>> understanding is I can transform RDD from Dstream to schemaRDD and >>>>>> execute >>>>>> SQL on each RDD. But I got no luck >>>>>> Would you guys help me take a look at my code? Thank you very much! >>>>>> >>>>>> object KafkaSpark { >>>>>> >>>>>> def main(args: Array[String]): Unit = { >>>>>> if (args.length < 4) { >>>>>> System.err.println("Usage: KafkaSpark <zkQuorum> <group> >>>>>> <topics> <numThreads>") >>>>>> System.exit(1) >>>>>> } >>>>>> >>>>>> >>>>>> val Array(zkQuorum, group, topics, numThreads) = args >>>>>> val sparkConf = new SparkConf().setAppName("KafkaSpark") >>>>>> val ssc = new StreamingContext(sparkConf, Seconds(10)) >>>>>> val sc = new SparkContext(sparkConf) >>>>>> val sqlContext = new SQLContext(sc); >>>>>> // ssc.checkpoint("checkpoint") >>>>>> >>>>>> // Importing the SQL context gives access to all the SQL >>>>>> functions and implicit conversions. >>>>>> import sqlContext._ >>>>>> >>>>>> >>>>>> val tt = Time(10000) >>>>>> val topicpMap = topics.split(",").map((_,numThreads.toInt)).toMap >>>>>> val recordsStream = KafkaUtils.createStream(ssc, zkQuorum, group, >>>>>> topicpMap).map(t => getRecord(t._2.split("#"))) >>>>>> >>>>>> val result = recordsStream.foreachRDD((recRDD, tt)=>{ >>>>>> recRDD.registerAsTable("records") >>>>>> val result = sql("select * from records") >>>>>> println(result) >>>>>> result.foreach(println) >>>>>> }) >>>>>> >>>>>> ssc.start() >>>>>> ssc.awaitTermination() >>>>>> >>>>>> } >>>>>> >>>>>> def getRecord(l:Array[String]):Record = { >>>>>> println("Getting the record") >>>>>> Record(l(0), l(1))} >>>>>> } >>>>>> >>>>>> >>>>> >>>> >>> >> >