[jira] [Updated] (KAFKA-8100) kafka consumer not refresh metadata for dynamic topic deletion

2019-09-24 Thread Shengnan YU (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-8100?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Shengnan YU updated KAFKA-8100:
---
Description: 
Recently we used flink to consume kafka topics with a regex pattern. It is 
found that when we deleted some unused(outdated) topics, the logs will keep 
flushing UNKNOWN_TOPIC_EXCEPTION unless restarting the program.

I study the source code of kafka client, it is found that for consumer, 
topicExpiry is disable in Metadata, which leads to that the even the topic 
deleted, the client still have this topic info cached in the metadata's topic 
list and keep fetching infomration of those topics from brokers.

The following code can be used to reproduce this problem (if you create 
multiple topics such as "test1", "test2", "test3"..."testn" in kafka cluster 
and then delete any of one while running).
{code:java}
public static void main(String [] args) {
Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092\n");
props.put("group.id", "test10");
props.put("enable.auto.commit", "true");
props.put("auto.commit.interval.ms", "1000");
props.put("auto.offset.reset", "earliest");
props.put("key.deserializer", 
"org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer", 
"org.apache.kafka.common.serialization.StringDeserializer");
props.put("metadata.max.age.ms", "6");
KafkaConsumer consumer = new KafkaConsumer(props);

class PartitionOffsetAssignerListener implements 
ConsumerRebalanceListener {

private KafkaConsumer consumer;

public PartitionOffsetAssignerListener(KafkaConsumer kafkaConsumer) 
{
this.consumer = kafkaConsumer;
}

public void onPartitionsRevoked(Collection 
partitions) {

}

public void onPartitionsAssigned(Collection 
partitions) {
//reading all partitions from the beginning
consumer.seekToBeginning(partitions);
}
}

consumer.subscribe(Pattern.compile("^test.*$"), new 
PartitionOffsetAssignerListener(consumer));

while (true) {
ConsumerRecords records = consumer.poll(100);
for (ConsumerRecord record : records) {
System.out.printf("offset = %d, key = %s, value = %s%n", 
record.offset(), record.key(), record.value());
}
}
}
{code}

  was:
Recently we used flink to consume kafka topics with a regex pattern. It is 
found that when we deleted some unused topics, the logs will keep flushing 
UNKNOWN_TOPIC_EXCEPTION.

I study the source code of kafka client, it is found that for consumer, 
topicExpiry is disable in Metadata, which leads to that the even the topic 
deleted, the client still have this topic info in the metadata's topic list and 
keep fetching from servers.

Is there any good method to avoid this annoying warning logs without modify the 
kafka's source code? (We still need the 'Real' Unknown topic exception, which 
means not the outdated topic, in logs)

The following code can be used to reproduce this problem (if you create 
multiple topics such as "test1", "test2", "test3"..."testn" in kafka cluster 
and then delete any of one while running).
{code:java}
public static void main(String [] args) {
Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092\n");
props.put("group.id", "test10");
props.put("enable.auto.commit", "true");
props.put("auto.commit.interval.ms", "1000");
props.put("auto.offset.reset", "earliest");
props.put("key.deserializer", 
"org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer", 
"org.apache.kafka.common.serialization.StringDeserializer");
props.put("metadata.max.age.ms", "6");
KafkaConsumer consumer = new KafkaConsumer(props);

class PartitionOffsetAssignerListener implements 
ConsumerRebalanceListener {

private KafkaConsumer consumer;

public PartitionOffsetAssignerListener(KafkaConsumer kafkaConsumer) 
{
this.consumer = kafkaConsumer;
}

public void onPartitionsRevoked(Collection 
partitions) {

}

public void onPartitionsAssigned(Collection 
partitions) {
//reading all partitions from the beginning
consumer.seekToBeginning(partitions);
}
}

consumer.subscribe(Pattern.compile("^test.*$"), new 
PartitionOffsetAssignerListener(consumer));

while (true) {
ConsumerRecords records = consumer.poll(100);
for (ConsumerRecord record : records) {
System.out.printf("offset = %d, key = %s, value = %s%n", 
record.offset(), record.key(), record.value());
  

[jira] [Updated] (KAFKA-8100) kafka consumer not refresh metadata for dynamic topic deletion

2019-04-15 Thread Shengnan YU (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-8100?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Shengnan YU updated KAFKA-8100:
---
Summary: kafka consumer not refresh metadata for dynamic topic deletion  
(was: If delete expired topic, kafka consumer will keep flushing unknown_topic 
warning in log)

> kafka consumer not refresh metadata for dynamic topic deletion
> --
>
> Key: KAFKA-8100
> URL: https://issues.apache.org/jira/browse/KAFKA-8100
> Project: Kafka
>  Issue Type: Bug
>  Components: clients
>Affects Versions: 1.1.1, 2.1.1
>Reporter: Shengnan YU
>Priority: Major
>
> Recently we used flink to consume kafka topics with a regex pattern. It is 
> found that when we deleted some unused topics, the logs will keep flushing 
> UNKNOWN_TOPIC_EXCEPTION.
> I study the source code of kafka client, it is found that for consumer, 
> topicExpiry is disable in Metadata, which leads to that the even the topic 
> deleted, the client still have this topic info in the metadata's topic list 
> and keep fetching from servers.
> Is there any good method to avoid this annoying warning logs without modify 
> the kafka's source code? (We still need the 'Real' Unknown topic exception, 
> which means not the outdated topic, in logs)
> The following code can be used to reproduce this problem (if you create 
> multiple topics such as "test1", "test2", "test3"..."testn" in kafka cluster 
> and then delete any of one while running).
> {code:java}
> public static void main(String [] args) {
> Properties props = new Properties();
> props.put("bootstrap.servers", "localhost:9092\n");
> props.put("group.id", "test10");
> props.put("enable.auto.commit", "true");
> props.put("auto.commit.interval.ms", "1000");
> props.put("auto.offset.reset", "earliest");
> props.put("key.deserializer", 
> "org.apache.kafka.common.serialization.StringDeserializer");
> props.put("value.deserializer", 
> "org.apache.kafka.common.serialization.StringDeserializer");
> props.put("metadata.max.age.ms", "6");
> KafkaConsumer consumer = new KafkaConsumer String>(props);
> class PartitionOffsetAssignerListener implements 
> ConsumerRebalanceListener {
> private KafkaConsumer consumer;
> public PartitionOffsetAssignerListener(KafkaConsumer 
> kafkaConsumer) {
> this.consumer = kafkaConsumer;
> }
> public void onPartitionsRevoked(Collection 
> partitions) {
> }
> public void onPartitionsAssigned(Collection 
> partitions) {
> //reading all partitions from the beginning
> consumer.seekToBeginning(partitions);
> }
> }
> consumer.subscribe(Pattern.compile("^test.*$"), new 
> PartitionOffsetAssignerListener(consumer));
> while (true) {
> ConsumerRecords records = consumer.poll(100);
> for (ConsumerRecord record : records) {
> System.out.printf("offset = %d, key = %s, value = %s%n", 
> record.offset(), record.key(), record.value());
> }
> }
> }
> {code}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)