[jira] [Comment Edited] (KAFKA-8100) If delete expired topic, kafka consumer will keep flushing unknown_topic warning in log
[ https://issues.apache.org/jira/browse/KAFKA-8100?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16817951#comment-16817951 ] Shengnan YU edited comment on KAFKA-8100 at 4/15/19 1:07 PM: - Hi could you please explain why not delete this topic info in metadata first when hitting UNKNOWN_TOPIC_OR_PARTITION because it can be easily discovered again if the topic actually exists. was (Author: ysn2233): Hi could you please explain why not delete this topic info in metadata first when hitting UNKNOWN_TOPIC_OR_PARTITION because it can be easily discovered if the topic actually exists. > If delete expired topic, kafka consumer will keep flushing unknown_topic > warning in log > --- > > Key: KAFKA-8100 > URL: https://issues.apache.org/jira/browse/KAFKA-8100 > Project: Kafka > Issue Type: Bug > Components: clients >Affects Versions: 1.1.1, 2.1.1 >Reporter: Shengnan YU >Priority: Major > > Recently we used flink to consume kafka topics with a regex pattern. It is > found that when we deleted some unused topics, the logs will keep flushing > UNKNOWN_TOPIC_EXCEPTION. > I study the source code of kafka client, it is found that for consumer, > topicExpiry is disable in Metadata, which leads to that the even the topic > deleted, the client still have this topic info in the metadata's topic list > and keep fetching from servers. > Is there any good method to avoid this annoying warning logs without modify > the kafka's source code? (We still need the 'Real' Unknown topic exception, > which means not the outdated topic, in logs) > The following code can be used to reproduce this problem (if you create > multiple topics such as "test1", "test2", "test3"..."testn" in kafka cluster > and then delete any of one while running). > {code:java} > public static void main(String [] args) { > Properties props = new Properties(); > props.put("bootstrap.servers", "localhost:9092\n"); > props.put("group.id", "test10"); > props.put("enable.auto.commit", "true"); > props.put("auto.commit.interval.ms", "1000"); > props.put("auto.offset.reset", "earliest"); > props.put("key.deserializer", > "org.apache.kafka.common.serialization.StringDeserializer"); > props.put("value.deserializer", > "org.apache.kafka.common.serialization.StringDeserializer"); > props.put("metadata.max.age.ms", "6"); > KafkaConsumer consumer = new KafkaConsumer String>(props); > class PartitionOffsetAssignerListener implements > ConsumerRebalanceListener { > private KafkaConsumer consumer; > public PartitionOffsetAssignerListener(KafkaConsumer > kafkaConsumer) { > this.consumer = kafkaConsumer; > } > public void onPartitionsRevoked(Collection > partitions) { > } > public void onPartitionsAssigned(Collection > partitions) { > //reading all partitions from the beginning > consumer.seekToBeginning(partitions); > } > } > consumer.subscribe(Pattern.compile("^test.*$"), new > PartitionOffsetAssignerListener(consumer)); > while (true) { > ConsumerRecords records = consumer.poll(100); > for (ConsumerRecord record : records) { > System.out.printf("offset = %d, key = %s, value = %s%n", > record.offset(), record.key(), record.value()); > } > } > } > {code} -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Comment Edited] (KAFKA-8100) If delete expired topic, kafka consumer will keep flushing unknown_topic warning in log
[ https://issues.apache.org/jira/browse/KAFKA-8100?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16817951#comment-16817951 ] Shengnan YU edited comment on KAFKA-8100 at 4/15/19 1:07 PM: - Hi could you please explain why not delete this topic info in metadata first when hitting UNKNOWN_TOPIC_OR_PARTITION because it can be easily discovered if the topic actually exists. was (Author: ysn2233): Hi could you please explain why not delete this topic info in metadata first when hitting UNKNOWN_TOPIC_OR_PARTITION because it can be easily recoverable if the topic actually exists. > If delete expired topic, kafka consumer will keep flushing unknown_topic > warning in log > --- > > Key: KAFKA-8100 > URL: https://issues.apache.org/jira/browse/KAFKA-8100 > Project: Kafka > Issue Type: Bug > Components: clients >Affects Versions: 1.1.1, 2.1.1 >Reporter: Shengnan YU >Priority: Major > > Recently we used flink to consume kafka topics with a regex pattern. It is > found that when we deleted some unused topics, the logs will keep flushing > UNKNOWN_TOPIC_EXCEPTION. > I study the source code of kafka client, it is found that for consumer, > topicExpiry is disable in Metadata, which leads to that the even the topic > deleted, the client still have this topic info in the metadata's topic list > and keep fetching from servers. > Is there any good method to avoid this annoying warning logs without modify > the kafka's source code? (We still need the 'Real' Unknown topic exception, > which means not the outdated topic, in logs) > The following code can be used to reproduce this problem (if you create > multiple topics such as "test1", "test2", "test3"..."testn" in kafka cluster > and then delete any of one while running). > {code:java} > public static void main(String [] args) { > Properties props = new Properties(); > props.put("bootstrap.servers", "localhost:9092\n"); > props.put("group.id", "test10"); > props.put("enable.auto.commit", "true"); > props.put("auto.commit.interval.ms", "1000"); > props.put("auto.offset.reset", "earliest"); > props.put("key.deserializer", > "org.apache.kafka.common.serialization.StringDeserializer"); > props.put("value.deserializer", > "org.apache.kafka.common.serialization.StringDeserializer"); > props.put("metadata.max.age.ms", "6"); > KafkaConsumer consumer = new KafkaConsumer String>(props); > class PartitionOffsetAssignerListener implements > ConsumerRebalanceListener { > private KafkaConsumer consumer; > public PartitionOffsetAssignerListener(KafkaConsumer > kafkaConsumer) { > this.consumer = kafkaConsumer; > } > public void onPartitionsRevoked(Collection > partitions) { > } > public void onPartitionsAssigned(Collection > partitions) { > //reading all partitions from the beginning > consumer.seekToBeginning(partitions); > } > } > consumer.subscribe(Pattern.compile("^test.*$"), new > PartitionOffsetAssignerListener(consumer)); > while (true) { > ConsumerRecords records = consumer.poll(100); > for (ConsumerRecord record : records) { > System.out.printf("offset = %d, key = %s, value = %s%n", > record.offset(), record.key(), record.value()); > } > } > } > {code} -- This message was sent by Atlassian JIRA (v7.6.3#76005)