[
https://issues.apache.org/jira/browse/KAFKA-1029?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=13984265#comment-13984265
]
Michael Noll commented on KAFKA-1029:
-------------------------------------
Interestingly I ran into the very same issue while doing basic validation of
Kafka 0.8.1.1.
Here is an example log message, which was repeated indefinitely:
{code}
[2014-04-29 09:48:27,207] INFO conflict in /controller data:
{"version":1,"brokerid":0,"timestamp":"1398764901156"} stored data:
{"version":1,"brokerid":0,"timestamp":"1398764894941"} (kafka.utils.ZkUtils$)
[2014-04-29 09:48:27,218] INFO I wrote this conflicted ephemeral node
[{"version":1,"brokerid":0,"timestamp":"1398764901156"}] at /controller a while
back in a different session, hence I will backoff for this node to be deleted
by Zookeeper and retry (kafka.utils.ZkUtils$)
{code}
*How to reproduce (unsolved)*
Unfortunately I cannot consistently reproduce the issue and, to be honest, I am
still not sure what actually triggers the bug. As such I can only summarize
what I did before and around the time when this bug was triggered, which I
could observe through errors in log files and through errors being displayed
after running certain commands. So yes, it's a bit like shooting in the dark.
Here's an overview of my test setup:
- I deployed Kafka 0.8.1.1 to one machine {{kafka1}} and ZooKeeper 3.4.5 to a
second machine {{zookeeper1}}.
- I followed the Kafka 0.8.1 quick start guide to create a topic "test" with 1
partition and a replication factor of 1.
- I sent test messages to the topic "test" via the console producer.
- I read test messages from the topic "test" via the console consumer.
- Apart from producing and consuming a handful of test messages I also ran some
supposedly read-only admin commands such as "describing" the topic, and running
the consumer offset checker tool.
- At "some" point, Kafka was caught in an indefinite loop complaining about
"conflicted ephemeral node".
The following paragraphs list in more detail what I did before the error popped
up.
Producer:
{code}
$ sudo su - kafka
$ cd /opt/kafka
# This command returned no topics at this point = worked as expected
$ bin/kafka-topics.sh --list --zookeeper zookeeper1
# I created a topic, worked as expected
$ bin/kafka-topics.sh --create --zookeeper zookeeper1 --replication-factor 1
--partitions 1 --topic test
# I requested details of topic "test", worked as expected
$ bin/kafka-topics.sh --zookeeper zookeeper1 --describe --topic test
# I started a console producer and manually send a handful of test messages,
worked as expected (see consumer below)
$ bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test
{code}
Consumer:
{code}
$ sudo su - kafka
$ cd /opt/kafka
# I started a console consumer, worked as expected (i.e. could read messages
sent by producer, see above)
$ bin/kafka-console-consumer.sh --zookeeper zookeeper1 --topic test
--from-beginning
{code}
Up to that point, everything worked. But then the Kafka broker went the way of
the dodo. As I said I can't pinpoint the cause, and re-running the same
commands on a fresh Kafka/ZooKeeper deployment (fresh VMs etc.) didn't
consistently trigger the issue like I hoped.
Here's what I did after the commands above, and at some point I eventually did
observe the original error described in this JIRA ticket. Again, at the moment
I cannot tell what actually triggered the bug.
Producer:
{code}
# Test-driving the consumer offset checker
$ bin/kafka-run-class.sh kafka.tools.ConsumerOffsetChecker --zkconnect
zookeeper1
# At this point consumer "foo" was not expected to exist.
$ bin/kafka-run-class.sh kafka.tools.ConsumerOffsetChecker --zkconnect
zookeeper1 --broker-info --group foo
#
# I then re-started the console producer (see below), now configured to use the
group id "foo".
#
{code}
Consumer:
{code}
# I re-started the console consumer, now configured to use the group id "foo".
$ bin/kafka-console-consumer.sh --zookeeper zookeeper1 --topic test
--from-beginning --group foo
{code}
At this point "describing" the topic gave the following info, indicating that
there was a problem (e.g. no leader for partition):
{code}
$ bin/kafka-topics.sh --zookeeper zookeeper1 --describe --topic testTopic:test
PartitionCount:1 ReplicationFactor:1 Configs:
Topic: test Partition: 0 Leader: -1 Replicas: 0 Isr:
{code}
Log files such as {{state-change.log}} showed these error messages:
{code}
[2014-04-29 07:44:47,816] TRACE Broker 0 cached leader info
(LeaderAndIsrInfo:(Leader:0,ISR:0,LeaderEpoch:0,ControllerEp
och:1),ReplicationFactor:1),AllReplicas:0) for partition [test,0] in response
to UpdateMetadata request sent by controll
er 0 epoch 1 with correlation id 7 (state.change.logger)
[2014-04-29 07:44:47,818] TRACE Controller 0 epoch 1 received response
correlationId 7 for a request sent to broker id:0
,host:kafka1,port:9092 (state.change.logger)
[2014-04-29 08:48:06,559] TRACE Controller 0 epoch 1 changed partition [test,0]
state from OnlinePartition to OfflinePar
tition (state.change.logger)
[2014-04-29 08:48:06,561] TRACE Controller 0 epoch 1 started leader election
for partition [test,0] (state.change.logger
)
[2014-04-29 08:48:06,574] ERROR Controller 0 epoch 1 initiated state change for
partition [test,0] from OfflinePartition
to OnlinePartition failed (state.change.logger)
kafka.common.NoReplicaOnlineException: No replica for partition [test,0] is
alive. Live brokers are: [Set()], Assigned r
eplicas are: [List(0)]
at
kafka.controller.OfflinePartitionLeaderSelector.selectLeader(PartitionLeaderSelector.scala:61)
at
kafka.controller.PartitionStateMachine.electLeaderForPartition(PartitionStateMachine.scala:336)
at
kafka.controller.PartitionStateMachine.kafka$controller$PartitionStateMachine$$handleStateChange(PartitionSta
teMachine.scala:185)
at
kafka.controller.PartitionStateMachine$$anonfun$triggerOnlinePartitionStateChange$3.apply(PartitionStateMachi
ne.scala:99)
at
kafka.controller.PartitionStateMachine$$anonfun$triggerOnlinePartitionStateChange$3.apply(PartitionStateMachi
ne.scala:96)
at
scala.collection.TraversableLike$WithFilter$$anonfun$foreach$1.apply(TraversableLike.scala:772)
at
scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:98)
at
scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:98)
at
scala.collection.mutable.HashTable$class.foreachEntry(HashTable.scala:226)
at scala.collection.mutable.HashMap.foreachEntry(HashMap.scala:39)
at scala.collection.mutable.HashMap.foreach(HashMap.scala:98)
at
scala.collection.TraversableLike$WithFilter.foreach(TraversableLike.scala:771)
at
kafka.controller.PartitionStateMachine.triggerOnlinePartitionStateChange(PartitionStateMachine.scala:96)
at
kafka.controller.KafkaController.onBrokerFailure(KafkaController.scala:433)
at
kafka.controller.ReplicaStateMachine$BrokerChangeListener$$anonfun$handleChildChange$1$$anonfun$apply$mcV$sp$1.apply$mcV$sp(ReplicaStateMachine.scala:344)
at
kafka.controller.ReplicaStateMachine$BrokerChangeListener$$anonfun$handleChildChange$1$$anonfun$apply$mcV$sp$1.apply(ReplicaStateMachine.scala:330)
at
kafka.controller.ReplicaStateMachine$BrokerChangeListener$$anonfun$handleChildChange$1$$anonfun$apply$mcV$sp$[2014-04-29
09:33:16,161] INFO conflict in /controller data:
{"version":1,"brokerid":0,"timestamp":"1398761286651"} stored data:
{"version":1,"brokerid":0,"timestamp":"1398761286608"} (kafka.utils.ZkUtils$)
[2014-04-29 09:33:16,163] INFO I wrote this conflicted ephemeral node
[{"version":1,"brokerid":0,"timestamp":"1398761286651"}] at /controller a while
back in a different session, hence I will backoff for this node to be deleted
by Zookeeper and retry (kafka.utils.ZkUtils$)
{code}
I also tried a few admin commands to see what would happen:
{code}
$ bin/kafka-preferred-replica-election.sh --zookeeper zookeeper1
--path-to-json-file /tmp/test.json
Failed to start preferred replica election
kafka.common.AdminCommandFailedException: Admin command failed
at
kafka.admin.PreferredReplicaLeaderElectionCommand.moveLeaderToPreferredReplica(PreferredReplicaLeaderElectionCommand.scala:115)
at
kafka.admin.PreferredReplicaLeaderElectionCommand$.main(PreferredReplicaLeaderElectionCommand.scala:60)
at
kafka.admin.PreferredReplicaLeaderElectionCommand.main(PreferredReplicaLeaderElectionCommand.scala)
Caused by: kafka.admin.AdminOperationException: Preferred replica leader
election currently in progress for Set(). Aborting operation
at
kafka.admin.PreferredReplicaLeaderElectionCommand$.writePreferredReplicaElectionData(PreferredReplicaLeaderElectionCommand.scala:101)
at
kafka.admin.PreferredReplicaLeaderElectionCommand.moveLeaderToPreferredReplica(PreferredReplicaLeaderElectionCommand.scala:113)
... 2 more
{code}
> Zookeeper leader election stuck in ephemeral node retry loop
> ------------------------------------------------------------
>
> Key: KAFKA-1029
> URL: https://issues.apache.org/jira/browse/KAFKA-1029
> Project: Kafka
> Issue Type: Bug
> Components: controller
> Affects Versions: 0.8.0
> Reporter: Sam Meder
> Assignee: Sam Meder
> Priority: Blocker
> Fix For: 0.8.0
>
> Attachments:
> 0002-KAFKA-1029-Use-brokerId-instead-of-leaderId-when-tri.patch
>
>
> We're seeing the following log statements (over and over):
> [2013-08-27 07:21:49,538] INFO conflict in /controller data: { "brokerid":3,
> "timestamp":"1377587945206", "version":1 } stored data: { "brokerid":2,
> "timestamp":"1377587460904", "version":1 } (kafka.utils.ZkUtils$)
> [2013-08-27 07:21:49,559] INFO I wrote this conflicted ephemeral node [{
> "brokerid":3, "timestamp":"1377587945206", "version":1 }] at /controller a
> while back in a different session, hence I will backoff for this node to be
> deleted by Zookeeper and retry (kafka.utils.ZkUtils$)
> where the broker is essentially stuck in the loop that is trying to deal with
> left-over ephemeral nodes. The code looks a bit racy to me. In particular:
> ZookeeperLeaderElector:
> def elect: Boolean = {
> controllerContext.zkClient.subscribeDataChanges(electionPath,
> leaderChangeListener)
> val timestamp = SystemTime.milliseconds.toString
> val electString = ...
> try {
>
> createEphemeralPathExpectConflictHandleZKBug(controllerContext.zkClient,
> electionPath, electString, leaderId,
> (controllerString : String, leaderId : Any) =>
> KafkaController.parseControllerId(controllerString) ==
> leaderId.asInstanceOf[Int],
> controllerContext.zkSessionTimeout)
> leaderChangeListener is registered before the create call (by the way, it
> looks like a new registration will be added every elect call - shouldn't it
> register in startup()?) so can update leaderId to the current leader before
> the call to create. If that happens then we will continuously get node exists
> exceptions and the checker function will always return true, i.e. we will
> never get out of the while(true) loop.
> I think the right fix here is to pass brokerId instead of leaderId when
> calling create, i.e.
> createEphemeralPathExpectConflictHandleZKBug(controllerContext.zkClient,
> electionPath, electString, brokerId,
> (controllerString : String, leaderId : Any) =>
> KafkaController.parseControllerId(controllerString) ==
> leaderId.asInstanceOf[Int],
> controllerContext.zkSessionTimeout)
> The loop dealing with the ephemeral node bug is now only triggered for the
> broker that owned the node previously, although I am still not 100% sure if
> that is sufficient.
--
This message was sent by Atlassian JIRA
(v6.2#6252)