Are you having enough messages in kafka to consume? Can you make sure you kafka setup is working with your console consumer? Also try this example <https://github.com/apache/spark/blob/master/examples/scala-2.10/src/main/scala/org/apache/spark/examples/streaming/DirectKafkaWordCount.scala>
Thanks Best Regards On Mon, Mar 30, 2015 at 11:04 AM, <luohui20...@sina.com> wrote: > Hi guys, > > I am using SparkStreaming to receive message from kafka,process > it and then send back to kafka. however ,kafka consumer can not receive any > messages. Any one share ideas? > > > > here is my code: > > > > object SparkStreamingSampleDirectApproach { > def main(args: Array[String]): Unit = { > Logger.getLogger("org.apache.spark").setLevel(Level.WARN) > Logger.getLogger("org.eclipse.jetty.server").setLevel(Level.OFF) > > > val Array(brokers, topics) = Array("localhost:9092", "topic1") > val conf = new > SparkConf().setMaster("local[2]").setAppName("SparkStreamingSampleDirectApproach").set("log4j.rootCategory", > "WARN, console") > val ssc = new StreamingContext(conf, Seconds(1)) > > val topicsSet = topics.split(",").toSet > val kafkaParams = Map[String, String]("metadata.broker.list" -> > brokers) > val messages = KafkaUtils.createDirectStream[String, String, > StringDecoder, StringDecoder](ssc, kafkaParams, topicsSet) > // messages.saveAsTextFiles("hdfs://localhost:8020/spark/data", "test") > val lines = messages.map(_._2) > val words = lines.flatMap(_.split(" ")) > val wordCounts = words.map(x => (x, 1L)).reduceByKey(_ + _) > wordCounts.print() > > val Array(brokers2, topic2) = Array("localhost:9092", "topic2") > val props = new Properties() > props.put("metadata.broker.list", brokers) > props.put("serializer.class", "kafka.serializer.StringEncoder") > > val config = new ProducerConfig(props) > val producer = new Producer[String, String](config) > > // val messages2 = messages.map{line => > // new KeyedMessage[String, String](topic2,wordCounts.toString()) > // }.toArray > > val messages2 = new KeyedMessage[String, > String](topic2,messages.toString()) > println(messages2) > > producer.send(messages2) > > ssc.start() > ssc.awaitTermination() > } > } > > -------------------------------- > > Thanks&Best regards! > 罗辉 San.Luo >