Also you could use Producer singletion to improve the performance, since now you have to create a Producer for each partition in each batch duration, you could create a singleton object and reuse it (Producer is tread safe as I know).
-Jerry 2015-03-30 15:13 GMT+08:00 Saisai Shao <sai.sai.s...@gmail.com>: > Yeah, after review again about your code, the reason why you cannot > receive any data is that your previous code lacks ACTION function of > DStream, so the code actually doesn't execute, after you changing to the > style as I mentioned, `foreachRDD` will trigger and run the jobs as you > wrote. > > Yes, your understanding is correct. > > Thanks > Jerry > > > > 2015-03-30 14:58 GMT+08:00 <luohui20...@sina.com>: > >> >> >> >> >> To Saisai: >> >> it works after I correct some of them with your advices like >> below: >> >> Furthermore, I am not quite clear about which code running on >> driver and which code running on executor, so i wrote my understanding in >> comment. would you help check? Thank you. >> >> >> >> To akhil: >> >> yes, kafka has enough messages.I tested it with kafka producer >> sending scala Random Int ,it also works. thanks. >> >> >> >> object SparkStreamingSampleDirectApproach2 { >> def main(args: Array[String]): Unit = { >> Logger.getLogger("org.apache.spark").setLevel(Level.WARN) >> Logger.getLogger("org.eclipse.jetty.server").setLevel(Level.OFF) >> >> val Array(brokers, topics) = Array("localhost:9092", "topic1") >> val conf = new >> SparkConf().setMaster("local[2]").setAppName("SparkStreamingSampleDirectApproach").set("log4j.rootCategory", >> "WARN, console") >> val ssc = new StreamingContext(conf, Seconds(1)) >> >> val topicsSet = topics.split(",").toSet >> val kafkaParams = Map[String, String]("metadata.broker.list" -> >> brokers) >> val messages = KafkaUtils.createDirectStream[String, String, >> StringDecoder, StringDecoder](ssc, kafkaParams, topicsSet) >> // messages.saveAsTextFiles("hdfs://localhost:8020/spark/data", >> "test") >> // val lines = messages.map(_._2) >> // val words = lines.flatMap(_.split(" ")) >> // val wordCounts = words.map(x => (x, 1L)).reduceByKey(_ + _) >> // wordCounts.print() >> >> val Array(brokers2, topic2) = Array("localhost:9092", "topic2") >> val props = new Properties() >> props.put("metadata.broker.list", brokers2) >> props.put("serializer.class", "kafka.serializer.StringEncoder") >> >> val messages2 = messages.foreachRDD { rdd => >> //above code running on driver >> rdd.foreachPartition { record >> => //code from here >> running on executor >> val config = new ProducerConfig(props) >> val producer = new Producer[String, String](config) >> record.foreach { piece => >> val msg = new KeyedMessage[String, String](topic2, piece._2) >> producer.send(msg) >> } >> >> } >> } >> >> // val messages2 = new KeyedMessage[String, >> String](topic2,messages.toString()) >> // println(messages2) >> >> >> ssc.start() >> //again, running on driver >> ssc.awaitTermination() >> } >> } >> >> >> >> >> >> >> -------------------------------- >> >> Thanks&Best regards! >> 罗辉 San.Luo >> >> ----- 原始邮件 ----- >> 发件人:Saisai Shao <sai.sai.s...@gmail.com> >> 收件人:luohui20...@sina.com >> 抄送人:user <user@spark.apache.org> >> 主题:Re: How SparkStreaming output messages to Kafka? >> 日期:2015年03月30日 14点03分 >> >> Hi Hui, >> >> Did you try the direct Kafka stream example under Spark Streaming's >> examples? Does it still fail to receive the message? Also would you please >> check all the setups including Kafka, test with Kafka console consumer to >> see if Kafka is OK. >> >> Besides seeing from your code, there's some problems in your code, here: >> >> val messages2 = new KeyedMessage[String, String](topic2,messages. >> toString()) >> println(messages2) >> >> producer.send(messages2) >> >> This code snippets are not lazily evaluated, this will be executed ONLY >> ONCE when running to here, so actually you may not write the data into the >> Kafka, you need to write like this: >> >> messages.foreachRDD { r => >> r.foreachPartition{ iter => >> // create Producer >> // change this partition of data (iter) into keyedMessage and >> write into Kafka. >> } >> } >> >> This is the basic style, sorry for any missing parts and typos, also pay >> a attention to serialization issue when you need to create executors on >> remote side. Please take a try again. >> >> Thanks >> Jerry >> >> >> >> 2015-03-30 13:34 GMT+08:00 <luohui20...@sina.com>: >> >> Hi guys, >> >> I am using SparkStreaming to receive message from kafka,process >> it and then send back to kafka. however ,kafka consumer can not receive any >> messages. Any one share ideas? >> >> >> >> here is my code: >> >> >> >> object SparkStreamingSampleDirectApproach { >> def main(args: Array[String]): Unit = { >> Logger.getLogger("org.apache.spark").setLevel(Level.WARN) >> Logger.getLogger("org.eclipse.jetty.server").setLevel(Level.OFF) >> >> >> val Array(brokers, topics) = Array("localhost:9092", "topic1") >> val conf = new >> SparkConf().setMaster("local[2]").setAppName("SparkStreamingSampleDirectApproach").set("log4j.rootCategory", >> "WARN, console") >> val ssc = new StreamingContext(conf, Seconds(1)) >> >> val topicsSet = topics.split(",").toSet >> val kafkaParams = Map[String, String]("metadata.broker.list" -> >> brokers) >> val messages = KafkaUtils.createDirectStream[String, String, >> StringDecoder, StringDecoder](ssc, kafkaParams, topicsSet) >> // messages.saveAsTextFiles("hdfs://localhost:8020/spark/data", "test") >> val lines = messages.map(_._2) >> val words = lines.flatMap(_.split(" ")) >> val wordCounts = words.map(x => (x, 1L)).reduceByKey(_ + _) >> wordCounts.print() >> >> val Array(brokers2, topic2) = Array("localhost:9092", "topic2") >> val props = new Properties() >> props.put("metadata.broker.list", brokers) >> props.put("serializer.class", "kafka.serializer.StringEncoder") >> >> val config = new ProducerConfig(props) >> val producer = new Producer[String, String](config) >> >> // val messages2 = messages.map{line => >> // new KeyedMessage[String, String](topic2,wordCounts.toString()) >> // }.toArray >> >> val messages2 = new KeyedMessage[String, >> String](topic2,messages.toString()) >> println(messages2) >> >> producer.send(messages2) >> >> ssc.start() >> ssc.awaitTermination() >> } >> } >> >> -------------------------------- >> >> Thanks&Best regards! >> 罗辉 San.Luo >> >> >> >