[ https://issues.apache.org/jira/browse/KAFKA-4362?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15649485#comment-15649485 ]
Mayuresh Gharat commented on KAFKA-4362: ---------------------------------------- I did some more testing while reproducing these error scenarios. There are 2 bugs : 1) OFFSET COMMITS : When we check for MessageFormatVersion, we actually first check if the replica is local for the __consumer_offsets topic and if its not, we return an illegalArgumentException. This results in an Unknown Exception on the client side and commitOffset operation gets an exception. Also as a side effect of this, if you start another consumer in the same group for consuming from the same topic after the rebalance is done and start producing to the topic, you will see that both consumers are consuming the same data. This is because the second consumer that you have started is talking to the right coordinator and the first consumer is completely unaware of the presence of second consumer. 2) CONSUMER REBALANCE : While doing topic reassignment for example moving replicas from (1,2,3 [Leader : 1]) to (4,2,3 [Leader : 4]) for __consumer_offsets topic, controller sends stopReplicaRequest to broker 1 for __consumer_offsets topic. While handling this request on server side, we never get rid of the particular partition of __consumer_offsets topic from the coordinators (broker 1) cache. When a handleJoinGroupRequest comes in during rebalance, the coordinator (broker 1) actually has a check if the group is local. But since we have not removed the group from its cache on the earlier stopRepicaRequest from the controller, it does not return NotCoordinatorForGroupException but proceeds with success. So the consumer thinks that its talking to the right coordinator (which is not the case since we moved the coordinator to broker 4 from broker 1). On the consumer side, in the handleJoinGroupResponseHandler callback, we send SyncGroupRequest to broker 1, which in turn calls the code for checking the MessageFormatVersion on the server. At this point it throws an illegalArgumentException for same reason expalined in point 1) above. This causes the syncGroupRequest to fail with unknown exception in the SyncGroupResponseHandler callback. The exact steps for reproducing these scenarios are as follows : For 1) a) Start 4 kafka brokers and create a topic testtopicA with 1 partition. b) Start a producer producing to a topic testtopicA. c) Start a console consumer with a groupId = testGroupA consuming from testtopicA. d) Produce and consume some data. e) Find the __consumer_offsets partition that stores the offsets for testGroupA. f) Reassign the partitions for the partition form e) such that you remove the leader out of replica lists. g) You should see frequent exceptions on the consumer side, something like this : [2016-11-08 10:04:03,192] ERROR Group testGroupA failed to commit partition testtopicA-0 at offset 14: The server experienced an unexpected error when processing the request (org.apache.kafka.clients.consumer.internals.ConsumerCoordinator) h) If you still produce to the topic, you should be able to see data in the console of this consumer, but its not able to commit offsets. i) Now if you start another console consumer with same groupId = testGroupA consuming from the same topic testtopicA and if you produce more data, you should be able to see the data in both the consumer consoles. For 2) a) Start 4 kafka brokers and create a topic testtopicA with 1 partition. b) Start a producer producing to a topic testtopicA. c) Start a console consumer with a groupId = testGroupA consuming from testtopicA. c) Start another console consumer with a groupId = testGroupA consuming from testtopicA. d) Produce and consume some data. Exactly one of them should be consuming the data. e) Find the __consumer_offsets partition that stores the offsets for testGroupA. f) Reassign the partitions for the partition form e) such that you remove the leader out of replica lists. g) You should see frequent exceptions on the consumer actually fetching data, something like this : [2016-11-08 10:04:03,192] ERROR Group testGroupA failed to commit partition testtopicA-0 at offset 14: The server experienced an unexpected error when processing the request (org.apache.kafka.clients.consumer.internals.ConsumerCoordinator) h) If you kill this consumer, you should immediately see an exception on the other consumer console, something like this : [2016-11-08 10:04:20,705] ERROR Error processing message, terminating consumer process: (kafka.tools.ConsoleConsumer$) org.apache.kafka.common.KafkaException: Unexpected error from SyncGroup: The server experienced an unexpected error when processing the request at org.apache.kafka.clients.consumer.internals.AbstractCoordinator$SyncGroupResponseHandler.handle(AbstractCoordinator.java:518) at org.apache.kafka.clients.consumer.internals.AbstractCoordinator$SyncGroupResponseHandler.handle(AbstractCoordinator.java:485) at org.apache.kafka.clients.consumer.internals.AbstractCoordinator$CoordinatorResponseHandler.onSuccess(AbstractCoordinator.java:742) at org.apache.kafka.clients.consumer.internals.AbstractCoordinator$CoordinatorResponseHandler.onSuccess(AbstractCoordinator.java:722) at org.apache.kafka.clients.consumer.internals.RequestFuture$1.onSuccess(RequestFuture.java:186) at org.apache.kafka.clients.consumer.internals.RequestFuture.fireSuccess(RequestFuture.java:149) at org.apache.kafka.clients.consumer.internals.RequestFuture.complete(RequestFuture.java:116) at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient$RequestFutureCompletionHandler.fireCompletion(ConsumerNetworkClient.java:479) at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.firePendingCompletedRequests(ConsumerNetworkClient.java:316) at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:256) at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:180) at org.apache.kafka.clients.consumer.internals.AbstractCoordinator.joinGroupIfNeeded(AbstractCoordinator.java:308) at org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureActiveGroup(AbstractCoordinator.java:277) at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.poll(ConsumerCoordinator.java:259) at org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:1013) at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:979) at kafka.consumer.NewShinyConsumer.receive(BaseConsumer.scala:100) at kafka.tools.ConsoleConsumer$.process(ConsoleConsumer.scala:120) at kafka.tools.ConsoleConsumer$.run(ConsoleConsumer.scala:75) at kafka.tools.ConsoleConsumer$.main(ConsoleConsumer.scala:50) at kafka.tools.ConsoleConsumer.main(ConsoleConsumer.scala) Processed a total of 0 messages I have a fix for both of these issues and will upload a PR for it. [~ijuma] yes the ticket that you mentioned seems a duplicate and can be closed. > Consumer can fail after reassignment of the offsets topic partition > ------------------------------------------------------------------- > > Key: KAFKA-4362 > URL: https://issues.apache.org/jira/browse/KAFKA-4362 > Project: Kafka > Issue Type: Bug > Affects Versions: 0.10.1.0 > Reporter: Joel Koshy > Assignee: Mayuresh Gharat > > When a consumer offsets topic partition reassignment completes, an offset > commit shows this: > {code} > java.lang.IllegalArgumentException: Message format version for partition 100 > not found > at > kafka.coordinator.GroupMetadataManager$$anonfun$14.apply(GroupMetadataManager.scala:633) > ~[kafka_2.10.jar:?] > at > kafka.coordinator.GroupMetadataManager$$anonfun$14.apply(GroupMetadataManager.scala:633) > ~[kafka_2.10.jar:?] > at scala.Option.getOrElse(Option.scala:120) ~[scala-library-2.10.4.jar:?] > at > kafka.coordinator.GroupMetadataManager.kafka$coordinator$GroupMetadataManager$$getMessageFormatVersionAndTimestamp(GroupMetadataManager.scala:632) > ~[kafka_2.10.jar:?] > at > ... > {code} > The issue is that the replica has been deleted so the > {{GroupMetadataManager.getMessageFormatVersionAndTimestamp}} throws this > exception instead which propagates as an unknown error. > Unfortunately consumers don't respond to this and will fail their offset > commits. > One workaround in the above situation is to bounce the cluster - the consumer > will be forced to rediscover the group coordinator. > (Incidentally, the message incorrectly prints the number of partitions > instead of the actual partition.) -- This message was sent by Atlassian JIRA (v6.3.4#6332)