[ https://issues.apache.org/jira/browse/KAFKA-4879?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Ismael Juma updated KAFKA-4879: ------------------------------- Fix Version/s: (was: 1.0.0) 1.1.0 > KafkaConsumer.position may hang forever when deleting a topic > ------------------------------------------------------------- > > Key: KAFKA-4879 > URL: https://issues.apache.org/jira/browse/KAFKA-4879 > Project: Kafka > Issue Type: Bug > Components: consumer > Affects Versions: 0.10.2.0 > Reporter: Shixiong Zhu > Assignee: Balint Molnar > Fix For: 1.1.0 > > > KafkaConsumer.position may hang forever when deleting a topic. The problem is > this line > https://github.com/apache/kafka/blob/022bf129518e33e165f9ceefc4ab9e622952d3bd/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java#L374 > The timeout is "Long.MAX_VALUE", and it will just retry forever for > UnknownTopicOrPartitionException. > Here is a reproducer > {code} > import org.apache.kafka.clients.consumer.KafkaConsumer; > import org.apache.kafka.common.TopicPartition; > import org.apache.kafka.common.serialization.StringDeserializer; > import java.util.Collections; > import java.util.Properties; > import java.util.Set; > public class KafkaReproducer { > public static void main(String[] args) { > // Make sure "delete.topic.enable" is set to true. > // Please create the topic test with "3" partitions manually. > // The issue is gone when there is only one partition. > String topic = "test"; > Properties props = new Properties(); > props.put("bootstrap.servers", "localhost:9092"); > props.put("group.id", "testgroup"); > props.put("value.deserializer", StringDeserializer.class.getName()); > props.put("key.deserializer", StringDeserializer.class.getName()); > props.put("enable.auto.commit", "false"); > KafkaConsumer kc = new KafkaConsumer(props); > kc.subscribe(Collections.singletonList(topic)); > kc.poll(0); > Set<TopicPartition> partitions = kc.assignment(); > System.out.println("partitions: " + partitions); > kc.pause(partitions); > kc.seekToEnd(partitions); > System.out.println("please delete the topic in 30 seconds"); > try { > // Sleep 30 seconds to give us enough time to delete the topic. > Thread.sleep(30000); > } catch (InterruptedException e) { > e.printStackTrace(); > } > System.out.println("sleep end"); > for (TopicPartition p : partitions) { > System.out.println(p + " offset: " + kc.position(p)); > } > System.out.println("cannot reach here"); > kc.close(); > } > } > {code} -- This message was sent by Atlassian JIRA (v6.4.14#64029)