Hi Justine,
Thanks for your comment. Sorry for the delay responding.

It was not my intent to leave a query unanswered. I have modified the KIP as a 
result
of the discussion and I think perhaps I didn’t neatly close off the email 
thread.

In summary:

* The share-partition leader does not maintain an explicit cache of records 
that it has
fetched. When it fetches records, it does “shallow” iteration to look at the 
batch
headers only so that it understands at least the base/last offset of the 
records within.
It is left to the consumers to do the “deep” iteration of the records batches 
they fetch.

* It may sometimes be necessary to re-fetch records for redelivery. This is 
essentially
analogous to two consumer groups independently fetching the same records today.
We will be relying on the efficiency of the page cache.

* The zero-copy optimisation is not possible for records fetched for consumers 
in
share groups. The KIP does not affect the use of the zero-copy optimisation for 
any
scenarios in which it currently applies (this was not true in one earlier 
version of the KIP).

* I share concern about memory usage, partly because of the producer state 
management
area. To keep a lid on memory use, the number of share groups, the number of 
members
of each share group, and the number of in-flight messages per partition in a 
share group
are all limited. The aim is to get the in-memory state to be nice and compact, 
probably
at the expense of throughput. Over time, I’m sure we’ll optimise and get a bit 
more
headroom. Limits like these cannot easily be applied retrospectively, so the 
limits are
there right at the start.

* I have reworked the section on read-committed isolation level, and the 
complexity
and memory usage of the approach is significantly improved.

I hope this answers your question.

Thanks,
Andrew


