[ 
https://issues.apache.org/jira/browse/KAFKA-4362?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15649485#comment-15649485
 ] 

Mayuresh Gharat commented on KAFKA-4362:
----------------------------------------

I did some more testing while reproducing these error scenarios.
There are 2 bugs :
1) OFFSET COMMITS :
 When we check for MessageFormatVersion, we actually first check if the replica 
is local for the __consumer_offsets topic and if its not, we return an 
illegalArgumentException. This results in an Unknown Exception on the client 
side and commitOffset operation gets an exception. 
Also as a side effect of this, if you start another consumer in the same group 
for consuming from the same topic after the rebalance is done and start 
producing to the topic, you will see that both consumers are consuming the same 
data. This is because the second consumer that you have started is talking to 
the right coordinator and the first consumer is completely unaware of the 
presence of second consumer.

2) CONSUMER REBALANCE :
While doing topic reassignment for example moving replicas from (1,2,3 [Leader 
: 1]) to (4,2,3 [Leader : 4]) for __consumer_offsets topic, controller sends 
stopReplicaRequest to broker 1 for __consumer_offsets topic. While handling 
this request on server side, we never get rid of the particular partition of 
__consumer_offsets topic from the coordinators (broker 1) cache. When a 
handleJoinGroupRequest comes in during rebalance, the coordinator (broker 1) 
actually has a check if the group is local. But since we have not removed the 
group from its cache on the earlier stopRepicaRequest from the controller, it 
does not return NotCoordinatorForGroupException but proceeds with success. So 
the consumer thinks that its talking to the right coordinator (which is not the 
case since we moved the coordinator to broker 4 from broker 1). On the consumer 
side, in the handleJoinGroupResponseHandler callback, we send SyncGroupRequest 
to broker 1, which in turn calls the code for checking the MessageFormatVersion 
on the server. At this point it throws an illegalArgumentException for same 
reason expalined in point 1) above. This causes the syncGroupRequest to fail 
with unknown exception in the SyncGroupResponseHandler callback.

The exact steps for reproducing these scenarios are as follows :
For 1)
a) Start 4 kafka brokers and create a topic testtopicA with 1 partition.
b) Start a producer producing to a topic testtopicA. 
c) Start a console consumer with a groupId = testGroupA consuming from 
testtopicA.
d) Produce and consume some data.
e) Find the __consumer_offsets partition that stores the offsets for testGroupA.
f) Reassign the partitions for the partition form e) such that you remove the 
leader out of replica lists.
g) You should see frequent exceptions on the consumer side, something like this 
:

[2016-11-08 10:04:03,192] ERROR Group testGroupA failed to commit partition 
testtopicA-0 at offset 14: The server experienced an unexpected error when 
processing the request 
(org.apache.kafka.clients.consumer.internals.ConsumerCoordinator)

h) If you still produce to the topic, you should be able to see data in the 
console of this consumer, but its not able to commit offsets.
i) Now if you start another console consumer with same groupId = testGroupA 
consuming from the same topic testtopicA and if you produce more data, you 
should be able to see the data in both the consumer consoles.


For 2)
a) Start 4 kafka brokers and create a topic testtopicA with 1 partition.
b) Start a producer producing to a topic testtopicA. 
c) Start a console consumer with a groupId = testGroupA consuming from 
testtopicA.
c) Start another console consumer with a groupId = testGroupA consuming from 
testtopicA.
d) Produce and consume some data. Exactly one of them should be consuming the 
data.
e) Find the __consumer_offsets partition that stores the offsets for testGroupA.
f) Reassign the partitions for the partition form e) such that you remove the 
leader out of replica lists.
g) You should see frequent exceptions on the consumer actually fetching data, 
something like this :

[2016-11-08 10:04:03,192] ERROR Group testGroupA failed to commit partition 
testtopicA-0 at offset 14: The server experienced an unexpected error when 
processing the request 
(org.apache.kafka.clients.consumer.internals.ConsumerCoordinator)

h) If you kill this consumer, you should immediately see an exception on the 
other consumer console, something like this : 

[2016-11-08 10:04:20,705] ERROR Error processing message, terminating consumer 
process:  (kafka.tools.ConsoleConsumer$)
org.apache.kafka.common.KafkaException: Unexpected error from SyncGroup: The 
server experienced an unexpected error when processing the request
    at 
