Look this is the whole program. I am not trying to serialize the JobConf.
 
def main(args: Array[String]) {
    try {
      val properties = getProperties("settings.properties")
      StreamingExamples.setStreamingLogLevels()
      val zkQuorum =  properties.get("zookeeper.list").toString()
      val topic = properties.get("topic.name").toString()
      val group = properties.get("group.name").toString()
      val threads = properties.get("consumer.threads").toString()
      val topicpMap = Map(topic -> threads.toInt)
      val hdfsNameNodeUrl = properties.get("hdfs.namenode.url").toString()
      val hdfsCheckPointUrl = hdfsNameNodeUrl +
properties.get("hdfs.checkpoint.path").toString()
      val hdfsDataUrl = hdfsNameNodeUrl +
properties.get("hdfs.data.path").toString()
      val checkPointInterval =
properties.get("spark.streaming.checkpoint.interval").toString().toInt
      val sparkConf = new SparkConf().setAppName("KafkaMessageReceiver")
      println("=======================================================")
      println("kafka configuration: zk: "+ zkQuorum +" ; topic: "+ topic +"
; group: "+ group + " ; threads:" + threads)
      println("=======================================================")
      val ssc = new StreamingContext(sparkConf, Seconds(1))
      ssc.checkpoint(hdfsCheckPointUrl)
      val dStream = KafkaUtils.createStream(ssc, zkQuorum, group, topicpMap)
      dStream.checkpoint(Seconds(checkPointInterval))
      dStream.saveAsNewAPIHadoopFiles(hdfsDataUrl, "csv", classOf[String],
classOf[String], classOf[TextOutputFormat[String,String]],
ssc.sparkContext.hadoopConfiguration)
      
      val eventData = dStream.map(_._2).map(_.split(",")).map(data =>
DataObject(data(0), data(1), data(2), data(3), data(4), data(5), data(6),
data(7), data(8).toLong, data(9), data(10), data(11), data(12).toLong,
data(13), data(14)))
      val count = eventData.filter(_.state ==
"COMPLETE").countByWindow(Minutes(15), Seconds(1))
      count.map(cnt => "the Total count of calls in complete state  in the
last 15 minutes is: " + cnt).print()
      ssc.start()
      ssc.awaitTermination()

    } catch {
      case e: Exception => println("exception caught: " + e);
    }
  }



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/spark-streaming-saving-kafka-DStream-into-hadoop-throws-exception-tp12202p12207.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

---------------------------------------------------------------------
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org

Reply via email to