Hi David,
Thanks for your response.

001: OK, I'll include converting an empty classic or consumer group to
a share group in the thinking for the future KIP on group management that
I have in mind for later in 2024.

004: Added to the KIP explicitly.

006: That description helps.

I have changed the KIP so that `group.share.enable` is only an internal
configuration that we use for tests, meaning that this configuration has
been removed entirely from the KIP. The user enables share groups by
including "share" in `group.coordinator.rebalance.protocols`. When we get
to a production-ready release of KIP-932, `group.version` will be used to
enable the new records that we persist. I hope this is an acceptable approach.

007: There are two questions here I think. First, would there ever be an
assignor that would work properly for both consumer groups and share groups.
I suppose the answer is no.

Second, is the interface for these two types of assignors the same? The
answer is that it is. I think we should exploit this in the code.

Let's say there's an entirely separate ShareGroupPartitionAssignor which
has no relationship with PartitionAssignor. Then, we need to create independent
and almost identical implementations of the machinery in
org.apache.kafka.coordinator.group.common such as TargetAssignmentBuilder.
This is undesirable code duplication.

Here's my suggestion.
a) o.a.k.coordinator.group.assignor.PartitionAssignor is no longer implemented
directly by any assignors.
b) o.a.k.coordinator.group.assignor.ConsumerGroupPartitionAssignor
and o.a.k.coordinator.group.assignor.ShareGroupPartitionAssignor extend
this interface. By implementing one of these interfaces in an assignor,
you're choosing a group type.
c) o.a.k.coordinator.group.assignor.Range/UniformAssignor are modified
to implement ConsumerGroupPartitionAssignor.
d) o.a.k.coordinator.group.share.SimpleAssignor implements
ShareGroupPartitionAssignor.
e) Wherever the broker code cares which kind of assignor it gets, it uses the
appropriate group-specific interface. But the code that calculates the
assignments is generic and uses PartitionAssignor.

017: Yes, I agree. I've added num-partitions in the share-coordinator-metrics
group.

018/019: I have moved the share group-specific metrics recorded by the
SPL into a separate group. Please take another look at the table of
broker metrics.

021: I think it is preferable to have a fencing mechanism to protect against
zombie share-partition leaders. The question is how to do it in a robust
way without too much overhead. I think there's a way using existing concepts.

Each partition leader has a leader epoch that increments when a new leader is
elected. Reads and writes of the share-group state from the share coordinator 
can
use the leader epoch as a fence. When a new leader is elected, it reads the 
state
providing its new, higher leader epoch. The share coordinator notices that the
leader epoch has increased and will no longer honour requests from a lower 
epoch.
The share coordinator includes the leader epoch in records it writes to the
share-group state topic. When the state epoch is bumped, the leader epoch is
initialized to -1 which means that the leader epoch for this state epoch is
not yet set. When the SPL calls the share coordinator, it provides the leader
epoch and the share coordinator can initialise its copy of the leader epoch.

I have updated the KIP accordingly.

022: `share.coordinator.state.topic.*` works for me. Configs changed.

023: We assume the clients will do the right thing. If they do not, the effect
is essentially that the desired balance of consumers to partitions is not
being honoured. That's going to happen temporarily as rebalancing occurs
for a group with lots of partitions and members anyway.


Thanks,
Andrew