org.apache.kafka.clients.consumer.internals.AbstractCoordinator$SyncGroupResponseHandler.handle(AbstractCoordinator.java:518)
    at 
org.apache.kafka.clients.consumer.internals.AbstractCoordinator$SyncGroupResponseHandler.handle(AbstractCoordinator.java:485)
    at 
org.apache.kafka.clients.consumer.internals.AbstractCoordinator$CoordinatorResponseHandler.onSuccess(AbstractCoordinator.java:742)
    at 
org.apache.kafka.clients.consumer.internals.AbstractCoordinator$CoordinatorResponseHandler.onSuccess(AbstractCoordinator.java:722)
    at 
org.apache.kafka.clients.consumer.internals.RequestFuture$1.onSuccess(RequestFuture.java:186)
    at 
org.apache.kafka.clients.consumer.internals.RequestFuture.fireSuccess(RequestFuture.java:149)
    at 
org.apache.kafka.clients.consumer.internals.RequestFuture.complete(RequestFuture.java:116)
    at 
org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient$RequestFutureCompletionHandler.fireCompletion(ConsumerNetworkClient.java:479)
    at 
org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.firePendingCompletedRequests(ConsumerNetworkClient.java:316)
    at 
org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:256)
    at 
org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:180)
    at 
org.apache.kafka.clients.consumer.internals.AbstractCoordinator.joinGroupIfNeeded(AbstractCoordinator.java:308)
    at 
org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureActiveGroup(AbstractCoordinator.java:277)
    at 
org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.poll(ConsumerCoordinator.java:259)
    at 
org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:1013)
    at 
org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:979)
    at kafka.consumer.NewShinyConsumer.receive(BaseConsumer.scala:100)
    at kafka.tools.ConsoleConsumer$.process(ConsoleConsumer.scala:120)
    at kafka.tools.ConsoleConsumer$.run(ConsoleConsumer.scala:75)
    at kafka.tools.ConsoleConsumer$.main(ConsoleConsumer.scala:50)
    at kafka.tools.ConsoleConsumer.main(ConsoleConsumer.scala)
Processed a total of 0 messages

I have a fix for both of these issues and will upload a PR for it.
[~ijuma] yes the ticket that you mentioned seems a duplicate and can be closed.

> Consumer can fail after reassignment of the offsets topic partition
> -------------------------------------------------------------------
>
>                 Key: KAFKA-4362
>                 URL: https://issues.apache.org/jira/browse/KAFKA-4362
>             Project: Kafka
>          Issue Type: Bug
>    Affects Versions: 0.10.1.0
>            Reporter: Joel Koshy
>            Assignee: Mayuresh Gharat
>
> When a consumer offsets topic partition reassignment completes, an offset 
> commit shows this:
> {code}
> java.lang.IllegalArgumentException: Message format version for partition 100 
> not found
>     at 
> kafka.coordinator.GroupMetadataManager$$anonfun$14.apply(GroupMetadataManager.scala:633)
>  ~[kafka_2.10.jar:?]
>     at 
> kafka.coordinator.GroupMetadataManager$$anonfun$14.apply(GroupMetadataManager.scala:633)
>  ~[kafka_2.10.jar:?]
>     at scala.Option.getOrElse(Option.scala:120) ~[scala-library-2.10.4.jar:?]
>     at 
> kafka.coordinator.GroupMetadataManager.kafka$coordinator$GroupMetadataManager$$getMessageFormatVersionAndTimestamp(GroupMetadataManager.scala:632)
>  ~[kafka_2.10.jar:?]
>     at 
> ...
> {code}
> The issue is that the replica has been deleted so the 
> {{GroupMetadataManager.getMessageFormatVersionAndTimestamp}} throws this 
> exception instead which propagates as an unknown error.
> Unfortunately consumers don't respond to this and will fail their offset 
> commits.
> One workaround in the above situation is to bounce the cluster - the consumer 
> will be forced to rediscover the group coordinator.
> (Incidentally, the message incorrectly prints the number of partitions 
> instead of the actual partition.)



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Reply via email to