What code is triggering the stack overflow? On Mon, Feb 29, 2016 at 11:13 PM, Vinti Maheshwari <vinti.u...@gmail.com> wrote:
> Hi All, > > I am getting below error in spark-streaming application, i am using kafka > for input stream. When i was doing with socket, it was working fine. But > when i changed to kafka it's giving error. Anyone has idea why it's > throwing error, do i need to change my batch time and check pointing time? > > > > *ERROR StreamingContext: Error starting the context, marking it as > stoppedjava.lang.StackOverflowError* > > My program: > > def main(args: Array[String]): Unit = { > > // Function to create and setup a new StreamingContext > def functionToCreateContext(): StreamingContext = { > val conf = new SparkConf().setAppName("HBaseStream") > val sc = new SparkContext(conf) > // create a StreamingContext, the main entry point for all streaming > functionality > val ssc = new StreamingContext(sc, Seconds(5)) > val brokers = args(0) > val topics= args(1) > val topicsSet = topics.split(",").toSet > val kafkaParams = Map[String, String]("metadata.broker.list" -> brokers) > val messages = KafkaUtils.createDirectStream[String, String, > StringDecoder, StringDecoder]( > ssc, kafkaParams, topicsSet) > > val inputStream = messages.map(_._2) > // val inputStream = ssc.socketTextStream(args(0), args(1).toInt) > ssc.checkpoint(checkpointDirectory) > inputStream.print(1) > val parsedStream = inputStream > .map(line => { > val splitLines = line.split(",") > (splitLines(1), splitLines.slice(2, > splitLines.length).map((_.trim.toLong))) > }) > import breeze.linalg.{DenseVector => BDV} > import scala.util.Try > > val state: DStream[(String, Array[Long])] = > parsedStream.updateStateByKey( > (current: Seq[Array[Long]], prev: Option[Array[Long]]) => { > prev.map(_ +: current).orElse(Some(current)) > .flatMap(as => Try(as.map(BDV(_)).reduce(_ + _).toArray).toOption) > }) > > state.checkpoint(Duration(10000)) > state.foreachRDD(rdd => rdd.foreach(Blaher.blah)) > ssc > } > // Get StreamingContext from checkpoint data or create a new one > val context = StreamingContext.getOrCreate(checkpointDirectory, > functionToCreateContext _) > } > } > > Regards, > ~Vinti >