Is anyone producing any (new) messages to the topics you are subscribing to in that consumer?

-Jaikiran
On Friday 26 August 2016 10:14 AM, Jack Yang wrote:
Hi all,
I am using kafka 0.10.0.1, and I set up my listeners like:

listeners=PLAINTEXT://myhostName:9092

then I have one consumer going using the new api. However, I did not see 
anything return for the api.
The log from kafka is:

[2016-08-26 14:39:28,548] INFO [GroupCoordinator 0]: Preparing to restabilize 
group newconsumper with old generation 0 (kafka.coordinator.GroupCoordinator)
[2016-08-26 14:39:28,548] INFO [GroupCoordinator 0]: Stabilized group 
newconsumper generation 1 (kafka.coordinator.GroupCoordinator)
[2016-08-26 14:39:28,555] INFO [GroupCoordinator 0]: Assignment received from 
leader for group newconsumper for generation 1 
(kafka.coordinator.GroupCoordinator)
[2016-08-26 14:39:29,401] INFO [GroupCoordinator 0]: Preparing to restabilize 
group newconsumper with old generation 1 (kafka.coordinator.GroupCoordinator)
[2016-08-26 14:39:29,401] INFO [GroupCoordinator 0]: Group newconsumper 
generation 1 is dead and removed (kafka.coordinator.GroupCoordinator)


Here is the code:
p.put("bootstrap.servers", config.getString("kafka.broker"))
p.put("group.id", "newconsumper")
p.put("value.deserializer", 
"org.apache.kafka.common.serialization.StringDeserializer")
p.put("key.deserializer", 
"org.apache.kafka.common.serialization.StringDeserializer")
p.put("enable.auto.commit", "true")
p.put("heartbeat.interval.ms", "10000")
p.put("session.timeout.ms", "30000");

     val kafkaConsumer: KafkaConsumer[String, String] = new 
KafkaConsumer[String, String](p)
     kafkaConsumer.subscribe(util.Arrays.asList("test"))
     try {
       val timeout:Long = 1000
       val records: ConsumerRecords[String, String] = 
kafkaConsumer.poll(timeout)

       records.asScala.foreach(record => {
           println(record.key() + ":" + record.value() + ":" + record.offset())
           })

     } catch {
       case e: Exception => {
         e.printStackTrace
       }
     } finally {
       kafkaConsumer.close
     }

Best regards,
Jack



Reply via email to