I am testing with one partition now. I am using Kafka 0.9 and Spark 1.6.1 
(Scala 2.11). Just start with one topic first and then add more. I am not 
partitioning the topic.

HTH, 

Regards,

Sivakumaran

> On 10-Aug-2016, at 5:56 AM, Diwakar Dhanuskodi <diwakar.dhanusk...@gmail.com> 
> wrote:
> 
> Hi Siva,
> 
> Does topic  has partitions? which version of Spark you are using?
> 
> On Wed, Aug 10, 2016 at 2:38 AM, Sivakumaran S <siva.kuma...@me.com 
> <mailto:siva.kuma...@me.com>> wrote:
> Hi,
> 
> Here is a working example I did.
> 
> HTH
> 
> Regards,
> 
> Sivakumaran S
> 
> val topics = "test"
> val brokers = "localhost:9092"
> val topicsSet = topics.split(",").toSet
> val sparkConf = new 
> SparkConf().setAppName("KafkaWeatherCalc").setMaster("local") 
> //spark://localhost:7077 <>
> val sc = new SparkContext(sparkConf)
> val ssc = new StreamingContext(sc, Seconds(60))
> val kafkaParams = Map[String, String]("metadata.broker.list" -> brokers)
> val messages = KafkaUtils.createDirectStream[String, String, StringDecoder, 
> StringDecoder](ssc, kafkaParams, topicsSet)
> messages.foreachRDD(rdd => {
>   if (rdd.isEmpty()) {
>     println("Failed to get data from Kafka. Please check that the Kafka 
> producer is streaming data.")
>     System.exit(-1)
>   }
>   val sqlContext = 
> org.apache.spark.sql.SQLContext.getOrCreate(rdd.sparkContext)
>   val weatherDF = sqlContext.read.json(rdd.map(_._2)).toDF()
>   //Process your DF as required here on
> }
> 
> 
> 
>> On 09-Aug-2016, at 9:47 PM, Diwakar Dhanuskodi <diwakar.dhanusk...@gmail.com 
>> <mailto:diwakar.dhanusk...@gmail.com>> wrote:
>> 
>> Hi,
>> 
>> I am reading json messages from kafka . Topics has 2 partitions. When 
>> running streaming job using spark-submit, I could see that  val dataFrame = 
>> sqlContext.read.json(rdd.map(_._2)) executes indefinitely. Am I doing 
>> something wrong here. Below is code .This environment is cloudera sandbox 
>> env. Same issue in hadoop production cluster mode except that it is 
>> restricted thats why tried to reproduce issue in Cloudera sandbox. Kafka 
>> 0.10 and  Spark 1.4.
>> 
>> val kafkaParams = 
>> Map[String,String]("bootstrap.servers"->"localhost:9093,localhost:9092", 
>> "group.id <http://group.id/>" -> "xyz","auto.offset.reset"->"smallest")
>> val conf = new SparkConf().setMaster("local[3]").setAppName("topic")
>> val ssc = new StreamingContext(conf, Seconds(1))
>> 
>> val sqlContext = new org.apache.spark.sql.SQLContext(ssc.sparkContext)
>> 
>> val topics = Set("gpp.minf")
>> val kafkaStream = KafkaUtils.createDirectStream[String, String, 
>> StringDecoder,StringDecoder](ssc, kafkaParams, topics)
>> 
>> kafkaStream.foreachRDD(
>>   rdd => {
>>     if (rdd.count > 0){
>>         val dataFrame = sqlContext.read.json(rdd.map(_._2)) 
>>        dataFrame.printSchema()
>> //dataFrame.foreach(println)
>> }
>> }
> 
> 

Reply via email to