Re: Kafka 0.8's ConsumerConnector.close() hangs if ZK is unavailable and autocommit is enabled

2014-05-13 Thread Jun Rao
In 0.8, ZK is critical for managing the brokers. So, we do expect that ZK
service to be always available.

Thanks,

Jun


On Mon, May 12, 2014 at 9:59 AM, Yury Ruchin yuri.ruc...@gmail.com wrote:

 Hi all,

 I'm using Kafka 0.8 and I've ran into an issue with ConsumerConnector.
 Steps to reproduce:

 1. Start single-broker Kafka cluster with auto.create.topic.enable set to
 true
 2. Start ConsumerConnector on topic (which does not yet exist) with
 auto.offset.reset set to smallest.
 3. Produce some data to the topic.
 4. Bring Zookeeper down
 5. Call ConsumerConnector.close()

 Observation - the call blocks forever with the following stack trace:

java.lang.Thread.State: TIMED_WAITING (parking)

 at sun.misc.Unsafe.park(Native Method)

 - parking to wait for  0xc6bf0570 (a
 java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject)

 at
 java.util.concurrent.locks.LockSupport.parkUntil(LockSupport.java:237)

 at

 java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.awaitUntil(AbstractQueuedSynchronizer.java:2072)

 at
 org.I0Itec.zkclient.ZkClient.waitForKeeperState(ZkClient.java:636)

 at
 org.I0Itec.zkclient.ZkClient.waitUntilConnected(ZkClient.java:619)

 at
 org.I0Itec.zkclient.ZkClient.waitUntilConnected(ZkClient.java:615)

 at
 org.I0Itec.zkclient.ZkClient.retryUntilConnected(ZkClient.java:679)

 at
 org.I0Itec.zkclient.ZkClient.writeDataReturnStat(ZkClient.java:813)

 at org.I0Itec.zkclient.ZkClient.writeData(ZkClient.java:808)

 at org.I0Itec.zkclient.ZkClient.writeData(ZkClient.java:777)

 at kafka.utils.ZkUtils$.updatePersistentPath(Unknown Source)

 at

 kafka.consumer.ZookeeperConsumerConnector$$anonfun$commitOffsets$3$$anonfun$apply$5.apply(Unknown
 Source)

 at

 kafka.consumer.ZookeeperConsumerConnector$$anonfun$commitOffsets$3$$anonfun$apply$5.apply(Unknown
 Source)

 at scala.collection.Iterator$class.foreach(Iterator.scala:727)

 at scala.collection.AbstractIterator.foreach(Iterator.scala:1157)

 at
 scala.collection.IterableLike$class.foreach(IterableLike.scala:72)

 at scala.collection.AbstractIterable.foreach(Iterable.scala:54)

 at

 kafka.consumer.ZookeeperConsumerConnector$$anonfun$commitOffsets$3.apply(Unknown
 Source)

 at

 kafka.consumer.ZookeeperConsumerConnector$$anonfun$commitOffsets$3.apply(Unknown
 Source)

 at

 scala.collection.TraversableLike$WithFilter$$anonfun$foreach$1.apply(TraversableLike.scala:772)

 at scala.collection.Iterator$class.foreach(Iterator.scala:727)

 at kafka.utils.Pool$$anon$1.foreach(Unknown Source)

 at
 scala.collection.IterableLike$class.foreach(IterableLike.scala:72)

 at kafka.utils.Pool.foreach(Unknown Source)

 at

 scala.collection.TraversableLike$WithFilter.foreach(TraversableLike.scala:771)

 at kafka.consumer.ZookeeperConsumerConnector.commitOffsets(Unknown
 Source)

 at kafka.consumer.ZookeeperConsumerConnector.liftedTree1$1(Unknown
 Source)

 at kafka.consumer.ZookeeperConsumerConnector.shutdown(Unknown
 Source)

 - locked 0xc6be0c60 (a java.lang.Object)

 at
 kafka.javaapi.consumer.ZookeeperConsumerConnector.shutdown(Unknown Source)
...

 Once I set auto.commit.enable to false, the problem is gone. This is
 identical to what's described in
 https://issues.apache.org/jira/browse/KAFKA-601, with the only difference
 that it applies to Kafka 0.8 rather than Kafka 0.7.2. Any way to solve this
 issue other than disabling auto-commit?

 Thanks,
 Yury