> On 25 Apr 2024, at 14:53, David Jacot <dja...@confluent.io.INVALID> wrote:
> 
> Hi Andrew,
> 
> Thanks for your responses and sorry for my late reply.
> 
> 001: Makes sense. One thing that we could consider here is to allow
> converting an empty classic or consumer group to a share group. If already
> do this between empty classic and consumer groups.
> 
> 004: I see. It would be great to call it out in the KIP.
> 
> 006: I view `group.coordinator.rebalance.protocols` and `group.version` as
> two different concepts. The former is purely here to enable/disable
> protocols where the latter is mainly here to gate the versions of records
> that we persist. Both are indeed required to enable the new consumer group
> protocol. The reason is that we could imagine having a new version to the
> `group.version` for different purposes (e.g. queues) but one may not want
> to enable the new consumer protocol. Unless we have a strong reason not to
> use these two, I would use them from day 1. This is actually something that
> I got wrong in KIP-848, I think.
> 
> 007: I see that you extend ShareGroupPartitionAssignor from
> PartitionAssignor. I wonder if we should separate them because it means
> that a ShareGroupPartitionAssignor could be accidentally used by a consumer
> group. They won't be meant to be, I think.
> 
> 009: I see. Thanks for clarifying.
> 
> 017: I think that it should rather be num-partitions but in a different
> group, the group of the share coordinator.
> 
> 018/019: Would it be better to put them in a different group then (e.g.
> share-coordinator-metrics)? This is confusing to have metrics crossing
> group boundaries in my opinion.
> 
> 021: "The state epoch is used to fence against writes to earlier versions
> of a share-partition state". I understand that this protects the share
> leader from overriding the state when it was altered by the admin APIs.
> However, it does not seem to protect the state from being altered by, for
> instance, a zombie share leader. A simple example is when the leadership of
> a partition is moved from broker A to broker B. In this case, broker A
> should not be permitted to alter the state anymore. The trick is that the
> former leader may not be aware of the change yet. I think that the share
> coordinator should prevent this. Don't you agree?
> 
> 022: Would it make sense to rename `group.share.state.topic.*` configs to
> use `share.coordinator.*` prefix as they are effectively share coordinator
> matters.
> 
> 023: Is there a mechanism to prevent fenced members from the group
> coordinator point of view to not fetch from the share leaders? Or do we
> assume that clients will do the right thing in this case? By this, I meant
> that they will stop fetching immediately. This part is not clear in the KIP
> or I missed it.
> 
> Best,
> David
> 
> On Thu, Apr 25, 2024 at 1:19 PM Andrew Schofield <andrew_schofi...@live.com>
> wrote:
> 
>> Hi Jun,
>> Thanks for the response.
>> 
>> 123. Of course, ShareGroupHearbeat started off as ConsumerGroupHeartbeat
>> and then unnecessary fields were removed. In the network issue case,
>> there is not currently enough state being exchanged to be sure an
>> assignment
>> was received.
>> 
>> Rather than add the group epoch to the ShareGroupHeartbeat, I have decided
>> to go for TopicPartitions in ShareGroupHeartbeatRequest which mirrors
>> ConsumerGroupHeartbeatRequest. It means the share group member does
>> confirm the assignment it is using, and that can be used by the GC to
>> safely
>> stop repeating the assignment in heartbeat responses.
>> 
>> 125. Ah, yes. This is indeed something possible with a consumer group
>> and share groups should support it too. This does of course imply that
>> ShareGroupPartitionMetadataValue needs an array of partitions, not
>> just the number.
>> 
>> 140. Yes, good spot. There is an inconsistency here in consumer groups
>> where you can use AdminClient.deleteConsumerGroupOffsets at the
>> partition level, but kafka-consumer-groups.sh --delete only operates
>> at the topic level.
>> 
>> Personally, I don’t think it’s sensible to delete offsets at the partition
>> level only. You can reset them, but if you’re actively using a topic with
>> a share group, I don’t see why you’d want to delete offsets rather than
>> reset. If you’ve finished using a topic with a share group and want to
>> clean
>> up, use delete.
>> 
>> So, I’ve changed the AdminClient.deleteConsumerGroupOffsets to be
>> topic-based and the RPCs behind it.
>> 
>> The GC reconciles the cluster state with the ShareGroupPartitionMetadata
>> to spot deletion of topics and the like. However, when the offsets for
>> a topic were deleted manually, the topic very like still exists so
>> reconciliation
>> alone is not going to be able to continue an interrupted operation that
>> has started. So, I’ve added DeletingTopics back into
>> ShareGroupPartitionMetadata for this purpose. It’s so failover of a GC
>> can continue where it left off rather than leaving fragments across the
>> SCs.
>> 
>> 141. That is not required. Because this is not a compacted topic, it is
>> not necessary to write tombstones for every key. As long as there is a
>> clear and unambiguous record for the deletion of the group, that is enough.
>> The tombstone for ShareGroupPartitionMetadata is theoretically not
>> required but it’s a single record, rather than one per member, so I prefer
>> to leave it as a record that the interactions with the SC have been
>> completed.
>> 
>> 142.
>> 142.1. It will prompt the user to confirm they want to continue.
>> This is in common with `kafka-consumer-groups.sh` which historically
>> has defaulted to --dry-run behaviour, but is due to change to prompting
>> if neither --dry-run nor --execute is specified “in a future major
>> release”.
>> 
>> 142.2. It should support partition-level reset, but only topic-level
>> delete.
>> I have updated the usage text accordingly. This is in common with
>> kafka-consumer-groups.sh.
>> 
>> 142.3. --dry-run displays the operation that would be executed.
>> 
>> 142.4. The valid values are: Dead, Empty, Stable. Added to the
>> usage text.
>> 
>> 143. DescribeShareGroupOffsets is served by the group coordinator
>> for this kind of reason.
>> 
>> 144. That’s the default. If you haven’t asked to release or reject, it
>> accepts.
>> This is analogous to fetching and committing offsets in a consumer group.
>> 
>> 145. I think this is a good idea, but I would prefer to defer it until a
>> future
>> metrics KIP that I have planned. In KIP-932, I have added basic metrics
>> only.
>> For example, you’ll see that there’s no concept of lag yet, which surely
>> will have relevance for share groups. I plan to create and deliver the
>> metrics KIP before share groups are declared ready for production.
>> I want the new metrics to be developed with the experience of running
>> the code.
>> 
>> 146. Milliseconds. KIP updated.
>> 
>> 147. There is a membership state machine in the client that
>> changes states as the ShareGroupHeartbeat requests and responses
>> flow. The duration of a rebalance will be shorter from the point of
>> view of the share-group consumer because it doesn’t have to worry about
>> rebalance callbacks and committing offsets as the partitions move
>> around, but the overall flow is very similar. So, it’s the state
>> transitions
>> that drive the collection of the rebalance metrics.
>> 
>> 148. Strangely, none of the existing uses of records-per-request-avg
>> actually have records-per-request-max. I tend to err on the side of
>> consistency, but can’t think of any reason not to add this. Done.
>> 
>> 149. There are several error codes for WriteShareGroupStateResponse:
>> 
>> NOT_COORDINATOR - This is not the share coordinator you’re looking for.
>> COORDINATOR_NOT_AVAILABLE - The SC can’t
>> COORDINATOR_LOAD_IN_PROGRESS - The SC is replaying the topic.
>> GROUP_ID_NOT_FOUND - The SC doesn’t have state for this group.
>> UNKNOWN_TOPIC_OR_PARTITION - The SC doesn’t have state for this
>> topic-partition.
>> FENCED_STATE_EPOCH - The write has the wrong state epoch.
>> INVALID_REQUEST - There was a problem with the request.
>> 
>> 
>> Thanks,
>> Andrew
>> 
>> 
>>> On 24 Apr 2024, at 19:10, Jun Rao <j...@confluent.io.INVALID> wrote:
>>> 
>>> Hi, Andrew,
>>> 
>>> Thanks for the response.
>>> 
>>> 123. I thought the network issue can be covered with the group epoch.
>>> Basically, if the assignment is to be changed, GC bumps up the epoch
>> first,
>>> but doesn't expose the new epoch to members until the assignment is
>>> complete (including initializing the sharePartitionState). Once the
>>> assignment is complete, GC includes the bumped up epoch and the new
>>> assignment in the heartbeatResponse. If the next heartbeat Request
>> includes
>>> the new epoch, it means that the member has received the new assignment
>> and
>>> GC can exclude the assignment in the heartbeatResponse.
>>> 
>>> 125. "If AlterShareGroupOffsets is called for a topic-partition which is
>>> not yet in ShareGroupPartitionMetadataValue, it’s not really part of the
>>> group yet. So I think it’s permitted to fail the AlterShareGroupOffsets
>>> because the topic-partition would not be part of ListShareGroupOffsets
>>> result."
>>> A user may want to initialize SPSO (e.g. based on timestamp) before the
>>> application is first used. If we reject AlterShareGroupOffsets when
>>> ShareGroupPartitionMetadataValue is empty, the user will be forced to
>> start
>>> the application with the wrong SPSO first and then reset it, which will
>> be
>>> inconvenient.
>>> 
>>> 140. AdminClient.deleteShareGroupOffsets allows deletion of SPSO for
>>> individual partitions, but ShareGroupPartitionMetadataValue only tracks
>>> consecutive partitions.
>>> 
>>> 141. Delete share group: Should GC also write a tombstone for each
>>> ShareGroupMemberMetadata record?
>>> 
>>> 142. kafka-share-groups.sh
>>> 142.1 What happens if neither --dry-run nor --execute is specified for
>>> reset-offsets?
>>> 142.2 Should it support --delete-offsets and --reset-offsets at the
>>> partition level to match the AdminClient api?
>>> 142.3 How useful is --dry-run? The corresponding RPCs don't carry the
>>> dry-run flag.
>>> 142.4 --state [String]: What are the valid state values?
>>> 
>>> 143. DescribeShareGroupOffsets is served by the share-partition leader.
>> How
>>> could a user describe the share group offset when there is no member?
>>> 
>>> 144. kafka-console-share-consumer.sh: Is there an option to accept
>> consumed
>>> messages?
>>> 
>>> 145. It would be useful to add a broker side metric that measures the
>>> pressure from group.share.partition.max.record.locks, e.g. the fraction
>> of
>>> the time that SPL is blocked because
>> group.share.partition.max.record.locks
>>> is reached.
>>> 
>>> 146. heartbeat-response-time-max: What's the unit?
>>> 
>>> 147. rebalance related metrics: How does a share consumer know when there
>>> is a rebalance and how does it measure the rebalance time?
>>> 
>>> 148. records-per-request-avg: Should we pair it with
>>> records-per-request-max?
>>> 
>>> 149. If the shareGroupState is not available, what error code is used in
>>> WriteShareGroupStateResponse?
>>> 
>>> Jun
>>> 
>>> On Wed, Apr 24, 2024 at 7:12 AM Andrew Schofield <
>> andrew_schofi...@live.com>
>>> wrote:
>>> 
>>>> Hi Jun,
>>>> Thanks for your reply.
>>>> 
>>>> 123. The GC only sends the assignment in ShareGroupHeartbeat
>>>> response if the member has just joined, or the assignment has been
>>>> recalculated. This latter condition is met when the GC fails over.
>>>> 
>>>> With a consumer group, it works a little differently. The heartbeating
>>>> occurs asynchronously with the reconciliation process in the client.
>>>> The client has reconciliation callbacks, as well as offsets to commit
>>>> before it can confirm that revocation has occured. So, it might take
>>>> multiple heartbeats to complete the installation of a new target
>>>> assignment in the client.
>>>> 
>>>> With a share group, it’s more brutal. The GC sends the assignment
>>>> in a heartbeat response, and the client is assumed to have acted on
>>>> It immediately. Since share group assignment is really about balancing
>>>> consumers across the available partitions, rather than safely handing
>>>> ownership of partitions between consumers, there is less coordination.
>>>> 
>>>> I wonder whether there is one situation that does need considering.
>>>> If the network connection between the client and the GC is lost, it is
>>>> possible that a response containing the assignment is lost. Then,
>>>> the connection will be reestablished, and the assignment will only be
>>>> sent when it’s recalculated. Adding the equivalent of
>>>> ConsumerGroupHeartbeatRequest.TopicPartitions would be one
>>>> way to close that.
>>>> 
>>>> 125. Yes, you are right. The reconciliation that I describe in the
>>>> answer to (54) below is related. If AlterShareGroupOffsets is called
>>>> for a topic-partition which is not yet in
>> ShareGroupPartitionMetadataValue,
>>>> it’s not really part of the group yet. So I think it’s permitted to fail
>>>> the AlterShareGroupOffsets because the topic-partition would not be
>>>> part of ListShareGroupOffsets result.
>>>> 
>>>> If AlterShareGroupOffsets is called for a topic-partition which is in
>>>> ShareGroupPartitionMetadataValue, then just calling
>>>> InitializeShareGroupState to set its offset would be sufficient without
>>>> writing ShareGroupPartitionMetadata again.
>>>> 
>>>> 138. Added.
>>>> 
>>>> 139. This week, I have been experimenting with using the existing
>>>> consumer metrics with share consumer code. That was my plan to get
>>>> started with metrics. While they work to some extent, I am not entirely
>>>> happy with the result.
>>>> 
>>>> I have added a basic set of share consumer-specific metrics to KIP-932
>>>> which I think is a step in the right direction. I anticipate a future
>> KIP
>>>> that defines many more metrics and tackles concepts such as what
>>>> “lag” means for a share group. The metrics I’ve included are simply
>>>> counting and measuring the operations, which is a good start.
>>>> 
>>>> 54. I think either with or without InitializingTopics/DeletingTopics in
>>>> ShareGroupPartitionMetadataValue works. However, I think there is
>>>> a deeper point behind what you said. The GC needs to be responsive
>>>> to topic creation and deletion, and addition of partitions in cases
>> where
>>>> it doesn’t agree with InitializingTopics/DeletingTopics. So, we are
>>>> probably not saving anything by having those fields. As a result, I have
>>>> removed InitializingTopics and DeletingTopics and updated the KIP
>>>> accordingly. The GC needs to reconcile its view of the initialised
>>>> topic-partitions with ShareGroupPartitionMetadataValue and it will
>>>> initialize or delete share-group state accordingly.
>>>> 
>>>> 
>>>> I have made one more change to the KIP. The SharePartitionAssignor
>>>> interface has been renamed to
>>>> o.a.k.coordinator.group.assignor.ShareGroupPartitionAssignor and it
>>>> now extends the PartitionAssignor interface. It’s essentially a marker
>>>> of which partition assignors can be used with share groups.
>>>> 
>>>> Thanks,
>>>> Andrew
>>>> 
>>>>> On 23 Apr 2024, at 18:27, Jun Rao <j...@confluent.io.INVALID> wrote:
>>>>> 
>>>>> Hi, Andrew,
>>>>> 
>>>>> Thanks for the reply.
>>>>> 
>>>>> 123. "it doesn’t need to confirm the assignment back to the GC."
>>>>> Hmm, I thought the member needs to confirm the assignment to GC to
>>>>> avoid GC including the assignment in the heartbeat response
>>>> continuously. I
>>>>> assume this is done by including the new group epoch in the heartbeat
>>>>> response.
>>>>> 
>>>>> 125. It's possible that the share partition has never been initialized
>>>> when
>>>>> AlterShareGroupOffsets is called. If GC doesn't write
>>>>> ShareGroupPartitionMetadata and the GC fails over, it would
>> reinitialize
>>>>> the share partition and lose the effect of AlterShareGroupOffsets. If
>> the
>>>>> partition has already been initialized and it's recorded
>>>>> in ShareGroupPartitionMetadata, it's possible not to write
>>>>> ShareGroupPartitionMetadata again when handling AlterShareGroupOffsets.
>>>>> 
>>>>> 138. Could we add the flow in GC when a topic is deleted?
>>>>> 
>>>>> 139. Do we need to add any metrics in KafkaShareConsumer?
>>>>> 
>>>>> 54. "I don’t think there is any redundancy. The
>> ShareGroupMemberMetadata
>>>>> does include a list of subscribed topics. However, if there is a period
>>>> of
>>>>> time in which no members are subscribed to a particular topic, it does
>>>> not
>>>>> mean that the topic’s ShareGroupState should be immediately removed,
>> but
>>>> it
>>>>> does mean that there will be no ShareGroupMemberMetadata records
>>>> containing
>>>>> that topic."
>>>>> I am still trying to understand the value of InitializingTopics and
>>>>> DeletingTopics in ShareGroupPartitionMetadataValue. They are used to
>>>>> remember the intention of an operation. However, GC still needs to
>> handle
>>>>> the case when the intention is not safely recorded. If GC wants to
>>>>> initialize a new topic/partition, a simpler approach is for it to send
>> an
>>>>> InitializeShareGroupState to the share coordinator and after receiving
>> a
>>>>> success response, write ShareGroupPartitionMetadataValue with the
>>>>> initialized partition included in InitializedTopics. This saves a
>> record
>>>>> write. It's possible for GC to fail in between the two steps. On
>>>> failover,
>>>>> the new GC just repeats the process. The case that you mentioned above
>>>> can
>>>>> still be achieved. If a partition is in InitializedTopics of
>>>>> ShareGroupPartitionMetadataValue, but no member subscribes to it, we
>> can
>>>>> still keep the ShareGroupState as long as the topic still exists. The
>>>> same
>>>>> optimization could be applied to DeletingTopics too.
>>>>> 
>>>>> Jun
>>>>> 
>>>>> 
>>>>> On Tue, Apr 23, 2024 at 3:57 AM Andrew Schofield <
>>>> andrew_schofi...@live.com>
>>>>> wrote:
>>>>> 
>>>>>> Hi Jun,
>>>>>> Thanks for the reply.
>>>>>> 
>>>>>> 123. Every time the GC fails over, it needs to recompute the
>> assignment
>>>>>> for every member. However, the impact of re-assignment is not that
>>>> onerous.
>>>>>> If the recomputed assignments are the same, which they may well be,
>>>> there
>>>>>> is no impact on the members at all.
>>>>>> 
>>>>>> On receiving the new assignment, the member adjusts the
>> topic-partitions
>>>>>> in its share sessions, removing those which were revoked and adding
>>>> those
>>>>>> which were assigned. It is able to acknowledge the records it fetched
>>>> from
>>>>>> the partitions which have just been revoked, and it doesn’t need to
>>>> confirm
>>>>>> the assignment back to the GC.
>>>>>> 
>>>>>> 125. I don’t think the GC needs to write ShareGroupPartitionMetadata
>>>>>> when processing AlterShareGroupOffsets. This is because the operation
>>>>>> happens as a result of an explicit administrative action and it is
>>>> possible
>>>>>> to return a specific error code for each topic-partition. The cases
>>>> where
>>>>>> ShareGroupPartitionMetadata is used are when a topic is added or
>> removed
>>>>>> from the subscribed topics, or the number of partitions changes.
>>>>>> 
>>>>>> 130. I suppose that limits the minimum lock timeout for a cluster to
>>>>>> prevent
>>>>>> a group from having an excessively low value. Config added.
>>>>>> 
>>>>>> 131. I have changed it to group.share.partition.max.record.locks.
>>>>>> 
>>>>>> 136.  When GC failover occurs, the GC gaining ownership of a partition
>>>> of
>>>>>> the __consumer_offsets topic replays the records to build its state.
>>>>>> In the case of a share group, it learns:
>>>>>> 
>>>>>> * The share group and its group epoch (ShareGroupMetadata)
>>>>>> * The list of members (ShareGroupMemberMetadata)
>>>>>> * The list of share-partitions (ShareGroupPartitionMetadata)
>>>>>> 
>>>>>> It will recompute the assignments in order to respond to
>>>>>> ShareGroupHeartbeat requests. As a result, it bumps the group epoch.
>>>>>> 
>>>>>> I will update the KIP accordingly to confirm the behaviour.
>>>>>> 
>>>>>> 137.1: The GC and the SPL report the metrics in the
>>>>>> group-coordinator-metrics
>>>>>> group. Unlike consumer groups in which the GC performs offset commit,
>>>>>> the share group equivalent is performed by the SPL. So, I’ve grouped
>> the
>>>>>> concepts which relate to the group in group-coordinator-metrics.
>>>>>> 
>>>>>> The SC reports the metrics in the share-coordinator-metrics group.
>>>>>> 
>>>>>> 137.2: There is one metric in both groups - partition-load-time. In
>> the
>>>> SC
>>>>>> group,
>>>>>> it refers to the time loading data from the share-group state topic so
>>>> that
>>>>>> a ReadShareGroupState request can be answered. In the GC group,
>>>>>> it refers to the time to read the state from the persister. Apart from
>>>> the
>>>>>> interbroker RPC latency of the read, they’re likely to be very close.
>>>>>> 
>>>>>> Later, for a cluster which is using a custom persister, the
>>>>>> share-coordinator
>>>>>> metrics would likely not be reported, and the persister would have its
>>>> own
>>>>>> metrics.
>>>>>> 
>>>>>> 137.3: Correct. Fixed.
>>>>>> 
>>>>>> 137.4: Yes, it does include the time to write to the internal topic.
>>>>>> I’ve tweaked the description.
>>>>>> 
>>>>>> Thanks,
>>>>>> Andrew
>>>>>> 
>>>>>>> On 22 Apr 2024, at 20:04, Jun Rao <j...@confluent.io.INVALID> wrote:
>>>>>>> 
>>>>>>> Hi, Andrew,
>>>>>>> 
>>>>>>> Thanks for the reply.
>>>>>>> 
>>>>>>> 123. "The share group does not persist the target assignment."
>>>>>>> What's the impact of this? Everytime that GC fails over, it needs to
>>>>>>> recompute the assignment for every member. Do we expect the member
>>>>>>> assignment to change on every GC failover?
>>>>>>> 
>>>>>>> 125. Should the GC also write ShareGroupPartitionMetadata?
>>>>>>> 
>>>>>>> 127. So, group epoch is only propagated to SC when
>>>>>>> InitializeShareGroupState request is sent. This sounds good.
>>>>>>> 
>>>>>>> 130. Should we have a group.share.min.record.lock.duration.ms to
>> pair
>>>>>> with
>>>>>>> group.share.max.record.lock.duration.ms?
>>>>>>> 
>>>>>>> 131. Sounds good. The name group.share.record.lock.partition.limit
>>>>>> doesn't
>>>>>>> seem very intuitive. How about something
>>>>>>> like group.share.partition.max.records.pending.ack?
>>>>>>> 
>>>>>>> 136. Could we describe the process of GC failover? I guess it needs
>> to
>>>>>>> compute member reassignment and check if there is any new
>>>> topic/partition
>>>>>>> matching the share subscription. Does it bump up the group epoch?
>>>>>>> 
>>>>>>> 137. Metrics:
>>>>>>> 137.1 It would be useful to document who reports each metric. Is it
>> any
>>>>>>> broker, GC, SC or SPL?
>>>>>>> 137.2 partition-load-time: Is that the loading time at SPL or SC?
>>>>>>> 137.3 "The time taken in milliseconds to load the share-group state
>>>> from
>>>>>>> the share-group state partitions loaded in the last 30 seconds."
>>>>>>> The window depends on metrics.num.samples and
>> metrics.sample.window.ms
>>>>>>> and is not always 30 seconds, right?
>>>>>>> 137.4 Could you explain write/write-latency a bit more? Does it
>> include
>>>>>> the
>>>>>>> time to write to the internal topic?
>>>>>>> 
>>>>>>> Jun
>>>>>>> 
>>>>>>> On Mon, Apr 22, 2024 at 2:57 AM Andrew Schofield <
>>>>>> andrew_schofi...@live.com>
>>>>>>> wrote:
>>>>>>> 
>>>>>>>> Hi Jun,
>>>>>>>> Thanks for your comments.
>>>>>>>> 
>>>>>>>> 120. Thanks. Fixed.
>>>>>>>> 
>>>>>>>> 121. ShareUpdateValue.SnapshotEpoch indicates which snapshot
>>>>>>>> the update applies to. It should of course be the snapshot that
>>>> precedes
>>>>>>>> it in the log. It’s just there to provide a consistency check.
>>>>>>>> 
>>>>>>>> I also noticed that ShareSnapshotValue was missing StateEpoch. It
>>>>>>>> isn’t any more.
>>>>>>>> 
>>>>>>>> 122. In KIP-848, ConsumerGroupMemberMetadataValue includes
>>>>>>>> GroupEpoch, but in the code it does not. In fact, there is
>>>> considerable
>>>>>>>> divergence between the KIP and the code for this record value schema
>>>>>>>> which I expect will be resolved when the migration code has been
>>>>>>>> completed.
>>>>>>>> 
>>>>>>>> 123. The share group does not persist the target assignment.
>>>>>>>> 
>>>>>>>> 124. Share groups have three kinds of record:
>>>>>>>> i) ShareGroupMetadata
>>>>>>>> - this contains the group epoch and is written whenever the group
>>>>>>>> epoch changes.
>>>>>>>> ii) ShareGroupMemberMetadata
>>>>>>>> - this does not contain the group epoch.
>>>>>>>> iii) ShareGroupPartitionMetadata
>>>>>>>> - this currently contains the epoch, but I think that is
>> unnecessary.
>>>>>>>>  For one thing, the ConsumerGroupPartitionMetadata definition
>>>>>>>>  contains the group epoch, but the value appears never to be set.
>>>>>>>>  David Jacot confirms that it’s not necessary and is removing it.
>>>>>>>> 
>>>>>>>> I have removed the Epoch from ShareGroupPartitionMetadata.
>>>>>>>> The only purpose of the persisting the epoch for a share group is so
>>>>>> that
>>>>>>>> when a group coordinator takes over the share group, it is able to
>>>>>>>> continue the sequence of epochs. ShareGroupMetadataValue.Epoch
>>>>>>>> is used for this.
>>>>>>>> 
>>>>>>>> 125. The group epoch will be incremented in this case and
>>>>>>>> consequently a ShareGroupMetadata will be written. KIP updated.
>>>>>>>> 
>>>>>>>> 126. Not directly. A share group can only be deleted when it has no
>>>>>>>> members, so the tombstones for ShareGroupMemberMetadata will
>>>>>>>> have been written when the members left. I have clarified this.
>>>>>>>> 
>>>>>>>> 127. The share coordinator is ignorant of the group epoch. When the
>>>>>>>> group coordinator is initializing the share-group state the first
>> time
>>>>>> that
>>>>>>>> a share-partition is being added to an assignment in the group, the
>>>>>>>> group epoch is used as the state epoch. But as the group epoch
>>>>>>>> increases over time, the share coordinator is entirely unaware.
>>>>>>>> 
>>>>>>>> When the first consumer for a share-partition fetches records from a
>>>>>>>> share-partition leader, the SPL calls the share coordinator to
>>>>>>>> ReadShareGroupState. If the SPL has previously read the information
>>>>>>>> and again it’s going from 0 to 1 consumer, it confirms it's up to
>> date
>>>>>> by
>>>>>>>> calling ReadShareGroupOffsetsState.
>>>>>>>> 
>>>>>>>> Even if many consumers are joining at the same time, any
>>>> share-partition
>>>>>>>> which is being initialized will not be included in their
>> assignments.
>>>>>> Once
>>>>>>>> the initialization is complete, the next rebalance will assign the
>>>>>>>> partition
>>>>>>>> to some consumers which will discover this by ShareGroupHeartbeat
>>>>>>>> response. And then, the fetching begins.
>>>>>>>> 
>>>>>>>> If an SPL receives a ShareFetch request before it’s read the state
>>>>>>>> from the SC, it can make the ShareFetch request wait up to MaxWaitMs
>>>>>>>> and then it can return an empty set of records if it’s still not
>>>> ready.
>>>>>>>> 
>>>>>>>> So, I don’t believe there will be too much load. If a topic with
>> many
>>>>>>>> partitions is added to the subscribed topics for a share group, the
>>>> fact
>>>>>>>> that the assignments will only start to include the partitions as
>>>> their
>>>>>>>> initialization completes should soften the impact.
>>>>>>>> 
>>>>>>>> 128, 129: The “proper” way to turn on this feature when it’s
>> finished
>>>>>> will
>>>>>>>> be using `group.coordinator.rebalance.protocols` and
>> `group.version`.
>>>>>>>> While it’s in Early Access and for test cases, the
>>>> `group.share.enable`
>>>>>>>> configuration will turn it on.
>>>>>>>> 
>>>>>>>> I have described `group.share.enable` as an internal configuration
>> in
>>>>>>>> the KIP.
>>>>>>>> 
>>>>>>>> 130. The config `group.share.record.lock.duration.ms` applies to
>>>> groups
>>>>>>>> which do not specify a group-level configuration for lock duration.
>>>> The
>>>>>>>> minimum and maximum for this configuration are intended to give it
>>>>>>>> sensible bounds.
>>>>>>>> 
>>>>>>>> If a group does specify its own `
>> group.share.record.lock.duration.ms
>>>> `,
>>>>>>>> the broker-level `group.share.max.record.lock.duration.ms` gives
>> the
>>>>>>>> cluster administrator a way of setting a maximum value for all
>> groups.
>>>>>>>> 
>>>>>>>> While editing, I renamed `group.share.record.lock.duration.max.ms`
>> to
>>>>>>>> `group.share.max.record.lock.duration.ms` for consistency with the
>>>>>>>> rest of the min/max configurations.
>>>>>>>> 
>>>>>>>> 131. This is the limit per partition so you can go wider with
>> multiple
>>>>>>>> partitions.
>>>>>>>> I have set the initial value low for safety. I expect to be able to
>>>>>>>> increase this
>>>>>>>> significantly when we have mature code which has been battle-tested.
>>>>>>>> Rather than try to guess how high it can safely go, I’ve erred on
>> the
>>>>>> side
>>>>>>>> of
>>>>>>>> caution and expect to open it up in a future KIP.
>>>>>>>> 
>>>>>>>> 132. Good catch. The problem is that I have missed two group
>>>>>>>> configurations,
>>>>>>>> now added. These are group.share.session.timeout.ms and
>>>>>>>> group.share.heartbeat.timeout.ms . The configurations you mentioned
>>>>>>>> are the bounds for the group-level configurations.
>>>>>>>> 
>>>>>>>> 133. The name `group.share.max.size` was chosen to mirror the
>> existing
>>>>>>>> `group.consumer.max.size`.
>>>>>>>> 
>>>>>>>> 134. It is intended to be a list of all of the valid assignors for
>> the
>>>>>>>> cluster.
>>>>>>>> When the assignors are configurable at the group level, the group
>>>>>>>> configuration
>>>>>>>> will only be permitted to name an assignor which is in this list.
>> For
>>>>>> now,
>>>>>>>> there
>>>>>>>> is no group configuration for assignor, so all groups get the one
>> and
>>>>>> only
>>>>>>>> assignor in the list.
>>>>>>>> 
>>>>>>>> 135. It’s the number of threads per broker. For a cluster with a
>> small
>>>>>>>> number of
>>>>>>>> of brokers and a lot of share group activity, it may be appropriate
>> to
>>>>>>>> increase
>>>>>>>> this. We will be able to give tuning advice once we have experience
>> of
>>>>>> the
>>>>>>>> performance impact of increasing it.
>>>>>>>> 
>>>>>>>> Thanks,
>>>>>>>> Andrew
>>>>>>>> 
>>>>>>>> 
>>>>>>>>> On 20 Apr 2024, at 00:14, Jun Rao <j...@confluent.io.INVALID>
>> wrote:
>>>>>>>>> 
>>>>>>>>> Hi, Andrew,
>>>>>>>>> 
>>>>>>>>> Thanks for the reply. A few more comments.
>>>>>>>>> 
>>>>>>>>> 120. There is still reference to ConsumerGroupMetadataKey.
>>>>>>>>> 
>>>>>>>>> 121. ShareUpdateValue.SnapshotEpoch: Should we change it since it's
>>>>>> not a
>>>>>>>>> snapshot?
>>>>>>>>> 
>>>>>>>>> 122. ConsumerGroupMemberMetadataValue includes epoch, but
>>>>>>>>> ShareGroupMemberMetadataValue does not. Do we know how the epoch is
>>>>>> being
>>>>>>>>> used in the consumer group and whether it's needed in the share
>>>> group?
>>>>>>>>> 
>>>>>>>>> 123. There is no equivalent of ConsumerGroupTargetAssignmentMember
>>>> for
>>>>>>>>> ShareGroup. How does the shareGroup persist the member assignment?
>>>>>>>>> 
>>>>>>>>> 124. Assign a share-partition: "When a topic-partition is assigned
>>>> to a
>>>>>>>>> member of a share group for the first time, the group coordinator
>>>>>> writes
>>>>>>>> a
>>>>>>>>> ShareGroupPartitionMetadata record to the __consumer_offsets
>> topic".
>>>>>>>>> When does the coordinator write ShareGroupMetadata with the bumped
>>>>>>>> epoch?
>>>>>>>>> In general, is there a particular order to bump up the epoch in
>>>>>> different
>>>>>>>>> records? When can the new epoch be exposed to the
>>>> sharePartitionLeader?
>>>>>>>> It
>>>>>>>>> would be useful to make this clear in other state changing
>> operations
>>>>>>>> too.
>>>>>>>>> 
>>>>>>>>> 125. "Alter share group offsets Group coordinator and share
>>>> coordinator
>>>>>>>>> Only empty share groups support this operation. The group
>> coordinator
>>>>>>>> sends
>>>>>>>>> an InitializeShareGroupState  request to the share coordinator. The
>>>>>> share
>>>>>>>>> coordinator writes a ShareSnapshot record with the new state epoch
>> to
>>>>>> the
>>>>>>>>> __share_group_state  topic."
>>>>>>>>> Does the operation need to write ShareGroupPartitionMetadata and
>>>>>>>>> ShareGroupMetadata (for the new epoch)?
>>>>>>>>> 
>>>>>>>>> 126. Delete share group: Does this operation also need to write a
>>>>>>>> tombstone
>>>>>>>>> for the ShareGroupMemberMetadata record?
>>>>>>>>> 
>>>>>>>>> 127. I was thinking about the impact on a new consumer joining the
>>>>>>>>> shareGroup. This causes the GroupCoordinator and the
>> ShareCoordinator
>>>>>> to
>>>>>>>>> bump up the group epoch, which in turn causes the SharePartition
>>>> leader
>>>>>>>> to
>>>>>>>>> reinitialize the state with ReadShareGroupOffsetsState. If many
>>>>>> consumers
>>>>>>>>> are joining a shareGroup around the same time, would there be too
>>>> much
>>>>>>>> load
>>>>>>>>> for the ShareCoordinator and SharePartition leader?
>>>>>>>>> 
>>>>>>>>> 128. Since group.share.enable will be replaced by the group.version
>>>>>>>>> feature, should we make it an internal config?
>>>>>>>>> 
>>>>>>>>> 129. group.coordinator.rebalance.protocols: this seems redundant
>> with
>>>>>>>>> group.share.enable?
>>>>>>>>> 
>>>>>>>>> 130. Why do we have both group.share.record.lock.duration.ms and
>>>>>>>>> group.share.record.lock.duration.max.ms, each with its own max
>>>> value?
>>>>>>>>> 
>>>>>>>>> 131. group.share.record.lock.partition.limit defaults to 200. This
>>>>>> limits
>>>>>>>>> the max degree of consumer parallelism to 200, right? If there are
>>>>>>>> multiple
>>>>>>>>> records per batch, it could be even smaller.
>>>>>>>>> 
>>>>>>>>> 132. Why do we need all three of group.share.session.timeout.ms,
>>>>>>>>> group.share.min.session.timeout.ms and
>>>>>>>> group.share.max.session.timeout.ms?
>>>>>>>>> Session timeout can't be set by the client. Ditto for
>>>>>>>>> group.share.heartbeat.interval.ms.
>>>>>>>>> 
>>>>>>>>> 133. group.share.max.size: Would group.share.max.members.per.group
>>>> be a
>>>>>>>>> more intuitive name?
>>>>>>>>> 
>>>>>>>>> 134. group.share.assignors: Why does it need to be a list?
>>>>>>>>> 
>>>>>>>>> 135. share.coordinator.threads: Is that per share coordinator or
>> per
>>>>>>>> broker?
>>>>>>>>> 
>>>>>>>>> Jun
>>>>>>>>> 
>>>>>>>>> On Tue, Apr 16, 2024 at 3:21 AM Andrew Schofield <
>>>>>>>> andrew_schofi...@live.com>
>>>>>>>>> wrote:
>>>>>>>>> 
>>>>>>>>>> Hi Jun,
>>>>>>>>>> Thanks for you reply.
>>>>>>>>>> 
>>>>>>>>>> 42.1. That’s a sensible improvement. Done.
>>>>>>>>>> 
>>>>>>>>>> 47,56. Done. All instances of BaseOffset changed to FirstOffset.
>>>>>>>>>> 
>>>>>>>>>> 105. I think that would be in a future KIP. Personally, I don’t
>> mind
>>>>>>>> having
>>>>>>>>>> a non-contiguous set of values in this KIP.
>>>>>>>>>> 
>>>>>>>>>> 114. Done.
>>>>>>>>>> 
>>>>>>>>>> 115. If the poll is just returning a single record because there
>> is
>>>>>> not
>>>>>>>>>> much
>>>>>>>>>> data to consume, committing on every record is OK. It’s
>> inefficient
>>>>>> but
>>>>>>>>>> acceptable.
>>>>>>>>>> If the first poll returns just one record, but many more have
>> piled
>>>> up
>>>>>>>>>> while
>>>>>>>>>> the first one was being processed, the next poll has the
>> opportunity
>>>>>> to
>>>>>>>>>> return
>>>>>>>>>> a bunch of records and then these will be able to be committed
>>>>>> together.
>>>>>>>>>> So, my answer is that optimisation on the broker to return batches
>>>> of
>>>>>>>>>> records when the records are available is the approach we will
>> take
>>>>>>>> here.
>>>>>>>>>> 
>>>>>>>>>> 116. Good idea. Done.
>>>>>>>>>> 
>>>>>>>>>> 117. I’ve rewritten the Kafka Broker Migration section. Let me
>> know
>>>>>> what
>>>>>>>>>> you think.
>>>>>>>>>> I am discussing the configuration to enable the feature in the
>>>> mailing
>>>>>>>>>> list with
>>>>>>>>>> David Jacot also, so I anticipate a bit of change in this area
>>>> still.
>>>>>>>>>> 
>>>>>>>>>> Thanks,
>>>>>>>>>> Andrew
>>>>>>>>>> 
>>>>>>>>>>> On 15 Apr 2024, at 23:34, Jun Rao <j...@confluent.io.INVALID>
>>>> wrote:
>>>>>>>>>>> 
>>>>>>>>>>> Hi, Andrew,
>>>>>>>>>>> 
>>>>>>>>>>> Thanks for the updated KIP.
>>>>>>>>>>> 
>>>>>>>>>>> 42.1 "If the share group offset is altered multiple times when
>> the
>>>>>>>> group
>>>>>>>>>>> remains empty, it would be harmless if the same state epoch was
>>>>>> reused
>>>>>>>> to
>>>>>>>>>>> initialize the state."
>>>>>>>>>>> Hmm, how does the partition leader know for sure that it has
>>>> received
>>>>>>>> the
>>>>>>>>>>> latest share group offset if epoch is reused?
>>>>>>>>>>> Could we update the section "Group epoch - Trigger a rebalance"
>>>> that
>>>>>>>>>>> AdminClient.alterShareGroupOffsets causes the group epoch to be
>>>>>> bumped
>>>>>>>>>> too?
>>>>>>>>>>> 
>>>>>>>>>>> 47,56 "my view is that BaseOffset should become FirstOffset in
>> ALL
>>>>>>>>>> schemas
>>>>>>>>>>> defined in the KIP."
>>>>>>>>>>> Yes, that seems better to me.
>>>>>>>>>>> 
>>>>>>>>>>> 105. "I have another non-terminal state in mind for 3."
>>>>>>>>>>> Should we document it?
>>>>>>>>>>> 
>>>>>>>>>>> 114. session.timeout.ms in the consumer configuration is
>>>> deprecated
>>>>>> in
>>>>>>>>>>> KIP-848. So, we need to remove it from the shareConsumer
>>>>>> configuration.
>>>>>>>>>>> 
>>>>>>>>>>> 115. I am wondering if it's a good idea to always commit acks on
>>>>>>>>>>> ShareConsumer.poll(). In the extreme case, each batch may only
>>>>>> contain
>>>>>>>> a
>>>>>>>>>>> single record and each poll() only returns a single batch. This
>>>> will
>>>>>>>>>> cause
>>>>>>>>>>> each record to be committed individually. Is there a way for a
>> user
>>>>>> to
>>>>>>>>>>> optimize this?
>>>>>>>>>>> 
>>>>>>>>>>> 116. For each new RPC, could we list the associated acls?
>>>>>>>>>>> 
>>>>>>>>>>> 117. Since this KIP changes internal records and RPCs, it would
>> be
>>>>>>>> useful
>>>>>>>>>>> to document the upgrade process.
>>>>>>>>>>> 
>>>>>>>>>>> Jun
>>>>>>>>>>> 
>>>>>>>>>>> On Wed, Apr 10, 2024 at 7:35 AM Andrew Schofield <
>>>>>>>>>> andrew_schofi...@live.com>
>>>>>>>>>>> wrote:
>>>>>>>>>>> 
>>>>>>>>>>>> Hi Jun,
>>>>>>>>>>>> Thanks for your questions.
>>>>>>>>>>>> 
>>>>>>>>>>>> 41.
>>>>>>>>>>>> 41.1. The partition leader obtains the state epoch in the
>> response
>>>>>>>> from
>>>>>>>>>>>> ReadShareGroupState. When it becomes a share-partition leader,
>>>>>>>>>>>> it reads the share-group state and one of the things it learns
>> is
>>>>>> the
>>>>>>>>>>>> current state epoch. Then it uses the state epoch in all
>>>> subsequent
>>>>>>>>>>>> calls to WriteShareGroupState. The fencing is to prevent writes
>>>> for
>>>>>>>>>>>> a previous state epoch, which are very unlikely but which would
>>>> mean
>>>>>>>>>>>> that a leader was using an out-of-date epoch and was likely no
>>>>>> longer
>>>>>>>>>>>> the current leader at all, perhaps due to a long pause for some
>>>>>>>> reason.
>>>>>>>>>>>> 
>>>>>>>>>>>> 41.2. If the group coordinator were to set the SPSO, wouldn’t it
>>>>>> need
>>>>>>>>>>>> to discover the initial offset? I’m trying to avoid yet another
>>>>>>>>>>>> inter-broker
>>>>>>>>>>>> hop.
>>>>>>>>>>>> 
>>>>>>>>>>>> 42.
>>>>>>>>>>>> 42.1. I think I’ve confused things. When the share group offset
>> is
>>>>>>>>>> altered
>>>>>>>>>>>> using AdminClient.alterShareGroupOffsets, the group coordinator
>>>> WILL
>>>>>>>>>>>> update the state epoch. I don’t think it needs to update the
>> group
>>>>>>>> epoch
>>>>>>>>>>>> at the same time (although it could) because the group epoch
>> will
>>>>>> have
>>>>>>>>>>>> been bumped when the group became empty. If the share group
>> offset
>>>>>>>>>>>> is altered multiple times when the group remains empty, it would
>>>> be
>>>>>>>>>>>> harmless if the same state epoch was reused to initialize the
>>>> state.
>>>>>>>>>>>> 
>>>>>>>>>>>> When the share-partition leader updates the SPSO as a result of
>>>>>>>>>>>> the usual flow of record delivery, it does not update the state
>>>>>> epoch.
>>>>>>>>>>>> 
>>>>>>>>>>>> 42.2. The share-partition leader will notice the alteration
>>>> because,
>>>>>>>>>>>> when it issues WriteShareGroupState, the response will contain
>> the
>>>>>>>>>>>> error code FENCED_STATE_EPOCH. This is supposed to be the
>>>>>>>>>>>> last-resort way of catching this.
>>>>>>>>>>>> 
>>>>>>>>>>>> When the share-partition leader handles its first ShareFetch
>>>>>> request,
>>>>>>>>>>>> it learns the state epoch from the response to
>>>> ReadShareGroupState.
>>>>>>>>>>>> 
>>>>>>>>>>>> In normal running, the state epoch will remain constant, but,
>> when
>>>>>>>> there
>>>>>>>>>>>> are no consumers and the group is empty, it might change. As a
>>>>>> result,
>>>>>>>>>>>> I think it would be sensible when the set of share sessions
>>>>>>>> transitions
>>>>>>>>>>>> from 0 to 1, which is a reasonable proxy for the share group
>>>>>>>>>> transitioning
>>>>>>>>>>>> from empty to non-empty, for the share-partition leader to issue
>>>>>>>>>>>> ReadShareGroupOffsetsState to validate the state epoch. If its
>>>> state
>>>>>>>>>>>> epoch is out of date, it can then ReadShareGroupState to
>>>>>>>> re-initialize.
>>>>>>>>>>>> 
>>>>>>>>>>>> I’ve changed the KIP accordingly.
>>>>>>>>>>>> 
>>>>>>>>>>>> 47, 56. If I am to change BaseOffset to FirstOffset, we need to
>>>> have
>>>>>>>>>>>> a clear view of which is the correct term. Having reviewed all
>> of
>>>>>> the
>>>>>>>>>>>> instances, my view is that BaseOffset should become FirstOffset
>> in
>>>>>>>>>>>> ALL schemas defined in the KIP. Then, BaseOffset is just used in
>>>>>>>>>>>> record batches, which is already a known concept.
>>>>>>>>>>>> 
>>>>>>>>>>>> Please let me know if you agree.
>>>>>>>>>>>> 
>>>>>>>>>>>> 60. I’ve added FindCoordinator to the top level index for
>> protocol
>>>>>>>>>> changes.
>>>>>>>>>>>> 
>>>>>>>>>>>> 61. OK. I expect you are correct about how users will be using
>> the
>>>>>>>>>>>> console share consumer. When I use the console consumer, I
>> always
>>>>>> get
>>>>>>>>>>>> a new consumer group. I have changed the default group ID for
>>>>>> console
>>>>>>>>>>>> share consumer to “console-share-consumer” to match the console
>>>>>>>> consumer
>>>>>>>>>>>> better and give more of an idea where this mysterious group has
>>>> come
>>>>>>>>>> from.
>>>>>>>>>>>> 
>>>>>>>>>>>> 77. I will work on a proposal that does not use compaction and
>> we
>>>>>> can
>>>>>>>>>>>> make a judgement about whether it’s a better course for KIP-932.
>>>>>>>>>>>> Personally,
>>>>>>>>>>>> until I’ve written it down and lived with the ideas for a few
>>>> days,
>>>>>> I
>>>>>>>>>>>> won’t be
>>>>>>>>>>>> able to choose which I prefer.
>>>>>>>>>>>> 
>>>>>>>>>>>> I should be able to get the proposal written by the end of this
>>>>>> week.
>>>>>>>>>>>> 
>>>>>>>>>>>> 100. ShareGroupHeartbeatRequest.RebalanceTimeoutMs matches
>>>>>>>>>>>> ConsumerGroupHeartbeatRequest.RebalanceTimeoutMs from KIP-848.
>>>>>>>>>>>> I prefer to maintain the consistency.
>>>>>>>>>>>> 
>>>>>>>>>>>> 101. Thanks for catching this. The ShareGroupHeartbeatResponse
>> was
>>>>>>>>>>>> originally
>>>>>>>>>>>> created from KIP-848. This part of the schema does not apply
>> and I
>>>>>>>> have
>>>>>>>>>>>> removed
>>>>>>>>>>>> it. I have also renamed AssignedTopicPartitions to simply
>>>>>>>>>> TopicPartitions
>>>>>>>>>>>> which
>>>>>>>>>>>> aligns with the actual definition of
>>>> ConsumerGroupHeartbeatResponse.
>>>>>>>>>>>> 
>>>>>>>>>>>> 102. No, I don’t think we do. Removed.
>>>>>>>>>>>> 
>>>>>>>>>>>> 103. I’ve changed the description for the error codes for
>>>>>>>>>>>> ShareFetchResponse.
>>>>>>>>>>>> 
>>>>>>>>>>>> 104. Interesting. I have added ErrorMessages to these RPCs as
>> you
>>>>>>>>>> suggest.
>>>>>>>>>>>> It’s a good improvement for problem determination.
>>>>>>>>>>>> 
>>>>>>>>>>>> 105. The values are reserved in my brain. Actually, 1 is
>> Acquired
>>>>>>>> which
>>>>>>>>>>>> is not persisted, and I have another non-terminal state in mind
>>>> for
>>>>>> 3.
>>>>>>>>>>>> 
>>>>>>>>>>>> 106. A few people have raised the question of whether
>>>>>>>> OffsetAndMetadata
>>>>>>>>>>>> is sensible in this KIP, given that the optional Metadata part
>>>> comes
>>>>>>>>>> from
>>>>>>>>>>>> when
>>>>>>>>>>>> a regular consumer commits offsets. However, it is correct that
>>>>>> there
>>>>>>>>>> will
>>>>>>>>>>>> never be metadata with a share group. I have changed
>>>>>>>>>>>> the KIP to replace OffsetAndMetadata with Long.
>>>>>>>>>>>> 
>>>>>>>>>>>> 107. Yes, you are right. I have learnt during this process that
>> a
>>>>>>>>>> version
>>>>>>>>>>>> bump
>>>>>>>>>>>> can be a logical not just a physical change to the schema. KIP
>>>>>>>> updated.
>>>>>>>>>>>> 
>>>>>>>>>>>> 108. I would prefer not to extend this RPC for all of the states
>>>> at
>>>>>>>> this
>>>>>>>>>>>> point.
>>>>>>>>>>>> I think there is definitely scope for another KIP focused on
>>>>>>>>>> administration
>>>>>>>>>>>> of share groups that might want this information so someone
>> could
>>>>>>>> build
>>>>>>>>>> a
>>>>>>>>>>>> UI and other tools on top. Doing that well is quite a lot of
>> work
>>>> in
>>>>>>>> its
>>>>>>>>>>>> own right
>>>>>>>>>>>> so I would prefer not to do that now.
>>>>>>>>>>>> 
>>>>>>>>>>>> 109.Yes, they’re inconsistent and follow the consumer groups
>>>>>>>> equivalents
>>>>>>>>>>>> which are also inconsistent. In general, the KafkaAdmin.delete….
>>>>>>>> Methods
>>>>>>>>>>>> use the Map<XXX, KafkaFuture<YYY>> pattern like
>>>>>>>> DeleteShareGroupsResult.
>>>>>>>>>>>> 
>>>>>>>>>>>> Would you prefer that I do that, or remain consistent with
>>>> consumer
>>>>>>>>>> groups?
>>>>>>>>>>>> Happy to change it.
>>>>>>>>>>>> 
>>>>>>>>>>>> 110. Again, consumer groups don’t yield that level of detail
>> about
>>>>>>>>>> epochs.
>>>>>>>>>>>> The MemberDescription does include the assignment, but not the
>>>> list
>>>>>> of
>>>>>>>>>>>> subscribed topic names.
>>>>>>>>>>>> 
>>>>>>>>>>>> 111. I didn’t include GroupState in GroupListing because there’s
>>>> no
>>>>>>>>>>>> single class that includes the states of all group types.
>>>>>>>>>>>> 
>>>>>>>>>>>> 112. I think it’s good practice for the API to have
>>>>>>>>>>>> ListShareGroupOffsetSpec.
>>>>>>>>>>>> It makes evolution and extension of the API much easier. Also,
>> it
>>>>>>>>>> matches
>>>>>>>>>>>> the precedent set by ListConsumerGroupOffsetSpec.
>>>>>>>>>>>> 
>>>>>>>>>>>> 113. ListConsumerGroupsResults.errors() is the same. I think you
>>>>>> just
>>>>>>>>>> have
>>>>>>>>>>>> to look in the exception details and the same pattern is being
>>>>>>>> followed
>>>>>>>>>>>> here.
>>>>>>>>>>>> 
>>>>>>>>>>>> 
>>>>>>>>>>>> Over the next few days, I have committed to writing a proposal
>> for
>>>>>> how
>>>>>>>>>> to
>>>>>>>>>>>> persist
>>>>>>>>>>>> share-group state that doesn’t use log compaction.
>>>>>>>>>>>> 
>>>>>>>>>>>> I am also aware that discussion with Justine Olshan on
>>>>>> read-committed
>>>>>>>>>>>> isolation level is not yet quite complete.
>>>>>>>>>>>> 
>>>>>>>>>>>> Thanks for the detailed review.
>>>>>>>>>>>> 
>>>>>>>>>>>> Andrew
>>>>>>>>>>>> 
>>>>>>>>>>>>> On 9 Apr 2024, at 23:58, Jun Rao <j...@confluent.io.INVALID>
>>>> wrote:
>>>>>>>>>>>>> 
>>>>>>>>>>>>> Hi, Andrew,
>>>>>>>>>>>>> 
>>>>>>>>>>>>> Thanks for the reply. A few more comments.
>>>>>>>>>>>>> 
>>>>>>>>>>>>> 41.
>>>>>>>>>>>>> 41.1 How does the partition leader obtain the group epoch to
>> set
>>>>>>>>>>>>> WriteShareGroupStateRequest.StateEpoch?
>>>>>>>>>>>>> 41.2 What's the benefit of having the group coordinator
>>>> initialize
>>>>>>>> the
>>>>>>>>>>>>> state and the partition leader set the SPSO? It seems simpler
>> to
>>>>>> have
>>>>>>>>>> the
>>>>>>>>>>>>> partition leader initialize both the state and the SPSO
>> together?
>>>>>>>>>>>>> 
>>>>>>>>>>>>> 42.
>>>>>>>>>>>>> 42.1 "I don’t think the group epoch needs to be bumped when the
>>>>>> share
>>>>>>>>>>>> group
>>>>>>>>>>>>> offset is altered."
>>>>>>>>>>>>> But the part on handling Alter share group offsets says "The
>>>> share
>>>>>>>>>>>>> coordinator writes a ShareCheckpoint record with the new state
>>>>>> epoch
>>>>>>>> to
>>>>>>>>>>>> the
>>>>>>>>>>>>> __share_group_state  topic." So, which is correct? We have two
>>>>>> paths
>>>>>>>> to
>>>>>>>>>>>>> update the state in the share coordinator, one from the group
>>>>>>>>>> coordinator
>>>>>>>>>>>>> and another from the partition leader. I thought the benefit of
>>>>>>>> bumping
>>>>>>>>>>>> up
>>>>>>>>>>>>> the epoch is to fence off a late request in the previous epoch
>>>> from
>>>>>>>>>>>> another
>>>>>>>>>>>>> path.
>>>>>>>>>>>>> 42.2 When the group coordinator alters the share group offset
>> in
>>>>>>>> share
>>>>>>>>>>>>> coordinator, how does the partition leader know the share group
>>>>>> state
>>>>>>>>>> has
>>>>>>>>>>>>> been altered so that it could clear its in-memory state?
>>>>>>>>>>>>> 
>>>>>>>>>>>>> 47. 56. BaseOffset typically refers to the base offset for the
>>>>>> batch
>>>>>>>>>> and
>>>>>>>>>>>>> can be confusing. FirstOffset is clearer and matches
>> LastOffset.
>>>>>>>>>>>>> 
>>>>>>>>>>>>> 60. Could we include FindCoordinatorRequest in the top level
>>>> index
>>>>>>>> for
>>>>>>>>>>>>> Kafka protocol changes?
>>>>>>>>>>>>> 
>>>>>>>>>>>>> 61. I think it's probably ok to add time-based expiration
>> later.
>>>>>> But
>>>>>>>>>>>> using
>>>>>>>>>>>>> a default group in console-share-consumer probably won't help
>>>>>> reduce
>>>>>>>>>> the
>>>>>>>>>>>>> garbage. In the common case, the user of the console consumer
>>>>>> likely
>>>>>>>>>>>> wants
>>>>>>>>>>>>> to see the recently produced records for verification. If the
>>>>>> default
>>>>>>>>>>>> group
>>>>>>>>>>>>> doesn't provide that (because of the stale state), the user
>> will
>>>>>>>> likely
>>>>>>>>>>>>> just use a new group. It's true that we don't garbage collect
>>>> idle
>>>>>>>>>>>> topics.
>>>>>>>>>>>>> However,  the share groups are similar to consumers, which does
>>>>>>>> support
>>>>>>>>>>>>> garbage collection. Typically, we read topics more than
>> creating
>>>>>>>> them.
>>>>>>>>>>>>> 
>>>>>>>>>>>>> 77. If the generic compaction is inconvenient, we could use
>>>>>>>> customized
>>>>>>>>>>>>> logic. If we go with that route, option (b) seems cleaner and
>>>> more
>>>>>>>>>>>>> optimized. Since the share states for all groups fit in memory,
>>>> we
>>>>>>>>>> could
>>>>>>>>>>>>> generate snapshots more efficiently than going through
>>>> compaction.
>>>>>>>>>>>> Having a
>>>>>>>>>>>>> separate log per share partition is probably too much overhead.
>>>>>> It's
>>>>>>>>>> more
>>>>>>>>>>>>> efficient to put the state changes for multiple share
>> partitions
>>>>>> in a
>>>>>>>>>>>>> single log.
>>>>>>>>>>>>> 
>>>>>>>>>>>>> 100. ShareGroupHeartbeatRequest.RebalanceTimeoutMs: Should we
>>>> name
>>>>>> it
>>>>>>>>>>>>> SessionTimeoutMs?
>>>>>>>>>>>>> 
>>>>>>>>>>>>> 101. ShareGroupHeartbeatResponse.Assignment.Error: What kind of
>>>>>> error
>>>>>>>>>>>> could
>>>>>>>>>>>>> we have when assigning partitions? What are the corresponding
>>>> error
>>>>>>>>>>>> codes?
>>>>>>>>>>>>> 
>>>>>>>>>>>>> 102. Do we still need
>>>>>>>>>>>>> 
>>>>>>>>>>>> 
>>>>>>>>>> 
>>>>>>>> 
>>>>>> 
>>>> 
>> ShareGroupDescribeResponse.Members.Assignment.{MetadataVersion,MetadataBytes}?
>>>>>>>>>>>>> 
>>>>>>>>>>>>> 103. Could we list the error codes separately for
>>>>>>>>>>>>> ShareFetchResponse.Responses.Partitions.ErrorCode and
>>>>>>>>>>>>> ShareFetchResponse.Responses.Partitions.AcknowledgeErrorCode?
>>>>>>>>>>>>> 
>>>>>>>>>>>>> 104. Should we add error message for the errorCode in
>>>>>>>>>> ShareFetchResponse,
>>>>>>>>>>>>> ShareAcknowledgeResponse, ReadShareGroupStateResponse,
>>>>>>>>>>>>> WriteShareGroupStateResponse, DeleteShareGroupStateResponse,
>>>>>>>>>>>>> ReadShareGroupOffsetsStateResponse and
>>>>>>>>>> InitializeShareGroupStateResponse?
>>>>>>>>>>>>> 
>>>>>>>>>>>>> 105. "about": "The state - 0:Available,2:Acked,4:Archived.":
>> What
>>>>>>>>>> about 1
>>>>>>>>>>>>> and 3? Are we leaving them out intentionally?
>>>>>>>>>>>>> 
>>>>>>>>>>>>> 106. Do we have usage of metadata in OffsetAndMetadata? If not,
>>>>>> could
>>>>>>>>>> we
>>>>>>>>>>>>> remove it from AdminClient and KafkaShareConsumer?
>>>>>>>>>>>>> 
>>>>>>>>>>>>> 107. ListGroupsRequest: Should we bump up the version since it
>>>> now
>>>>>>>>>>>> supports
>>>>>>>>>>>>> a new group type "share"?
>>>>>>>>>>>>> 
>>>>>>>>>>>>> 108. AdminClient.listShareGroupOffsets: Should it expose all
>> the
>>>>>>>> states
>>>>>>>>>>>>> from ReadShareGroupStateResponse, instead of just SPSO?
>>>>>>>>>>>>> 
>>>>>>>>>>>>> 109. DeleteShareGroupOffsetsResult exposes
>>>>>>>>>>>>> public KafkaFuture<Void> partitionResult(final TopicPartition
>>>>>>>>>> partition)
>>>>>>>>>>>>> DeleteShareGroupsResult exposes
>>>>>>>>>>>>> public Map<String, KafkaFuture<Void>> deletedGroups()
>>>>>>>>>>>>> Should we make them more consistent?
>>>>>>>>>>>>> 
>>>>>>>>>>>>> 110. Should ShareGroupDescription include fields like
>> GroupEpoch,
>>>>>>>>>>>>> AssignmentEpoch, MemberEpoch, and SubscribedTopicNames?
>>>>>>>>>>>>> 
>>>>>>>>>>>>> 111. Should GroupListing include GroupState?
>>>>>>>>>>>>> 
>>>>>>>>>>>>> 112. Do we need ListShareGroupOffsetsSpec? Could we just use
>>>>>>>>>>>>> Set<TopicPartition> directly?
>>>>>>>>>>>>> 
>>>>>>>>>>>>> 113. ListShareGroupsResult.errors(): How do we know which group
>>>> has
>>>>>>>> an
>>>>>>>>>>>>> error?
>>>>>>>>>>>>> 
>>>>>>>>>>>>> Jun
>>>>>>>>>>>>> 
>>>>>>>>>>>>> On Mon, Apr 8, 2024 at 9:32 AM Andrew Schofield <
>>>>>>>>>>>>> andrew_schofield_j...@outlook.com> wrote:
>>>>>>>>>>>>> 
>>>>>>>>>>>>>> Hi David,
>>>>>>>>>>>>>> Thanks for your questions.
>>>>>>>>>>>>>> 
>>>>>>>>>>>>>> 70. The Group Coordinator communicates with the Share
>>>> Coordinator
>>>>>>>> over
>>>>>>>>>>>>>> RPCs.
>>>>>>>>>>>>>> In the general case, it’s an inter-broker call. It is probably
>>>>>>>>>> possible
>>>>>>>>>>>> to
>>>>>>>>>>>>>> optimise
>>>>>>>>>>>>>> for the situation in which the appropriate GC and SC shards
>> are
>>>>>>>>>>>>>> co-located, but the
>>>>>>>>>>>>>> KIP does not delve that deep into potential performance
>>>>>>>> optimisations.
>>>>>>>>>>>>>> 
>>>>>>>>>>>>>> 71. Avoiding collisions would be a good idea, but I do worry
>>>> about
>>>>>>>>>>>>>> retrospectively
>>>>>>>>>>>>>> introducing a naming convention for groups. I feel that naming
>>>>>>>>>>>> conventions
>>>>>>>>>>>>>> will
>>>>>>>>>>>>>> typically be the responsibility of the cluster administrators
>>>>>> based
>>>>>>>> on
>>>>>>>>>>>>>> organizational
>>>>>>>>>>>>>> factors, such as the name of an application.
>>>>>>>>>>>>>> 
>>>>>>>>>>>>>> 72. Personally, I don’t like INVALID_GROUP_ID because the
>> group
>>>> ID
>>>>>>>> is
>>>>>>>>>>>>>> correct but
>>>>>>>>>>>>>> the group is the wrong type. The nearest existing error code
>>>> that
>>>>>>>> gets
>>>>>>>>>>>>>> that across
>>>>>>>>>>>>>> is INCONSISTENT_GROUP_PROTOCOL. Perhaps this is really showing
>>>>>> that
>>>>>>>> a
>>>>>>>>>>>> new
>>>>>>>>>>>>>> error code would be better.
>>>>>>>>>>>>>> 
>>>>>>>>>>>>>> 73. The Metadata fields are not used. I have removed them.
>>>>>>>>>>>>>> 
>>>>>>>>>>>>>> 74. The metadata is re-evaluated on every change, but only a
>>>>>> subset
>>>>>>>> is
>>>>>>>>>>>>>> relevant
>>>>>>>>>>>>>> for rebalancing. A check is done against the names of the
>>>>>> subscribed
>>>>>>>>>>>>>> topics to
>>>>>>>>>>>>>> see if any relevant changes may have occurred. Then the
>> changes
>>>>>>>> which
>>>>>>>>>>>>>> trigger
>>>>>>>>>>>>>> a rebalance are topic creation, deletion, change in
>> partitions,
>>>> or
>>>>>>>>>> rack
>>>>>>>>>>>>>> IDs for the
>>>>>>>>>>>>>> replicas. I have updated the KIP to make this more clear.
>>>>>>>>>>>>>> 
>>>>>>>>>>>>>> 75. The assignment is not persisted because it is much less
>>>>>>>> important
>>>>>>>>>>>> that
>>>>>>>>>>>>>> the
>>>>>>>>>>>>>> assignment survives a GC change. There’s no need to transfer
>>>>>>>>>> partitions
>>>>>>>>>>>>>> safely from
>>>>>>>>>>>>>> member to member in the way that is required for consumer
>>>> groups,
>>>>>> so
>>>>>>>>>> as
>>>>>>>>>>>> an
>>>>>>>>>>>>>> optimisation, the assignments for a share group are not
>>>> persisted.
>>>>>>>> It
>>>>>>>>>>>>>> wouldn’t do any
>>>>>>>>>>>>>> harm, but it just seems unnecessary.
>>>>>>>>>>>>>> 
>>>>>>>>>>>>>> 76. In the event that a consumer tries to acknowledge a record
>>>>>> that
>>>>>>>> it
>>>>>>>>>>>> now
>>>>>>>>>>>>>> longer
>>>>>>>>>>>>>> has the right to acknowledge, the INVALID_RECORD_STATE error
>>>> code
>>>>>> is
>>>>>>>>>>>> used.
>>>>>>>>>>>>>> 
>>>>>>>>>>>>>> If the application uses the KafkaShareConsumer.commitSync
>>>> method,
>>>>>> it
>>>>>>>>>>>> will
>>>>>>>>>>>>>> see an InvalidRecordState exception returned. Alternatively,
>> the
>>>>>>>>>>>>>> application can
>>>>>>>>>>>>>> register an acknowledgement commit callback which will be
>> called
>>>>>>>> with
>>>>>>>>>>>> the
>>>>>>>>>>>>>> status
>>>>>>>>>>>>>> of the acknowledgements that have succeeded or failed.
>>>>>>>>>>>>>> 
>>>>>>>>>>>>>> 77. I have tried to tread a careful path with the durable
>>>>>>>>>>>> share-partition
>>>>>>>>>>>>>> state in this
>>>>>>>>>>>>>> KIP. The significant choices I made are that:
>>>>>>>>>>>>>> * Topics are used so that the state is replicated between
>>>> brokers.
>>>>>>>>>>>>>> * Log compaction is used to keep a lid on the storage.
>>>>>>>>>>>>>> * Only one topic is required.
>>>>>>>>>>>>>> 
>>>>>>>>>>>>>> Log compaction as it stands is not ideal for this kind of
>> data,
>>>> as
>>>>>>>>>>>>>> evidenced by
>>>>>>>>>>>>>> the DeltaIndex technique I employed.
>>>>>>>>>>>>>> 
>>>>>>>>>>>>>> I can think of a few relatively simple ways to improve upon
>> it.
>>>>>>>>>>>>>> 
>>>>>>>>>>>>>> a) We could use a customised version of the log compactor for
>>>> this
>>>>>>>>>> topic
>>>>>>>>>>>>>> that
>>>>>>>>>>>>>> understands the rules for ShareCheckpoint and ShareDelta
>>>> records.
>>>>>>>>>>>>>> Essentially,
>>>>>>>>>>>>>> for each share-partition, the latest ShareCheckpoint and any
>>>>>>>>>> subsequent
>>>>>>>>>>>>>> ShareDelta
>>>>>>>>>>>>>> records must not be cleaned. Anything else can be cleaned. We
>>>>>> could
>>>>>>>>>> then
>>>>>>>>>>>>>> be sure
>>>>>>>>>>>>>> that multiple ShareDelta records with the same key would
>> survive
>>>>>>>>>>>> cleaning
>>>>>>>>>>>>>> and we could
>>>>>>>>>>>>>> abandon the DeltaIndex technique.
>>>>>>>>>>>>>> 
>>>>>>>>>>>>>> b) Actually what is required is a log per share-partition.
>> Let’s
>>>>>>>>>> imagine
>>>>>>>>>>>>>> that we had
>>>>>>>>>>>>>> a share-state topic per topic being consumed in a share group,
>>>>>> with
>>>>>>>>>> the
>>>>>>>>>>>>>> same number
>>>>>>>>>>>>>> of partitions as the topic being consumed. We could write many
>>>>>> more
>>>>>>>>>>>> deltas
>>>>>>>>>>>>>> between
>>>>>>>>>>>>>> checkpoints, and just take periodic checkpoints to keep
>> control
>>>> of
>>>>>>>> the
>>>>>>>>>>>>>> storage used.
>>>>>>>>>>>>>> Once a checkpoint has been taken, we could use
>>>>>>>>>>>> KafkaAdmin.deleteRecords()
>>>>>>>>>>>>>> to
>>>>>>>>>>>>>> prune all of the older records.
>>>>>>>>>>>>>> 
>>>>>>>>>>>>>> The share-state topics would be more numerous, but we are
>>>> talking
>>>>>>>> one
>>>>>>>>>>>> per
>>>>>>>>>>>>>> topic
>>>>>>>>>>>>>> per share group that it’s being consumed in. These topics
>> would
>>>>>> not
>>>>>>>> be
>>>>>>>>>>>>>> compacted.
>>>>>>>>>>>>>> 
>>>>>>>>>>>>>> As you’ll see in the KIP, the Persister interface is intended
>> to
>>>>>> be
>>>>>>>>>>>>>> pluggable one day.
>>>>>>>>>>>>>> I know the scheme in the KIP is not ideal. It seems likely to
>> me
>>>>>>>> that
>>>>>>>>>>>>>> future KIPs will
>>>>>>>>>>>>>> improve upon it.
>>>>>>>>>>>>>> 
>>>>>>>>>>>>>> If I can get buy-in for option (b), I’m happy to change this
>>>> KIP.
>>>>>>>>>> While
>>>>>>>>>>>>>> option (a) is
>>>>>>>>>>>>>> probably workable, it does seem a bit of a hack to have a
>>>>>> customised
>>>>>>>>>> log
>>>>>>>>>>>>>> compactor
>>>>>>>>>>>>>> just for this topic.
>>>>>>>>>>>>>> 
>>>>>>>>>>>>>> 78. How about DeliveryState? I agree that State is overloaded.
>>>>>>>>>>>>>> 
>>>>>>>>>>>>>> 79. See (77).
>>>>>>>>>>>>>> 
>>>>>>>>>>>>>> Thanks,
>>>>>>>>>>>>>> Andrew
>>>>>>>>>>>>>> 
>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>> On 5 Apr 2024, at 05:07, David Arthur <
>>>> david.art...@confluent.io
>>>>>>>>>>>> .INVALID>
>>>>>>>>>>>>>> wrote:
>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>> Andrew, thanks for the KIP! This is a pretty exciting effort.
>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>> I've finally made it through the KIP, still trying to grok
>> the
>>>>>>>> whole
>>>>>>>>>>>>>> thing.
>>>>>>>>>>>>>>> Sorry if some of my questions are basic :)
>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>> Concepts:
>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>> 70. Does the Group Coordinator communicate with the Share
>>>>>>>> Coordinator
>>>>>>>>>>>>>> over
>>>>>>>>>>>>>>> RPC or directly in-process?
>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>> 71. For preventing name collisions with regular consumer
>>>> groups,
>>>>>>>>>> could
>>>>>>>>>>>> we
>>>>>>>>>>>>>>> define a reserved share group prefix? E.g., the operator
>>>> defines
>>>>>>>>>> "sg_"
>>>>>>>>>>>>>> as a
>>>>>>>>>>>>>>> prefix for share groups only, and if a regular consumer group
>>>>>> tries
>>>>>>>>>> to
>>>>>>>>>>>>>> use
>>>>>>>>>>>>>>> that name it fails.
>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>> 72. When a consumer tries to use a share group, or a share
>>>>>> consumer
>>>>>>>>>>>> tries
>>>>>>>>>>>>>>> to use a regular group, would INVALID_GROUP_ID make more
>> sense
>>>>>>>>>>>>>>> than INCONSISTENT_GROUP_PROTOCOL?
>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>> --------
>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>> Share Group Membership:
>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>> 73. What goes in the Metadata field for
>> TargetAssignment#Member
>>>>>> and
>>>>>>>>>>>>>>> Assignment?
>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>> 74. Under Trigger a rebalance, it says we rebalance when the
>>>>>>>>>> partition
>>>>>>>>>>>>>>> metadata changes. Would this be for any change, or just
>> certain
>>>>>>>> ones?
>>>>>>>>>>>> For
>>>>>>>>>>>>>>> example, if a follower drops out of the ISR and comes back,
>> we
>>>>>>>>>> probably
>>>>>>>>>>>>>>> don't need to rebalance.
>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>> 75. "For a share group, the group coordinator does *not*
>>>> persist
>>>>>>>> the
>>>>>>>>>>>>>>> assignment" Can you explain why this is not needed?
>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>> 76. " If the consumer just failed to heartbeat due to a
>>>> temporary
>>>>>>>>>>>> pause,
>>>>>>>>>>>>>> it
>>>>>>>>>>>>>>> could in theory continue to fetch and acknowledge records.
>> When
>>>>>> it
>>>>>>>>>>>>>> finally
>>>>>>>>>>>>>>> sends a heartbeat and realises it’s been kicked out of the
>>>> group,
>>>>>>>> it
>>>>>>>>>>>>>> should
>>>>>>>>>>>>>>> stop fetching records because its assignment has been
>> revoked,
>>>>>> and
>>>>>>>>>>>> rejoin
>>>>>>>>>>>>>>> the group."
>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>> A consumer with a long pause might still deliver some
>> buffered
>>>>>>>>>> records,
>>>>>>>>>>>>>> but
>>>>>>>>>>>>>>> if the share group coordinator has expired its session, it
>>>>>> wouldn't
>>>>>>>>>>>>>> accept
>>>>>>>>>>>>>>> acknowledgments for that share consumer. In such a case, is
>> any
>>>>>>>> kind
>>>>>>>>>> of
>>>>>>>>>>>>>>> error raised to the application like "hey, I know we gave you
>>>>>> these
>>>>>>>>>>>>>>> records, but really we shouldn't have" ?
>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>> -----
>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>> Record Delivery and acknowledgement
>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>> 77. If we guarantee that a ShareCheckpoint is written at
>> least
>>>>>>>> every
>>>>>>>>>> so
>>>>>>>>>>>>>>> often, could we add a new log compactor that avoids
>> compacting
>>>>>>>>>>>>>> ShareDelta-s
>>>>>>>>>>>>>>> that are still "active" (i.e., not yet superceded by a new
>>>>>>>>>>>>>>> ShareCheckpoint). Mechnically, this could be done by keeping
>>>> the
>>>>>>>> LSO
>>>>>>>>>> no
>>>>>>>>>>>>>>> greater than the oldest "active" ShareCheckpoint. This might
>>>> let
>>>>>> us
>>>>>>>>>>>>>> remove
>>>>>>>>>>>>>>> the DeltaIndex thing.
>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>> 78. Instead of the State in the ShareDelta/Checkpoint
>> records,
>>>>>> how
>>>>>>>>>>>> about
>>>>>>>>>>>>>>> MessageState? (State is kind of overloaded/ambiguous)
>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>> 79. One possible limitation with the current persistence
>> model
>>>> is
>>>>>>>>>> that
>>>>>>>>>>>>>> all
>>>>>>>>>>>>>>> the share state is stored in one topic. It seems like we are
>>>>>> going
>>>>>>>> to
>>>>>>>>>>>> be
>>>>>>>>>>>>>>> storing a lot more state than we do in __consumer_offsets
>> since
>>>>>>>> we're
>>>>>>>>>>>>>>> dealing with message-level acks. With aggressive
>> checkpointing
>>>>>> and
>>>>>>>>>>>>>>> compaction, we can mitigate the storage requirements, but the
>>>>>>>>>>>> throughput
>>>>>>>>>>>>>>> could be a limiting factor. Have we considered other
>>>>>> possibilities
>>>>>>>>>> for
>>>>>>>>>>>>>>> persistence?
>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>> Cheers,
>>>>>>>>>>>>>>> David
>>>>>>>>>>>>>> 
>>>>>>>>>>>>>> 
>>>>>>>>>>>> 
>>>>>>>>>>>> 
>>>>>>>>>> 
>>>>>>>>>> 
>>>>>>>> 
>>>>>>>> 
>>>>>> 
>>>>>> 
>>>> 
>>>> 
>> 
>> 

Reply via email to