Recently we used Apache Flink to consume kafka topics with a regex pattern. It 
is found that when we deleted some unused topics, the logs will keep flushing 
UNKNOWN_TOPIC_EXCEPTION.

I looked up the source code of kafka client and found that for consumer, 
topicExpiry is disable in Metadata, which leads to that the client still manage 
deleted topic's information in the metadata's topic list and keep fetching them 
from servers.

Is there any good method to avoid this annoying warning logs without modify the 
kafka's source code? (We still need the 'Real' Unknown topic exception, which 
means not the outdated topic, in logs)

kafka client version: 1.0

The following code can be used to reproduce this problem (if you create 
multiple topics such as "test1", "test2", "test3"..."testn" in kafka cluster 
and then delete any of one while running).

public static void main(String [] args) {
        Properties props = new Properties();
        props.put("bootstrap.servers", "localhost:9092\n");
        props.put("group.id", "test10");
        props.put("enable.auto.commit", "true");
        props.put("auto.commit.interval.ms", "1000");
        props.put("auto.offset.reset", "earliest");
        props.put("key.deserializer", 
"org.apache.kafka.common.serialization.StringDeserializer");
        props.put("value.deserializer", 
"org.apache.kafka.common.serialization.StringDeserializer");
        props.put("metadata.max.age.ms", "60000");
        KafkaConsumer<String, String> consumer = new KafkaConsumer<String, 
String>(props);

        class PartitionOffsetAssignerListener implements 
ConsumerRebalanceListener {

            private KafkaConsumer<String, String> consumer;

            public PartitionOffsetAssignerListener(KafkaConsumer kafkaConsumer) 
{
                this.consumer = kafkaConsumer;
            }

            public void onPartitionsRevoked(Collection<TopicPartition> 
partitions) {

            }

            public void onPartitionsAssigned(Collection<TopicPartition> 
partitions) {
                //reading all partitions from the beginning
                consumer.seekToBeginning(partitions);
            }
        }

        consumer.subscribe(Pattern.compile("^test.*$"), new 
PartitionOffsetAssignerListener(consumer));

        while (true) {
            ConsumerRecords<String, String> records = consumer.poll(100);
            for (ConsumerRecord<String, String> record : records) {
                System.out.printf("offset = %d, key = %s, value = %s%n", 
record.offse

Reply via email to