Hi All,

I am getting below error in spark-streaming application, i am using kafka
for input stream. When i was doing with socket, it was working fine. But
when i changed to kafka it's giving error. Anyone has idea why it's
throwing error, do i need to change my batch time and check pointing time?

*ERROR StreamingContext: Error starting the context, marking it as

My program:

def main(args: Array[String]): Unit = {

    // Function to create and setup a new StreamingContext
    def functionToCreateContext(): StreamingContext = {
      val conf = new SparkConf().setAppName("HBaseStream")
      val sc = new SparkContext(conf)
      // create a StreamingContext, the main entry point for all
streaming functionality
      val ssc = new StreamingContext(sc, Seconds(5))
      val brokers = args(0)
      val topics= args(1)
      val topicsSet = topics.split(",").toSet
      val kafkaParams = Map[String, String]("metadata.broker.list" -> brokers)
      val messages = KafkaUtils.createDirectStream[String, String,
StringDecoder, StringDecoder](
        ssc, kafkaParams, topicsSet)

      val inputStream = messages.map(_._2)
//    val inputStream = ssc.socketTextStream(args(0), args(1).toInt)
      val parsedStream = inputStream
        .map(line => {
          val splitLines = line.split(",")
          (splitLines(1), splitLines.slice(2,
      import breeze.linalg.{DenseVector => BDV}
      import scala.util.Try

      val state: DStream[(String, Array[Long])] = parsedStream.updateStateByKey(
        (current: Seq[Array[Long]], prev: Option[Array[Long]]) =>  {
          prev.map(_ +: current).orElse(Some(current))
            .flatMap(as => Try(as.map(BDV(_)).reduce(_ + _).toArray).toOption)

      state.foreachRDD(rdd => rdd.foreach(Blaher.blah))
    // Get StreamingContext from checkpoint data or create a new one
    val context = StreamingContext.getOrCreate(checkpointDirectory,
functionToCreateContext _)


