[jira] [Commented] (KAFKA-4879) KafkaConsumer.position may hang forever when deleting a topic

2018-03-02 Thread Ted Yu (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-4879?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16384490#comment-16384490
 ] 

Ted Yu commented on KAFKA-4879:
---

What about #2 listed by Jason above ?
Should a KIP be drafted ?

Thanks

> KafkaConsumer.position may hang forever when deleting a topic
> -
>
> Key: KAFKA-4879
> URL: https://issues.apache.org/jira/browse/KAFKA-4879
> Project: Kafka
>  Issue Type: Bug
>  Components: consumer
>Affects Versions: 0.10.2.0
>Reporter: Shixiong Zhu
>Assignee: Jason Gustafson
>Priority: Major
> Fix For: 2.0.0
>
>
> KafkaConsumer.position may hang forever when deleting a topic. The problem is 
> this line 
> https://github.com/apache/kafka/blob/022bf129518e33e165f9ceefc4ab9e622952d3bd/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java#L374
> The timeout is "Long.MAX_VALUE", and it will just retry forever for 
> UnknownTopicOrPartitionException.
> Here is a reproducer
> {code}
> import org.apache.kafka.clients.consumer.KafkaConsumer;
> import org.apache.kafka.common.TopicPartition;
> import org.apache.kafka.common.serialization.StringDeserializer;
> import java.util.Collections;
> import java.util.Properties;
> import java.util.Set;
> public class KafkaReproducer {
>   public static void main(String[] args) {
> // Make sure "delete.topic.enable" is set to true.
> // Please create the topic test with "3" partitions manually.
> // The issue is gone when there is only one partition.
> String topic = "test";
> Properties props = new Properties();
> props.put("bootstrap.servers", "localhost:9092");
> props.put("group.id", "testgroup");
> props.put("value.deserializer", StringDeserializer.class.getName());
> props.put("key.deserializer", StringDeserializer.class.getName());
> props.put("enable.auto.commit", "false");
> KafkaConsumer kc = new KafkaConsumer(props);
> kc.subscribe(Collections.singletonList(topic));
> kc.poll(0);
> Set partitions = kc.assignment();
> System.out.println("partitions: " + partitions);
> kc.pause(partitions);
> kc.seekToEnd(partitions);
> System.out.println("please delete the topic in 30 seconds");
> try {
>   // Sleep 30 seconds to give us enough time to delete the topic.
>   Thread.sleep(3);
> } catch (InterruptedException e) {
>   e.printStackTrace();
> }
> System.out.println("sleep end");
> for (TopicPartition p : partitions) {
>   System.out.println(p + " offset: " + kc.position(p));
> }
> System.out.println("cannot reach here");
> kc.close();
>   }
> }
> {code}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-4879) KafkaConsumer.position may hang forever when deleting a topic

2018-02-08 Thread Damian Guy (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-4879?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16356998#comment-16356998
 ] 

Damian Guy commented on KAFKA-4879:
---

[~hachikuji] is this going to make it for 1.1?

