Hi Omnia,
Thanks for your questions.

The DR angle on `group.type` is interesting and I had not considered it. The 
namespace of groups contains
both consumer groups and share groups, so I was trying to ensure that which 
group type was used was
deterministic rather than a race to create the first member. There are already 
other uses of the group protocol
such as Kafka Connect, so it’s all a bit confusing even today.

It is actually KIP-848 which introduces configurations for group resources and 
KIP-932 is just building on
the idea. I think that MM2 will need to sync these configurations. The question 
of whether `group.type` is
a sensible configuration I think is separate.

Imagine that we do have `group.type` as a group configuration. How would we end 
up with groups with
the same ID but different types on the two ends of MM2? Assuming that both ends 
have KIP-932 enabled,
either the configuration was not set, and a consumer group was made on one end 
while a share group was
made on the other, OR, the configuration was set but its value changed, and 
again we get a divergence.

I think that on balance, having `group.type` as a configuration does at least 
mean there’s a better chance that
the two ends of MM2 do agree on the type of group. I’m happy to consider other 
ways to do this better. The
fact that we have different kinds of group in the same namespace is the tricky 
thing. I think this was possible
before this KIP, but it’s much more likely now.


Onto the question of memory. There are several different parts to this, all of 
which are distributed across
the cluster.

* For the group coordinator, the memory consumption will be affected by the 
number of groups,
the number of members and the number of topic-partitions to be assigned to the 
members. The
group coordinator is concerned with membership and assignment, so the memory 
per topic-partition
will be small.
* For the share coordinator, the memory consumption will be affected by the 
number of groups, the
number of topic-partitions being consumed in the group, and the number of 
in-flight records, but not
the number of members. We should be talking about no more than kilobytes per 
topic-partition.
* For the share-partition leader, the memory consumption will be affected by 
the number of share group
members assigned the topic-partition and the number of in-flight records. 
Again, we should be talking
about no more than kilobytes per topic-partition.

Of these, the factor that is not directly under control is the number of 
topic-partitions. The reason is that
I wanted to avoid a situation where the number of partitions in a topic was 
increased and suddenly
consumption in a share group hit a limit that was not anticipated.

I could introduce a configuration for controlling the number of topics allowed 
to be subscribed in a share
group. Personally, I think 1 would be a good starting point.

Let me know what you think.

Thanks,
Andrew