> On 18 Mar 2024, at 20:47, Justine Olshan <jols...@confluent.io.INVALID> wrote:
> 
> Hey Andrew,
> 
> I noticed you started the voting thread, but there seems to be a few
> questions that were not answered. One was Jun's about memory usage
>> How much additional heap memory will the server use? Do we need to cache
> records in heap? If so, is the cache bounded?
> 
> Your response was
>> This area needs more work. Using a share group surely gets the broker to
> do
> more manipulation of the data that it fetches than a regular consumer. I
> want to minimise
> this and need to research before providing a comprehensive answer. I
> suspect zero-copy
> is lost and that we do not cache records in heap. I will confirm later on.
> 
> I am also concerned about memory usage from my producer state management
> work, so I want to make sure we have thought about it here -- not just in
> the case Jun mentioned but generally.
> 
> Likewise, we have seen issues with large consumer groups and too many
> producer IDs. Are there any concerns with an analogous situation with too
> many share group members or share groups? Are they any ways we try to
> handle this or mitigate risks with respect to memory usage and client
> connections (wrt to rebalances for example)
> 
> Thanks,
> 
> Justine
> 
> On Fri, Mar 8, 2024 at 12:51 AM Andrew Schofield <
> andrew_schofield_j...@outlook.com> wrote:
> 
>> Hi Manikumar,
>> Thanks for your queries.
>> 
>> 1) Delivery count is added to the ConsumerRecord class so that a consumer
>> can
>> tell how often a record has been processed. I imagine that some
>> applications might
>> want to take different actions based on whether a record has previously
>> failed. This
>> enables richer error handling for bad records. In the future, I plan
>> another KIP to
>> enhance error handling.
>> 
>> 2) It is only possible to delete a share group which is empty. As a
>> result, all
>> well-behaved consumers will have closed their share sessions. After a
>> short while,
>> the share-partition leaders will discard the share-partition information
>> from memory.
>> 
>> In the presence of badly behaved consumers, a consumer would have to
>> pretend to
>> be a member of a share group. There are several cases:
>> 
>> a) If the share-partition leader still has in-memory state for the deleted
>> share-group, it will
>> continue to fetch records but it will be fenced by the share coordinator
>> when it attempts to
>> write its persistent state.
>> 
>> b) If the share-partition leader does not have in-memory state, it will
>> attempt to read it
>> from the share coordinator and this will fail.
>> 
>> 3) I will add metrics for the share coordinator today. This was an
>> omission. Thanks for catching it.
>> 
>> Thanks,
>> Andrew
>> 
>>> On 6 Mar 2024, at 17:53, Manikumar <manikumar.re...@gmail.com> wrote:
>>> 
>>> Hi Andrew,
>>> 
>>> Thanks for the updated KIP. Few queries below:
>>> 
>>> 1. What is the use-case of deliveryCount in ShareFetchResponse?
>>> 2. During delete share groups, Do we need to clean any in-memory state
>> from
>>> share-partition leaders?
>>> 3. Any metrics for the share-coordinator?
>>> 
>>> Thanks
>>> Manikumar
>>> 
>>> On Wed, Feb 21, 2024 at 12:11 AM Andrew Schofield <
>>> andrew_schofield_j...@outlook.com> wrote:
>>> 
>>>> Hi Manikumar,
>>>> Thanks for your comments.
>>>> 
>>>> 1. I believe that in general, there are not situations in which a
>> dynamic
>>>> config
>>>> change is prevented because of the existence of a resource. So, if we
>>>> prevented
>>>> setting config `group.type=consumer` on resource G of GROUP type
>>>> if there was a share group G in existence, it would be a bit weird.
>>>> 
>>>> I wonder whether changing the config name to `new.group.type` would
>> help.
>>>> It’s
>>>> ensuring the type of a new group created.
>>>> 
>>>> 2. The behaviour for a DEAD share group is intended to be the same as a
>>>> DEAD
>>>> consumer group. The group cannot be “reused” again as such, but the
>> group
>>>> ID
>>>> can be used by a new group.
>>>> 
>>>> 3. Yes. AlterShareGroupOffsets will cause a new SHARE_CHECKPOINT.
>>>> 
>>>> 4. In common with Admin.deleteConsumerGroups, the underlying Kafka RPC
>>>> for Admin.deleteShareGroups is DeleteGroups. This is handled by the
>> group
>>>> coordinator and it does this by writing control records (a tombstone in
>>>> this case).
>>>> The KIP doesn’t say anything about this because it’s the same as
>> consumer
>>>> groups.
>>>> Perhaps it would be sensible to add a GroupType to DeleteGroupsRequest
>> so
>>>> we can
>>>> make sure we are deleting the correct type of group. The fact that there
>>>> is not a specific
>>>> RPC for DeleteShareGroups seems correct to me.
>>>> 
>>>> 5. I prefer using “o.a.k.clients.consumer” because it’s already a public
>>>> package and
>>>> many of the classes and interfaces such as ConsumerRecord are in that
>>>> package.
>>>> 
>>>> I definitely need to add more information about how the Admin operations
>>>> work.
>>>> I will add a section to the KIP in the next version, later today. This
>>>> will fill in details for
>>>> your questions (3) and (4).
>>>> 
>>>> Thanks,
>>>> Andrew
>>>> 
>>>>> On 14 Feb 2024, at 18:04, Manikumar <manikumar.re...@gmail.com> wrote:
>>>>> 
>>>>> Hi Andrew,
>>>>> 
>>>>> Thanks for the KIP. A few comments below.
>>>>> 
>>>>> 1. kafka-configs.sh (incrementalAlterConfigs) allows you to dynamically
>>>>> change the configs. Maybe in this case, we should not allow the user to
>>>>> change `group.type` if it's already set.
>>>>> 2. What's the behaviour after a group transitions into DEAD state. Do
>> we
>>>>> add new control records to reset the state? Can we reuse the group
>> again?
>>>>> 3. Are we going to write new control records after the
>>>>> AlterShareGroupOffsets API to reset the state?
>>>>> 4. Is there any API for DeleteShareGroups? I assume, group co-ordinator
>>>> is
>>>>> going to handle the API. If so, Does this mean the group co-ordinator
>>>> also
>>>>> needs to write control records?
>>>>> 5. How about using "org.apache.kafka.clients.consumer.share" package
>> for
>>>>> new interfaces/classes?
>>>>> 
>>>>> 
>>>>> Thanks,
>>>>> Manikumar
>>>> 
>>>> 
>> 
>> 

Reply via email to