[ 
https://issues.apache.org/jira/browse/KAFKA-4196?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15507443#comment-15507443
 ] 

Flavio Junqueira commented on KAFKA-4196:
-----------------------------------------

After a cursory look, it looks like we could have this behavior if:

# There is an event that triggers {{TopicsChangeListener.handleChildChange}}.
# The previous event is followed by a broker change: {{BrokerChangeListener}}.

According to the description, we do have the broker change event. The topics 
event only happens after the topic has been deleted from under 
{{/broker/topics}} in zk, though. If the controller instance that triggers the 
first is the same that deletes the topic, then it doesn't look like we can have 
the behavior above because: 1) all those events are processed under the 
controller context lock; 2) the controller deletes the topic znodes and updates 
{{ControllerContext.partitionLeadershipInfo}} and 
{{controllerContext.partitionReplicaAssignment}}. Consequently, one possibility 
is a race between two controllers. One puzzling point is that the delete znode 
for the topic isn't going away, which indicates that no controller instance is 
completing successfully the delete operation.

I'd need to investigate some more to find the culprit. If it happens again and 
you have a chance, please upload the logs. I'll see if I can repro locally.

> Transient test failure: 
> DeleteConsumerGroupTest.testConsumptionOnRecreatedTopicAfterTopicWideDeleteInZK
> -------------------------------------------------------------------------------------------------------
>
>                 Key: KAFKA-4196
>                 URL: https://issues.apache.org/jira/browse/KAFKA-4196
>             Project: Kafka
>          Issue Type: Sub-task
>            Reporter: Ismael Juma
>              Labels: transient-unit-test-failure
>
> The error:
> {code}
> java.lang.AssertionError: Admin path /admin/delete_topic/topic path not 
> deleted even after a replica is restarted
>       at org.junit.Assert.fail(Assert.java:88)
>       at kafka.utils.TestUtils$.waitUntilTrue(TestUtils.scala:752)
>       at kafka.utils.TestUtils$.verifyTopicDeletion(TestUtils.scala:1017)
>       at 
> kafka.admin.DeleteConsumerGroupTest.testConsumptionOnRecreatedTopicAfterTopicWideDeleteInZK(DeleteConsumerGroupTest.scala:156)
> {code}
> Caused by a broken invariant in the Controller: a partition exists in 
> `ControllerContext.partitionLeadershipInfo`, but not 
> `controllerContext.partitionReplicaAssignment`.
> {code}
> [2016-09-20 06:45:13,967] ERROR [BrokerChangeListener on Controller 1]: Error 
> while handling broker changes 
> (kafka.controller.ReplicaStateMachine$BrokerChangeListener:103)
> java.util.NoSuchElementException: key not found: [topic,0]
>       at scala.collection.MapLike$class.default(MapLike.scala:228)
>       at scala.collection.AbstractMap.default(Map.scala:58)
>       at scala.collection.mutable.HashMap.apply(HashMap.scala:64)
>       at 
> kafka.controller.ControllerBrokerRequestBatch.kafka$controller$ControllerBrokerRequestBatch$$updateMetadataRequestMapFor$1(ControllerChannelManager.scala:310)
>       at 
> kafka.controller.ControllerBrokerRequestBatch$$anonfun$addUpdateMetadataRequestForBrokers$4.apply(ControllerChannelManager.scala:343)
>       at 
> kafka.controller.ControllerBrokerRequestBatch$$anonfun$addUpdateMetadataRequestForBrokers$4.apply(ControllerChannelManager.scala:343)
>       at scala.collection.mutable.HashSet.foreach(HashSet.scala:79)
>       at 
> kafka.controller.ControllerBrokerRequestBatch.addUpdateMetadataRequestForBrokers(ControllerChannelManager.scala:343)
>       at 
> kafka.controller.KafkaController.sendUpdateMetadataRequest(KafkaController.scala:1030)
>       at 
> kafka.controller.KafkaController.onBrokerFailure(KafkaController.scala:492)
>       at 
> kafka.controller.ReplicaStateMachine$BrokerChangeListener$$anonfun$handleChildChange$1$$anonfun$apply$mcV$sp$1.apply$mcV$sp(ReplicaStateMachine.scala:376)
>       at 
> kafka.controller.ReplicaStateMachine$BrokerChangeListener$$anonfun$handleChildChange$1$$anonfun$apply$mcV$sp$1.apply(ReplicaStateMachine.scala:358)
>       at 
> kafka.controller.ReplicaStateMachine$BrokerChangeListener$$anonfun$handleChildChange$1$$anonfun$apply$mcV$sp$1.apply(ReplicaStateMachine.scala:358)
>       at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:33)
>       at 
> kafka.controller.ReplicaStateMachine$BrokerChangeListener$$anonfun$handleChildChange$1.apply$mcV$sp(ReplicaStateMachine.scala:357)
>       at 
> kafka.controller.ReplicaStateMachine$BrokerChangeListener$$anonfun$handleChildChange$1.apply(ReplicaStateMachine.scala:356)
>       at 
> kafka.controller.ReplicaStateMachine$BrokerChangeListener$$anonfun$handleChildChange$1.apply(ReplicaStateMachine.scala:356)
>       at kafka.utils.CoreUtils$.inLock(CoreUtils.scala:234)
>       at 
> kafka.controller.ReplicaStateMachine$BrokerChangeListener.handleChildChange(ReplicaStateMachine.scala:355)
>       at org.I0Itec.zkclient.ZkClient$10.run(ZkClient.java:843)
>       at org.I0Itec.zkclient.ZkEventThread.run(ZkEventThread.java:71)
> {code}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Reply via email to