if I reduce the app to the following code then I don't see the exception. It
creates the hadoop files but they are empty! The DStream doesn't get written
out to the files!
def main(args: Array[String]) {
try {
val properties = getProperties("settings.properties")
StreamingExamples.setStreamingLogLevels()
val zkQuorum = properties.get("zookeeper.list").toString()
val topic = properties.get("topic.name").toString()
val group = properties.get("group.name").toString()
val threads = properties.get("consumer.threads").toString()
val topicpMap = Map(topic -> threads.toInt)
val hdfsNameNodeUrl = properties.get("hdfs.namenode.url").toString()
val hdfsCheckPointUrl = hdfsNameNodeUrl +
properties.get("hdfs.checkpoint.path").toString()
val hdfsDataUrl = hdfsNameNodeUrl +
properties.get("hdfs.data.path").toString()
val checkPointInterval =
properties.get("spark.streaming.checkpoint.interval").toString().toInt
val sparkConf = new SparkConf().setAppName("KafkaMessageReceiver")
println("=======================================================")
println("kafka configuration: zk: "+ zkQuorum +" ; topic: "+ topic +"
; group: "+ group + " ; threads:" + threads)
println("=======================================================")
val ssc = new StreamingContext(sparkConf, Seconds(1))
val dStream = KafkaUtils.createStream(ssc, zkQuorum, group, topicpMap)
dStream.saveAsNewAPIHadoopFiles(hdfsDataUrl, "csv", classOf[String],
classOf[String], classOf[TextOutputFormat[String,String]],
ssc.sparkContext.hadoopConfiguration)
ssc.start()
ssc.awaitTermination()
} catch {
case e: Exception => println("exception caught: " + e);
}
}
--
View this message in context:
http://apache-spark-user-list.1001560.n3.nabble.com/spark-streaming-saving-kafka-DStream-into-hadoop-throws-exception-tp12202p12213.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]