> KafkaConsumer.position may hang forever when deleting a topic
> -
>
> Key: KAFKA-4879
> URL: https://issues.apache.org/jira/browse/KAFKA-4879
> Project: Kafka
>  Issue Type: Bug
>  Components: consumer
>Affects Versions: 0.10.2.0
>Reporter: Shixiong Zhu
>Assignee: Jason Gustafson
>Priority: Major
> Fix For: 1.1.0
>
>
> KafkaConsumer.position may hang forever when deleting a topic. The problem is 
> this line 
> https://github.com/apache/kafka/blob/022bf129518e33e165f9ceefc4ab9e622952d3bd/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java#L374
> The timeout is "Long.MAX_VALUE", and it will just retry forever for 
> UnknownTopicOrPartitionException.
> Here is a reproducer
> {code}
> import org.apache.kafka.clients.consumer.KafkaConsumer;
> import org.apache.kafka.common.TopicPartition;
> import org.apache.kafka.common.serialization.StringDeserializer;
> import java.util.Collections;
> import java.util.Properties;
> import java.util.Set;
> public class KafkaReproducer {
>   public static void main(String[] args) {
> // Make sure "delete.topic.enable" is set to true.
> // Please create the topic test with "3" partitions manually.
> // The issue is gone when there is only one partition.
> String topic = "test";
> Properties props = new Properties();
> props.put("bootstrap.servers", "localhost:9092");
> props.put("group.id", "testgroup");
> props.put("value.deserializer", StringDeserializer.class.getName());
> props.put("key.deserializer", StringDeserializer.class.getName());
> props.put("enable.auto.commit", "false");
> KafkaConsumer kc = new KafkaConsumer(props);
> kc.subscribe(Collections.singletonList(topic));
> kc.poll(0);
> Set partitions = kc.assignment();
> System.out.println("partitions: " + partitions);
> kc.pause(partitions);
> kc.seekToEnd(partitions);
> System.out.println("please delete the topic in 30 seconds");
> try {
>   // Sleep 30 seconds to give us enough time to delete the topic.
>   Thread.sleep(3);
> } catch (InterruptedException e) {
>   e.printStackTrace();
> }
> System.out.println("sleep end");
> for (TopicPartition p : partitions) {
>   System.out.println(p + " offset: " + kc.position(p));
> }
> System.out.println("cannot reach here");
> kc.close();
>   }
> }
> {code}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-4879) KafkaConsumer.position may hang forever when deleting a topic

2017-12-21 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-4879?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16301018#comment-16301018
 ] 

ASF GitHub Bot commented on KAFKA-4879:
---

hachikuji commented on issue #3637: KAFKA-4879 KafkaConsumer.position may hang 
forever when deleting a topic
URL: https://github.com/apache/kafka/pull/3637#issuecomment-353532046
 
 
   Closing this since I'm taking over the issue.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> KafkaConsumer.position may hang forever when deleting a topic
> -
>
> Key: KAFKA-4879
> URL: https://issues.apache.org/jira/browse/KAFKA-4879
> Project: Kafka
>  Issue Type: Bug
>  Components: consumer
>Affects Versions: 0.10.2.0
>Reporter: Shixiong Zhu
>Assignee: Jason Gustafson
> Fix For: 1.1.0
>
>
> KafkaConsumer.position may hang forever when deleting a topic. The problem is 
> this line 
> https://github.com/apache/kafka/blob/022bf129518e33e165f9ceefc4ab9e622952d3bd/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java#L374
> The timeout is "Long.MAX_VALUE", and it will just retry forever for 
> UnknownTopicOrPartitionException.
> Here is a reproducer
> {code}
> import org.apache.kafka.clients.consumer.KafkaConsumer;
> import org.apache.kafka.common.TopicPartition;
> import org.apache.kafka.common.serialization.StringDeserializer;
> import java.util.Collections;
> import java.util.Properties;
> import java.util.Set;
> public class KafkaReproducer {
>   public static void main(String[] args) {
> // Make sure "delete.topic.enable" is set to true.
> // Please create the topic test with "3" partitions manually.
> // The issue is gone when there is only one partition.
> String topic = "test";
> Properties props = new Properties();
> props.put("bootstrap.servers", "localhost:9092");
> props.put("group.id", "testgroup");
> props.put("value.deserializer", StringDeserializer.class.getName());
> props.put("key.deserializer", StringDeserializer.class.getName());
> props.put("enable.auto.commit", "false");
> KafkaConsumer kc = new KafkaConsumer(props);
> kc.subscribe(Collections.singletonList(topic));
> kc.poll(0);
> Set partitions = kc.assignment();
> System.out.println("partitions: " + partitions);
> kc.pause(partitions);
> kc.seekToEnd(partitions);
> System.out.println("please delete the topic in 30 seconds");
> try {
>   // Sleep 30 seconds to give us enough time to delete the topic.
>   Thread.sleep(3);
> } catch (InterruptedException e) {
>   e.printStackTrace();
> }
> System.out.println("sleep end");
> for (TopicPartition p : partitions) {
>   System.out.println(p + " offset: " + kc.position(p));
> }
> System.out.println("cannot reach here");
> kc.close();
>   }
> }
> {code}



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (KAFKA-4879) KafkaConsumer.position may hang forever when deleting a topic

