[
https://issues.apache.org/jira/browse/KAFKA-1029?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14105480#comment-14105480
]
Yiyang Li commented on KAFKA-1029:
----------------------------------
Hello, I am totally new to Kafka, and we are .Net developers who is testing the
kafka by the kafka-net library. The Nuget package is still under alpha version,
leaving lots of functionality not implemented.
We recently got this problem when we embed a producer in a service, where
according to the library, it will do a ResponseTimeoutCheck every 30000 ms.
However, one of the client throws an error (the others are fine)
ERROR Closing socket for /10.207.x.x because of error (kafka.network.Processor)
java.io.IOException: An existing connection was forcibly closed by the remote
host
at sun.nio.ch.SocketDispatcher.write0(Native Method)
at sun.nio.ch.SocketDispatcher.write(SocketDispatcher.java:51)
at sun.nio.ch.IOUtil.writeFromNativeBuffer(IOUtil.java:93)
at sun.nio.ch.IOUtil.write(IOUtil.java:65)
at sun.nio.ch.SocketChannelImpl.write(SocketChannelImpl.java:466)
at kafka.api.FetchResponseSend.writeTo(FetchResponse.scala:217)
at kafka.network.Processor.write(SocketServer.scala:375)
at kafka.network.Processor.run(SocketServer.scala:247)
at java.lang.Thread.run(Thread.java:745)
log4j:ERROR Failed to rename [/cygdrive/c/kafka/bin/../logs/server.log] to
[/cygdrive/c/kafka/bin/../logs/server.log.2014-08-20-17].
under the kafka-server-start.sh
I have change the log4j properties to
log4j.appender.kafkaAppender.DatePattern='.'yyyy-MM
log4j.appender.kafkaAppender.File=${kafka.logs.dir}/server.log
Other backgound:
Kafka binaries: Scala 2.9.2 - kafka_2.9.2-0.8.1.1
Brokers: 3 brokers in 3 different machines, using the same port under each IP
(borker id = 0, 1, 2)
Zookeeper: 2181 port at one of the brokers
I understand that it might be hard to reproduce as it might be the problem in
incomplete .Net client
Details of ResponseTimeoutCheck:
https://github.com/YiyangLi/kafka-net/blob/master/src/kafka-net/KafkaConnection.cs
(Line 200)
Thanks.
> Zookeeper leader election stuck in ephemeral node retry loop
> ------------------------------------------------------------
>
> Key: KAFKA-1029
> URL: https://issues.apache.org/jira/browse/KAFKA-1029
> Project: Kafka
> Issue Type: Bug
> Components: controller
> Affects Versions: 0.8.0
> Reporter: Sam Meder
> Assignee: Sam Meder
> Priority: Blocker
> Fix For: 0.8.0
>
> Attachments:
> 0002-KAFKA-1029-Use-brokerId-instead-of-leaderId-when-tri.patch
>
>
> We're seeing the following log statements (over and over):
> [2013-08-27 07:21:49,538] INFO conflict in /controller data: { "brokerid":3,
> "timestamp":"1377587945206", "version":1 } stored data: { "brokerid":2,
> "timestamp":"1377587460904", "version":1 } (kafka.utils.ZkUtils$)
> [2013-08-27 07:21:49,559] INFO I wrote this conflicted ephemeral node [{
> "brokerid":3, "timestamp":"1377587945206", "version":1 }] at /controller a
> while back in a different session, hence I will backoff for this node to be
> deleted by Zookeeper and retry (kafka.utils.ZkUtils$)
> where the broker is essentially stuck in the loop that is trying to deal with
> left-over ephemeral nodes. The code looks a bit racy to me. In particular:
> ZookeeperLeaderElector:
> def elect: Boolean = {
> controllerContext.zkClient.subscribeDataChanges(electionPath,
> leaderChangeListener)
> val timestamp = SystemTime.milliseconds.toString
> val electString = ...
> try {
>
> createEphemeralPathExpectConflictHandleZKBug(controllerContext.zkClient,
> electionPath, electString, leaderId,
> (controllerString : String, leaderId : Any) =>
> KafkaController.parseControllerId(controllerString) ==
> leaderId.asInstanceOf[Int],
> controllerContext.zkSessionTimeout)
> leaderChangeListener is registered before the create call (by the way, it
> looks like a new registration will be added every elect call - shouldn't it
> register in startup()?) so can update leaderId to the current leader before
> the call to create. If that happens then we will continuously get node exists
> exceptions and the checker function will always return true, i.e. we will
> never get out of the while(true) loop.
> I think the right fix here is to pass brokerId instead of leaderId when
> calling create, i.e.
> createEphemeralPathExpectConflictHandleZKBug(controllerContext.zkClient,
> electionPath, electString, brokerId,
> (controllerString : String, leaderId : Any) =>
> KafkaController.parseControllerId(controllerString) ==
> leaderId.asInstanceOf[Int],
> controllerContext.zkSessionTimeout)
> The loop dealing with the ephemeral node bug is now only triggered for the
> broker that owned the node previously, although I am still not 100% sure if
> that is sufficient.
--
This message was sent by Atlassian JIRA
(v6.2#6252)