[ https://issues.apache.org/jira/browse/SPARK-20671?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
amit kumar closed SPARK-20671. ------------------------------ Resolution: Not A Problem My Bad . I configured it wrong. setMaster("local[*]") in place of setMaster("local[2]") works. > Processing muitple kafka topics with single spark streaming context hangs on > batchSubmitted. > -------------------------------------------------------------------------------------------- > > Key: SPARK-20671 > URL: https://issues.apache.org/jira/browse/SPARK-20671 > Project: Spark > Issue Type: Bug > Components: DStreams > Affects Versions: 2.0.0 > Environment: Ubuntu > Reporter: amit kumar > > object SparkMain extends App { > System.setProperty("spark.cassandra.connection.host", "127.0.0.1") > val conf = new > SparkConf().setMaster("local[2]").setAppName("kafkaspark").set("spark.streaming.concurrentJobs","4") > val sc = new SparkContext(conf) > val ssc = new StreamingContext(sc, Seconds(5)) > val sqlContext= new SQLContext(sc) > val host = "localhost:2181" > val topicList = List("test","fb") > topicList.foreach{ > topic=> val lines =KafkaUtils.createStream(ssc, host, topic, Map(topic -> > 1)).map(_._2); > //configureStream(topic, lines) > lines.foreachRDD(rdd => > rdd.map(test(_)).saveToCassandra("test","rawdata",SomeColumns("key"))) > } > ssc.addStreamingListener(new StreamingListener { > override def onBatchCompleted(batchCompleted: > StreamingListenerBatchCompleted): Unit = { > System.out.println("Batch completed, Total delay :" + > batchCompleted.batchInfo.totalDelay.get.toString + " ms") > } > override def onReceiverStarted(receiverStarted: > StreamingListenerReceiverStarted): Unit = { > println("inside onReceiverStarted") > } > override def onReceiverError(receiverError: > StreamingListenerReceiverError): Unit = { > println("inside onReceiverError") > } > override def onReceiverStopped(receiverStopped: > StreamingListenerReceiverStopped): Unit = { > println("inside onReceiverStopped") > } > override def onBatchSubmitted(batchSubmitted: > StreamingListenerBatchSubmitted): Unit = { > println("inside onBatchSubmitted") > } > override def onBatchStarted(batchStarted: StreamingListenerBatchStarted): > Unit = { > println("inside onBatchStarted") > } > }) > ssc.start() > println("===========================") > ssc.awaitTermination() > } > case class test(key: String) > ======== > If i put any one of the topics at a time then each topic works.But when topic > list has more than one topic, after getting the DataStream from kafka topic, > it keeps printing "inside onBatchSubmitted". Thanks in advance. -- This message was sent by Atlassian JIRA (v6.3.15#6346) --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org