Hi Team,



I'm using KafkaConsumer to consume messages from Kafka server (topics)..
*         It works fine for topics created before starting Consumer code...
*         But the problem is, it will not work if the topics created 
dynamically(i mean to say after consumer code started), but the API says it 
will support dynamic topic creation..



Kafka version used : 0.9.0.1

Here is the link for your reference..

https://kafka.apache.org/090/javadoc/index.html?org/apache/kafka/clients/consumer/KafkaConsumer.html



Here is the JAVA code...

    Properties props = new Properties();
    props.put("bootstrap.servers", "localhost:9092");
    props.put("group.id", "test");
    props.put("enable.auto.commit", "false");
    props.put("auto.commit.interval.ms", "1000");
    props.put("session.timeout.ms", "30000");
    
props.put("key.deserializer","org.apache.kafka.common.serialization.StringDeserializer");
    
props.put("value.deserializer","org.apache.kafka.common.serialization.StringDeserializer");

    KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
    Pattern r = Pattern.compile("siddu(\\d)*");

    consumer.subscribe(r, new HandleRebalance());
    try {
         while(true) {
             ConsumerRecords<String, String> records = 
consumer.poll(Long.MAX_VALUE);
             for (TopicPartition partition : records.partitions()) {
                 List<ConsumerRecord<String, String>> partitionRecords = 
records.records(partition);
                 for (ConsumerRecord<String, String> record : partitionRecords) 
{
                     System.out.println(partition.partition()  + ": "  
+record.offset() + ": " + record.value());
                 }
                 long lastOffset = partitionRecords.get(partitionRecords.size() 
- 1).offset();

                 consumer.commitSync(Collections.singletonMap(partition, new 
OffsetAndMetadata(lastOffset + 1)));
             }
         }
     } finally {
       consumer.close();
     }


NOTE: My topic names are matching the Regular Expression.. And if i restart the 
consumer then it will start reading messages pushed to topic...
Any help is really appreciated...
Thanks,
Siddu
FICO Bangalore.
Ph : +91 - 9845234534


This email and any files transmitted with it are confidential, proprietary and 
intended solely for the individual or entity to whom they are addressed. If you 
have received this email in error please delete it immediately.

Reply via email to