[ 
https://issues.apache.org/jira/browse/KAFKA-4879?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16297562#comment-16297562
 ] 

Jason Gustafson commented on KAFKA-4879:
----------------------------------------

[~baluchicken] It looks like progress on this issue has stalled. Do you mind if 
I pick it up?

I'm not sure we've reached consensus on the solution. The underlying issue is 
that the consumer blocks to find starting offsets for partitions which are 
assigned. If we are in {{poll()}}, then we will be stuck there until the 
partition is re-created. If we are in {{position()}} or one of the relative 
{{seek()}} methods, we will be similarly stuck. 

1. In {{poll()}}, if we can't find the starting offset for a partition, I think 
we ought to begin fetching for other partitions and periodically recheck 
metadata in the background. We can potentially let the leader of the group 
rebalance if enough time passes and an assigned partition still doesn't exist. 
One question is whether we can propagate back to the user an 
UnknownTopicException at any point, but I think we can punt on this problem for 
now.

2. I think for the other methods, we have a choice between adding overloaded 
methods that accept a timeout parameter or adding a config like 
{{max.block.ms}} to match the producer. I'm somewhat partial to the first one 
since it is more flexible. If we're going to do a KIP for this, we may as well 
cover {{commitSync()}} and some of the other blocking APIs as well.

Thoughts?

> KafkaConsumer.position may hang forever when deleting a topic
> -------------------------------------------------------------
>
>                 Key: KAFKA-4879
>                 URL: https://issues.apache.org/jira/browse/KAFKA-4879
>             Project: Kafka
>          Issue Type: Bug
>          Components: consumer
>    Affects Versions: 0.10.2.0
>            Reporter: Shixiong Zhu
>            Assignee: Balint Molnar
>             Fix For: 1.1.0
>
>
> KafkaConsumer.position may hang forever when deleting a topic. The problem is 
> this line 
> https://github.com/apache/kafka/blob/022bf129518e33e165f9ceefc4ab9e622952d3bd/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java#L374
> The timeout is "Long.MAX_VALUE", and it will just retry forever for 
> UnknownTopicOrPartitionException.
> Here is a reproducer
> {code}
> import org.apache.kafka.clients.consumer.KafkaConsumer;
> import org.apache.kafka.common.TopicPartition;
> import org.apache.kafka.common.serialization.StringDeserializer;
> import java.util.Collections;
> import java.util.Properties;
> import java.util.Set;
> public class KafkaReproducer {
>   public static void main(String[] args) {
>     // Make sure "delete.topic.enable" is set to true.
>     // Please create the topic test with "3" partitions manually.
>     // The issue is gone when there is only one partition.
>     String topic = "test";
>     Properties props = new Properties();
>     props.put("bootstrap.servers", "localhost:9092");
>     props.put("group.id", "testgroup");
>     props.put("value.deserializer", StringDeserializer.class.getName());
>     props.put("key.deserializer", StringDeserializer.class.getName());
>     props.put("enable.auto.commit", "false");
>     KafkaConsumer kc = new KafkaConsumer(props);
>     kc.subscribe(Collections.singletonList(topic));
>     kc.poll(0);
>     Set<TopicPartition> partitions = kc.assignment();
>     System.out.println("partitions: " + partitions);
>     kc.pause(partitions);
>     kc.seekToEnd(partitions);
>     System.out.println("please delete the topic in 30 seconds");
>     try {
>       // Sleep 30 seconds to give us enough time to delete the topic.
>       Thread.sleep(30000);
>     } catch (InterruptedException e) {
>       e.printStackTrace();
>     }
>     System.out.println("sleep end");
>     for (TopicPartition p : partitions) {
>       System.out.println(p + " offset: " + kc.position(p));
>     }
>     System.out.println("cannot reach here");
>     kc.close();
>   }
> }
> {code}



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

Reply via email to