> On 2 Apr 2024, at 15:39, Omnia Ibrahim <o.g.h.ibra...@gmail.com> wrote:
> 
> Hi Andrew, 
> Thanks for the KIP it is definitely an interesting read. I have few questions 
> As the KIP proposing extending `AdminClient.incrementalAlterConfigs` to add 
> an explicit `group.type` what would this means for DR feature in MM2 
> offering? 
> Right now MM2 sync consumer group offsets from source to destination cluster. 
> And it also offer sync ACLs which contribute to DR feature. Would this KIP 
> means MM2 needs to also sync the type of groups to destination? 
> As `AdminClient.incrementalAlterConfigs` means "when a new group is created 
> with this name, it must have this type”. What will happened if clusters on 
> both ends of MM2 has same group id but with different types? 
> If this concern is out of the scope we might need to call this out somewhere 
> in the KIP. 
> While the number of share-group and the number of consumers in share-group is 
> limited by `group.share.max.groups`and `group.share.max.size` the total 
> number of share-group state records that might need to be loaded in-memeory 
> has another factor which is the number of partitions. In cases where group is 
> consuming from large number of topics with large number of partitions what 
> will be the impact on coordinator memory?
> 
> Thanks 
> Omnia
> 
> 
>> On 25 Mar 2024, at 10:23, Andrew Schofield 
>> <andrew_schofield_j...@outlook.com> wrote:
>> 
>> Hi Justine,
>> Thanks for your questions.
>> 
>> There are several limits in this KIP. With consumer groups, we see problems
>> where there are huge numbers of consumer groups, and we also see problems
>> when there are huge number of members in a consumer group.
>> 
>> There’s a limit on the number of members in share group. When the limit is 
>> reached,
>> additional members are not admitted to the group. The members heartbeat to 
>> remain
>> in the group and that enables timely expiration.
>> 
>> There’s also a limit of the number of share groups in a cluster. Initially, 
>> this limit has been
>> set low. As a result, it would be possible to create sufficient groups to 
>> reach the limit,
>> and then creating additional groups will fail. It will be possible to delete 
>> a share group
>> administratively, but share groups do not automatically expire (just like 
>> topics do not
>> expire and queues in message-queuing systems do not expire).
>> 
>> The `kafka-console-share-consumer.sh` tool in the KIP defaults the group 
>> name to
>> “share”. This has two benefits. First, it means that the trivial exploratory 
>> use of it running
>> multiple concurrent copies will naturally get sharing of the records 
>> consumed.
>> Second, it means that only one share group is being create, rather than 
>> generating another
>> group ID for each execution.
>> 
>> Please do re-read the read-committed section. I’ll grateful for all the 
>> thoughtful reviews
>> that the community is able to provide. The KIP says that broker-side 
>> filtering
>> removes the records for aborted transactions. This is obviously quite a 
>> difference compared
>> with consumers in consumer groups. It think it would also be possible to do 
>> it client-side
>> but the records fetched from the replica manager are distributed among the 
>> consumers,
>> and I’m concerned that it would be difficult to distribute the list of 
>> aborted transactions
>> relevant to the records each consumer receives. I’m considering prototyping 
>> client-side
>> filtering to see how well it works in practice.
>> 
>> I am definitely thoughtful about the inter-broker hops in order to persist 
>> the share-group
>> state. Originally, I did look at writing the state directly into the user’s 
>> topic-partitions
>> because this means the share-partition leader would be able to write 
>> directly.
>> This has downsides as documented in the “Rejected Alternatives” section of 
>> the KIP.
>> 
>> We do have opportunities for pipelining and batching which I expect we will 
>> exploit
>> in order to improve the performance.
>> 
>> This KIP is only the beginning. I expect a future KIP will address storage 
>> of metadata
>> in a more performant way.
>> 
>> Thanks,
>> Andrew
>> 
>>> On 21 Mar 2024, at 15:40, Justine Olshan <jols...@confluent.io.INVALID> 
>>> wrote:
>>> 
>>> Thanks Andrew,
>>> 
>>> That answers some of the questions I have.
>>> 
>>> With respect to the limits -- how will this be implemented? One issue we
>>> saw with producers is "short-lived" producers that send one message and
>>> disconnect.
>>> Due to how expiration works for producer state, if we have a simple limit
>>> for producer IDs, all new producers are blocked until the old ones expire.
>>> Will we block new group members as well if we reach our limit?
>>> 
>>> In the consumer case, we have a heartbeat which can be used for expiration
>>> behavior and avoid the headache we see on the producer side, but I can
>>> imagine a case where misuse of the groups themselves could occur -- ie
>>> creating a short lived share group that I believe will take some time to
>>> expire. Do we have considerations for this case?
>>> 
>>> I also plan to re-read the read-committed section and may have further
>>> questions there.
>>> 
>>> You also mentioned in the KIP how there are a few inter-broker hops to the
>>> share coordinator, etc for a given read operation of a partition. Are we
>>> concerned about performance here? My work in transactions and trying to
>>> optimize performance made me realize how expensive these inter-broker hops
>>> can be.
>>> 
>>> Justine
>>> 
>>> On Thu, Mar 21, 2024 at 7:37 AM Andrew Schofield <
>>> andrew_schofield_j...@outlook.com> wrote:
>>> 
>>>> Hi Justine,
>>>> Thanks for your comment. Sorry for the delay responding.
>>>> 
>>>> It was not my intent to leave a query unanswered. I have modified the KIP
>>>> as a result
>>>> of the discussion and I think perhaps I didn’t neatly close off the email
>>>> thread.
>>>> 
>>>> In summary:
>>>> 
>>>> * The share-partition leader does not maintain an explicit cache of
>>>> records that it has
>>>> fetched. When it fetches records, it does “shallow” iteration to look at
>>>> the batch
>>>> headers only so that it understands at least the base/last offset of the
>>>> records within.
>>>> It is left to the consumers to do the “deep” iteration of the records
>>>> batches they fetch.
>>>> 
>>>> * It may sometimes be necessary to re-fetch records for redelivery. This
>>>> is essentially
>>>> analogous to two consumer groups independently fetching the same records
>>>> today.
>>>> We will be relying on the efficiency of the page cache.
>>>> 
>>>> * The zero-copy optimisation is not possible for records fetched for
>>>> consumers in
>>>> share groups. The KIP does not affect the use of the zero-copy
>>>> optimisation for any
>>>> scenarios in which it currently applies (this was not true in one earlier
>>>> version of the KIP).
>>>> 
>>>> * I share concern about memory usage, partly because of the producer state
>>>> management
>>>> area. To keep a lid on memory use, the number of share groups, the number
>>>> of members
>>>> of each share group, and the number of in-flight messages per partition in
>>>> a share group
>>>> are all limited. The aim is to get the in-memory state to be nice and
>>>> compact, probably
>>>> at the expense of throughput. Over time, I’m sure we’ll optimise and get a
>>>> bit more
>>>> headroom. Limits like these cannot easily be applied retrospectively, so
>>>> the limits are
>>>> there right at the start.
>>>> 
>>>> * I have reworked the section on read-committed isolation level, and the
>>>> complexity
>>>> and memory usage of the approach is significantly improved.
>>>> 
>>>> I hope this answers your question.
>>>> 
>>>> Thanks,
>>>> Andrew
>>>> 
>>>> 
>>>>> On 18 Mar 2024, at 20:47, Justine Olshan <jols...@confluent.io.INVALID>
>>>> wrote:
>>>>> 
>>>>> Hey Andrew,
>>>>> 
>>>>> I noticed you started the voting thread, but there seems to be a few
>>>>> questions that were not answered. One was Jun's about memory usage
>>>>>> How much additional heap memory will the server use? Do we need to cache
>>>>> records in heap? If so, is the cache bounded?
>>>>> 
>>>>> Your response was
>>>>>> This area needs more work. Using a share group surely gets the broker to
>>>>> do
>>>>> more manipulation of the data that it fetches than a regular consumer. I
>>>>> want to minimise
>>>>> this and need to research before providing a comprehensive answer. I
>>>>> suspect zero-copy
>>>>> is lost and that we do not cache records in heap. I will confirm later
>>>> on.
>>>>> 
>>>>> I am also concerned about memory usage from my producer state management
>>>>> work, so I want to make sure we have thought about it here -- not just in
>>>>> the case Jun mentioned but generally.
>>>>> 
>>>>> Likewise, we have seen issues with large consumer groups and too many
>>>>> producer IDs. Are there any concerns with an analogous situation with too
>>>>> many share group members or share groups? Are they any ways we try to
>>>>> handle this or mitigate risks with respect to memory usage and client
>>>>> connections (wrt to rebalances for example)
>>>>> 
>>>>> Thanks,
>>>>> 
>>>>> Justine
>>>>> 
>>>>> On Fri, Mar 8, 2024 at 12:51 AM Andrew Schofield <
>>>>> andrew_schofield_j...@outlook.com> wrote:
>>>>> 
>>>>>> Hi Manikumar,
>>>>>> Thanks for your queries.
>>>>>> 
>>>>>> 1) Delivery count is added to the ConsumerRecord class so that a
>>>> consumer
>>>>>> can
>>>>>> tell how often a record has been processed. I imagine that some
>>>>>> applications might
>>>>>> want to take different actions based on whether a record has previously
>>>>>> failed. This
>>>>>> enables richer error handling for bad records. In the future, I plan
>>>>>> another KIP to
>>>>>> enhance error handling.
>>>>>> 
>>>>>> 2) It is only possible to delete a share group which is empty. As a
>>>>>> result, all
>>>>>> well-behaved consumers will have closed their share sessions. After a
>>>>>> short while,
>>>>>> the share-partition leaders will discard the share-partition information
>>>>>> from memory.
>>>>>> 
>>>>>> In the presence of badly behaved consumers, a consumer would have to
>>>>>> pretend to
>>>>>> be a member of a share group. There are several cases:
>>>>>> 
>>>>>> a) If the share-partition leader still has in-memory state for the
>>>> deleted
>>>>>> share-group, it will
>>>>>> continue to fetch records but it will be fenced by the share coordinator
>>>>>> when it attempts to
>>>>>> write its persistent state.
>>>>>> 
>>>>>> b) If the share-partition leader does not have in-memory state, it will
>>>>>> attempt to read it
>>>>>> from the share coordinator and this will fail.
>>>>>> 
>>>>>> 3) I will add metrics for the share coordinator today. This was an
>>>>>> omission. Thanks for catching it.
>>>>>> 
>>>>>> Thanks,
>>>>>> Andrew
>>>>>> 
>>>>>>> On 6 Mar 2024, at 17:53, Manikumar <manikumar.re...@gmail.com> wrote:
>>>>>>> 
>>>>>>> Hi Andrew,
>>>>>>> 
>>>>>>> Thanks for the updated KIP. Few queries below:
>>>>>>> 
>>>>>>> 1. What is the use-case of deliveryCount in ShareFetchResponse?
>>>>>>> 2. During delete share groups, Do we need to clean any in-memory state
>>>>>> from
>>>>>>> share-partition leaders?
>>>>>>> 3. Any metrics for the share-coordinator?
>>>>>>> 
>>>>>>> Thanks
>>>>>>> Manikumar
>>>>>>> 
>>>>>>> On Wed, Feb 21, 2024 at 12:11 AM Andrew Schofield <
>>>>>>> andrew_schofield_j...@outlook.com> wrote:
>>>>>>> 
>>>>>>>> Hi Manikumar,
>>>>>>>> Thanks for your comments.
>>>>>>>> 
>>>>>>>> 1. I believe that in general, there are not situations in which a
>>>>>> dynamic
>>>>>>>> config
>>>>>>>> change is prevented because of the existence of a resource. So, if we
>>>>>>>> prevented
>>>>>>>> setting config `group.type=consumer` on resource G of GROUP type
>>>>>>>> if there was a share group G in existence, it would be a bit weird.
>>>>>>>> 
>>>>>>>> I wonder whether changing the config name to `new.group.type` would
>>>>>> help.
>>>>>>>> It’s
>>>>>>>> ensuring the type of a new group created.
>>>>>>>> 
>>>>>>>> 2. The behaviour for a DEAD share group is intended to be the same as
>>>> a
>>>>>>>> DEAD
>>>>>>>> consumer group. The group cannot be “reused” again as such, but the
>>>>>> group
>>>>>>>> ID
>>>>>>>> can be used by a new group.
>>>>>>>> 
>>>>>>>> 3. Yes. AlterShareGroupOffsets will cause a new SHARE_CHECKPOINT.
>>>>>>>> 
>>>>>>>> 4. In common with Admin.deleteConsumerGroups, the underlying Kafka RPC
>>>>>>>> for Admin.deleteShareGroups is DeleteGroups. This is handled by the
>>>>>> group
>>>>>>>> coordinator and it does this by writing control records (a tombstone
>>>> in
>>>>>>>> this case).
>>>>>>>> The KIP doesn’t say anything about this because it’s the same as
>>>>>> consumer
>>>>>>>> groups.
>>>>>>>> Perhaps it would be sensible to add a GroupType to DeleteGroupsRequest
>>>>>> so
>>>>>>>> we can
>>>>>>>> make sure we are deleting the correct type of group. The fact that
>>>> there
>>>>>>>> is not a specific
>>>>>>>> RPC for DeleteShareGroups seems correct to me.
>>>>>>>> 
>>>>>>>> 5. I prefer using “o.a.k.clients.consumer” because it’s already a
>>>> public
>>>>>>>> package and
>>>>>>>> many of the classes and interfaces such as ConsumerRecord are in that
>>>>>>>> package.
>>>>>>>> 
>>>>>>>> I definitely need to add more information about how the Admin
>>>> operations
>>>>>>>> work.
>>>>>>>> I will add a section to the KIP in the next version, later today. This
>>>>>>>> will fill in details for
>>>>>>>> your questions (3) and (4).
>>>>>>>> 
>>>>>>>> Thanks,
>>>>>>>> Andrew
>>>>>>>> 
>>>>>>>>> On 14 Feb 2024, at 18:04, Manikumar <manikumar.re...@gmail.com>
>>>> wrote:
>>>>>>>>> 
>>>>>>>>> Hi Andrew,
>>>>>>>>> 
>>>>>>>>> Thanks for the KIP. A few comments below.
>>>>>>>>> 
>>>>>>>>> 1. kafka-configs.sh (incrementalAlterConfigs) allows you to
>>>> dynamically
>>>>>>>>> change the configs. Maybe in this case, we should not allow the user
>>>> to
>>>>>>>>> change `group.type` if it's already set.
>>>>>>>>> 2. What's the behaviour after a group transitions into DEAD state. Do
>>>>>> we
>>>>>>>>> add new control records to reset the state? Can we reuse the group
>>>>>> again?
>>>>>>>>> 3. Are we going to write new control records after the
>>>>>>>>> AlterShareGroupOffsets API to reset the state?
>>>>>>>>> 4. Is there any API for DeleteShareGroups? I assume, group
>>>> co-ordinator
>>>>>>>> is
>>>>>>>>> going to handle the API. If so, Does this mean the group co-ordinator
>>>>>>>> also
>>>>>>>>> needs to write control records?
>>>>>>>>> 5. How about using "org.apache.kafka.clients.consumer.share" package
>>>>>> for
>>>>>>>>> new interfaces/classes?
>>>>>>>>> 
>>>>>>>>> 
>>>>>>>>> Thanks,
>>>>>>>>> Manikumar
>>>>>>>> 
>>>>>>>> 
>>>>>> 
>>>>>> 
>>>> 
>>>> 
>> 
> 

Reply via email to