Hi guys,
          I am using SparkStreaming to receive message from kafka,process it 
and then send back to kafka. however ,kafka consumer can not receive any 
messages. Any one share ideas?
 
here is my code:
 
object SparkStreamingSampleDirectApproach {
  def main(args: Array[String]): Unit = {
    Logger.getLogger("org.apache.spark").setLevel(Level.WARN)  
    Logger.getLogger("org.eclipse.jetty.server").setLevel(Level.OFF)  
    
    val Array(brokers, topics) = Array("localhost:9092", "topic1")
    val conf = new 
SparkConf().setMaster("local[2]").setAppName("SparkStreamingSampleDirectApproach").set("log4j.rootCategory",
 "WARN, console")
    val ssc = new StreamingContext(conf, Seconds(1))
    val topicsSet = topics.split(",").toSet
    val kafkaParams = Map[String, String]("metadata.broker.list" -> brokers)
    val messages = KafkaUtils.createDirectStream[String, String, StringDecoder, 
StringDecoder](ssc, kafkaParams, topicsSet) 
//    messages.saveAsTextFiles("hdfs://localhost:8020/spark/data", "test")
    val lines = messages.map(_._2)
    val words = lines.flatMap(_.split(" "))
    val wordCounts = words.map(x => (x, 1L)).reduceByKey(_ + _)
    wordCounts.print()
    
    val Array(brokers2, topic2) = Array("localhost:9092", "topic2")
    val props = new Properties()
    props.put("metadata.broker.list", brokers)
    props.put("serializer.class", "kafka.serializer.StringEncoder")
    
    val config = new ProducerConfig(props)
    val producer = new Producer[String, String](config)
//    val messages2 = messages.map{line =>
//      new KeyedMessage[String, String](topic2,wordCounts.toString())
//    }.toArray
    
    val messages2 = new KeyedMessage[String, String](topic2,messages.toString())
    println(messages2)
    
    producer.send(messages2)
    
    ssc.start()
    ssc.awaitTermination() 
  }
}


--------------------------------


 
Thanks&Best regards!
罗辉 San.Luo

Reply via email to