[
https://issues.apache.org/jira/browse/KAFKA-2017?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14979146#comment-14979146
]
Guozhang Wang edited comment on KAFKA-2017 at 10/28/15 8:08 PM:
----------------------------------------------------------------
[~onurkaraman] [~becket_qin] [~jjkoshy] I have discussed with [~junrao] and
[~hachikuji] about various options for 0.9.0:
1. We realized relaxing the generation id check for commit offset while not
persisting grouping state does not perfectly solve the problem. Since when
coordinator migrates, consumers will 1) discover the new coordinator, 2) send
the HB request as scheduled without stop fetching. With a group of more than
one consumer, it is likely that a first consumer member will find the new
coordinator and send the HB request, then got the error back and rejoin the
group. Since today coordinator will immediately create the group when receiving
a join group request from an unknown group for the first time and finish the
join-group phase immediately, right after that other consumer member's commit
request will be rejected, hence still causing duplicates during coordinator
migration.
We have talked about delaying the creation of the group up to the session
timeout for the first-ever join group, or relax the offset commit checking
further to completely ignore the group id and always blindly accepts the
requests. But those solutions also have their problems, as the former approach
could delay the creation of the group by 30 seconds (as the default value of
session timeout), and the latter approach cannot distinguish consumers using
Kafka for coordinations with other consumers that get assignments themselves.
So we think it is still necessary to have this feature in 0.9.0 release.
2. We also went through implementation details to enforce persistency in Kafka:
a) If we are going to use two topics, one for offset and one for group
metadata, then we need to make sure these two topics will ALWAYS have the same
leader (i.e. the coordinator) for their partitions. However, with the current
reassignment mechanism, consecutive reassignments from bouncing brokers /
broker failures cannot easily ensure that is the case. We can of course
refactor the offset manager as a general key-value storage with multiple
topics, but that is a much larger feature to add that is way beyond the scope
of 0.9.0.
b) If we are going to use the same topics with the new message format as Joel
proposed, it is not clear how we can use log compaction to delete the old
formatted messages as they will be different keys. If we are going to keep
messages of both versions, it will further increase the latency of loading the
whole log for consumer group metadata upon coordinator migration, and also we
need to change the caching layer behavior to be able to override values while
loading offsets from logs.
c) Instead, what we can do with the same topics is to use the key version as
the "type indicator": since both key and value have their own versions, we can
use key version number to indicate the type of the message, for 0 it is the
offset message, and for 1 it is the group metadata offset message. The value
versions for offset and group metadata messages can still evolve separately;
and we will never evolve key versions moving forward (we cannot do this even
today anyways because of log compaction), but just change the topic if we ever
have to do so.
With this proposal: 1) OffsetManager will become ConsumerGroupManager, thought
its related config names will still be "offsetXXX" for now since they are
public, 2) loading a message from log will either return an offset object or
group metadata object, both of which will be kept inside ConsumerGroupManager's
cache, 3) we will store the assignment along with the metadata only after the
sync phase is complete; for MM this assignment could be large and hence we may
want to reconfig the "offsetMaxRecordSize" to handle this, 4) we will still
need KIP-40 for querying the group metadata / assignment from
ConsumerGroupManager's cache.
Thoughts?
was (Author: guozhang):
[~onurkaraman] [~becket_qin] [~jjkoshy] I have discussed with [~junrao] and
[~hachikuji] about various options for 0.9.0:
1. We realized relaxing the generation id check for commit offset while not
persisting grouping state does not perfectly solve the problem. Since when
coordinator migrates, consumers will 1) discover the new coordinator, 2) send
the HB request as scheduled without stop fetching. With a group of more than
one consumer, it is likely that a first consumer member will find the new
coordinator and send the HB request, then got the error back and rejoin the
group. Since today coordinator will immediately create the group when receiving
a join group request from an unknown group for the first time and finish the
join-group phase immediately, right after that other consumer member's commit
request will be rejected, hence still causing duplicates during coordinator
migration.
We have talked about delaying the creation of the group up to the session
timeout for the first-ever join group, or relax the offset commit checking
further to completely ignore the group id and always blindly accepts the
requests. But those solutions also have their problems, as the former approach
could delay the creation of the group by 30 seconds (as the default value of
session timeout), and the latter approach cannot distinguish consumers using
Kafka for coordinations with other consumers that get assignments themselves.
So we think it is still necessary to have this feature in 0.9.0 release.
2. We also went through implementation details to enforce persistency in Kafka,
and felt that it still have many tricky cases to be done right, for example:
a) If we are going to use two topics, one for offset and one for group
metadata, then we need to make sure these two topics will ALWAYS have the same
leader (i.e. the coordinator) for their partitions. However, with the current
reassignment mechanism, consecutive reassignments from bouncing brokers /
broker failures cannot easily ensure that is the case. We can of course
refactor the offset manager as a general key-value storage with multiple
topics, but that is a much larger feature to add that is way beyond the scope
of 0.9.0.
b) If we are going to use the same topics with the new message format as Joel
proposed, it is not clear how we can use log compaction to delete the old
formatted messages as they will be different keys. If we are going to keep
messages of both versions, it will further increase the latency of loading the
whole log for consumer group metadata upon coordinator migration, and also we
need to change the caching layer behavior to be able to override values while
loading offsets from logs.
c) Instead, what we can do with the same topics is to use the key version as
the "type indicator": since both key and value have their own versions, we can
use key version number to indicate the type of the message, for 0 it is the
offset message, and for 1 it is the group metadata offset message. The value
versions for offset and group metadata messages can still evolve separately;
and we will never evolve key versions moving forward (we cannot do this even
today anyways because of log compaction), but just change the topic if we ever
have to do so.
With this proposal: 1) OffsetManager will become ConsumerGroupManager, thought
its related config names will still be "offsetXXX" for now since they are
public, 2) loading a message from log will either return an offset object or
group metadata object, both of which will be kept inside ConsumerGroupManager's
cache, 3) we will store the assignment along with the metadata only after the
sync phase is complete; for MM this assignment could be large and hence we may
want to reconfig the "offsetMaxRecordSize" to handle this, 4) we will still
need KIP-40 for querying the group metadata / assignment from
ConsumerGroupManager's cache.
Thoughts?
> Persist Coordinator State for Coordinator Failover
> --------------------------------------------------
>
> Key: KAFKA-2017
> URL: https://issues.apache.org/jira/browse/KAFKA-2017
> Project: Kafka
> Issue Type: Sub-task
> Components: consumer
> Affects Versions: 0.9.0.0
> Reporter: Onur Karaman
> Assignee: Guozhang Wang
> Fix For: 0.9.0.0
>
> Attachments: KAFKA-2017.patch, KAFKA-2017_2015-05-20_09:13:39.patch,
> KAFKA-2017_2015-05-21_19:02:47.patch
>
>
> When a coordinator fails, the group membership protocol tries to failover to
> a new coordinator without forcing all the consumers rejoin their groups. This
> is possible if the coordinator persists its state so that the state can be
> transferred during coordinator failover. This state consists of most of the
> information in GroupRegistry and ConsumerRegistry.
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)