[ 
https://issues.apache.org/jira/browse/SPARK-20671?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

amit kumar closed SPARK-20671.
------------------------------
    Resolution: Not A Problem

My Bad . I configured it wrong. setMaster("local[*]") in place of 
setMaster("local[2]") works.

> Processing muitple kafka topics with single spark streaming context hangs on 
> batchSubmitted.
> --------------------------------------------------------------------------------------------
>
>                 Key: SPARK-20671
>                 URL: https://issues.apache.org/jira/browse/SPARK-20671
>             Project: Spark
>          Issue Type: Bug
>          Components: DStreams
>    Affects Versions: 2.0.0
>         Environment: Ubuntu
>            Reporter: amit kumar
>
> object SparkMain extends App {
>  System.setProperty("spark.cassandra.connection.host", "127.0.0.1")
>  val conf = new 
> SparkConf().setMaster("local[2]").setAppName("kafkaspark").set("spark.streaming.concurrentJobs","4")
>  val sc = new SparkContext(conf)
>  val ssc = new StreamingContext(sc, Seconds(5))
>  val sqlContext= new SQLContext(sc)
>  val host = "localhost:2181"
>  val topicList = List("test","fb")
>  topicList.foreach{
>    topic=> val lines =KafkaUtils.createStream(ssc, host, topic, Map(topic -> 
> 1)).map(_._2);
>      //configureStream(topic, lines)
>      lines.foreachRDD(rdd => 
> rdd.map(test(_)).saveToCassandra("test","rawdata",SomeColumns("key")))
>  }
>   ssc.addStreamingListener(new StreamingListener {
>    override def onBatchCompleted(batchCompleted: 
> StreamingListenerBatchCompleted): Unit = {
>      System.out.println("Batch completed, Total delay :" + 
> batchCompleted.batchInfo.totalDelay.get.toString + " ms")
>    }
>     override def onReceiverStarted(receiverStarted: 
> StreamingListenerReceiverStarted): Unit = {
>      println("inside onReceiverStarted")
>    }
>     override def onReceiverError(receiverError: 
> StreamingListenerReceiverError): Unit = {
>      println("inside onReceiverError")
>    }
>     override def onReceiverStopped(receiverStopped: 
> StreamingListenerReceiverStopped): Unit = {
>      println("inside onReceiverStopped")
>    }
>     override def onBatchSubmitted(batchSubmitted: 
> StreamingListenerBatchSubmitted): Unit = {
>      println("inside onBatchSubmitted")
>    }
>     override def onBatchStarted(batchStarted: StreamingListenerBatchStarted): 
> Unit = {
>      println("inside onBatchStarted")
>    }
>  })
>   ssc.start()
>  println("===========================")
>  ssc.awaitTermination()
> }
> case class test(key: String)
> ========
> If i put any one of the topics at a time then each topic works.But when topic 
> list has more than one topic, after getting the DataStream from kafka topic, 
> it keeps printing "inside onBatchSubmitted". Thanks in advance.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org

Reply via email to