[
https://issues.apache.org/jira/browse/KAFKA-824?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=13979305#comment-13979305
]
Artur Denysenko commented on KAFKA-824:
---------------------------------------
I think it's a case then "_connection" is null for some reason:
https://github.com/sgroschupf/zkclient/blob/master/src/main/java/org/I0Itec/zkclient/ZkClient.java#L308
Without going deep into hardcore I would override "ZkClient.create" with my
method checking "_connection" for null:
{code:java}
public class KafkaZkClient extends ZkClient{
public String create(final String path, Object data, final CreateMode mode)
throws ZkInterruptedException, IllegalArgumentException, ZkException,
RuntimeException {
if (path == null) {
throw new NullPointerException("path must not be null.");
}
final byte[] bytes = data == null ? null : serialize(data);
return retryUntilConnected(new Callable<String>() {
@Override
public String call() throws Exception {
if(_connection==null) throw new ConnectionLossException(); //
FIX HERE <<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<
return _connection.create(path, bytes, mode);
}
});
}
}
{code}
> java.lang.NullPointerException in commitOffsets
> ------------------------------------------------
>
> Key: KAFKA-824
> URL: https://issues.apache.org/jira/browse/KAFKA-824
> Project: Kafka
> Issue Type: Bug
> Components: consumer
> Affects Versions: 0.7.2
> Reporter: Yonghui Zhao
> Assignee: Neha Narkhede
> Attachments: ZkClient.0.3.txt, ZkClient.0.4.txt
>
>
> Neha Narkhede
> "Yes, I have. Unfortunately, I never quite around to fixing it. My guess is
> that it is caused due to a race condition between the rebalance thread and
> the offset commit thread when a rebalance is triggered or the client is
> being shutdown. Do you mind filing a bug ?"
> 2013/03/25 12:08:32.020 WARN [ZookeeperConsumerConnector] []
> 0_lu-ml-test10.bj-1364184411339-7c88f710 exception during commitOffsets
> java.lang.NullPointerException
> at org.I0Itec.zkclient.ZkConnection.writeData(ZkConnection.java:111)
> at org.I0Itec.zkclient.ZkClient$10.call(ZkClient.java:813)
> at org.I0Itec.zkclient.ZkClient.retryUntilConnected(ZkClient.java:675)
> at org.I0Itec.zkclient.ZkClient.writeData(ZkClient.java:809)
> at org.I0Itec.zkclient.ZkClient.writeData(ZkClient.java:777)
> at kafka.utils.ZkUtils$.updatePersistentPath(ZkUtils.scala:103)
> at
> kafka.consumer.ZookeeperConsumerConnector$$anonfun$commitOffsets$2$$anonfun$apply$4.apply(ZookeeperConsumerConnector.scala:251)
> at
> kafka.consumer.ZookeeperConsumerConnector$$anonfun$commitOffsets$2$$anonfun$apply$4.apply(ZookeeperConsumerConnector.scala:248)
> at scala.collection.Iterator$class.foreach(Iterator.scala:631)
> at
> scala.collection.JavaConversions$JIteratorWrapper.foreach(JavaConversions.scala:549)
> at scala.collection.IterableLike$class.foreach(IterableLike.scala:79)
> at
> scala.collection.JavaConversions$JCollectionWrapper.foreach(JavaConversions.scala:570)
> at
> kafka.consumer.ZookeeperConsumerConnector$$anonfun$commitOffsets$2.apply(ZookeeperConsumerConnector.scala:248)
> at
> kafka.consumer.ZookeeperConsumerConnector$$anonfun$commitOffsets$2.apply(ZookeeperConsumerConnector.scala:246)
> at scala.collection.Iterator$class.foreach(Iterator.scala:631)
> at kafka.utils.Pool$$anon$1.foreach(Pool.scala:53)
> at scala.collection.IterableLike$class.foreach(IterableLike.scala:79)
> at kafka.utils.Pool.foreach(Pool.scala:24)
> at
> kafka.consumer.ZookeeperConsumerConnector.commitOffsets(ZookeeperConsumerConnector.scala:246)
> at
> kafka.consumer.ZookeeperConsumerConnector.autoCommit(ZookeeperConsumerConnector.scala:232)
> at
> kafka.consumer.ZookeeperConsumerConnector$$anonfun$1.apply$mcV$sp(ZookeeperConsumerConnector.scala:126)
> at kafka.utils.Utils$$anon$2.run(Utils.scala:58)
> at
> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:471)
> at
> java.util.concurrent.FutureTask$Sync.innerRunAndReset(FutureTask.java:351)
> at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:178)
> at
> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:178)
> at
> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
> at
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1110)
> at
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:603)
> at java.lang.Thread.run(Thread.java:722)
--
This message was sent by Atlassian JIRA
(v6.2#6252)