What code is triggering the stack overflow?

On Mon, Feb 29, 2016 at 11:13 PM, Vinti Maheshwari <vinti.u...@gmail.com>
wrote:

> Hi All,
>
> I am getting below error in spark-streaming application, i am using kafka
> for input stream. When i was doing with socket, it was working fine. But
> when i changed to kafka it's giving error. Anyone has idea why it's
> throwing error, do i need to change my batch time and check pointing time?
>
>
>
> *ERROR StreamingContext: Error starting the context, marking it as
> stoppedjava.lang.StackOverflowError*
>
> My program:
>
> def main(args: Array[String]): Unit = {
>
>     // Function to create and setup a new StreamingContext
>     def functionToCreateContext(): StreamingContext = {
>       val conf = new SparkConf().setAppName("HBaseStream")
>       val sc = new SparkContext(conf)
>       // create a StreamingContext, the main entry point for all streaming 
> functionality
>       val ssc = new StreamingContext(sc, Seconds(5))
>       val brokers = args(0)
>       val topics= args(1)
>       val topicsSet = topics.split(",").toSet
>       val kafkaParams = Map[String, String]("metadata.broker.list" -> brokers)
>       val messages = KafkaUtils.createDirectStream[String, String, 
> StringDecoder, StringDecoder](
>         ssc, kafkaParams, topicsSet)
>
>       val inputStream = messages.map(_._2)
> //    val inputStream = ssc.socketTextStream(args(0), args(1).toInt)
>       ssc.checkpoint(checkpointDirectory)
>       inputStream.print(1)
>       val parsedStream = inputStream
>         .map(line => {
>           val splitLines = line.split(",")
>           (splitLines(1), splitLines.slice(2, 
> splitLines.length).map((_.trim.toLong)))
>         })
>       import breeze.linalg.{DenseVector => BDV}
>       import scala.util.Try
>
>       val state: DStream[(String, Array[Long])] = 
> parsedStream.updateStateByKey(
>         (current: Seq[Array[Long]], prev: Option[Array[Long]]) =>  {
>           prev.map(_ +: current).orElse(Some(current))
>             .flatMap(as => Try(as.map(BDV(_)).reduce(_ + _).toArray).toOption)
>         })
>
>       state.checkpoint(Duration(10000))
>       state.foreachRDD(rdd => rdd.foreach(Blaher.blah))
>       ssc
>     }
>     // Get StreamingContext from checkpoint data or create a new one
>     val context = StreamingContext.getOrCreate(checkpointDirectory, 
> functionToCreateContext _)
>   }
> }
>
> Regards,
> ~Vinti
>

Reply via email to