Re: Kafka 0.8's ConsumerConnector.close() hangs if ZK is unavailable and autocommit is enabled

2014-05-13 Thread Guozhang Wang
Hi Yury,

When auto-commit is turned on, before the consumer shuts down completely it
needs to commit its last offset to ZK to avoid any duplicates consumed when
it starts over. If the ZK is not up running then the shutdown would rather
stuck than let it go and incur duplicates. This behavior is by design that
to make consistent offset maintenance, ZK should be up all the time.

Guozhang


On Mon, May 12, 2014 at 9:59 AM, Yury Ruchin yuri.ruc...@gmail.com wrote:

 Hi all,

 I'm using Kafka 0.8 and I've ran into an issue with ConsumerConnector.
 Steps to reproduce:

 1. Start single-broker Kafka cluster with auto.create.topic.enable set to
 true
 2. Start ConsumerConnector on topic (which does not yet exist) with
 auto.offset.reset set to smallest.
 3. Produce some data to the topic.
 4. Bring Zookeeper down
 5. Call ConsumerConnector.close()

 Observation - the call blocks forever with the following stack trace:

java.lang.Thread.State: TIMED_WAITING (parking)

 at sun.misc.Unsafe.park(Native Method)

 - parking to wait for  0xc6bf0570 (a
 java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject)

 at
 java.util.concurrent.locks.LockSupport.parkUntil(LockSupport.java:237)

 at

 java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.awaitUntil(AbstractQueuedSynchronizer.java:2072)

 at
 org.I0Itec.zkclient.ZkClient.waitForKeeperState(ZkClient.java:636)

 at
 org.I0Itec.zkclient.ZkClient.waitUntilConnected(ZkClient.java:619)

 at
 org.I0Itec.zkclient.ZkClient.waitUntilConnected(ZkClient.java:615)

 at
 org.I0Itec.zkclient.ZkClient.retryUntilConnected(ZkClient.java:679)

 at
 org.I0Itec.zkclient.ZkClient.writeDataReturnStat(ZkClient.java:813)

 at org.I0Itec.zkclient.ZkClient.writeData(ZkClient.java:808)

 at org.I0Itec.zkclient.ZkClient.writeData(ZkClient.java:777)

 at kafka.utils.ZkUtils$.updatePersistentPath(Unknown Source)

 at

 kafka.consumer.ZookeeperConsumerConnector$$anonfun$commitOffsets$3$$anonfun$apply$5.apply(Unknown
 Source)

 at

 kafka.consumer.ZookeeperConsumerConnector$$anonfun$commitOffsets$3$$anonfun$apply$5.apply(Unknown
 Source)

 at scala.collection.Iterator$class.foreach(Iterator.scala:727)

 at scala.collection.AbstractIterator.foreach(Iterator.scala:1157)

 at
 scala.collection.IterableLike$class.foreach(IterableLike.scala:72)

 at scala.collection.AbstractIterable.foreach(Iterable.scala:54)

 at

 kafka.consumer.ZookeeperConsumerConnector$$anonfun$commitOffsets$3.apply(Unknown
 Source)

 at

 kafka.consumer.ZookeeperConsumerConnector$$anonfun$commitOffsets$3.apply(Unknown
 Source)

 at

 scala.collection.TraversableLike$WithFilter$$anonfun$foreach$1.apply(TraversableLike.scala:772)

 at scala.collection.Iterator$class.foreach(Iterator.scala:727)

 at kafka.utils.Pool$$anon$1.foreach(Unknown Source)

 at
 scala.collection.IterableLike$class.foreach(IterableLike.scala:72)

 at kafka.utils.Pool.foreach(Unknown Source)

 at

 scala.collection.TraversableLike$WithFilter.foreach(TraversableLike.scala:771)

 at kafka.consumer.ZookeeperConsumerConnector.commitOffsets(Unknown
 Source)

 at kafka.consumer.ZookeeperConsumerConnector.liftedTree1$1(Unknown
 Source)

 at kafka.consumer.ZookeeperConsumerConnector.shutdown(Unknown
 Source)

 - locked 0xc6be0c60 (a java.lang.Object)

 at
 kafka.javaapi.consumer.ZookeeperConsumerConnector.shutdown(Unknown Source)
