Hi David,
Thanks for reviewing the KIP and your comments.

001: I think the `group.type` is the area of the KIP that has had the most 
comments.
I think you’re right that a better approach would be to make the creation of 
the group
explicit, for users who want that. I have removed `group.type` from this KIP and
I propose to introduce a separate KIP for how to control the type of groups.

002: Makes sense. What I was going for was essentially a GroupMetadata record,
but ConsumerGroupMetadata already exists. I’m perfectly happy creating a 
separate
ShareGroupMetadata and leaving ConsumerGroupMetadata unchanged by this KIP.
I have updated the KIP.

003: In order to be able to fence zombie writes to the state, I am putting the
responsibility of setting the state epoch for a share-partition in the group 
coordinator.
Because the GC already manages the group epoch, using that to initialize the 
state
epoch seems sensible. The SC is not the GC, and it doesn’t know the group epoch
without being told.

004: There is no group expiration for share groups in this KIP.

005: The share coordinator is an internal service which is not directly accessed
by external users. The RPCs that it serves are internal and only issued by 
brokers.

006: When KIP-932 becomes generally available, which I expect is at least a year
away, there will need to be a proper way of turning the capability on. I draw 
the
parallel with KIP-848 for which the configuration evolved right up until just 
before
the cut-off for 3.7 when `group.coordinator.rebalance.protocols` was introduced.
The final mechanism for enabling KIP-848 when it becomes the default in AK 4.0
is still being finalised (KIP-1022).

I expect to create a finalizing KIP for this feature which confirms the actual
configuration that administrators will use to enable it in production clusters.
I expect that will include adding “share” to 
`group.coordinator.rebalance.protocols`
and incrementing `group.version`. I don’t really like having 2 configurations
required, but I think that’s the implication of KIP-848 and KIP-1022.

I am viewing `group.share.enable` as the pre-GA way to enable this feature.
If you’d rather prefer me to use `group.coordinator.rebalance.protocols` in 
KIP-932,
I can do that, but I’d rather save this until we reach preview or GA status.

007: I had not intended to make the assignor interface public at this point,
but really there’s no reason not to. The interface is identical to the 
server-side
assignor used for cosnumer groups. However, it’s a separate interface because
we want to avoid someone using assignors for the wrong group type. I’ve
updated the KIP.

008: The SPSO and SPEO are maintained by the share-partition leader.
The SPSO is persisted using the share coordinator. The SPEO does not need
to be persisted.

009: The ShareAcknowledge API gives a less convoluted way to acknowledge
delivery for situations in which no fetching of records is required. For 
example,
when `KafkaShareConsumer.commitSync` is used, we want to acknowledge without
fetching.

010: Having a hard limit for the number of share sessions on a broker is tricky.
The KIP says that the limit is calculated based on `group.share.max.groups` and
`group.share.max.size`. I think multiplying those together gives the absolute 
limit
but the number required in practice will be somewhat less. I will make the text
a bit clearer here.

In a share group, each consumer assigned partitions which a particular broker 
leads
will need a share session on that broker. It won’t generally be the case that 
every
consumer needs a session on every broker. But the actual number needed on
a broker depends upon the distribution of leadership across the cluster.

The limit is intentionally the theoretical maximum because, without a share 
session,
a consumer cannot acquire records to consume or acknowledge them.

011: It’s a good question. I think it’s safe.

Let’s assume that in a pathological case, the records in a ShareSnapshot
are not contiguous and it is necessary to record a Base/LastOffset for each of
them. Then, it would take around 20 bytes per record to persist its state.
We could store over 50,000 records in 1MB. The configuration limit for
`group.share.record.lock.partition.limit` is 10,000.

012: Yes, we plan to implement it using the CoordinatorRuntime.

013: That makes sense. I will add `share.coordinator.threads`.

014: The `group.share.state.topic.min.isr` is used to set the min ISR
configuration to be used when the share-state partition is automatically
created. I have followed `transaction.state.log.min.isr` which is the closest
analogue I think.

