[
https://issues.apache.org/jira/browse/KAFKA-8100?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Shengnan YU updated KAFKA-8100:
---
Description:
Recently we used flink to consume kafka topics with a regex pattern. It is
found that when we deleted some unused(outdated) topics, the logs will keep
flushing UNKNOWN_TOPIC_EXCEPTION unless restarting the program.
I study the source code of kafka client, it is found that for consumer,
topicExpiry is disable in Metadata, which leads to that the even the topic
deleted, the client still have this topic info cached in the metadata's topic
list and keep fetching infomration of those topics from brokers.
The following code can be used to reproduce this problem (if you create
multiple topics such as "test1", "test2", "test3"..."testn" in kafka cluster
and then delete any of one while running).
{code:java}
public static void main(String [] args) {
Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092\n");
props.put("group.id", "test10");
props.put("enable.auto.commit", "true");
props.put("auto.commit.interval.ms", "1000");
props.put("auto.offset.reset", "earliest");
props.put("key.deserializer",
"org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer",
"org.apache.kafka.common.serialization.StringDeserializer");
props.put("metadata.max.age.ms", "6");
KafkaConsumer consumer = new KafkaConsumer(props);
class PartitionOffsetAssignerListener implements
ConsumerRebalanceListener {
private KafkaConsumer consumer;
public PartitionOffsetAssignerListener(KafkaConsumer kafkaConsumer)
{
this.consumer = kafkaConsumer;
}
public void onPartitionsRevoked(Collection
partitions) {
}
public void onPartitionsAssigned(Collection
partitions) {
//reading all partitions from the beginning
consumer.seekToBeginning(partitions);
}
}
consumer.subscribe(Pattern.compile("^test.*$"), new
PartitionOffsetAssignerListener(consumer));
while (true) {
ConsumerRecords records = consumer.poll(100);
for (ConsumerRecord record : records) {
System.out.printf("offset = %d, key = %s, value = %s%n",
record.offset(), record.key(), record.value());
}
}
}
{code}
was:
Recently we used flink to consume kafka topics with a regex pattern. It is
found that when we deleted some unused topics, the logs will keep flushing
UNKNOWN_TOPIC_EXCEPTION.
I study the source code of kafka client, it is found that for consumer,
topicExpiry is disable in Metadata, which leads to that the even the topic
deleted, the client still have this topic info in the metadata's topic list and
keep fetching from servers.
Is there any good method to avoid this annoying warning logs without modify the
kafka's source code? (We still need the 'Real' Unknown topic exception, which
means not the outdated topic, in logs)
The following code can be used to reproduce this problem (if you create
multiple topics such as "test1", "test2", "test3"..."testn" in kafka cluster
and then delete any of one while running).
{code:java}
public static void main(String [] args) {
Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092\n");
props.put("group.id", "test10");
props.put("enable.auto.commit", "true");
props.put("auto.commit.interval.ms", "1000");
props.put("auto.offset.reset", "earliest");
props.put("key.deserializer",
"org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer",
"org.apache.kafka.common.serialization.StringDeserializer");
props.put("metadata.max.age.ms", "6");
KafkaConsumer consumer = new KafkaConsumer(props);
class PartitionOffsetAssignerListener implements
ConsumerRebalanceListener {
private KafkaConsumer consumer;
public PartitionOffsetAssignerListener(KafkaConsumer kafkaConsumer)
{
this.consumer = kafkaConsumer;
}
public void onPartitionsRevoked(Collection
partitions) {
}
public void onPartitionsAssigned(Collection
partitions) {
//reading all partitions from the beginning
consumer.seekToBeginning(partitions);
}
}
consumer.subscribe(Pattern.compile("^test.*$"), new
PartitionOffsetAssignerListener(consumer));
while (true) {
ConsumerRecords records = consumer.poll(100);
for (ConsumerRecord record : records) {
System.out.printf("offset = %d, key = %s, value = %s%n",
record.offset(), record.key(), record.value());