[jira] [Comment Edited] (KAFKA-8100) If delete expired topic, kafka consumer will keep flushing unknown_topic warning in log

2019-04-15 Thread Shengnan YU (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-8100?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16817951#comment-16817951
 ] 

Shengnan YU edited comment on KAFKA-8100 at 4/15/19 1:07 PM:
-

Hi could you please explain why not delete this topic info in metadata first 
when hitting UNKNOWN_TOPIC_OR_PARTITION because it can be easily discovered 
again if the topic actually exists.


was (Author: ysn2233):
Hi could you please explain why not delete this topic info in metadata first 
when hitting UNKNOWN_TOPIC_OR_PARTITION because it can be easily discovered if 
the topic actually exists.

> If delete expired topic, kafka consumer will keep flushing unknown_topic 
> warning in log
> ---
>
> Key: KAFKA-8100
> URL: https://issues.apache.org/jira/browse/KAFKA-8100
> Project: Kafka
>  Issue Type: Bug
>  Components: clients
>Affects Versions: 1.1.1, 2.1.1
>Reporter: Shengnan YU
>Priority: Major
>
> Recently we used flink to consume kafka topics with a regex pattern. It is 
> found that when we deleted some unused topics, the logs will keep flushing 
> UNKNOWN_TOPIC_EXCEPTION.
> I study the source code of kafka client, it is found that for consumer, 
> topicExpiry is disable in Metadata, which leads to that the even the topic 
> deleted, the client still have this topic info in the metadata's topic list 
> and keep fetching from servers.
> Is there any good method to avoid this annoying warning logs without modify 
> the kafka's source code? (We still need the 'Real' Unknown topic exception, 
> which means not the outdated topic, in logs)
> The following code can be used to reproduce this problem (if you create 
> multiple topics such as "test1", "test2", "test3"..."testn" in kafka cluster 
> and then delete any of one while running).
> {code:java}
> public static void main(String [] args) {
> Properties props = new Properties();
> props.put("bootstrap.servers", "localhost:9092\n");
> props.put("group.id", "test10");
> props.put("enable.auto.commit", "true");
> props.put("auto.commit.interval.ms", "1000");
> props.put("auto.offset.reset", "earliest");
> props.put("key.deserializer", 
> "org.apache.kafka.common.serialization.StringDeserializer");
> props.put("value.deserializer", 
> "org.apache.kafka.common.serialization.StringDeserializer");
> props.put("metadata.max.age.ms", "6");
> KafkaConsumer consumer = new KafkaConsumer String>(props);
> class PartitionOffsetAssignerListener implements 
> ConsumerRebalanceListener {
> private KafkaConsumer consumer;
> public PartitionOffsetAssignerListener(KafkaConsumer 
> kafkaConsumer) {
> this.consumer = kafkaConsumer;
> }
> public void onPartitionsRevoked(Collection 
> partitions) {
> }
> public void onPartitionsAssigned(Collection 
> partitions) {
> //reading all partitions from the beginning
> consumer.seekToBeginning(partitions);
> }
> }
> consumer.subscribe(Pattern.compile("^test.*$"), new 
> PartitionOffsetAssignerListener(consumer));
> while (true) {
> ConsumerRecords records = consumer.poll(100);
> for (ConsumerRecord record : records) {
> System.out.printf("offset = %d, key = %s, value = %s%n", 
> record.offset(), record.key(), record.value());
> }
> }
> }
> {code}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Comment Edited] (KAFKA-8100) If delete expired topic, kafka consumer will keep flushing unknown_topic warning in log

2019-04-15 Thread Shengnan YU (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-8100?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16817951#comment-16817951
 ] 

Shengnan YU edited comment on KAFKA-8100 at 4/15/19 1:07 PM:
-

Hi could you please explain why not delete this topic info in metadata first 
when hitting UNKNOWN_TOPIC_OR_PARTITION because it can be easily discovered if 
the topic actually exists.


was (Author: ysn2233):
Hi could you please explain why not delete this topic info in metadata first 
when hitting UNKNOWN_TOPIC_OR_PARTITION because it can be easily recoverable if 
the topic actually exists.

> If delete expired topic, kafka consumer will keep flushing unknown_topic 
> warning in log
> ---
>
> Key: KAFKA-8100
> URL: https://issues.apache.org/jira/browse/KAFKA-8100
> Project: Kafka
>  Issue Type: Bug
>  Components: clients
>Affects Versions: 1.1.1, 2.1.1
>Reporter: Shengnan YU
>Priority: Major
>
> Recently we used flink to consume kafka topics with a regex pattern. It is 
> found that when we deleted some unused topics, the logs will keep flushing 
> UNKNOWN_TOPIC_EXCEPTION.
> I study the source code of kafka client, it is found that for consumer, 
> topicExpiry is disable in Metadata, which leads to that the even the topic 
> deleted, the client still have this topic info in the metadata's topic list 
> and keep fetching from servers.
> Is there any good method to avoid this annoying warning logs without modify 
> the kafka's source code? (We still need the 'Real' Unknown topic exception, 
> which means not the outdated topic, in logs)
> The following code can be used to reproduce this problem (if you create 
> multiple topics such as "test1", "test2", "test3"..."testn" in kafka cluster 
> and then delete any of one while running).
> {code:java}
> public static void main(String [] args) {
> Properties props = new Properties();
> props.put("bootstrap.servers", "localhost:9092\n");
> props.put("group.id", "test10");
> props.put("enable.auto.commit", "true");
> props.put("auto.commit.interval.ms", "1000");
> props.put("auto.offset.reset", "earliest");
> props.put("key.deserializer", 
> "org.apache.kafka.common.serialization.StringDeserializer");
> props.put("value.deserializer", 
> "org.apache.kafka.common.serialization.StringDeserializer");
> props.put("metadata.max.age.ms", "6");
> KafkaConsumer consumer = new KafkaConsumer String>(props);
> class PartitionOffsetAssignerListener implements 
> ConsumerRebalanceListener {
> private KafkaConsumer consumer;
> public PartitionOffsetAssignerListener(KafkaConsumer 
> kafkaConsumer) {
> this.consumer = kafkaConsumer;
> }
> public void onPartitionsRevoked(Collection 
> partitions) {
> }
> public void onPartitionsAssigned(Collection 
> partitions) {
> //reading all partitions from the beginning
> consumer.seekToBeginning(partitions);
> }
> }
> consumer.subscribe(Pattern.compile("^test.*$"), new 
> PartitionOffsetAssignerListener(consumer));
> while (true) {
> ConsumerRecords records = consumer.poll(100);
> for (ConsumerRecord record : records) {
> System.out.printf("offset = %d, key = %s, value = %s%n", 
> record.offset(), record.key(), record.value());
> }
> }
> }
> {code}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)