015: You make a good point about the RebalanceTimeoutMs. Removed.

016: I have added some clarifying text about Start/EndPartitionIndex.
Imagine that there’s a topic with 3 partitions. The administrator increases
the partition count to 6. The StartPartitionIndex is 4 and EndPartitionIndex is
6 while the new partitions are being initialised by the share coordinator.

017: Interesting. Given that, would `num-partitions` with a tag protocol:share
be the number of partitions of the share-state topic?

018 and 019: Although these are “group-coordinator-metrics”, they would be
recorded by the share-partition leader. “share-acknowledgement” is the number
of requests to acknowledge, while “record-acknowledgement” is the number of
record acknowledgements. The difference is the batching of acknowledgements
for multiple records in a single request.

020: The SPEO is always bounded by the HWM. For a read-commited share
group, it is also bounded by the LSO.

021: The state epoch is used to fence against writes to earlier versions of
a share-partition state, such as when the SPSO is administratively reset.
There is no leader epoch as such.

Thanks,
Andrew


> On 15 Apr 2024, at 14:25, David Jacot <dja...@confluent.io.INVALID> wrote:
> 
> Hi Andrew,
> 
> Thanks for the KIP. This work is really exciting.
> 
> I finally had a bit of time to go through the KIP. I need to read it a
> second time in order to get into the details. I have noted a few
> points/questions:
> 
> 001: The dynamic config to force the group type is really weird. As you
> said, groups are created on first use and so they are. If we want something
> better, we should rather make the creation of the group explicit.
> 
> 002: It is weird to write a ConsumerGroupMetadata to reserve the group id.
> I think that we should rather have a ShareGroupMetadata for this purpose.
> Similarly, I don't think that we should add a type to the
> ConsumerGroupMetadataValue record. This record is meant to be used by
> "consumer" groups.
> 
> 003: I don't fully understand the motivation for having the
> ShareGroupPartitionMetadata record and the InitializeShareGroupState API
> called from the group coordinator. Could you elaborate a bit more? Isn't it
> possible to lazily initialize the state in the share coordinator when the
> share leader fetches the state for the first time?
> 
> 004: Could you precise how the group expiration will work? I did not see it
> mentioned in the KIP but I may have missed it.
> 
> 005: I would like to ensure that I understand the proposal for the share
> coordinator. It looks like we want it to be an internal service. By this, I
> mean that it won't be directly accessed by external users. Is my
> understanding correct?
> 
> 006: group.share.enable: We should rather use
> `group.coordinator.rebalance.protocols` with `share`.
> 
> 007: SimpleShareAssignor, do we have an interface for it?
> 
> 008: For my understanding, will the SPSO and SPEO bookeeped in the
> Partition and in the Log layer?
> 
> 009: Is there a reason why we still need the ShareAcknowledge API if
> acknowledging can also be done with the ShareFetch API?
> 
> 010: Do we plan to limit the number of share sessions on the share leader?
> The KIP mentions a limit calculated based on group.share.max.groups and
> group.share.max.size but it is quite vague.
> 
> 011: Do you have an idea of the size that ShareSnapshot will use in
> practice? Could it get larger than the max size of the batch within a
> partition (default to 1MB)
> 
> 012: Regarding the share group coordinator, do you plan to implement it on
> top of the CoordinatorRuntime introduced by KIP-848? I hope so in order to
> reuse code.
> 
> 013: Following my previous question, do we need a config similar to
> `group.coordinator.threads` for the share coordinator?
> 
> 014: I am not sure to understand why we need
> `group.share.state.topic.min.isr`. Is the topic level configuration enough
> for this?
> 
> 015: ShareGroupHeartbeat API: Do we need RebalanceTimeoutMs? What's its
> purpose if there is no revocation in the protocol?
> 
> 016: ShareGroupPartitionMetadataValue: What are the StartPartitionIndex and
> EndPartitionIndex?
> 
> 017: The metric `num-partitions` with a tag called protocol does not make
> sense in the group coordinator. The number of partitions is the number of
> __consumer_offsets partitions here.
> 
> 018: Do we need a tag for `share-acknowledgement` if the name is already
> scope to share groups?
> 
> 019: Should we also scope the name of `record-acknowledgement` to follow
> `share-acknowledgement`?
> 
> 020: I suppose that the SPEO is always bounded by the HWM. It may be good
> to call it out. Is it also bounded by the LSO?
> 
> 021: WriteShareGroupState API: Is there a mechanism to prevent zombie share
> leaders from committing wrong state?
> 
> Best,
> David
> 
> 
> On Fri, Apr 12, 2024 at 2:32 PM Andrew Schofield <andrew_schofi...@live.com>
> wrote:
> 
>> Hi,
>> 77. I’ve updated the KIP to use log retention rather than log compaction.
>> The basic ideas of what to persist are unchanged. It makes a few changes:
>> 
>> * It changes the record names: ShareCheckpoint -> ShareSnapshot and
>>  ShareDelta -> ShareUpdate. They’re equivalent, but renaming makes it
>>  simple to check I did an atomic change to the new proposal.
>> * It uses log retention and explicit pruning of elderly records using
>>  ReplicaManager.deleteRecords
>> * It gets rid of the nasty DeltaIndex scheme because we don’t need to worry
>>  about the log compactor and key uniqueness.
>> 
>> I have also changed the ambiguous “State” to “DeliveryState” in RPCs
>> and records.
>> 
>> And I added a clarification about how the “group.type” configuration should
>> be used.
>> 
>> Thanks,
>> Andrew
>> 
>>> On 10 Apr 2024, at 15:33, Andrew Schofield <
>> andrew_schofield_j...@live.com> wrote:
>>> 
>>> Hi Jun,
>>> Thanks for your questions.
>>> 
>>> 41.
>>> 41.1. The partition leader obtains the state epoch in the response from
>>> ReadShareGroupState. When it becomes a share-partition leader,
>>> it reads the share-group state and one of the things it learns is the
>>> current state epoch. Then it uses the state epoch in all subsequent
>>> calls to WriteShareGroupState. The fencing is to prevent writes for
>>> a previous state epoch, which are very unlikely but which would mean
>>> that a leader was using an out-of-date epoch and was likely no longer
>>> the current leader at all, perhaps due to a long pause for some reason.
>>> 
>>> 41.2. If the group coordinator were to set the SPSO, wouldn’t it need
>>> to discover the initial offset? I’m trying to avoid yet another
>> inter-broker
>>> hop.
>>> 
>>> 42.
>>> 42.1. I think I’ve confused things. When the share group offset is
>> altered
>>> using AdminClient.alterShareGroupOffsets, the group coordinator WILL
>>> update the state epoch. I don’t think it needs to update the group epoch
>>> at the same time (although it could) because the group epoch will have
>>> been bumped when the group became empty. If the share group offset
>>> is altered multiple times when the group remains empty, it would be
>>> harmless if the same state epoch was reused to initialize the state.
>>> 
>>> When the share-partition leader updates the SPSO as a result of
>>> the usual flow of record delivery, it does not update the state epoch.
>>> 
>>> 42.2. The share-partition leader will notice the alteration because,
>>> when it issues WriteShareGroupState, the response will contain the
>>> error code FENCED_STATE_EPOCH. This is supposed to be the
>>> last-resort way of catching this.
>>> 
>>> When the share-partition leader handles its first ShareFetch request,
>>> it learns the state epoch from the response to ReadShareGroupState.
>>> 
>>> In normal running, the state epoch will remain constant, but, when there
>>> are no consumers and the group is empty, it might change. As a result,
>>> I think it would be sensible when the set of share sessions transitions
>>> from 0 to 1, which is a reasonable proxy for the share group
>> transitioning
>>> from empty to non-empty, for the share-partition leader to issue
>>> ReadShareGroupOffsetsState to validate the state epoch. If its state
>>> epoch is out of date, it can then ReadShareGroupState to re-initialize.
>>> 
>>> I’ve changed the KIP accordingly.
>>> 
>>> 47, 56. If I am to change BaseOffset to FirstOffset, we need to have
>>> a clear view of which is the correct term. Having reviewed all of the
>>> instances, my view is that BaseOffset should become FirstOffset in
>>> ALL schemas defined in the KIP. Then, BaseOffset is just used in
>>> record batches, which is already a known concept.
>>> 
>>> Please let me know if you agree.
>>> 
>>> 60. I’ve added FindCoordinator to the top level index for protocol
>> changes.
>>> 
>>> 61. OK. I expect you are correct about how users will be using the
>>> console share consumer. When I use the console consumer, I always get
>>> a new consumer group. I have changed the default group ID for console
>>> share consumer to “console-share-consumer” to match the console consumer
>>> better and give more of an idea where this mysterious group has come
>> from.
>>> 
>>> 77. I will work on a proposal that does not use compaction and we can
>>> make a judgement about whether it’s a better course for KIP-932.
>> Personally,
>>> until I’ve written it down and lived with the ideas for a few days, I
>> won’t be
>>> able to choose which I prefer.
>>> 
>>> I should be able to get the proposal written by the end of this week.
>>> 
>>> 100. ShareGroupHeartbeatRequest.RebalanceTimeoutMs matches
>>> ConsumerGroupHeartbeatRequest.RebalanceTimeoutMs from KIP-848.
>>> I prefer to maintain the consistency.
>>> 
>>> 101. Thanks for catching this. The ShareGroupHeartbeatResponse was
>> originally
>>> created from KIP-848. This part of the schema does not apply and I have
>> removed
>>> it. I have also renamed AssignedTopicPartitions to simply
>> TopicPartitions which
>>> aligns with the actual definition of ConsumerGroupHeartbeatResponse.
>>> 
>>> 102. No, I don’t think we do. Removed.
>>> 
>>> 103. I’ve changed the description for the error codes for
>> ShareFetchResponse.
>>> 
>>> 104. Interesting. I have added ErrorMessages to these RPCs as you
>> suggest.
>>> It’s a good improvement for problem determination.
>>> 
>>> 105. The values are reserved in my brain. Actually, 1 is Acquired which
>>> is not persisted, and I have another non-terminal state in mind for 3.
>>> 
>>> 106. A few people have raised the question of whether OffsetAndMetadata
>>> is sensible in this KIP, given that the optional Metadata part comes
>> from when
>>> a regular consumer commits offsets. However, it is correct that there
>> will
>>> never be metadata with a share group. I have changed
>>> the KIP to replace OffsetAndMetadata with Long.
>>> 
>>> 107. Yes, you are right. I have learnt during this process that a
>> version bump
>>> can be a logical not just a physical change to the schema. KIP updated.
>>> 
>>> 108. I would prefer not to extend this RPC for all of the states at this
>> point.
>>> I think there is definitely scope for another KIP focused on
>> administration
>>> of share groups that might want this information so someone could build a
>>> UI and other tools on top. Doing that well is quite a lot of work in its
>> own right
>>> so I would prefer not to do that now.
>>> 
>>> 109.Yes, they’re inconsistent and follow the consumer groups equivalents
>>> which are also inconsistent. In general, the KafkaAdmin.delete…. Methods
>>> use the Map<XXX, KafkaFuture<YYY>> pattern like DeleteShareGroupsResult.
>>> 
>>> Would you prefer that I do that, or remain consistent with consumer
>> groups?
>>> Happy to change it.
>>> 
>>> 110. Again, consumer groups don’t yield that level of detail about
>> epochs.
>>> The MemberDescription does include the assignment, but not the list of
>>> subscribed topic names.
>>> 
>>> 111. I didn’t include GroupState in GroupListing because there’s no
>>> single class that includes the states of all group types.
>>> 
>>> 112. I think it’s good practice for the API to have
>> ListShareGroupOffsetSpec.
>>> It makes evolution and extension of the API much easier. Also, it matches
>>> the precedent set by ListConsumerGroupOffsetSpec.
>>> 
>>> 113. ListConsumerGroupsResults.errors() is the same. I think you just
>> have
>>> to look in the exception details and the same pattern is being followed
>> here.
>>> 
>>> 
>>> Over the next few days, I have committed to writing a proposal for how
>> to persist
>>> share-group state that doesn’t use log compaction.
>>> 
>>> I am also aware that discussion with Justine Olshan on read-committed
>>> isolation level is not yet quite complete.
>>> 
>>> Thanks for the detailed review.
>>> 
>>> Andrew
>>> 
>>>> On 9 Apr 2024, at 23:58, Jun Rao <j...@confluent.io.INVALID> wrote:
>>>> 
>>>> Hi, Andrew,
>>>> 
>>>> Thanks for the reply. A few more comments.
>>>> 
>>>> 41.
>>>> 41.1 How does the partition leader obtain the group epoch to set
>>>> WriteShareGroupStateRequest.StateEpoch?
>>>> 41.2 What's the benefit of having the group coordinator initialize the
>>>> state and the partition leader set the SPSO? It seems simpler to have
>> the
>>>> partition leader initialize both the state and the SPSO together?
>>>> 
>>>> 42.
>>>> 42.1 "I don’t think the group epoch needs to be bumped when the share
>> group
>>>> offset is altered."
>>>> But the part on handling Alter share group offsets says "The share
>>>> coordinator writes a ShareCheckpoint record with the new state epoch to
>> the
>>>> __share_group_state  topic." So, which is correct? We have two paths to
>>>> update the state in the share coordinator, one from the group
>> coordinator
>>>> and another from the partition leader. I thought the benefit of bumping
>> up
>>>> the epoch is to fence off a late request in the previous epoch from
>> another
>>>> path.
>>>> 42.2 When the group coordinator alters the share group offset in share
>>>> coordinator, how does the partition leader know the share group state
>> has
>>>> been altered so that it could clear its in-memory state?
>>>> 
>>>> 47. 56. BaseOffset typically refers to the base offset for the batch and
>>>> can be confusing. FirstOffset is clearer and matches LastOffset.
>>>> 
>>>> 60. Could we include FindCoordinatorRequest in the top level index for
>>>> Kafka protocol changes?
>>>> 
>>>> 61. I think it's probably ok to add time-based expiration later. But
>> using
>>>> a default group in console-share-consumer probably won't help reduce the
>>>> garbage. In the common case, the user of the console consumer likely
>> wants
>>>> to see the recently produced records for verification. If the default
>> group
>>>> doesn't provide that (because of the stale state), the user will likely
>>>> just use a new group. It's true that we don't garbage collect idle
>> topics.
>>>> However,  the share groups are similar to consumers, which does support
>>>> garbage collection. Typically, we read topics more than creating them.
>>>> 
>>>> 77. If the generic compaction is inconvenient, we could use customized
>>>> logic. If we go with that route, option (b) seems cleaner and more
>>>> optimized. Since the share states for all groups fit in memory, we could
>>>> generate snapshots more efficiently than going through compaction.
>> Having a
>>>> separate log per share partition is probably too much overhead. It's
>> more
>>>> efficient to put the state changes for multiple share partitions in a
>>>> single log.
>>>> 
>>>> 100. ShareGroupHeartbeatRequest.RebalanceTimeoutMs: Should we name it
>>>> SessionTimeoutMs?
>>>> 
>>>> 101. ShareGroupHeartbeatResponse.Assignment.Error: What kind of error
>> could
>>>> we have when assigning partitions? What are the corresponding error
>> codes?
>>>> 
>>>> 102. Do we still need
>>>> 
>> ShareGroupDescribeResponse.Members.Assignment.{MetadataVersion,MetadataBytes}?
>>>> 
>>>> 103. Could we list the error codes separately for
>>>> ShareFetchResponse.Responses.Partitions.ErrorCode and
>>>> ShareFetchResponse.Responses.Partitions.AcknowledgeErrorCode?
>>>> 
>>>> 104. Should we add error message for the errorCode in
>> ShareFetchResponse,
>>>> ShareAcknowledgeResponse, ReadShareGroupStateResponse,
>>>> WriteShareGroupStateResponse, DeleteShareGroupStateResponse,
>>>> ReadShareGroupOffsetsStateResponse and
>> InitializeShareGroupStateResponse?
>>>> 
>>>> 105. "about": "The state - 0:Available,2:Acked,4:Archived.": What about
>> 1
>>>> and 3? Are we leaving them out intentionally?
>>>> 
>>>> 106. Do we have usage of metadata in OffsetAndMetadata? If not, could we
>>>> remove it from AdminClient and KafkaShareConsumer?
>>>> 
>>>> 107. ListGroupsRequest: Should we bump up the version since it now
>> supports
>>>> a new group type "share"?
>>>> 
>>>> 108. AdminClient.listShareGroupOffsets: Should it expose all the states
>>>> from ReadShareGroupStateResponse, instead of just SPSO?
>>>> 
>>>> 109. DeleteShareGroupOffsetsResult exposes
>>>> public KafkaFuture<Void> partitionResult(final TopicPartition partition)
>>>> DeleteShareGroupsResult exposes
>>>> public Map<String, KafkaFuture<Void>> deletedGroups()
>>>> Should we make them more consistent?
>>>> 
>>>> 110. Should ShareGroupDescription include fields like GroupEpoch,
>>>> AssignmentEpoch, MemberEpoch, and SubscribedTopicNames?
>>>> 
>>>> 111. Should GroupListing include GroupState?
>>>> 
>>>> 112. Do we need ListShareGroupOffsetsSpec? Could we just use
>>>> Set<TopicPartition> directly?
>>>> 
>>>> 113. ListShareGroupsResult.errors(): How do we know which group has an
>>>> error?
>>>> 
>>>> Jun
>>>> 
>>>> On Mon, Apr 8, 2024 at 9:32 AM Andrew Schofield <
>>>> andrew_schofield_j...@outlook.com> wrote:
>>>> 
>>>>> Hi David,
>>>>> Thanks for your questions.
>>>>> 
>>>>> 70. The Group Coordinator communicates with the Share Coordinator over
>>>>> RPCs.
>>>>> In the general case, it’s an inter-broker call. It is probably
>> possible to
>>>>> optimise
>>>>> for the situation in which the appropriate GC and SC shards are
>>>>> co-located, but the
>>>>> KIP does not delve that deep into potential performance optimisations.
>>>>> 
>>>>> 71. Avoiding collisions would be a good idea, but I do worry about
>>>>> retrospectively
>>>>> introducing a naming convention for groups. I feel that naming
>> conventions
>>>>> will
>>>>> typically be the responsibility of the cluster administrators based on
>>>>> organizational
>>>>> factors, such as the name of an application.
>>>>> 
>>>>> 72. Personally, I don’t like INVALID_GROUP_ID because the group ID is
>>>>> correct but
>>>>> the group is the wrong type. The nearest existing error code that gets
>>>>> that across
>>>>> is INCONSISTENT_GROUP_PROTOCOL. Perhaps this is really showing that a
>> new
>>>>> error code would be better.
>>>>> 
>>>>> 73. The Metadata fields are not used. I have removed them.
>>>>> 
>>>>> 74. The metadata is re-evaluated on every change, but only a subset is
>>>>> relevant
>>>>> for rebalancing. A check is done against the names of the subscribed
>>>>> topics to
>>>>> see if any relevant changes may have occurred. Then the changes which
>>>>> trigger
>>>>> a rebalance are topic creation, deletion, change in partitions, or rack
>>>>> IDs for the
>>>>> replicas. I have updated the KIP to make this more clear.
>>>>> 
>>>>> 75. The assignment is not persisted because it is much less important
>> that
>>>>> the
>>>>> assignment survives a GC change. There’s no need to transfer partitions
>>>>> safely from
>>>>> member to member in the way that is required for consumer groups, so
>> as an
>>>>> optimisation, the assignments for a share group are not persisted. It
>>>>> wouldn’t do any
>>>>> harm, but it just seems unnecessary.
>>>>> 
>>>>> 76. In the event that a consumer tries to acknowledge a record that it
>> now
>>>>> longer
>>>>> has the right to acknowledge, the INVALID_RECORD_STATE error code is
>> used.
>>>>> 
>>>>> If the application uses the KafkaShareConsumer.commitSync method, it
>> will
>>>>> see an InvalidRecordState exception returned. Alternatively, the
>>>>> application can
>>>>> register an acknowledgement commit callback which will be called with
>> the
>>>>> status
>>>>> of the acknowledgements that have succeeded or failed.
>>>>> 
>>>>> 77. I have tried to tread a careful path with the durable
>> share-partition
>>>>> state in this
>>>>> KIP. The significant choices I made are that:
>>>>> * Topics are used so that the state is replicated between brokers.
>>>>> * Log compaction is used to keep a lid on the storage.
>>>>> * Only one topic is required.
>>>>> 
>>>>> Log compaction as it stands is not ideal for this kind of data, as
>>>>> evidenced by
>>>>> the DeltaIndex technique I employed.
>>>>> 
>>>>> I can think of a few relatively simple ways to improve upon it.
>>>>> 
>>>>> a) We could use a customised version of the log compactor for this
>> topic
>>>>> that
>>>>> understands the rules for ShareCheckpoint and ShareDelta records.
>>>>> Essentially,
>>>>> for each share-partition, the latest ShareCheckpoint and any subsequent
>>>>> ShareDelta
>>>>> records must not be cleaned. Anything else can be cleaned. We could
>> then
>>>>> be sure
>>>>> that multiple ShareDelta records with the same key would survive
>> cleaning
>>>>> and we could
>>>>> abandon the DeltaIndex technique.
>>>>> 
>>>>> b) Actually what is required is a log per share-partition. Let’s
>> imagine
>>>>> that we had
>>>>> a share-state topic per topic being consumed in a share group, with the
>>>>> same number
>>>>> of partitions as the topic being consumed. We could write many more
>> deltas
>>>>> between
>>>>> checkpoints, and just take periodic checkpoints to keep control of the
>>>>> storage used.
>>>>> Once a checkpoint has been taken, we could use
>> KafkaAdmin.deleteRecords()
>>>>> to
>>>>> prune all of the older records.
>>>>> 
>>>>> The share-state topics would be more numerous, but we are talking one
>> per
>>>>> topic
>>>>> per share group that it’s being consumed in. These topics would not be
>>>>> compacted.
>>>>> 
>>>>> As you’ll see in the KIP, the Persister interface is intended to be
>>>>> pluggable one day.
>>>>> I know the scheme in the KIP is not ideal. It seems likely to me that
>>>>> future KIPs will
>>>>> improve upon it.
>>>>> 
>>>>> If I can get buy-in for option (b), I’m happy to change this KIP. While
>>>>> option (a) is
>>>>> probably workable, it does seem a bit of a hack to have a customised
>> log
>>>>> compactor
>>>>> just for this topic.
>>>>> 
>>>>> 78. How about DeliveryState? I agree that State is overloaded.
>>>>> 
>>>>> 79. See (77).
>>>>> 
>>>>> Thanks,
>>>>> Andrew
>>>>> 
>>>>> 
>>>>>> On 5 Apr 2024, at 05:07, David Arthur <david.art...@confluent.io
>> .INVALID>
>>>>> wrote:
>>>>>> 
>>>>>> Andrew, thanks for the KIP! This is a pretty exciting effort.
>>>>>> 
>>>>>> I've finally made it through the KIP, still trying to grok the whole
>>>>> thing.
>>>>>> Sorry if some of my questions are basic :)
>>>>>> 
>>>>>> 
>>>>>> Concepts:
>>>>>> 
>>>>>> 70. Does the Group Coordinator communicate with the Share Coordinator
>>>>> over
>>>>>> RPC or directly in-process?
>>>>>> 
>>>>>> 71. For preventing name collisions with regular consumer groups,
>> could we
>>>>>> define a reserved share group prefix? E.g., the operator defines "sg_"
>>>>> as a
>>>>>> prefix for share groups only, and if a regular consumer group tries to
>>>>> use
>>>>>> that name it fails.
>>>>>> 
>>>>>> 72. When a consumer tries to use a share group, or a share consumer
>> tries
>>>>>> to use a regular group, would INVALID_GROUP_ID make more sense
>>>>>> than INCONSISTENT_GROUP_PROTOCOL?
>>>>>> 
>>>>>> --------
>>>>>> 
>>>>>> Share Group Membership:
>>>>>> 
>>>>>> 73. What goes in the Metadata field for TargetAssignment#Member and
>>>>>> Assignment?
>>>>>> 
>>>>>> 74. Under Trigger a rebalance, it says we rebalance when the partition
>>>>>> metadata changes. Would this be for any change, or just certain ones?
>> For
>>>>>> example, if a follower drops out of the ISR and comes back, we
>> probably
>>>>>> don't need to rebalance.
>>>>>> 
>>>>>> 75. "For a share group, the group coordinator does *not* persist the
>>>>>> assignment" Can you explain why this is not needed?
>>>>>> 
>>>>>> 76. " If the consumer just failed to heartbeat due to a temporary
>> pause,
>>>>> it
>>>>>> could in theory continue to fetch and acknowledge records. When it
>>>>> finally
>>>>>> sends a heartbeat and realises it’s been kicked out of the group, it
>>>>> should
>>>>>> stop fetching records because its assignment has been revoked, and
>> rejoin
>>>>>> the group."
>>>>>> 
>>>>>> A consumer with a long pause might still deliver some buffered
>> records,
>>>>> but
>>>>>> if the share group coordinator has expired its session, it wouldn't
>>>>> accept
>>>>>> acknowledgments for that share consumer. In such a case, is any kind
>> of
>>>>>> error raised to the application like "hey, I know we gave you these
>>>>>> records, but really we shouldn't have" ?
>>>>>> 
>>>>>> 
>>>>>> -----
>>>>>> 
>>>>>> Record Delivery and acknowledgement
>>>>>> 
>>>>>> 77. If we guarantee that a ShareCheckpoint is written at least every
>> so
>>>>>> often, could we add a new log compactor that avoids compacting
>>>>> ShareDelta-s
>>>>>> that are still "active" (i.e., not yet superceded by a new
>>>>>> ShareCheckpoint). Mechnically, this could be done by keeping the LSO
>> no
>>>>>> greater than the oldest "active" ShareCheckpoint. This might let us
>>>>> remove
>>>>>> the DeltaIndex thing.
>>>>>> 
>>>>>> 78. Instead of the State in the ShareDelta/Checkpoint records, how
>> about
>>>>>> MessageState? (State is kind of overloaded/ambiguous)
>>>>>> 
>>>>>> 79. One possible limitation with the current persistence model is that
>>>>> all
>>>>>> the share state is stored in one topic. It seems like we are going to
>> be
>>>>>> storing a lot more state than we do in __consumer_offsets since we're
>>>>>> dealing with message-level acks. With aggressive checkpointing and
>>>>>> compaction, we can mitigate the storage requirements, but the
>> throughput
>>>>>> could be a limiting factor. Have we considered other possibilities for
>>>>>> persistence?
>>>>>> 
>>>>>> 
>>>>>> Cheers,
>>>>>> David
>>>>> 
>>>>> 
>>> 
>> 
>> 

Reply via email to