...

 Once I set auto.commit.enable to false, the problem is gone. This is
 identical to what's described in
 https://issues.apache.org/jira/browse/KAFKA-601, with the only difference
 that it applies to Kafka 0.8 rather than Kafka 0.7.2. Any way to solve this
 issue other than disabling auto-commit?

 Thanks,
 Yury




-- 
-- Guozhang


Kafka 0.8's ConsumerConnector.close() hangs if ZK is unavailable and autocommit is enabled

2014-05-12 Thread Yury Ruchin
Hi all,

I'm using Kafka 0.8 and I've ran into an issue with ConsumerConnector.
Steps to reproduce:

1. Start single-broker Kafka cluster with auto.create.topic.enable set to
true
2. Start ConsumerConnector on topic (which does not yet exist) with
auto.offset.reset set to smallest.
3. Produce some data to the topic.
4. Bring Zookeeper down
5. Call ConsumerConnector.close()

Observation - the call blocks forever with the following stack trace:

   java.lang.Thread.State: TIMED_WAITING (parking)

at sun.misc.Unsafe.park(Native Method)

- parking to wait for  0xc6bf0570 (a
java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject)

at
java.util.concurrent.locks.LockSupport.parkUntil(LockSupport.java:237)

at
java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.awaitUntil(AbstractQueuedSynchronizer.java:2072)

at
org.I0Itec.zkclient.ZkClient.waitForKeeperState(ZkClient.java:636)

at
org.I0Itec.zkclient.ZkClient.waitUntilConnected(ZkClient.java:619)

at
org.I0Itec.zkclient.ZkClient.waitUntilConnected(ZkClient.java:615)

at
org.I0Itec.zkclient.ZkClient.retryUntilConnected(ZkClient.java:679)

at
org.I0Itec.zkclient.ZkClient.writeDataReturnStat(ZkClient.java:813)

at org.I0Itec.zkclient.ZkClient.writeData(ZkClient.java:808)

at org.I0Itec.zkclient.ZkClient.writeData(ZkClient.java:777)

at kafka.utils.ZkUtils$.updatePersistentPath(Unknown Source)

at
kafka.consumer.ZookeeperConsumerConnector$$anonfun$commitOffsets$3$$anonfun$apply$5.apply(Unknown
Source)

at
kafka.consumer.ZookeeperConsumerConnector$$anonfun$commitOffsets$3$$anonfun$apply$5.apply(Unknown
Source)

at scala.collection.Iterator$class.foreach(Iterator.scala:727)

at scala.collection.AbstractIterator.foreach(Iterator.scala:1157)

at
scala.collection.IterableLike$class.foreach(IterableLike.scala:72)

at scala.collection.AbstractIterable.foreach(Iterable.scala:54)

at
kafka.consumer.ZookeeperConsumerConnector$$anonfun$commitOffsets$3.apply(Unknown
Source)

at
kafka.consumer.ZookeeperConsumerConnector$$anonfun$commitOffsets$3.apply(Unknown
Source)

at
scala.collection.TraversableLike$WithFilter$$anonfun$foreach$1.apply(TraversableLike.scala:772)

at scala.collection.Iterator$class.foreach(Iterator.scala:727)

at kafka.utils.Pool$$anon$1.foreach(Unknown Source)

at
scala.collection.IterableLike$class.foreach(IterableLike.scala:72)

at kafka.utils.Pool.foreach(Unknown Source)

at
scala.collection.TraversableLike$WithFilter.foreach(TraversableLike.scala:771)

at kafka.consumer.ZookeeperConsumerConnector.commitOffsets(Unknown
Source)

at kafka.consumer.ZookeeperConsumerConnector.liftedTree1$1(Unknown
Source)

at kafka.consumer.ZookeeperConsumerConnector.shutdown(Unknown
Source)

- locked 0xc6be0c60 (a java.lang.Object)

at
kafka.javaapi.consumer.ZookeeperConsumerConnector.shutdown(Unknown Source)
   ...

Once I set auto.commit.enable to false, the problem is gone. This is
identical to what's described in
https://issues.apache.org/jira/browse/KAFKA-601, with the only difference
that it applies to Kafka 0.8 rather than Kafka 0.7.2. Any way to solve this
issue other than disabling auto-commit?

Thanks,
Yury