[jira] [Commented] (KAFKA-4879) KafkaConsumer.position may hang forever when deleting a topic
[ https://issues.apache.org/jira/browse/KAFKA-4879?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16384490#comment-16384490 ] Ted Yu commented on KAFKA-4879: --- What about #2 listed by Jason above ? Should a KIP be drafted ? Thanks > KafkaConsumer.position may hang forever when deleting a topic > - > > Key: KAFKA-4879 > URL: https://issues.apache.org/jira/browse/KAFKA-4879 > Project: Kafka > Issue Type: Bug > Components: consumer >Affects Versions: 0.10.2.0 >Reporter: Shixiong Zhu >Assignee: Jason Gustafson >Priority: Major > Fix For: 2.0.0 > > > KafkaConsumer.position may hang forever when deleting a topic. The problem is > this line > https://github.com/apache/kafka/blob/022bf129518e33e165f9ceefc4ab9e622952d3bd/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java#L374 > The timeout is "Long.MAX_VALUE", and it will just retry forever for > UnknownTopicOrPartitionException. > Here is a reproducer > {code} > import org.apache.kafka.clients.consumer.KafkaConsumer; > import org.apache.kafka.common.TopicPartition; > import org.apache.kafka.common.serialization.StringDeserializer; > import java.util.Collections; > import java.util.Properties; > import java.util.Set; > public class KafkaReproducer { > public static void main(String[] args) { > // Make sure "delete.topic.enable" is set to true. > // Please create the topic test with "3" partitions manually. > // The issue is gone when there is only one partition. > String topic = "test"; > Properties props = new Properties(); > props.put("bootstrap.servers", "localhost:9092"); > props.put("group.id", "testgroup"); > props.put("value.deserializer", StringDeserializer.class.getName()); > props.put("key.deserializer", StringDeserializer.class.getName()); > props.put("enable.auto.commit", "false"); > KafkaConsumer kc = new KafkaConsumer(props); > kc.subscribe(Collections.singletonList(topic)); > kc.poll(0); > Set partitions = kc.assignment(); > System.out.println("partitions: " + partitions); > kc.pause(partitions); > kc.seekToEnd(partitions); > System.out.println("please delete the topic in 30 seconds"); > try { > // Sleep 30 seconds to give us enough time to delete the topic. > Thread.sleep(3); > } catch (InterruptedException e) { > e.printStackTrace(); > } > System.out.println("sleep end"); > for (TopicPartition p : partitions) { > System.out.println(p + " offset: " + kc.position(p)); > } > System.out.println("cannot reach here"); > kc.close(); > } > } > {code} -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (KAFKA-4879) KafkaConsumer.position may hang forever when deleting a topic
[ https://issues.apache.org/jira/browse/KAFKA-4879?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16356998#comment-16356998 ] Damian Guy commented on KAFKA-4879: --- [~hachikuji] is this going to make it for 1.1? > KafkaConsumer.position may hang forever when deleting a topic > - > > Key: KAFKA-4879 > URL: https://issues.apache.org/jira/browse/KAFKA-4879 > Project: Kafka > Issue Type: Bug > Components: consumer >Affects Versions: 0.10.2.0 >Reporter: Shixiong Zhu >Assignee: Jason Gustafson >Priority: Major > Fix For: 1.1.0 > > > KafkaConsumer.position may hang forever when deleting a topic. The problem is > this line > https://github.com/apache/kafka/blob/022bf129518e33e165f9ceefc4ab9e622952d3bd/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java#L374 > The timeout is "Long.MAX_VALUE", and it will just retry forever for > UnknownTopicOrPartitionException. > Here is a reproducer > {code} > import org.apache.kafka.clients.consumer.KafkaConsumer; > import org.apache.kafka.common.TopicPartition; > import org.apache.kafka.common.serialization.StringDeserializer; > import java.util.Collections; > import java.util.Properties; > import java.util.Set; > public class KafkaReproducer { > public static void main(String[] args) { > // Make sure "delete.topic.enable" is set to true. > // Please create the topic test with "3" partitions manually. > // The issue is gone when there is only one partition. > String topic = "test"; > Properties props = new Properties(); > props.put("bootstrap.servers", "localhost:9092"); > props.put("group.id", "testgroup"); > props.put("value.deserializer", StringDeserializer.class.getName()); > props.put("key.deserializer", StringDeserializer.class.getName()); > props.put("enable.auto.commit", "false"); > KafkaConsumer kc = new KafkaConsumer(props); > kc.subscribe(Collections.singletonList(topic)); > kc.poll(0); > Set partitions = kc.assignment(); > System.out.println("partitions: " + partitions); > kc.pause(partitions); > kc.seekToEnd(partitions); > System.out.println("please delete the topic in 30 seconds"); > try { > // Sleep 30 seconds to give us enough time to delete the topic. > Thread.sleep(3); > } catch (InterruptedException e) { > e.printStackTrace(); > } > System.out.println("sleep end"); > for (TopicPartition p : partitions) { > System.out.println(p + " offset: " + kc.position(p)); > } > System.out.println("cannot reach here"); > kc.close(); > } > } > {code} -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (KAFKA-4879) KafkaConsumer.position may hang forever when deleting a topic
[ https://issues.apache.org/jira/browse/KAFKA-4879?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16301018#comment-16301018 ] ASF GitHub Bot commented on KAFKA-4879: --- hachikuji commented on issue #3637: KAFKA-4879 KafkaConsumer.position may hang forever when deleting a topic URL: https://github.com/apache/kafka/pull/3637#issuecomment-353532046 Closing this since I'm taking over the issue. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > KafkaConsumer.position may hang forever when deleting a topic > - > > Key: KAFKA-4879 > URL: https://issues.apache.org/jira/browse/KAFKA-4879 > Project: Kafka > Issue Type: Bug > Components: consumer >Affects Versions: 0.10.2.0 >Reporter: Shixiong Zhu >Assignee: Jason Gustafson > Fix For: 1.1.0 > > > KafkaConsumer.position may hang forever when deleting a topic. The problem is > this line > https://github.com/apache/kafka/blob/022bf129518e33e165f9ceefc4ab9e622952d3bd/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java#L374 > The timeout is "Long.MAX_VALUE", and it will just retry forever for > UnknownTopicOrPartitionException. > Here is a reproducer > {code} > import org.apache.kafka.clients.consumer.KafkaConsumer; > import org.apache.kafka.common.TopicPartition; > import org.apache.kafka.common.serialization.StringDeserializer; > import java.util.Collections; > import java.util.Properties; > import java.util.Set; > public class KafkaReproducer { > public static void main(String[] args) { > // Make sure "delete.topic.enable" is set to true. > // Please create the topic test with "3" partitions manually. > // The issue is gone when there is only one partition. > String topic = "test"; > Properties props = new Properties(); > props.put("bootstrap.servers", "localhost:9092"); > props.put("group.id", "testgroup"); > props.put("value.deserializer", StringDeserializer.class.getName()); > props.put("key.deserializer", StringDeserializer.class.getName()); > props.put("enable.auto.commit", "false"); > KafkaConsumer kc = new KafkaConsumer(props); > kc.subscribe(Collections.singletonList(topic)); > kc.poll(0); > Set partitions = kc.assignment(); > System.out.println("partitions: " + partitions); > kc.pause(partitions); > kc.seekToEnd(partitions); > System.out.println("please delete the topic in 30 seconds"); > try { > // Sleep 30 seconds to give us enough time to delete the topic. > Thread.sleep(3); > } catch (InterruptedException e) { > e.printStackTrace(); > } > System.out.println("sleep end"); > for (TopicPartition p : partitions) { > System.out.println(p + " offset: " + kc.position(p)); > } > System.out.println("cannot reach here"); > kc.close(); > } > } > {code} -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (KAFKA-4879) KafkaConsumer.position may hang forever when deleting a topic
[ https://issues.apache.org/jira/browse/KAFKA-4879?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16301019#comment-16301019 ] ASF GitHub Bot commented on KAFKA-4879: --- hachikuji closed pull request #3637: KAFKA-4879 KafkaConsumer.position may hang forever when deleting a topic URL: https://github.com/apache/kafka/pull/3637 This is a PR merged from a forked repository. As GitHub hides the original diff on merge, it is displayed below for the sake of provenance: As this is a foreign pull request (from a fork), the diff is supplied below (as it won't show otherwise due to GitHub magic): diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/Consumer.java b/clients/src/main/java/org/apache/kafka/clients/consumer/Consumer.java index b1badefd318..992b0711670 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/Consumer.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/Consumer.java @@ -116,9 +116,9 @@ public void seekToEnd(Collection partitions); /** - * @see KafkaConsumer#position(TopicPartition) + * @see KafkaConsumer#position(TopicPartition, long) */ -public long position(TopicPartition partition); +public long position(TopicPartition partition, long timeout); /** * @see KafkaConsumer#committed(TopicPartition) diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java b/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java index 3154061bf01..5e287f3b114 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java @@ -1079,8 +1079,9 @@ public void assign(Collection partitions) { // fetch positions if we have partitions we're subscribed to that we // don't know the offset for +//TODO we need to clarify this if (!subscriptions.hasAllFetchPositions()) -updateFetchPositions(this.subscriptions.missingFetchPositions()); +updateFetchPositions(this.subscriptions.missingFetchPositions(), timeout); // if data is available already, return it immediately Map>> records = fetcher.fetchedRecords(); @@ -1295,6 +1296,7 @@ public void seekToEnd(Collection partitions) { * Get the offset of the next record that will be fetched (if a record with that offset exists). * * @param partition The partition to get the position for + * @param timeout The amount of time to wait for the offset * @return The offset * @throws org.apache.kafka.clients.consumer.InvalidOffsetException if no offset is currently defined for * the partition @@ -1305,8 +1307,10 @@ public void seekToEnd(Collection partitions) { * @throws org.apache.kafka.common.errors.AuthorizationException if not authorized to the topic or to the * configured groupId * @throws org.apache.kafka.common.KafkaException for any other unrecoverable errors + * + * @throws org.apache.kafka.common.errors.TimeoutException: if the given partition is not available */ -public long position(TopicPartition partition) { +public long position(TopicPartition partition, long timeout) { acquireAndEnsureOpen(); try { if (!this.subscriptions.isAssigned(partition)) @@ -1314,7 +1318,7 @@ public long position(TopicPartition partition) { Long offset = this.subscriptions.position(partition); if (offset == null) { // batch update fetch positions for any partitions without a valid position -updateFetchPositions(subscriptions.assignedPartitions()); +updateFetchPositions(subscriptions.assignedPartitions(), timeout); offset = this.subscriptions.position(partition); } return offset; @@ -1645,15 +1649,18 @@ private void close(long timeoutMs, boolean swallowException) { * or reset it using the offset reset policy the user has configured. * * @param partitions The partitions that needs updating fetch positions + * @param timeout The amount of time to wait for the offset * @throws NoOffsetForPartitionException If no offset is stored for a given partition and no offset reset policy is * defined + * + * @throws org.apache.kafka.common.errors.TimeoutException: if one of the given partition is not available */ -private void updateFetchPositions(Set partitions) { +private void updateFetchPositions(Set partitions, long timeout) { // lookup any positions for partitions which are awaiting reset (which may be the // case if the user called seekToBeginning or seekToEnd. We do this check first to // avoid an
[jira] [Commented] (KAFKA-4879) KafkaConsumer.position may hang forever when deleting a topic
[ https://issues.apache.org/jira/browse/KAFKA-4879?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16297995#comment-16297995 ] Balint Molnar commented on KAFKA-4879: -- [~jasong35] Sure, I reassigned to you. > KafkaConsumer.position may hang forever when deleting a topic > - > > Key: KAFKA-4879 > URL: https://issues.apache.org/jira/browse/KAFKA-4879 > Project: Kafka > Issue Type: Bug > Components: consumer >Affects Versions: 0.10.2.0 >Reporter: Shixiong Zhu >Assignee: Jason Gustafson > Fix For: 1.1.0 > > > KafkaConsumer.position may hang forever when deleting a topic. The problem is > this line > https://github.com/apache/kafka/blob/022bf129518e33e165f9ceefc4ab9e622952d3bd/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java#L374 > The timeout is "Long.MAX_VALUE", and it will just retry forever for > UnknownTopicOrPartitionException. > Here is a reproducer > {code} > import org.apache.kafka.clients.consumer.KafkaConsumer; > import org.apache.kafka.common.TopicPartition; > import org.apache.kafka.common.serialization.StringDeserializer; > import java.util.Collections; > import java.util.Properties; > import java.util.Set; > public class KafkaReproducer { > public static void main(String[] args) { > // Make sure "delete.topic.enable" is set to true. > // Please create the topic test with "3" partitions manually. > // The issue is gone when there is only one partition. > String topic = "test"; > Properties props = new Properties(); > props.put("bootstrap.servers", "localhost:9092"); > props.put("group.id", "testgroup"); > props.put("value.deserializer", StringDeserializer.class.getName()); > props.put("key.deserializer", StringDeserializer.class.getName()); > props.put("enable.auto.commit", "false"); > KafkaConsumer kc = new KafkaConsumer(props); > kc.subscribe(Collections.singletonList(topic)); > kc.poll(0); > Set partitions = kc.assignment(); > System.out.println("partitions: " + partitions); > kc.pause(partitions); > kc.seekToEnd(partitions); > System.out.println("please delete the topic in 30 seconds"); > try { > // Sleep 30 seconds to give us enough time to delete the topic. > Thread.sleep(3); > } catch (InterruptedException e) { > e.printStackTrace(); > } > System.out.println("sleep end"); > for (TopicPartition p : partitions) { > System.out.println(p + " offset: " + kc.position(p)); > } > System.out.println("cannot reach here"); > kc.close(); > } > } > {code} -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (KAFKA-4879) KafkaConsumer.position may hang forever when deleting a topic
[ https://issues.apache.org/jira/browse/KAFKA-4879?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16297562#comment-16297562 ] Jason Gustafson commented on KAFKA-4879: [~baluchicken] It looks like progress on this issue has stalled. Do you mind if I pick it up? I'm not sure we've reached consensus on the solution. The underlying issue is that the consumer blocks to find starting offsets for partitions which are assigned. If we are in {{poll()}}, then we will be stuck there until the partition is re-created. If we are in {{position()}} or one of the relative {{seek()}} methods, we will be similarly stuck. 1. In {{poll()}}, if we can't find the starting offset for a partition, I think we ought to begin fetching for other partitions and periodically recheck metadata in the background. We can potentially let the leader of the group rebalance if enough time passes and an assigned partition still doesn't exist. One question is whether we can propagate back to the user an UnknownTopicException at any point, but I think we can punt on this problem for now. 2. I think for the other methods, we have a choice between adding overloaded methods that accept a timeout parameter or adding a config like {{max.block.ms}} to match the producer. I'm somewhat partial to the first one since it is more flexible. If we're going to do a KIP for this, we may as well cover {{commitSync()}} and some of the other blocking APIs as well. Thoughts? > KafkaConsumer.position may hang forever when deleting a topic > - > > Key: KAFKA-4879 > URL: https://issues.apache.org/jira/browse/KAFKA-4879 > Project: Kafka > Issue Type: Bug > Components: consumer >Affects Versions: 0.10.2.0 >Reporter: Shixiong Zhu >Assignee: Balint Molnar > Fix For: 1.1.0 > > > KafkaConsumer.position may hang forever when deleting a topic. The problem is > this line > https://github.com/apache/kafka/blob/022bf129518e33e165f9ceefc4ab9e622952d3bd/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java#L374 > The timeout is "Long.MAX_VALUE", and it will just retry forever for > UnknownTopicOrPartitionException. > Here is a reproducer > {code} > import org.apache.kafka.clients.consumer.KafkaConsumer; > import org.apache.kafka.common.TopicPartition; > import org.apache.kafka.common.serialization.StringDeserializer; > import java.util.Collections; > import java.util.Properties; > import java.util.Set; > public class KafkaReproducer { > public static void main(String[] args) { > // Make sure "delete.topic.enable" is set to true. > // Please create the topic test with "3" partitions manually. > // The issue is gone when there is only one partition. > String topic = "test"; > Properties props = new Properties(); > props.put("bootstrap.servers", "localhost:9092"); > props.put("group.id", "testgroup"); > props.put("value.deserializer", StringDeserializer.class.getName()); > props.put("key.deserializer", StringDeserializer.class.getName()); > props.put("enable.auto.commit", "false"); > KafkaConsumer kc = new KafkaConsumer(props); > kc.subscribe(Collections.singletonList(topic)); > kc.poll(0); > Set partitions = kc.assignment(); > System.out.println("partitions: " + partitions); > kc.pause(partitions); > kc.seekToEnd(partitions); > System.out.println("please delete the topic in 30 seconds"); > try { > // Sleep 30 seconds to give us enough time to delete the topic. > Thread.sleep(3); > } catch (InterruptedException e) { > e.printStackTrace(); > } > System.out.println("sleep end"); > for (TopicPartition p : partitions) { > System.out.println(p + " offset: " + kc.position(p)); > } > System.out.println("cannot reach here"); > kc.close(); > } > } > {code} -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (KAFKA-4879) KafkaConsumer.position may hang forever when deleting a topic
[ https://issues.apache.org/jira/browse/KAFKA-4879?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16189600#comment-16189600 ] Ismael Juma commented on KAFKA-4879: This needs a KIP, so pushing to the next release. > KafkaConsumer.position may hang forever when deleting a topic > - > > Key: KAFKA-4879 > URL: https://issues.apache.org/jira/browse/KAFKA-4879 > Project: Kafka > Issue Type: Bug > Components: consumer >Affects Versions: 0.10.2.0 >Reporter: Shixiong Zhu >Assignee: Balint Molnar > Fix For: 1.1.0 > > > KafkaConsumer.position may hang forever when deleting a topic. The problem is > this line > https://github.com/apache/kafka/blob/022bf129518e33e165f9ceefc4ab9e622952d3bd/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java#L374 > The timeout is "Long.MAX_VALUE", and it will just retry forever for > UnknownTopicOrPartitionException. > Here is a reproducer > {code} > import org.apache.kafka.clients.consumer.KafkaConsumer; > import org.apache.kafka.common.TopicPartition; > import org.apache.kafka.common.serialization.StringDeserializer; > import java.util.Collections; > import java.util.Properties; > import java.util.Set; > public class KafkaReproducer { > public static void main(String[] args) { > // Make sure "delete.topic.enable" is set to true. > // Please create the topic test with "3" partitions manually. > // The issue is gone when there is only one partition. > String topic = "test"; > Properties props = new Properties(); > props.put("bootstrap.servers", "localhost:9092"); > props.put("group.id", "testgroup"); > props.put("value.deserializer", StringDeserializer.class.getName()); > props.put("key.deserializer", StringDeserializer.class.getName()); > props.put("enable.auto.commit", "false"); > KafkaConsumer kc = new KafkaConsumer(props); > kc.subscribe(Collections.singletonList(topic)); > kc.poll(0); > Set partitions = kc.assignment(); > System.out.println("partitions: " + partitions); > kc.pause(partitions); > kc.seekToEnd(partitions); > System.out.println("please delete the topic in 30 seconds"); > try { > // Sleep 30 seconds to give us enough time to delete the topic. > Thread.sleep(3); > } catch (InterruptedException e) { > e.printStackTrace(); > } > System.out.println("sleep end"); > for (TopicPartition p : partitions) { > System.out.println(p + " offset: " + kc.position(p)); > } > System.out.println("cannot reach here"); > kc.close(); > } > } > {code} -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (KAFKA-4879) KafkaConsumer.position may hang forever when deleting a topic
[ https://issues.apache.org/jira/browse/KAFKA-4879?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16116707#comment-16116707 ] ASF GitHub Bot commented on KAFKA-4879: --- GitHub user baluchicken opened a pull request: https://github.com/apache/kafka/pull/3637 KAFKA-4879 KafkaConsumer.position may hang forever when deleting a topic @guozhangwang I created this PR but it lacks the test. I also has the test but I wanted to ask which Test file should be the best candidate for this? Also there is a TODO line which I created because that method already has a timeout parameter, what do you think how can we proceed there? You can merge this pull request into a Git repository by running: $ git pull https://github.com/baluchicken/kafka-1 KAFKA-4879 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/kafka/pull/3637.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #3637 commit 211b2118b0ec5fd7ff5b499ef17c92b0d31e76e3 Author: Balint MolnarDate: 2017-08-07T14:54:34Z KAFKA-4879 KafkaConsumer.position may hang forever when deleting a topic > KafkaConsumer.position may hang forever when deleting a topic > - > > Key: KAFKA-4879 > URL: https://issues.apache.org/jira/browse/KAFKA-4879 > Project: Kafka > Issue Type: Bug > Components: consumer >Affects Versions: 0.10.2.0 >Reporter: Shixiong Zhu >Assignee: Balint Molnar > Fix For: 1.0.0 > > > KafkaConsumer.position may hang forever when deleting a topic. The problem is > this line > https://github.com/apache/kafka/blob/022bf129518e33e165f9ceefc4ab9e622952d3bd/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java#L374 > The timeout is "Long.MAX_VALUE", and it will just retry forever for > UnknownTopicOrPartitionException. > Here is a reproducer > {code} > import org.apache.kafka.clients.consumer.KafkaConsumer; > import org.apache.kafka.common.TopicPartition; > import org.apache.kafka.common.serialization.StringDeserializer; > import java.util.Collections; > import java.util.Properties; > import java.util.Set; > public class KafkaReproducer { > public static void main(String[] args) { > // Make sure "delete.topic.enable" is set to true. > // Please create the topic test with "3" partitions manually. > // The issue is gone when there is only one partition. > String topic = "test"; > Properties props = new Properties(); > props.put("bootstrap.servers", "localhost:9092"); > props.put("group.id", "testgroup"); > props.put("value.deserializer", StringDeserializer.class.getName()); > props.put("key.deserializer", StringDeserializer.class.getName()); > props.put("enable.auto.commit", "false"); > KafkaConsumer kc = new KafkaConsumer(props); > kc.subscribe(Collections.singletonList(topic)); > kc.poll(0); > Set partitions = kc.assignment(); > System.out.println("partitions: " + partitions); > kc.pause(partitions); > kc.seekToEnd(partitions); > System.out.println("please delete the topic in 30 seconds"); > try { > // Sleep 30 seconds to give us enough time to delete the topic. > Thread.sleep(3); > } catch (InterruptedException e) { > e.printStackTrace(); > } > System.out.println("sleep end"); > for (TopicPartition p : partitions) { > System.out.println(p + " offset: " + kc.position(p)); > } > System.out.println("cannot reach here"); > kc.close(); > } > } > {code} -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (KAFKA-4879) KafkaConsumer.position may hang forever when deleting a topic
[ https://issues.apache.org/jira/browse/KAFKA-4879?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16116589#comment-16116589 ] Balint Molnar commented on KAFKA-4879: -- [~guozhang] I debugged this issue, my findings are the following: * The partition size for the topic is need to be bigger than num.partitions configuration value. * This problem only occurs when delete.topic.enable is set to true. * This problem only occurs when auto.create.topics.enable set to true. (if this one is set to false then it does not matter how many partitions are set in num.partitions the position will hang indefinitely.) * The main problem here is: during the consumer.position call the consumer will recreate the topic with the num.partitions partitions and if it is smaller than the original topic's partition num it will hang indefinitely. > KafkaConsumer.position may hang forever when deleting a topic > - > > Key: KAFKA-4879 > URL: https://issues.apache.org/jira/browse/KAFKA-4879 > Project: Kafka > Issue Type: Bug > Components: consumer >Affects Versions: 0.10.2.0 >Reporter: Shixiong Zhu >Assignee: Balint Molnar > Fix For: 1.0.0 > > > KafkaConsumer.position may hang forever when deleting a topic. The problem is > this line > https://github.com/apache/kafka/blob/022bf129518e33e165f9ceefc4ab9e622952d3bd/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java#L374 > The timeout is "Long.MAX_VALUE", and it will just retry forever for > UnknownTopicOrPartitionException. > Here is a reproducer > {code} > import org.apache.kafka.clients.consumer.KafkaConsumer; > import org.apache.kafka.common.TopicPartition; > import org.apache.kafka.common.serialization.StringDeserializer; > import java.util.Collections; > import java.util.Properties; > import java.util.Set; > public class KafkaReproducer { > public static void main(String[] args) { > // Make sure "delete.topic.enable" is set to true. > // Please create the topic test with "3" partitions manually. > // The issue is gone when there is only one partition. > String topic = "test"; > Properties props = new Properties(); > props.put("bootstrap.servers", "localhost:9092"); > props.put("group.id", "testgroup"); > props.put("value.deserializer", StringDeserializer.class.getName()); > props.put("key.deserializer", StringDeserializer.class.getName()); > props.put("enable.auto.commit", "false"); > KafkaConsumer kc = new KafkaConsumer(props); > kc.subscribe(Collections.singletonList(topic)); > kc.poll(0); > Set partitions = kc.assignment(); > System.out.println("partitions: " + partitions); > kc.pause(partitions); > kc.seekToEnd(partitions); > System.out.println("please delete the topic in 30 seconds"); > try { > // Sleep 30 seconds to give us enough time to delete the topic. > Thread.sleep(3); > } catch (InterruptedException e) { > e.printStackTrace(); > } > System.out.println("sleep end"); > for (TopicPartition p : partitions) { > System.out.println(p + " offset: " + kc.position(p)); > } > System.out.println("cannot reach here"); > kc.close(); > } > } > {code} -- This message was sent by Atlassian JIRA (v6.4.14#64029)