2017-12-21 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-4879?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16301019#comment-16301019
 ] 

ASF GitHub Bot commented on KAFKA-4879:
---

hachikuji closed pull request #3637: KAFKA-4879 KafkaConsumer.position may hang 
forever when deleting a topic
URL: https://github.com/apache/kafka/pull/3637
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git 
a/clients/src/main/java/org/apache/kafka/clients/consumer/Consumer.java 
b/clients/src/main/java/org/apache/kafka/clients/consumer/Consumer.java
index b1badefd318..992b0711670 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/Consumer.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/Consumer.java
@@ -116,9 +116,9 @@
 public void seekToEnd(Collection partitions);
 
 /**
- * @see KafkaConsumer#position(TopicPartition)
+ * @see KafkaConsumer#position(TopicPartition, long)
  */
-public long position(TopicPartition partition);
+public long position(TopicPartition partition, long timeout);
 
 /**
  * @see KafkaConsumer#committed(TopicPartition)
diff --git 
a/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java 
b/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
index 3154061bf01..5e287f3b114 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
@@ -1079,8 +1079,9 @@ public void assign(Collection partitions) 
{
 
 // fetch positions if we have partitions we're subscribed to that we
 // don't know the offset for
+//TODO we need to clarify this
 if (!subscriptions.hasAllFetchPositions())
-updateFetchPositions(this.subscriptions.missingFetchPositions());
+updateFetchPositions(this.subscriptions.missingFetchPositions(), 
timeout);
 
 // if data is available already, return it immediately
 Map>> records = 
fetcher.fetchedRecords();
@@ -1295,6 +1296,7 @@ public void seekToEnd(Collection 
partitions) {
  * Get the offset of the next record that will be fetched (if a 
record with that offset exists).
  *
  * @param partition The partition to get the position for
+ * @param timeout The amount of time to wait for the offset
  * @return The offset
  * @throws org.apache.kafka.clients.consumer.InvalidOffsetException if no 
offset is currently defined for
  * the partition
@@ -1305,8 +1307,10 @@ public void seekToEnd(Collection 
partitions) {
  * @throws org.apache.kafka.common.errors.AuthorizationException if not 
authorized to the topic or to the
  * configured groupId
  * @throws org.apache.kafka.common.KafkaException for any other 
unrecoverable errors
+ *
+ * @throws org.apache.kafka.common.errors.TimeoutException: if the given 
partition is not available
  */
