Hi Friends, I am new to Spark and Spark streaming. I am trying to save a DStream to file but can't figure out how to do it with provided methods on DStream (saveAsTextFiles). Following is what I am trying to do
Eg. DStream of type DStream[(String, String)] ("file1.txt", "msg_a"), ("file1.txt", "msg_b"), ("file2.txt", "msg_c"), ("file2.txt", "msg_d") I want the messages msg_a & msg_b to be stored in file1.txt and msg_c & msg_d in file2.txt I got as far as grouping the stream on the first item in tuple but I can't figure out how to get the file name out of the stream and pass that to saveAsTextFile() method on DStream Following is what I came up with so far. I am consuming messages from Kafka. object SparkStreamingTest2 { val sparkHome = "/Users/parth/Projects/spark-0.8.0-incubating" def main(args: Array[String]) { if (args.length < 5) { System.err.println("Usage: KafkaWordCount <master> <zkQuorum> <group> <topics> <numThreads>") System.exit(1) } val Array(master, zkQuorum, zkGroup, topics, numThreads) = args val ssc = new StreamingContext(master, "KafkaWordCount", Seconds(4), sparkHome, List("target/scala-2.9.3/sparktest_2.9.3-0.0.1.jar")) ssc.checkpoint("checkpoint") val topicpMap = topics.split(",").map((_,numThreads.toInt)).toMap // lines are in format "filename.txt#msg_x" val lines: DStream[String] = ssc.kafkaStream(zkQuorum, zkGroup, topicpMap) val fileNameAndMsgTuples = lines.map { line => val Array(fileName, message) = line.split("#") (fileName, message) } val groupedByFileName: DStream[(String, Seq[String])] = fileNameAndMsgTuples.groupByKey() // Save message groups to file ?? } } Thanks for your help in advance! -- Best, Parth