Hi guys, I am using SparkStreaming to receive message from kafka,process it and then send back to kafka. however ,kafka consumer can not receive any messages. Any one share ideas? here is my code: object SparkStreamingSampleDirectApproach { def main(args: Array[String]): Unit = { Logger.getLogger("org.apache.spark").setLevel(Level.WARN) Logger.getLogger("org.eclipse.jetty.server").setLevel(Level.OFF) val Array(brokers, topics) = Array("localhost:9092", "topic1") val conf = new SparkConf().setMaster("local[2]").setAppName("SparkStreamingSampleDirectApproach").set("log4j.rootCategory", "WARN, console") val ssc = new StreamingContext(conf, Seconds(1)) val topicsSet = topics.split(",").toSet val kafkaParams = Map[String, String]("metadata.broker.list" -> brokers) val messages = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](ssc, kafkaParams, topicsSet) // messages.saveAsTextFiles("hdfs://localhost:8020/spark/data", "test") val lines = messages.map(_._2) val words = lines.flatMap(_.split(" ")) val wordCounts = words.map(x => (x, 1L)).reduceByKey(_ + _) wordCounts.print() val Array(brokers2, topic2) = Array("localhost:9092", "topic2") val props = new Properties() props.put("metadata.broker.list", brokers) props.put("serializer.class", "kafka.serializer.StringEncoder") val config = new ProducerConfig(props) val producer = new Producer[String, String](config) // val messages2 = messages.map{line => // new KeyedMessage[String, String](topic2,wordCounts.toString()) // }.toArray val messages2 = new KeyedMessage[String, String](topic2,messages.toString()) println(messages2) producer.send(messages2) ssc.start() ssc.awaitTermination() } }
-------------------------------- Thanks&Best regards! 罗辉 San.Luo