-public long position(TopicPartition partition) {
+public long position(TopicPartition partition, long timeout) {
 acquireAndEnsureOpen();
 try {
 if (!this.subscriptions.isAssigned(partition))
@@ -1314,7 +1318,7 @@ public long position(TopicPartition partition) {
 Long offset = this.subscriptions.position(partition);
 if (offset == null) {
 // batch update fetch positions for any partitions without a 
valid position
-updateFetchPositions(subscriptions.assignedPartitions());
+updateFetchPositions(subscriptions.assignedPartitions(), 
timeout);
 offset = this.subscriptions.position(partition);
 }
 return offset;
@@ -1645,15 +1649,18 @@ private void close(long timeoutMs, boolean 
swallowException) {
  * or reset it using the offset reset policy the user has configured.
  *
  * @param partitions The partitions that needs updating fetch positions
+ * @param timeout The amount of time to wait for the offset
  * @throws NoOffsetForPartitionException If no offset is stored for a 
given partition and no offset reset policy is
  * defined
+ *
+ * @throws org.apache.kafka.common.errors.TimeoutException: if one of the 
given partition is not available
  */
-private void updateFetchPositions(Set partitions) {
+private void updateFetchPositions(Set partitions, long 
timeout) {
 // lookup any positions for partitions which are awaiting reset (which 
may be the
 // case if the user called seekToBeginning or seekToEnd. We do this 
check first to
 // avoid an 

[jira] [Commented] (KAFKA-4879) KafkaConsumer.position may hang forever when deleting a topic

2017-12-19 Thread Balint Molnar (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-4879?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16297995#comment-16297995
 ] 

Balint Molnar commented on KAFKA-4879:
--

[~jasong35] Sure, I reassigned to you.

> KafkaConsumer.position may hang forever when deleting a topic
> -
>
> Key: KAFKA-4879
> URL: https://issues.apache.org/jira/browse/KAFKA-4879
> Project: Kafka
>  Issue Type: Bug
>  Components: consumer
>Affects Versions: 0.10.2.0
>Reporter: Shixiong Zhu
>Assignee: Jason Gustafson
> Fix For: 1.1.0
>
>
> KafkaConsumer.position may hang forever when deleting a topic. The problem is 
> this line 
> https://github.com/apache/kafka/blob/022bf129518e33e165f9ceefc4ab9e622952d3bd/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java#L374
> The timeout is "Long.MAX_VALUE", and it will just retry forever for 
> UnknownTopicOrPartitionException.
> Here is a reproducer
> {code}
> import org.apache.kafka.clients.consumer.KafkaConsumer;
> import org.apache.kafka.common.TopicPartition;
> import org.apache.kafka.common.serialization.StringDeserializer;
> import java.util.Collections;
> import java.util.Properties;
> import java.util.Set;
> public class KafkaReproducer {
>   public static void main(String[] args) {
> // Make sure "delete.topic.enable" is set to true.
> // Please create the topic test with "3" partitions manually.
> // The issue is gone when there is only one partition.
> String topic = "test";
> Properties props = new Properties();
> props.put("bootstrap.servers", "localhost:9092");
> props.put("group.id", "testgroup");
> props.put("value.deserializer", StringDeserializer.class.getName());
> props.put("key.deserializer", StringDeserializer.class.getName());
> props.put("enable.auto.commit", "false");
> KafkaConsumer kc = new KafkaConsumer(props);
> kc.subscribe(Collections.singletonList(topic));
> kc.poll(0);
> Set partitions = kc.assignment();
> System.out.println("partitions: " + partitions);
> kc.pause(partitions);
> kc.seekToEnd(partitions);
> System.out.println("please delete the topic in 30 seconds");
> try {
>   // Sleep 30 seconds to give us enough time to delete the topic.
>   Thread.sleep(3);
> } catch (InterruptedException e) {
>   e.printStackTrace();
> }
> System.out.println("sleep end");
> for (TopicPartition p : partitions) {
>   System.out.println(p + " offset: " + kc.position(p));
> }
> System.out.println("cannot reach here");
> kc.close();
>   }
> }
> {code}



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (KAFKA-4879) KafkaConsumer.position may hang forever when deleting a topic

2017-12-19 Thread Jason Gustafson (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-4879?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16297562#comment-16297562
 ] 

Jason Gustafson commented on KAFKA-4879:


[~baluchicken] It looks like progress on this issue has stalled. Do you mind if 
I pick it up?

I'm not sure we've reached consensus on the solution. The underlying issue is 
that the consumer blocks to find starting offsets for partitions which are 
assigned. If we are in {{poll()}}, then we will be stuck there until the 
partition is re-created. If we are in {{position()}} or one of the relative 
{{seek()}} methods, we will be similarly stuck. 

1. In {{poll()}}, if we can't find the starting offset for a partition, I think 
we ought to begin fetching for other partitions and periodically recheck 
metadata in the background. We can potentially let the leader of the group 
rebalance if enough time passes and an assigned partition still doesn't exist. 
One question is whether we can propagate back to the user an 
UnknownTopicException at any point, but I think we can punt on this problem for 
now.

2. I think for the other methods, we have a choice between adding overloaded 
methods that accept a timeout parameter or adding a config like 
{{max.block.ms}} to match the producer. I'm somewhat partial to the first one 
since it is more flexible. If we're going to do a KIP for this, we may as well 
cover {{commitSync()}} and some of the other blocking APIs as well.

Thoughts?

> KafkaConsumer.position may hang forever when deleting a topic
> -
>
> Key: KAFKA-4879
> URL: https://issues.apache.org/jira/browse/KAFKA-4879
> Project: Kafka
>  Issue Type: Bug
>  Components: consumer
>Affects Versions: 0.10.2.0
>Reporter: Shixiong Zhu
>Assignee: Balint Molnar
> Fix For: 1.1.0
>
>
> KafkaConsumer.position may hang forever when deleting a topic. The problem is 
> this line 
> https://github.com/apache/kafka/blob/022bf129518e33e165f9ceefc4ab9e622952d3bd/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java#L374
> The timeout is "Long.MAX_VALUE", and it will just retry forever for 
> UnknownTopicOrPartitionException.
> Here is a reproducer
> {code}
> import org.apache.kafka.clients.consumer.KafkaConsumer;
> import org.apache.kafka.common.TopicPartition;
> import org.apache.kafka.common.serialization.StringDeserializer;
> import java.util.Collections;
> import java.util.Properties;
> import java.util.Set;
> public class KafkaReproducer {
>   public static void main(String[] args) {
> // Make sure "delete.topic.enable" is set to true.
> // Please create the topic test with "3" partitions manually.
> // The issue is gone when there is only one partition.
> String topic = "test";
> Properties props = new Properties();
> props.put("bootstrap.servers", "localhost:9092");
> props.put("group.id", "testgroup");
> props.put("value.deserializer", StringDeserializer.class.getName());
> props.put("key.deserializer", StringDeserializer.class.getName());
> props.put("enable.auto.commit", "false");
> KafkaConsumer kc = new KafkaConsumer(props);
> kc.subscribe(Collections.singletonList(topic));
> kc.poll(0);
> Set partitions = kc.assignment();
> System.out.println("partitions: " + partitions);
> kc.pause(partitions);
> kc.seekToEnd(partitions);
> System.out.println("please delete the topic in 30 seconds");
> try {
>   // Sleep 30 seconds to give us enough time to delete the topic.
>   Thread.sleep(3);
> } catch (InterruptedException e) {
>   e.printStackTrace();
> }
> System.out.println("sleep end");
> for (TopicPartition p : partitions) {
>   System.out.println(p + " offset: " + kc.position(p));
> }
> System.out.println("cannot reach here");
> kc.close();
>   }
> }
> {code}



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (KAFKA-4879) KafkaConsumer.position may hang forever when deleting a topic

2017-10-03 Thread Ismael Juma (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-4879?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16189600#comment-16189600
 ] 

Ismael Juma commented on KAFKA-4879:


This needs a KIP, so pushing to the next release.

> KafkaConsumer.position may hang forever when deleting a topic
> -
>
> Key: KAFKA-4879
> URL: https://issues.apache.org/jira/browse/KAFKA-4879
> Project: Kafka
>  Issue Type: Bug
>  Components: consumer
>Affects Versions: 0.10.2.0
>Reporter: Shixiong Zhu
>Assignee: Balint Molnar
> Fix For: 1.1.0
>
>
> KafkaConsumer.position may hang forever when deleting a topic. The problem is 
> this line 
> https://github.com/apache/kafka/blob/022bf129518e33e165f9ceefc4ab9e622952d3bd/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java#L374
> The timeout is "Long.MAX_VALUE", and it will just retry forever for 
> UnknownTopicOrPartitionException.
> Here is a reproducer
> {code}
> import org.apache.kafka.clients.consumer.KafkaConsumer;
> import org.apache.kafka.common.TopicPartition;
> import org.apache.kafka.common.serialization.StringDeserializer;
> import java.util.Collections;
> import java.util.Properties;
> import java.util.Set;
> public class KafkaReproducer {
>   public static void main(String[] args) {
> // Make sure "delete.topic.enable" is set to true.
> // Please create the topic test with "3" partitions manually.
> // The issue is gone when there is only one partition.
> String topic = "test";
> Properties props = new Properties();
> props.put("bootstrap.servers", "localhost:9092");
> props.put("group.id", "testgroup");
> props.put("value.deserializer", StringDeserializer.class.getName());
> props.put("key.deserializer", StringDeserializer.class.getName());
> props.put("enable.auto.commit", "false");
> KafkaConsumer kc = new KafkaConsumer(props);
> kc.subscribe(Collections.singletonList(topic));
> kc.poll(0);
> Set partitions = kc.assignment();
> System.out.println("partitions: " + partitions);
> kc.pause(partitions);
> kc.seekToEnd(partitions);
> System.out.println("please delete the topic in 30 seconds");
> try {
>   // Sleep 30 seconds to give us enough time to delete the topic.
>   Thread.sleep(3);
> } catch (InterruptedException e) {
>   e.printStackTrace();
> }
> System.out.println("sleep end");
> for (TopicPartition p : partitions) {
>   System.out.println(p + " offset: " + kc.position(p));
> }
> System.out.println("cannot reach here");
> kc.close();
>   }
> }
> {code}



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (KAFKA-4879) KafkaConsumer.position may hang forever when deleting a topic

2017-08-07 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-4879?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16116707#comment-16116707
 ] 

ASF GitHub Bot commented on KAFKA-4879:
---

GitHub user baluchicken opened a pull request:

https://github.com/apache/kafka/pull/3637

KAFKA-4879 KafkaConsumer.position may hang forever when deleting a topic

@guozhangwang  I created this PR but it lacks the test. I also has the test 
but I wanted to ask which Test file should be the best candidate for this?
Also there is a TODO line which I created because that method already has a 
timeout parameter, what do you think how can we proceed there?

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/baluchicken/kafka-1 KAFKA-4879

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/kafka/pull/3637.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #3637


commit 211b2118b0ec5fd7ff5b499ef17c92b0d31e76e3
Author: Balint Molnar 
Date:   2017-08-07T14:54:34Z

KAFKA-4879 KafkaConsumer.position may hang forever when deleting a topic




> KafkaConsumer.position may hang forever when deleting a topic
> -
>
> Key: KAFKA-4879
> URL: https://issues.apache.org/jira/browse/KAFKA-4879
> Project: Kafka
>  Issue Type: Bug
>  Components: consumer
>Affects Versions: 0.10.2.0
>Reporter: Shixiong Zhu
>Assignee: Balint Molnar
> Fix For: 1.0.0
>
>
> KafkaConsumer.position may hang forever when deleting a topic. The problem is 
> this line 
> https://github.com/apache/kafka/blob/022bf129518e33e165f9ceefc4ab9e622952d3bd/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java#L374
> The timeout is "Long.MAX_VALUE", and it will just retry forever for 
> UnknownTopicOrPartitionException.
> Here is a reproducer
> {code}
> import org.apache.kafka.clients.consumer.KafkaConsumer;
> import org.apache.kafka.common.TopicPartition;
> import org.apache.kafka.common.serialization.StringDeserializer;
> import java.util.Collections;
> import java.util.Properties;
> import java.util.Set;
> public class KafkaReproducer {
>   public static void main(String[] args) {
> // Make sure "delete.topic.enable" is set to true.
> // Please create the topic test with "3" partitions manually.
> // The issue is gone when there is only one partition.
> String topic = "test";
> Properties props = new Properties();
> props.put("bootstrap.servers", "localhost:9092");
> props.put("group.id", "testgroup");
> props.put("value.deserializer", StringDeserializer.class.getName());
> props.put("key.deserializer", StringDeserializer.class.getName());
> props.put("enable.auto.commit", "false");
> KafkaConsumer kc = new KafkaConsumer(props);
> kc.subscribe(Collections.singletonList(topic));
> kc.poll(0);
> Set partitions = kc.assignment();
> System.out.println("partitions: " + partitions);
> kc.pause(partitions);
> kc.seekToEnd(partitions);
> System.out.println("please delete the topic in 30 seconds");
> try {
>   // Sleep 30 seconds to give us enough time to delete the topic.
>   Thread.sleep(3);
> } catch (InterruptedException e) {
>   e.printStackTrace();
> }
> System.out.println("sleep end");
> for (TopicPartition p : partitions) {
>   System.out.println(p + " offset: " + kc.position(p));
> }
> System.out.println("cannot reach here");
> kc.close();
>   }
> }
> {code}



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (KAFKA-4879) KafkaConsumer.position may hang forever when deleting a topic

2017-08-07 Thread Balint Molnar (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-4879?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16116589#comment-16116589
 ] 

Balint Molnar commented on KAFKA-4879:
--

[~guozhang] I debugged this issue, my findings are the following:
* The partition size for the topic is need to be bigger than num.partitions 
configuration value.
* This problem only occurs when delete.topic.enable is set to true.
* This problem only occurs when auto.create.topics.enable set to true. (if this 
one is set to false then it does not matter how many partitions are set in 
num.partitions the position will hang indefinitely.)
* The main problem here is: during the consumer.position call the consumer will 
recreate the topic with the num.partitions partitions and if it is smaller than 
the original topic's partition num it will hang indefinitely. 

> KafkaConsumer.position may hang forever when deleting a topic
> -
>
> Key: KAFKA-4879
> URL: https://issues.apache.org/jira/browse/KAFKA-4879
> Project: Kafka
>  Issue Type: Bug
>  Components: consumer
>Affects Versions: 0.10.2.0
>Reporter: Shixiong Zhu
>Assignee: Balint Molnar
> Fix For: 1.0.0
>
>
> KafkaConsumer.position may hang forever when deleting a topic. The problem is 
> this line 
> https://github.com/apache/kafka/blob/022bf129518e33e165f9ceefc4ab9e622952d3bd/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java#L374
> The timeout is "Long.MAX_VALUE", and it will just retry forever for 
> UnknownTopicOrPartitionException.
> Here is a reproducer
> {code}
> import org.apache.kafka.clients.consumer.KafkaConsumer;
> import org.apache.kafka.common.TopicPartition;
> import org.apache.kafka.common.serialization.StringDeserializer;
> import java.util.Collections;
> import java.util.Properties;
> import java.util.Set;
> public class KafkaReproducer {
>   public static void main(String[] args) {
> // Make sure "delete.topic.enable" is set to true.
> // Please create the topic test with "3" partitions manually.
> // The issue is gone when there is only one partition.
> String topic = "test";
> Properties props = new Properties();
> props.put("bootstrap.servers", "localhost:9092");
> props.put("group.id", "testgroup");
> props.put("value.deserializer", StringDeserializer.class.getName());
> props.put("key.deserializer", StringDeserializer.class.getName());
> props.put("enable.auto.commit", "false");
> KafkaConsumer kc = new KafkaConsumer(props);
> kc.subscribe(Collections.singletonList(topic));
> kc.poll(0);
> Set partitions = kc.assignment();
> System.out.println("partitions: " + partitions);
> kc.pause(partitions);
> kc.seekToEnd(partitions);
> System.out.println("please delete the topic in 30 seconds");
> try {
>   // Sleep 30 seconds to give us enough time to delete the topic.
>   Thread.sleep(3);
> } catch (InterruptedException e) {
>   e.printStackTrace();
> }
> System.out.println("sleep end");
> for (TopicPartition p : partitions) {
>   System.out.println(p + " offset: " + kc.position(p));
> }
> System.out.println("cannot reach here");
> kc.close();
>   }
> }
> {code}



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)