[ 
https://issues.apache.org/jira/browse/KAFKA-2017?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14979146#comment-14979146
 ] 

Guozhang Wang edited comment on KAFKA-2017 at 10/28/15 8:08 PM:
----------------------------------------------------------------

[~onurkaraman] [~becket_qin] [~jjkoshy] I have discussed with [~junrao] and 
[~hachikuji] about various options for 0.9.0:

1. We realized relaxing the generation id check for commit offset while not 
persisting grouping state does not perfectly solve the problem. Since when 
coordinator migrates, consumers will 1) discover the new coordinator, 2) send 
the HB request as scheduled without stop fetching. With a group of more than 
one consumer, it is likely that a first consumer member will find the new 
coordinator and send the HB request, then got the error back and rejoin the 
group. Since today coordinator will immediately create the group when receiving 
a join group request from an unknown group for the first time and finish the 
join-group phase immediately, right after that other consumer member's commit 
request will be rejected, hence still causing duplicates during coordinator 
migration.

We have talked about delaying the creation of the group up to the session 
timeout for the first-ever join group, or relax the offset commit checking 
further to completely ignore the group id and always blindly accepts the 
requests. But those solutions also have their problems, as the former approach 
could delay the creation of the group by 30 seconds (as the default value of 
session timeout), and the latter approach cannot distinguish consumers using 
Kafka for coordinations with other consumers that get assignments themselves. 
So we think it is still necessary to have this feature in 0.9.0 release.

2. We also went through implementation details to enforce persistency in Kafka:

a) If we are going to use two topics, one for offset and one for group 
metadata, then we need to make sure these two topics will ALWAYS have the same 
leader (i.e. the coordinator) for their partitions. However, with the current 
reassignment mechanism, consecutive reassignments from bouncing brokers / 
broker failures cannot easily ensure that is the case. We can of course 
refactor the offset manager as a general key-value storage with multiple 
topics, but that is a much larger feature to add that is way beyond the scope 
of 0.9.0.

b) If we are going to use the same topics with the new message format as Joel 
proposed, it is not clear how we can use log compaction to delete the old 
formatted messages as they will be different keys. If we are going to keep 
messages of both versions, it will further increase the latency of loading the 
whole log for consumer group metadata upon coordinator migration, and also we 
need to change the caching layer behavior to be able to override values while 
loading offsets from logs.

c) Instead, what we can do with the same topics is to use the key version as 
the "type indicator": since both key and value have their own versions, we can 
use key version number to indicate the type of the message, for 0 it is the 
offset message, and for 1 it is the group metadata offset message. The value 
versions for offset and group metadata messages can still evolve separately; 
and we will never evolve key versions moving forward (we cannot do this even 
today anyways because of log compaction), but just change the topic if we ever 
have to do so.

With this proposal: 1) OffsetManager will become ConsumerGroupManager, thought 
its related config names will still be "offsetXXX" for now since they are 
public, 2) loading a message from log will either return an offset object or 
group metadata object, both of which will be kept inside ConsumerGroupManager's 
cache, 3) we will store the assignment along with the metadata only after the 
sync phase is complete; for MM this assignment could be large and hence we may 
want to reconfig the "offsetMaxRecordSize" to handle this, 4) we will still 
need KIP-40 for querying the group metadata / assignment from 
ConsumerGroupManager's cache.

Thoughts?


was (Author: guozhang):
[~onurkaraman] [~becket_qin] [~jjkoshy] I have discussed with [~junrao] and 
[~hachikuji] about various options for 0.9.0:

1. We realized relaxing the generation id check for commit offset while not 
persisting grouping state does not perfectly solve the problem. Since when 
coordinator migrates, consumers will 1) discover the new coordinator, 2) send 
the HB request as scheduled without stop fetching. With a group of more than 
one consumer, it is likely that a first consumer member will find the new 
coordinator and send the HB request, then got the error back and rejoin the 
group. Since today coordinator will immediately create the group when receiving 
a join group request from an unknown group for the first time and finish the 
join-group phase immediately, right after that other consumer member's commit 
request will be rejected, hence still causing duplicates during coordinator 
migration.

We have talked about delaying the creation of the group up to the session 
timeout for the first-ever join group, or relax the offset commit checking 
further to completely ignore the group id and always blindly accepts the 
requests. But those solutions also have their problems, as the former approach 
could delay the creation of the group by 30 seconds (as the default value of 
session timeout), and the latter approach cannot distinguish consumers using 
Kafka for coordinations with other consumers that get assignments themselves. 
So we think it is still necessary to have this feature in 0.9.0 release.

2. We also went through implementation details to enforce persistency in Kafka, 
and felt that it still have many tricky cases to be done right, for example:

a) If we are going to use two topics, one for offset and one for group 
metadata, then we need to make sure these two topics will ALWAYS have the same 
leader (i.e. the coordinator) for their partitions. However, with the current 
reassignment mechanism, consecutive reassignments from bouncing brokers / 
broker failures cannot easily ensure that is the case. We can of course 
refactor the offset manager as a general key-value storage with multiple 
topics, but that is a much larger feature to add that is way beyond the scope 
of 0.9.0.

b) If we are going to use the same topics with the new message format as Joel 
proposed, it is not clear how we can use log compaction to delete the old 
formatted messages as they will be different keys. If we are going to keep 
messages of both versions, it will further increase the latency of loading the 
whole log for consumer group metadata upon coordinator migration, and also we 
need to change the caching layer behavior to be able to override values while 
loading offsets from logs.

c) Instead, what we can do with the same topics is to use the key version as 
the "type indicator": since both key and value have their own versions, we can 
use key version number to indicate the type of the message, for 0 it is the 
offset message, and for 1 it is the group metadata offset message. The value 
versions for offset and group metadata messages can still evolve separately; 
and we will never evolve key versions moving forward (we cannot do this even 
today anyways because of log compaction), but just change the topic if we ever 
have to do so.

With this proposal: 1) OffsetManager will become ConsumerGroupManager, thought 
its related config names will still be "offsetXXX" for now since they are 
public, 2) loading a message from log will either return an offset object or 
group metadata object, both of which will be kept inside ConsumerGroupManager's 
cache, 3) we will store the assignment along with the metadata only after the 
sync phase is complete; for MM this assignment could be large and hence we may 
want to reconfig the "offsetMaxRecordSize" to handle this, 4) we will still 
need KIP-40 for querying the group metadata / assignment from 
ConsumerGroupManager's cache.

Thoughts?

> Persist Coordinator State for Coordinator Failover
> --------------------------------------------------
>
>                 Key: KAFKA-2017
>                 URL: https://issues.apache.org/jira/browse/KAFKA-2017
>             Project: Kafka
>          Issue Type: Sub-task
>          Components: consumer
>    Affects Versions: 0.9.0.0
>            Reporter: Onur Karaman
>            Assignee: Guozhang Wang
>             Fix For: 0.9.0.0
>
>         Attachments: KAFKA-2017.patch, KAFKA-2017_2015-05-20_09:13:39.patch, 
> KAFKA-2017_2015-05-21_19:02:47.patch
>
>
> When a coordinator fails, the group membership protocol tries to failover to 
> a new coordinator without forcing all the consumers rejoin their groups. This 
> is possible if the coordinator persists its state so that the state can be 
> transferred during coordinator failover. This state consists of most of the 
> information in GroupRegistry and ConsumerRegistry.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Reply via email to