[ 
https://issues.apache.org/jira/browse/KAFKA-8100?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16817951#comment-16817951
 ] 

Shengnan YU edited comment on KAFKA-8100 at 4/15/19 1:07 PM:
-------------------------------------------------------------

Hi could you please explain why not delete this topic info in metadata first 
when hitting UNKNOWN_TOPIC_OR_PARTITION because it can be easily discovered if 
the topic actually exists.


was (Author: ysn2233):
Hi could you please explain why not delete this topic info in metadata first 
when hitting UNKNOWN_TOPIC_OR_PARTITION because it can be easily recoverable if 
the topic actually exists.

> If delete expired topic, kafka consumer will keep flushing unknown_topic 
> warning in log
> ---------------------------------------------------------------------------------------
>
>                 Key: KAFKA-8100
>                 URL: https://issues.apache.org/jira/browse/KAFKA-8100
>             Project: Kafka
>          Issue Type: Bug
>          Components: clients
>    Affects Versions: 1.1.1, 2.1.1
>            Reporter: Shengnan YU
>            Priority: Major
>
> Recently we used flink to consume kafka topics with a regex pattern. It is 
> found that when we deleted some unused topics, the logs will keep flushing 
> UNKNOWN_TOPIC_EXCEPTION.
> I study the source code of kafka client, it is found that for consumer, 
> topicExpiry is disable in Metadata, which leads to that the even the topic 
> deleted, the client still have this topic info in the metadata's topic list 
> and keep fetching from servers.
> Is there any good method to avoid this annoying warning logs without modify 
> the kafka's source code? (We still need the 'Real' Unknown topic exception, 
> which means not the outdated topic, in logs)
> The following code can be used to reproduce this problem (if you create 
> multiple topics such as "test1", "test2", "test3"..."testn" in kafka cluster 
> and then delete any of one while running).
> {code:java}
> public static void main(String [] args) {
>         Properties props = new Properties();
>         props.put("bootstrap.servers", "localhost:9092\n");
>         props.put("group.id", "test10");
>         props.put("enable.auto.commit", "true");
>         props.put("auto.commit.interval.ms", "1000");
>         props.put("auto.offset.reset", "earliest");
>         props.put("key.deserializer", 
> "org.apache.kafka.common.serialization.StringDeserializer");
>         props.put("value.deserializer", 
> "org.apache.kafka.common.serialization.StringDeserializer");
>         props.put("metadata.max.age.ms", "60000");
>         KafkaConsumer<String, String> consumer = new KafkaConsumer<String, 
> String>(props);
>         class PartitionOffsetAssignerListener implements 
> ConsumerRebalanceListener {
>             private KafkaConsumer<String, String> consumer;
>             public PartitionOffsetAssignerListener(KafkaConsumer 
> kafkaConsumer) {
>                 this.consumer = kafkaConsumer;
>             }
>             public void onPartitionsRevoked(Collection<TopicPartition> 
> partitions) {
>             }
>             public void onPartitionsAssigned(Collection<TopicPartition> 
> partitions) {
>                 //reading all partitions from the beginning
>                 consumer.seekToBeginning(partitions);
>             }
>         }
>         consumer.subscribe(Pattern.compile("^test.*$"), new 
> PartitionOffsetAssignerListener(consumer));
>         while (true) {
>             ConsumerRecords<String, String> records = consumer.poll(100);
>             for (ConsumerRecord<String, String> record : records) {
>                 System.out.printf("offset = %d, key = %s, value = %s%n", 
> record.offset(), record.key(), record.value());
>             }
>         }
> }
> {code}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to