[
https://issues.apache.org/jira/browse/KAFKA-4879?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16297562#comment-16297562
]
Jason Gustafson commented on KAFKA-4879:
----------------------------------------
[~baluchicken] It looks like progress on this issue has stalled. Do you mind if
I pick it up?
I'm not sure we've reached consensus on the solution. The underlying issue is
that the consumer blocks to find starting offsets for partitions which are
assigned. If we are in {{poll()}}, then we will be stuck there until the
partition is re-created. If we are in {{position()}} or one of the relative
{{seek()}} methods, we will be similarly stuck.
1. In {{poll()}}, if we can't find the starting offset for a partition, I think
we ought to begin fetching for other partitions and periodically recheck
metadata in the background. We can potentially let the leader of the group
rebalance if enough time passes and an assigned partition still doesn't exist.
One question is whether we can propagate back to the user an
UnknownTopicException at any point, but I think we can punt on this problem for
now.
2. I think for the other methods, we have a choice between adding overloaded
methods that accept a timeout parameter or adding a config like
{{max.block.ms}} to match the producer. I'm somewhat partial to the first one
since it is more flexible. If we're going to do a KIP for this, we may as well
cover {{commitSync()}} and some of the other blocking APIs as well.
Thoughts?
> KafkaConsumer.position may hang forever when deleting a topic
> -------------------------------------------------------------
>
> Key: KAFKA-4879
> URL: https://issues.apache.org/jira/browse/KAFKA-4879
> Project: Kafka
> Issue Type: Bug
> Components: consumer
> Affects Versions: 0.10.2.0
> Reporter: Shixiong Zhu
> Assignee: Balint Molnar
> Fix For: 1.1.0
>
>
> KafkaConsumer.position may hang forever when deleting a topic. The problem is
> this line
> https://github.com/apache/kafka/blob/022bf129518e33e165f9ceefc4ab9e622952d3bd/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java#L374
> The timeout is "Long.MAX_VALUE", and it will just retry forever for
> UnknownTopicOrPartitionException.
> Here is a reproducer
> {code}
> import org.apache.kafka.clients.consumer.KafkaConsumer;
> import org.apache.kafka.common.TopicPartition;
> import org.apache.kafka.common.serialization.StringDeserializer;
> import java.util.Collections;
> import java.util.Properties;
> import java.util.Set;
> public class KafkaReproducer {
> public static void main(String[] args) {
> // Make sure "delete.topic.enable" is set to true.
> // Please create the topic test with "3" partitions manually.
> // The issue is gone when there is only one partition.
> String topic = "test";
> Properties props = new Properties();
> props.put("bootstrap.servers", "localhost:9092");
> props.put("group.id", "testgroup");
> props.put("value.deserializer", StringDeserializer.class.getName());
> props.put("key.deserializer", StringDeserializer.class.getName());
> props.put("enable.auto.commit", "false");
> KafkaConsumer kc = new KafkaConsumer(props);
> kc.subscribe(Collections.singletonList(topic));
> kc.poll(0);
> Set<TopicPartition> partitions = kc.assignment();
> System.out.println("partitions: " + partitions);
> kc.pause(partitions);
> kc.seekToEnd(partitions);
> System.out.println("please delete the topic in 30 seconds");
> try {
> // Sleep 30 seconds to give us enough time to delete the topic.
> Thread.sleep(30000);
> } catch (InterruptedException e) {
> e.printStackTrace();
> }
> System.out.println("sleep end");
> for (TopicPartition p : partitions) {
> System.out.println(p + " offset: " + kc.position(p));
> }
> System.out.println("cannot reach here");
> kc.close();
> }
> }
> {code}
--
This message was sent by Atlassian JIRA
(v6.4.14#64029)