Hi All, I am getting below error in spark-streaming application, i am using kafka for input stream. When i was doing with socket, it was working fine. But when i changed to kafka it's giving error. Anyone has idea why it's throwing error, do i need to change my batch time and check pointing time?
*ERROR StreamingContext: Error starting the context, marking it as stoppedjava.lang.StackOverflowError* My program: def main(args: Array[String]): Unit = { // Function to create and setup a new StreamingContext def functionToCreateContext(): StreamingContext = { val conf = new SparkConf().setAppName("HBaseStream") val sc = new SparkContext(conf) // create a StreamingContext, the main entry point for all streaming functionality val ssc = new StreamingContext(sc, Seconds(5)) val brokers = args(0) val topics= args(1) val topicsSet = topics.split(",").toSet val kafkaParams = Map[String, String]("metadata.broker.list" -> brokers) val messages = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder]( ssc, kafkaParams, topicsSet) val inputStream = messages.map(_._2) // val inputStream = ssc.socketTextStream(args(0), args(1).toInt) ssc.checkpoint(checkpointDirectory) inputStream.print(1) val parsedStream = inputStream .map(line => { val splitLines = line.split(",") (splitLines(1), splitLines.slice(2, splitLines.length).map((_.trim.toLong))) }) import breeze.linalg.{DenseVector => BDV} import scala.util.Try val state: DStream[(String, Array[Long])] = parsedStream.updateStateByKey( (current: Seq[Array[Long]], prev: Option[Array[Long]]) => { prev.map(_ +: current).orElse(Some(current)) .flatMap(as => Try(as.map(BDV(_)).reduce(_ + _).toArray).toOption) }) state.checkpoint(Duration(10000)) state.foreachRDD(rdd => rdd.foreach(Blaher.blah)) ssc } // Get StreamingContext from checkpoint data or create a new one val context = StreamingContext.getOrCreate(checkpointDirectory, functionToCreateContext _) } } Regards, ~Vinti