Hi, Andrew, Thanks for addressing all the comments. The KIP looks good to me now.
Jun On Mon, May 6, 2024 at 2:15 PM Andrew Schofield <andrew_schofi...@live.com> wrote: > Hi Jun, > I have removed AdminClient.listGroups and the associated classes and > interfaces. > > Version 6 of the ListGroups RPC remains because it adds support for share > groups. > This is needed to list share groups for the admin client and the > command-line > tools. > > Thanks, > Andrew > > > On 6 May 2024, at 19:26, Jun Rao <j...@confluent.io.INVALID> wrote: > > > > Hi, Andrew, > > > > Removing AdminClient.listGroups() and the LisGroups RPC for now sounds > good > > to me. > > > > Thanks, > > > > Jun > > > > On Mon, May 6, 2024 at 11:10 AM Andrew Schofield < > andrew_schofi...@live.com> > > wrote: > > > >> Hi Jun, > >> Thanks for the reply. > >> > >> 162. I’ve mentioned before that I plan another KIP for administration > >> of groups. I think this is heading into that territory. I would like > >> listGroups() to do a comprehensive job of returning all of the groups, > >> and that does include groups which aren’t currently covered by > GroupType. > >> There’s a single namespace for all groups, and if someone is to make > sense > >> of the groups in a cluster, I think they need to be able to see them > all. > >> This is really just putting an API on top of the ListGroups RPC that > >> already > >> exists. > >> > >> I don’t think it would be desirable for connect-based groups to > >> have GroupType.UNKNOWN. There are other custom group types. > >> This all needs sorting out, I think. > >> > >> I propose to remove AdminClient.listGroups() from this KIP, and put > >> it in the administration KIP. > >> > >> Let me know what you think. > >> > >> Thanks, > >> Andrew > >> > >> > >>> On 6 May 2024, at 18:04, Jun Rao <j...@confluent.io.INVALID> wrote: > >>> > >>> Hi, Andrew, > >>> > >>> Thanks for the reply. > >>> > >>> 162. It's fine to start with just the group type. Since ListGroups() > is a > >>> generic API, I want to make sure that it covers all existing groups. > >>> Currently, GroupType only has "classic" and "consumer", both of which > >> seem > >>> to be related to groups formed by consumers since it's part of > >>> ConsumerGroupDescription. Does ListGroup() return connect based groups > >> and > >>> if so, what's the GroupType? If ListGroup() doesn't cover all groups, > >>> should we name it more accurately? > >>> > >>> Jun > >>> > >>> > >>> On Fri, May 3, 2024 at 7:51 PM Andrew Schofield < > >> andrew_schofi...@live.com> > >>> wrote: > >>> > >>>> Hi Jun, > >>>> Thanks for your reply. > >>>> > >>>> 161. ShareGroupListing and ShareGroupDescription are using > >>>> the same pattern as ConsumerGroupListing and > >>>> ConsumerGroupDescription. I have gone for consistency which > >>>> I think is probably best here. It’s what I would expect if I had > >> previously > >>>> used the admin API for consumer groups and was looking to use it for > >>>> share groups. I agree it’s a bit weird. > >>>> > >>>> 162. GroupListing contains the only information which is properly > >>>> in common between a ConsumerGroupListing and a ShareGroupListing. > >>>> ListGroupsResponse.ProtocolType is interpreted to provide the > >>>> group type. I know that the ListGroups RPC also includes the group > >>>> state, but that’s as a string and there’s no common enum for the > states > >>>> of all types of group. As a result, I have exposed group type but not > >>>> state on this API. > >>>> > >>>> Previously in the discussion for this KIP, I mentioned that I would > >>>> create another KIP for the administration of groups, in particular > >>>> how the administrator can ensure that particular group IDs > >>>> are used for the group type they desire. At the moment, I think > >>>> keeping ListGroups in this KIP is a good idea. If we actually want > >>>> to make it more sophisticated, perhaps that would be better with > >>>> the group administration KIP. > >>>> > >>>> 163. It will be one higher than the latest version at the time we are > >>>> ready to deliver this feature for real. When we are on the cusp of > >>>> delivery, I’ll update the KIP with the final value. > >>>> > >>>> 164. KRaft only. All the RPCs are “broker” only. None of the code will > >>>> be merged until after 3.8 has branched. > >>>> > >>>> Thanks, > >>>> Andrew > >>>> > >>>>> On 4 May 2024, at 00:12, Jun Rao <j...@confluent.io.INVALID> wrote: > >>>>> > >>>>> Hi, Andrew, > >>>>> > >>>>> Thanks for the reply. A few more comments. > >>>>> > >>>>> 161. ShareGroupListing.state() returns an optional, but > >>>>> ShareGroupDescription.state() does not. Should we make them > consistent? > >>>>> Also, it seems a bit weird to return optional with an UNKNOWN state. > >>>>> > >>>>> 162. Should GroupListing include ProtocolType and GroupState too? > >>>>> > >>>>> 163. What is the value of group.version to gate the queuing feature? > >>>>> > >>>>> 164. Is the queueing feature only supported on KRaft clusters? For > >>>> example, > >>>>> the feature tool seems to be built only for the KRaft cluster. > >>>>> > >>>>> Jun > >>>>> > >>>>> On Fri, May 3, 2024 at 10:32 AM Andrew Schofield < > >>>> andrew_schofi...@live.com> > >>>>> wrote: > >>>>> > >>>>>> Hi Jun, > >>>>>> Thanks for your reply. > >>>>>> > >>>>>> 147. Yes, I see what you mean. The rebalance latency will indeed > >>>>>> be very short by comparison. I have removed the rebalance latency > >>>>>> metrics from the client and retained the rebalance count and rate. > >>>>>> > >>>>>> 150. Yes, I think so. I have tweaked the text so that the simple > >>>>>> assignor will take into account existing assignment information when > >>>>>> it has it, which would just minimise unnecessary churn of (b). > >>>>>> > >>>>>> 158. I’ve changed it to ReadShareGroupStateSummary. > >>>>>> > >>>>>> Thanks, > >>>>>> Andrew > >>>>>> > >>>>>> > >>>>>>> On 3 May 2024, at 22:17, Jun Rao <j...@confluent.io.INVALID> wrote: > >>>>>>> > >>>>>>> Hi, Andrew, > >>>>>>> > >>>>>>> Thanks for the reply. > >>>>>>> > >>>>>>> 147. There seems to be some difference between consumer groups and > >>>> share > >>>>>>> groups. In the consumer groups, if a client receives a heartbeat > >>>> response > >>>>>>> to revoke some partitions, it may have to commit offsets before > >>>> revoking > >>>>>>> partitions or it may have to call the rebalance callbacks provided > by > >>>> the > >>>>>>> user. This may take some time and can be reflected in the rebalance > >>>> time > >>>>>>> metric. In the share groups, none of that exists. If a client > >> receives > >>>>>> some > >>>>>>> added/revoked partitions, it accepts them immediately, right? So, > >> does > >>>>>> that > >>>>>>> practically make the rebalance time always 0? > >>>>>>> > >>>>>>> 150. I guess in the common case, there will be many more members > than > >>>>>>> partitions. So the need for (b) will be less common. We can > probably > >>>>>> leave > >>>>>>> the persisting of the assignment out for now. > >>>>>>> > >>>>>>> 158. The new name sounds good to me. > >>>>>>> > >>>>>>> Jun > >>>>>>> > >>>>>>> On Thu, May 2, 2024 at 10:21 PM Andrew Schofield < > >>>>>> andrew_schofi...@live.com> > >>>>>>> wrote: > >>>>>>> > >>>>>>>> Hi Jun, > >>>>>>>> Thanks for the response. > >>>>>>>> > >>>>>>>> 147. I am trying to get a correspondence between the concepts and > >>>>>>>> metrics of consumer groups and share groups. In both cases, > >>>>>>>> the client doesn’t strictly know when the rebalance starts. All it > >>>> knows > >>>>>>>> is when it has work to do in order to perform its part of a > >> rebalance. > >>>>>>>> I am proposing that share groups and consumer groups use > >>>>>>>> equivalent logic. > >>>>>>>> > >>>>>>>> I could remove the rebalance metrics from the client because I do > >>>>>>>> understand that they are making a judgement about when a rebalance > >>>>>>>> starts, but it’s their own part of the rebalance they are > measuring. > >>>>>>>> > >>>>>>>> I tend to think these metrics are better than no metrics and > >>>>>>>> will at least enable administrators to see how much rebalance > >>>>>>>> activity the members of share groups are experiencing. > >>>>>>>> > >>>>>>>> 150. The simple assignor does not take existing assignments into > >>>>>>>> consideration. The ShareGroupPartitionAssignor interface would > >>>>>>>> permit this, but the simple assignor does not currently use it. > >>>>>>>> > >>>>>>>> The simple assignor assigns partitions in two ways: > >>>>>>>> a) Distribute the members across the partitions by hashed member > ID. > >>>>>>>> b) If any partitions have no members assigned, distribute the > >> members > >>>>>>>> across these partitions round-robin. > >>>>>>>> > >>>>>>>> The (a) partitions will be quite stable. The (b) partitions will > be > >>>> less > >>>>>>>> stable. By using existing assignment information, it could make > (b) > >>>>>>>> partition assignment more stable, whether the assignments are > >>>>>>>> persisted or not. Perhaps it would be worth changing the simple > >>>>>>>> assignor in order to make (b) more stable. > >>>>>>>> > >>>>>>>> I envisage more sophisticated assignors in the future which could > >> use > >>>>>>>> existing assignments and also other dynamic factors such as lag. > >>>>>>>> > >>>>>>>> If it transpires that there is significant benefit in persisting > >>>>>>>> assignments > >>>>>>>> specifically to help smooth assignment in the event of GC change, > >>>>>>>> it would be quite an easy enhancement. I am not inclined to > persist > >>>>>>>> the assignments in this KIP. > >>>>>>>> > >>>>>>>> 158. Ah, yes. I see. Of course, I want the names as consistent and > >>>>>>>> understandable too. I suggest renaming > >>>>>>>> ReadShareGroupOffsetsState to ReadShareGroupStateSummary. > >>>>>>>> I haven’t changed the KIP yet, so let me know if that’s OK. > >>>>>>>> > >>>>>>>> Thanks, > >>>>>>>> Andrew > >>>>>>>> > >>>>>>>>> On 2 May 2024, at 22:18, Jun Rao <j...@confluent.io.INVALID> > wrote: > >>>>>>>>> > >>>>>>>>> Hi, Andrew, > >>>>>>>>> > >>>>>>>>> Thanks for the reply. > >>>>>>>>> > >>>>>>>>> 147. " it makes a judgement about whether an assignment received > is > >>>>>> equal > >>>>>>>>> to what it already is using." > >>>>>>>>> If a client receives an assignment different from what it has, it > >>>>>>>> indicates > >>>>>>>>> the end of the rebalance. But how does the client know when the > >>>>>> rebalance > >>>>>>>>> starts? In the shareHeartbeat design, the new group epoch is > >>>> propagated > >>>>>>>>> together with the new assignment in the response. > >>>>>>>>> > >>>>>>>>> 150. It could be a potential concern if each GC change forces > >>>>>> significant > >>>>>>>>> assignment changes. Does the default assignor take existing > >>>> assignments > >>>>>>>>> into consideration? > >>>>>>>>> > >>>>>>>>> 155. Good point. Sounds good. > >>>>>>>>> > >>>>>>>>> 158. My concern with the current naming is that it's not clear > what > >>>> the > >>>>>>>>> difference is between ReadShareGroupOffsetsState and > >>>>>> ReadShareGroupState. > >>>>>>>>> The state in the latter is also offsets. > >>>>>>>>> > >>>>>>>>> Jun > >>>>>>>>> > >>>>>>>>> On Wed, May 1, 2024 at 9:51 PM Andrew Schofield < > >>>>>>>> andrew_schofi...@live.com> > >>>>>>>>> wrote: > >>>>>>>>> > >>>>>>>>>> Hi Jun, > >>>>>>>>>> Thanks for your reply. > >>>>>>>>>> > >>>>>>>>>> 147. Perhaps the easiest is to take a look at the code in > >>>>>>>>>> o.a.k.clients.consumer.internal.MembershipManagerImpl. > >>>>>>>>>> This class is part of the new consumer group protocol > >>>>>>>>>> code in the client. It makes state transitions based on > >>>>>>>>>> the heartbeat requests and responses, and it makes a > >>>>>>>>>> judgement about whether an assignment received is > >>>>>>>>>> equal to what it already is using. When a state transition > >>>>>>>>>> is deemed to be the beginning or end of a rebalance > >>>>>>>>>> from the point of view of this client, it counts towards the > >>>>>>>>>> rebalance metrics. > >>>>>>>>>> > >>>>>>>>>> Share groups will follow the same path. > >>>>>>>>>> > >>>>>>>>>> 150. I do not consider it a concern. Rebalancing a share group > >>>>>>>>>> is less disruptive than rebalancing a consumer group. If the > >>>> assignor > >>>>>>>>>> Has information about existing assignments, it can use it. It is > >>>>>>>>>> true that this information cannot be replayed from a topic and > >> will > >>>>>>>>>> sometimes be unknown as a result. > >>>>>>>>>> > >>>>>>>>>> 151. I don’t want to rename TopicPartitionsMetadata to > >>>>>>>>>> simply TopicPartitions (it’s information about the partitions of > >>>>>>>>>> a topic) because we then have an array of plurals. > >>>>>>>>>> I’ve renamed Metadata to Info. That’s a bit less cumbersome. > >>>>>>>>>> > >>>>>>>>>> 152. Fixed. > >>>>>>>>>> > >>>>>>>>>> 153. It’s the GC. Fixed. > >>>>>>>>>> > >>>>>>>>>> 154. The UNKNOWN “state” is essentially a default for situations > >>>> where > >>>>>>>>>> the code cannot understand data it received. For example, let’s > >> say > >>>>>> that > >>>>>>>>>> Kafka 4.0 has groups with states EMPTY, STABLE, DEAD. If Kafka > 4.1 > >>>>>>>>>> introduced another state THINKING, a tool built with Kafka 4.0 > >> would > >>>>>> not > >>>>>>>>>> know what THINKING meant. It will use “UNKNOWN” to indicate that > >> the > >>>>>>>>>> state was something that it could not understand. > >>>>>>>>>> > >>>>>>>>>> 155. No, it’s a the level of the share-partition. If the offsets > >> for > >>>>>>>> just > >>>>>>>>>> one share-partition is reset, only the state epoch for that > >>>> partition > >>>>>> is > >>>>>>>>>> updated. > >>>>>>>>>> > >>>>>>>>>> 156. Strictly speaking, it’s redundant. I think having the > >>>> StartOffset > >>>>>>>>>> separate gives helpful clarity and I prefer to retain it. > >>>>>>>>>> > >>>>>>>>>> 157. Yes, you are right. There’s no reason why a leader change > >> needs > >>>>>>>>>> to force a ShareSnapshot. I’ve added leaderEpoch to the > >> ShareUpdate. > >>>>>>>>>> > >>>>>>>>>> 158. Although ReadShareGroupOffsetsState is a bit of a mouthful, > >>>>>>>>>> having “State” in the name makes it clear that this one the > family > >>>> of > >>>>>>>>>> inter-broker RPCs served by the share coordinator. The admin > RPCs > >>>>>>>>>> such as DescribeShareGroupOffsets do not include “State”. > >>>>>>>>>> > >>>>>>>>>> 159. Fixed. > >>>>>>>>>> > >>>>>>>>>> 160. Fixed. > >>>>>>>>>> > >>>>>>>>>> Thanks, > >>>>>>>>>> Andrew > >>>>>>>>>> > >>>>>>>>>>> On 2 May 2024, at 00:29, Jun Rao <j...@confluent.io.INVALID> > >> wrote: > >>>>>>>>>>> > >>>>>>>>>>> Hi, Andrew, > >>>>>>>>>>> > >>>>>>>>>>> Thanks for the reply. > >>>>>>>>>>> > >>>>>>>>>>> 147. "The measurement is certainly from the point of view of > the > >>>>>>>> client, > >>>>>>>>>>> but it’s driven by sending and receiving heartbeats rather than > >>>>>> whether > >>>>>>>>>> the > >>>>>>>>>>> client triggered the rebalance itself." > >>>>>>>>>>> Hmm, how does a client know which heartbeat response starts a > >>>>>>>> rebalance? > >>>>>>>>>>> > >>>>>>>>>>> 150. PartitionAssignor takes existing assignments into > >>>> consideration. > >>>>>>>>>> Since > >>>>>>>>>>> GC doesn't persist the assignment for share groups, it means > that > >>>>>>>>>>> ShareGroupPartitionAssignor can't reliably depend on existing > >>>>>>>>>> assignments. > >>>>>>>>>>> Is that a concern? > >>>>>>>>>>> > >>>>>>>>>>> 151. ShareGroupPartitionMetadataValue: Should we rename > >>>>>>>>>>> TopicPartitionsMetadata and TopicMetadata since there is no > >>>> metadata? > >>>>>>>>>>> > >>>>>>>>>>> 152. ShareGroupMetadataKey: "versions": "3" > >>>>>>>>>>> The versions should be 11. > >>>>>>>>>>> > >>>>>>>>>>> 153. ShareGroupDescription.coordinator(): The description says > >> "The > >>>>>>>> share > >>>>>>>>>>> group coordinator". Is that the GC or SC? > >>>>>>>>>>> > >>>>>>>>>>> 154. "A share group has only three states - EMPTY , STABLE and > >>>> DEAD". > >>>>>>>>>>> What about UNKNOWN? > >>>>>>>>>>> > >>>>>>>>>>> 155. WriteShareGroupState: StateEpoch is at the group level, > not > >>>>>>>>>> partition > >>>>>>>>>>> level, right? > >>>>>>>>>>> > >>>>>>>>>>> 156. ShareSnapshotValue: Is StartOffset redundant since it's > the > >>>> same > >>>>>>>> as > >>>>>>>>>>> the smallest FirstOffset in StateBatches? > >>>>>>>>>>> > >>>>>>>>>>> 157. Every leader change forces a ShareSnapshotValue write to > >>>> persist > >>>>>>>> the > >>>>>>>>>>> new leader epoch. Is that a concern? An alternative is to > include > >>>>>>>>>>> leaderEpoch in ShareUpdateValue. > >>>>>>>>>>> > >>>>>>>>>>> 158. ReadShareGroupOffsetsState: The state is the offsets. > Should > >>>> we > >>>>>>>>>> rename > >>>>>>>>>>> it to something like ReadShareGroupStartOffset? > >>>>>>>>>>> > >>>>>>>>>>> 159. members are assigned members round-robin => members are > >>>> assigned > >>>>>>>>>>> round-robin > >>>>>>>>>>> > >>>>>>>>>>> 160. "may called": typo > >>>>>>>>>>> > >>>>>>>>>>> Jun > >>>>>>>>>>> > >>>>>>>>>>> On Mon, Apr 29, 2024 at 10:11 AM Andrew Schofield < > >>>>>>>>>> andrew_schofi...@live.com> > >>>>>>>>>>> wrote: > >>>>>>>>>>> > >>>>>>>>>>>> Hi Jun, > >>>>>>>>>>>> Thanks for the reply and sorry for the delay in responding. > >>>>>>>>>>>> > >>>>>>>>>>>> 123. Yes, I didn’t quite get your point earlier. The member > >>>>>>>>>>>> epoch is bumped by the GC when it sends a new assignment. > >>>>>>>>>>>> When the member sends its next heartbeat, it echoes back > >>>>>>>>>>>> the member epoch, which will confirm the receipt of the > >>>>>>>>>>>> assignment. It would send the same member epoch even > >>>>>>>>>>>> after recovery of a network disconnection, so that should > >>>>>>>>>>>> be sufficient to cope with this eventuality. > >>>>>>>>>>>> > >>>>>>>>>>>> 125. Yes, I have added it to the table which now matches > >>>>>>>>>>>> the text earlier in the KIP. Thanks. > >>>>>>>>>>>> > >>>>>>>>>>>> 140. Yes, I have added it to the table which now matches > >>>>>>>>>>>> the text earlier in the KIP. I’ve also added more detail for > >>>>>>>>>>>> the case where the entire share group is being deleted. > >>>>>>>>>>>> > >>>>>>>>>>>> 141. Yes! Sorry for confusing things. > >>>>>>>>>>>> > >>>>>>>>>>>> Back to the original question for this point. To delete a > share > >>>>>>>>>>>> group, should the GC write a tombstone for each > >>>>>>>>>>>> ShareGroupMemberMetadata record? > >>>>>>>>>>>> > >>>>>>>>>>>> Tombstones are necessary to delete ShareGroupMemberMetadata > >>>>>>>>>>>> records. But, deletion of a share group is only possible when > >>>>>>>>>>>> the group is already empty, so the tombstones will have > >>>>>>>>>>>> been written as a result of the members leaving the group. > >>>>>>>>>>>> > >>>>>>>>>>>> 143. Yes, that’s right. > >>>>>>>>>>>> > >>>>>>>>>>>> 147. The measurement is certainly from the point of view > >>>>>>>>>>>> of the client, but it’s driven by sending and receiving > >> heartbeats > >>>>>>>>>>>> rather than whether the client triggered the rebalance itself. > >>>>>>>>>>>> The client decides when it enters and leaves reconciliation > >>>>>>>>>>>> of the assignment, and measures this period. > >>>>>>>>>>>> > >>>>>>>>>>>> > >>>>>>>>>>>> Thanks, > >>>>>>>>>>>> Andrew > >>>>>>>>>>>> > >>>>>>>>>>>> > >>>>>>>>>>>>> On 26 Apr 2024, at 09:43, Jun Rao <j...@confluent.io.INVALID> > >>>>>> wrote: > >>>>>>>>>>>>> > >>>>>>>>>>>>> Hi, Andrew, > >>>>>>>>>>>>> > >>>>>>>>>>>>> Thanks for the reply. > >>>>>>>>>>>>> > >>>>>>>>>>>>> 123. "Rather than add the group epoch to the > >>>> ShareGroupHeartbeat, I > >>>>>>>>>> have > >>>>>>>>>>>>> decided to go for TopicPartitions in > ShareGroupHeartbeatRequest > >>>>>> which > >>>>>>>>>>>>> mirrors ConsumerGroupHeartbeatRequest." > >>>>>>>>>>>>> ShareGroupHeartbeat.MemberEpoch is the group epoch, right? Is > >>>> that > >>>>>>>>>> enough > >>>>>>>>>>>>> for confirming the receipt of the new assignment? > >>>>>>>>>>>>> > >>>>>>>>>>>>> 125. This also means that "Alter share group offsets" needs > to > >>>>>> write > >>>>>>>> a > >>>>>>>>>>>>> ShareGroupPartitionMetadata record, if the partition is not > >>>> already > >>>>>>>>>>>>> initialized. > >>>>>>>>>>>>> > >>>>>>>>>>>>> 140. In the table for "Delete share group offsets", we need > to > >>>> add > >>>>>> a > >>>>>>>>>> step > >>>>>>>>>>>>> to write a ShareGroupPartitionMetadata record with > >>>> DeletingTopics. > >>>>>>>>>>>>> > >>>>>>>>>>>>> 141. Hmm, ShareGroupMemberMetadata is stored in the > >>>>>>>> __consumer_offsets > >>>>>>>>>>>>> topic, which is a compacted topic, right? > >>>>>>>>>>>>> > >>>>>>>>>>>>> 143. So, the client sends DescribeShareGroupOffsets requests > to > >>>> GC, > >>>>>>>>>> which > >>>>>>>>>>>>> then forwards it to SC? > >>>>>>>>>>>>> > >>>>>>>>>>>>> 147. I guess a client only knows the rebalance triggered by > >>>> itself, > >>>>>>>> but > >>>>>>>>>>>> not > >>>>>>>>>>>>> the ones triggered by other members or topic/partition > changes? > >>>>>>>>>>>>> > >>>>>>>>>>>>> Jun > >>>>>>>>>>>>> > >>>>>>>>>>>>> On Thu, Apr 25, 2024 at 4:19 AM Andrew Schofield < > >>>>>>>>>>>> andrew_schofi...@live.com> > >>>>>>>>>>>>> wrote: > >>>>>>>>>>>>> > >>>>>>>>>>>>>> Hi Jun, > >>>>>>>>>>>>>> Thanks for the response. > >>>>>>>>>>>>>> > >>>>>>>>>>>>>> 123. Of course, ShareGroupHearbeat started off as > >>>>>>>>>> ConsumerGroupHeartbeat > >>>>>>>>>>>>>> and then unnecessary fields were removed. In the network > issue > >>>>>> case, > >>>>>>>>>>>>>> there is not currently enough state being exchanged to be > sure > >>>> an > >>>>>>>>>>>>>> assignment > >>>>>>>>>>>>>> was received. > >>>>>>>>>>>>>> > >>>>>>>>>>>>>> Rather than add the group epoch to the ShareGroupHeartbeat, > I > >>>> have > >>>>>>>>>>>> decided > >>>>>>>>>>>>>> to go for TopicPartitions in ShareGroupHeartbeatRequest > which > >>>>>>>> mirrors > >>>>>>>>>>>>>> ConsumerGroupHeartbeatRequest. It means the share group > member > >>>>>> does > >>>>>>>>>>>>>> confirm the assignment it is using, and that can be used by > >> the > >>>> GC > >>>>>>>> to > >>>>>>>>>>>>>> safely > >>>>>>>>>>>>>> stop repeating the assignment in heartbeat responses. > >>>>>>>>>>>>>> > >>>>>>>>>>>>>> 125. Ah, yes. This is indeed something possible with a > >> consumer > >>>>>>>> group > >>>>>>>>>>>>>> and share groups should support it too. This does of course > >>>> imply > >>>>>>>> that > >>>>>>>>>>>>>> ShareGroupPartitionMetadataValue needs an array of > partitions, > >>>> not > >>>>>>>>>>>>>> just the number. > >>>>>>>>>>>>>> > >>>>>>>>>>>>>> 140. Yes, good spot. There is an inconsistency here in > >> consumer > >>>>>>>> groups > >>>>>>>>>>>>>> where you can use AdminClient.deleteConsumerGroupOffsets at > >> the > >>>>>>>>>>>>>> partition level, but kafka-consumer-groups.sh --delete only > >>>>>> operates > >>>>>>>>>>>>>> at the topic level. > >>>>>>>>>>>>>> > >>>>>>>>>>>>>> Personally, I don’t think it’s sensible to delete offsets at > >> the > >>>>>>>>>>>> partition > >>>>>>>>>>>>>> level only. You can reset them, but if you’re actively > using a > >>>>>> topic > >>>>>>>>>>>> with > >>>>>>>>>>>>>> a share group, I don’t see why you’d want to delete offsets > >>>> rather > >>>>>>>>>> than > >>>>>>>>>>>>>> reset. If you’ve finished using a topic with a share group > and > >>>>>> want > >>>>>>>> to > >>>>>>>>>>>>>> clean > >>>>>>>>>>>>>> up, use delete. > >>>>>>>>>>>>>> > >>>>>>>>>>>>>> So, I’ve changed the AdminClient.deleteConsumerGroupOffsets > to > >>>> be > >>>>>>>>>>>>>> topic-based and the RPCs behind it. > >>>>>>>>>>>>>> > >>>>>>>>>>>>>> The GC reconciles the cluster state with the > >>>>>>>>>> ShareGroupPartitionMetadata > >>>>>>>>>>>>>> to spot deletion of topics and the like. However, when the > >>>> offsets > >>>>>>>> for > >>>>>>>>>>>>>> a topic were deleted manually, the topic very like still > >> exists > >>>> so > >>>>>>>>>>>>>> reconciliation > >>>>>>>>>>>>>> alone is not going to be able to continue an interrupted > >>>> operation > >>>>>>>>>> that > >>>>>>>>>>>>>> has started. So, I’ve added DeletingTopics back into > >>>>>>>>>>>>>> ShareGroupPartitionMetadata for this purpose. It’s so > failover > >>>> of > >>>>>> a > >>>>>>>> GC > >>>>>>>>>>>>>> can continue where it left off rather than leaving fragments > >>>>>> across > >>>>>>>>>> the > >>>>>>>>>>>>>> SCs. > >>>>>>>>>>>>>> > >>>>>>>>>>>>>> 141. That is not required. Because this is not a compacted > >>>> topic, > >>>>>> it > >>>>>>>>>> is > >>>>>>>>>>>>>> not necessary to write tombstones for every key. As long as > >>>> there > >>>>>>>> is a > >>>>>>>>>>>>>> clear and unambiguous record for the deletion of the group, > >> that > >>>>>> is > >>>>>>>>>>>> enough. > >>>>>>>>>>>>>> The tombstone for ShareGroupPartitionMetadata is > theoretically > >>>> not > >>>>>>>>>>>>>> required but it’s a single record, rather than one per > member, > >>>> so > >>>>>> I > >>>>>>>>>>>> prefer > >>>>>>>>>>>>>> to leave it as a record that the interactions with the SC > have > >>>>>> been > >>>>>>>>>>>>>> completed. > >>>>>>>>>>>>>> > >>>>>>>>>>>>>> 142. > >>>>>>>>>>>>>> 142.1. It will prompt the user to confirm they want to > >> continue. > >>>>>>>>>>>>>> This is in common with `kafka-consumer-groups.sh` which > >>>>>> historically > >>>>>>>>>>>>>> has defaulted to --dry-run behaviour, but is due to change > to > >>>>>>>>>> prompting > >>>>>>>>>>>>>> if neither --dry-run nor --execute is specified “in a future > >>>> major > >>>>>>>>>>>>>> release”. > >>>>>>>>>>>>>> > >>>>>>>>>>>>>> 142.2. It should support partition-level reset, but only > >>>>>> topic-level > >>>>>>>>>>>>>> delete. > >>>>>>>>>>>>>> I have updated the usage text accordingly. This is in common > >>>> with > >>>>>>>>>>>>>> kafka-consumer-groups.sh. > >>>>>>>>>>>>>> > >>>>>>>>>>>>>> 142.3. --dry-run displays the operation that would be > >> executed. > >>>>>>>>>>>>>> > >>>>>>>>>>>>>> 142.4. The valid values are: Dead, Empty, Stable. Added to > the > >>>>>>>>>>>>>> usage text. > >>>>>>>>>>>>>> > >>>>>>>>>>>>>> 143. DescribeShareGroupOffsets is served by the group > >>>> coordinator > >>>>>>>>>>>>>> for this kind of reason. > >>>>>>>>>>>>>> > >>>>>>>>>>>>>> 144. That’s the default. If you haven’t asked to release or > >>>>>> reject, > >>>>>>>> it > >>>>>>>>>>>>>> accepts. > >>>>>>>>>>>>>> This is analogous to fetching and committing offsets in a > >>>> consumer > >>>>>>>>>>>> group. > >>>>>>>>>>>>>> > >>>>>>>>>>>>>> 145. I think this is a good idea, but I would prefer to > defer > >> it > >>>>>>>>>> until a > >>>>>>>>>>>>>> future > >>>>>>>>>>>>>> metrics KIP that I have planned. In KIP-932, I have added > >> basic > >>>>>>>>>> metrics > >>>>>>>>>>>>>> only. > >>>>>>>>>>>>>> For example, you’ll see that there’s no concept of lag yet, > >>>> which > >>>>>>>>>> surely > >>>>>>>>>>>>>> will have relevance for share groups. I plan to create and > >>>> deliver > >>>>>>>> the > >>>>>>>>>>>>>> metrics KIP before share groups are declared ready for > >>>> production. > >>>>>>>>>>>>>> I want the new metrics to be developed with the experience > of > >>>>>>>> running > >>>>>>>>>>>>>> the code. > >>>>>>>>>>>>>> > >>>>>>>>>>>>>> 146. Milliseconds. KIP updated. > >>>>>>>>>>>>>> > >>>>>>>>>>>>>> 147. There is a membership state machine in the client that > >>>>>>>>>>>>>> changes states as the ShareGroupHeartbeat requests and > >> responses > >>>>>>>>>>>>>> flow. The duration of a rebalance will be shorter from the > >> point > >>>>>> of > >>>>>>>>>>>>>> view of the share-group consumer because it doesn’t have to > >>>> worry > >>>>>>>>>> about > >>>>>>>>>>>>>> rebalance callbacks and committing offsets as the partitions > >>>> move > >>>>>>>>>>>>>> around, but the overall flow is very similar. So, it’s the > >> state > >>>>>>>>>>>>>> transitions > >>>>>>>>>>>>>> that drive the collection of the rebalance metrics. > >>>>>>>>>>>>>> > >>>>>>>>>>>>>> 148. Strangely, none of the existing uses of > >>>>>> records-per-request-avg > >>>>>>>>>>>>>> actually have records-per-request-max. I tend to err on the > >> side > >>>>>> of > >>>>>>>>>>>>>> consistency, but can’t think of any reason not to add this. > >>>> Done. > >>>>>>>>>>>>>> > >>>>>>>>>>>>>> 149. There are several error codes for > >>>>>> WriteShareGroupStateResponse: > >>>>>>>>>>>>>> > >>>>>>>>>>>>>> NOT_COORDINATOR - This is not the share coordinator you’re > >>>> looking > >>>>>>>>>> for. > >>>>>>>>>>>>>> COORDINATOR_NOT_AVAILABLE - The SC can’t > >>>>>>>>>>>>>> COORDINATOR_LOAD_IN_PROGRESS - The SC is replaying the > topic. > >>>>>>>>>>>>>> GROUP_ID_NOT_FOUND - The SC doesn’t have state for this > group. > >>>>>>>>>>>>>> UNKNOWN_TOPIC_OR_PARTITION - The SC doesn’t have state for > >> this > >>>>>>>>>>>>>> topic-partition. > >>>>>>>>>>>>>> FENCED_STATE_EPOCH - The write has the wrong state epoch. > >>>>>>>>>>>>>> INVALID_REQUEST - There was a problem with the request. > >>>>>>>>>>>>>> > >>>>>>>>>>>>>> > >>>>>>>>>>>>>> Thanks, > >>>>>>>>>>>>>> Andrew > >>>>>>>>>>>>>> > >>>>>>>>>>>>>> > >>>>>>>>>>>>>>> On 24 Apr 2024, at 19:10, Jun Rao <j...@confluent.io.INVALID > > > >>>>>>>> wrote: > >>>>>>>>>>>>>>> > >>>>>>>>>>>>>>> Hi, Andrew, > >>>>>>>>>>>>>>> > >>>>>>>>>>>>>>> Thanks for the response. > >>>>>>>>>>>>>>> > >>>>>>>>>>>>>>> 123. I thought the network issue can be covered with the > >> group > >>>>>>>> epoch. > >>>>>>>>>>>>>>> Basically, if the assignment is to be changed, GC bumps up > >> the > >>>>>>>> epoch > >>>>>>>>>>>>>> first, > >>>>>>>>>>>>>>> but doesn't expose the new epoch to members until the > >>>> assignment > >>>>>> is > >>>>>>>>>>>>>>> complete (including initializing the sharePartitionState). > >> Once > >>>>>> the > >>>>>>>>>>>>>>> assignment is complete, GC includes the bumped up epoch and > >> the > >>>>>> new > >>>>>>>>>>>>>>> assignment in the heartbeatResponse. If the next heartbeat > >>>>>> Request > >>>>>>>>>>>>>> includes > >>>>>>>>>>>>>>> the new epoch, it means that the member has received the > new > >>>>>>>>>> assignment > >>>>>>>>>>>>>> and > >>>>>>>>>>>>>>> GC can exclude the assignment in the heartbeatResponse. > >>>>>>>>>>>>>>> > >>>>>>>>>>>>>>> 125. "If AlterShareGroupOffsets is called for a > >> topic-partition > >>>>>>>> which > >>>>>>>>>>>> is > >>>>>>>>>>>>>>> not yet in ShareGroupPartitionMetadataValue, it’s not > really > >>>> part > >>>>>>>> of > >>>>>>>>>>>> the > >>>>>>>>>>>>>>> group yet. So I think it’s permitted to fail the > >>>>>>>>>> AlterShareGroupOffsets > >>>>>>>>>>>>>>> because the topic-partition would not be part of > >>>>>>>>>> ListShareGroupOffsets > >>>>>>>>>>>>>>> result." > >>>>>>>>>>>>>>> A user may want to initialize SPSO (e.g. based on > timestamp) > >>>>>> before > >>>>>>>>>> the > >>>>>>>>>>>>>>> application is first used. If we reject > >> AlterShareGroupOffsets > >>>>>> when > >>>>>>>>>>>>>>> ShareGroupPartitionMetadataValue is empty, the user will be > >>>>>> forced > >>>>>>>> to > >>>>>>>>>>>>>> start > >>>>>>>>>>>>>>> the application with the wrong SPSO first and then reset > it, > >>>>>> which > >>>>>>>>>> will > >>>>>>>>>>>>>> be > >>>>>>>>>>>>>>> inconvenient. > >>>>>>>>>>>>>>> > >>>>>>>>>>>>>>> 140. AdminClient.deleteShareGroupOffsets allows deletion of > >>>> SPSO > >>>>>>>> for > >>>>>>>>>>>>>>> individual partitions, but ShareGroupPartitionMetadataValue > >>>> only > >>>>>>>>>> tracks > >>>>>>>>>>>>>>> consecutive partitions. > >>>>>>>>>>>>>>> > >>>>>>>>>>>>>>> 141. Delete share group: Should GC also write a tombstone > for > >>>>>> each > >>>>>>>>>>>>>>> ShareGroupMemberMetadata record? > >>>>>>>>>>>>>>> > >>>>>>>>>>>>>>> 142. kafka-share-groups.sh > >>>>>>>>>>>>>>> 142.1 What happens if neither --dry-run nor --execute is > >>>>>> specified > >>>>>>>>>> for > >>>>>>>>>>>>>>> reset-offsets? > >>>>>>>>>>>>>>> 142.2 Should it support --delete-offsets and > --reset-offsets > >> at > >>>>>> the > >>>>>>>>>>>>>>> partition level to match the AdminClient api? > >>>>>>>>>>>>>>> 142.3 How useful is --dry-run? The corresponding RPCs don't > >>>> carry > >>>>>>>> the > >>>>>>>>>>>>>>> dry-run flag. > >>>>>>>>>>>>>>> 142.4 --state [String]: What are the valid state values? > >>>>>>>>>>>>>>> > >>>>>>>>>>>>>>> 143. DescribeShareGroupOffsets is served by the > >> share-partition > >>>>>>>>>> leader. > >>>>>>>>>>>>>> How > >>>>>>>>>>>>>>> could a user describe the share group offset when there is > no > >>>>>>>> member? > >>>>>>>>>>>>>>> > >>>>>>>>>>>>>>> 144. kafka-console-share-consumer.sh: Is there an option to > >>>>>> accept > >>>>>>>>>>>>>> consumed > >>>>>>>>>>>>>>> messages? > >>>>>>>>>>>>>>> > >>>>>>>>>>>>>>> 145. It would be useful to add a broker side metric that > >>>> measures > >>>>>>>> the > >>>>>>>>>>>>>>> pressure from group.share.partition.max.record.locks, e.g. > >> the > >>>>>>>>>> fraction > >>>>>>>>>>>>>> of > >>>>>>>>>>>>>>> the time that SPL is blocked because > >>>>>>>>>>>>>> group.share.partition.max.record.locks > >>>>>>>>>>>>>>> is reached. > >>>>>>>>>>>>>>> > >>>>>>>>>>>>>>> 146. heartbeat-response-time-max: What's the unit? > >>>>>>>>>>>>>>> > >>>>>>>>>>>>>>> 147. rebalance related metrics: How does a share consumer > >> know > >>>>>> when > >>>>>>>>>>>> there > >>>>>>>>>>>>>>> is a rebalance and how does it measure the rebalance time? > >>>>>>>>>>>>>>> > >>>>>>>>>>>>>>> 148. records-per-request-avg: Should we pair it with > >>>>>>>>>>>>>>> records-per-request-max? > >>>>>>>>>>>>>>> > >>>>>>>>>>>>>>> 149. If the shareGroupState is not available, what error > code > >>>> is > >>>>>>>> used > >>>>>>>>>>>> in > >>>>>>>>>>>>>>> WriteShareGroupStateResponse? > >>>>>>>>>>>>>>> > >>>>>>>>>>>>>>> Jun > >>>>>>>>>>>>>>> > >>>>>>>>>>>>>>> On Wed, Apr 24, 2024 at 7:12 AM Andrew Schofield < > >>>>>>>>>>>>>> andrew_schofi...@live.com> > >>>>>>>>>>>>>>> wrote: > >>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>> Hi Jun, > >>>>>>>>>>>>>>>> Thanks for your reply. > >>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>> 123. The GC only sends the assignment in > ShareGroupHeartbeat > >>>>>>>>>>>>>>>> response if the member has just joined, or the assignment > >> has > >>>>>> been > >>>>>>>>>>>>>>>> recalculated. This latter condition is met when the GC > fails > >>>>>> over. > >>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>> With a consumer group, it works a little differently. The > >>>>>>>>>> heartbeating > >>>>>>>>>>>>>>>> occurs asynchronously with the reconciliation process in > the > >>>>>>>> client. > >>>>>>>>>>>>>>>> The client has reconciliation callbacks, as well as > offsets > >> to > >>>>>>>>>> commit > >>>>>>>>>>>>>>>> before it can confirm that revocation has occured. So, it > >>>> might > >>>>>>>> take > >>>>>>>>>>>>>>>> multiple heartbeats to complete the installation of a new > >>>> target > >>>>>>>>>>>>>>>> assignment in the client. > >>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>> With a share group, it’s more brutal. The GC sends the > >>>>>> assignment > >>>>>>>>>>>>>>>> in a heartbeat response, and the client is assumed to have > >>>> acted > >>>>>>>> on > >>>>>>>>>>>>>>>> It immediately. Since share group assignment is really > about > >>>>>>>>>> balancing > >>>>>>>>>>>>>>>> consumers across the available partitions, rather than > >> safely > >>>>>>>>>> handing > >>>>>>>>>>>>>>>> ownership of partitions between consumers, there is less > >>>>>>>>>> coordination. > >>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>> I wonder whether there is one situation that does need > >>>>>>>> considering. > >>>>>>>>>>>>>>>> If the network connection between the client and the GC is > >>>> lost, > >>>>>>>> it > >>>>>>>>>> is > >>>>>>>>>>>>>>>> possible that a response containing the assignment is > lost. > >>>>>> Then, > >>>>>>>>>>>>>>>> the connection will be reestablished, and the assignment > >> will > >>>>>> only > >>>>>>>>>> be > >>>>>>>>>>>>>>>> sent when it’s recalculated. Adding the equivalent of > >>>>>>>>>>>>>>>> ConsumerGroupHeartbeatRequest.TopicPartitions would be one > >>>>>>>>>>>>>>>> way to close that. > >>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>> 125. Yes, you are right. The reconciliation that I > describe > >> in > >>>>>> the > >>>>>>>>>>>>>>>> answer to (54) below is related. If AlterShareGroupOffsets > >> is > >>>>>>>> called > >>>>>>>>>>>>>>>> for a topic-partition which is not yet in > >>>>>>>>>>>>>> ShareGroupPartitionMetadataValue, > >>>>>>>>>>>>>>>> it’s not really part of the group yet. So I think it’s > >>>> permitted > >>>>>>>> to > >>>>>>>>>>>> fail > >>>>>>>>>>>>>>>> the AlterShareGroupOffsets because the topic-partition > would > >>>> not > >>>>>>>> be > >>>>>>>>>>>>>>>> part of ListShareGroupOffsets result. > >>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>> If AlterShareGroupOffsets is called for a topic-partition > >>>> which > >>>>>> is > >>>>>>>>>> in > >>>>>>>>>>>>>>>> ShareGroupPartitionMetadataValue, then just calling > >>>>>>>>>>>>>>>> InitializeShareGroupState to set its offset would be > >>>> sufficient > >>>>>>>>>>>> without > >>>>>>>>>>>>>>>> writing ShareGroupPartitionMetadata again. > >>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>> 138. Added. > >>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>> 139. This week, I have been experimenting with using the > >>>>>> existing > >>>>>>>>>>>>>>>> consumer metrics with share consumer code. That was my > plan > >> to > >>>>>> get > >>>>>>>>>>>>>>>> started with metrics. While they work to some extent, I am > >> not > >>>>>>>>>>>> entirely > >>>>>>>>>>>>>>>> happy with the result. > >>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>> I have added a basic set of share consumer-specific > metrics > >> to > >>>>>>>>>> KIP-932 > >>>>>>>>>>>>>>>> which I think is a step in the right direction. I > >> anticipate a > >>>>>>>>>> future > >>>>>>>>>>>>>> KIP > >>>>>>>>>>>>>>>> that defines many more metrics and tackles concepts such > as > >>>> what > >>>>>>>>>>>>>>>> “lag” means for a share group. The metrics I’ve included > are > >>>>>>>> simply > >>>>>>>>>>>>>>>> counting and measuring the operations, which is a good > >> start. > >>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>> 54. I think either with or without > >>>>>>>> InitializingTopics/DeletingTopics > >>>>>>>>>>>> in > >>>>>>>>>>>>>>>> ShareGroupPartitionMetadataValue works. However, I think > >> there > >>>>>> is > >>>>>>>>>>>>>>>> a deeper point behind what you said. The GC needs to be > >>>>>> responsive > >>>>>>>>>>>>>>>> to topic creation and deletion, and addition of partitions > >> in > >>>>>>>> cases > >>>>>>>>>>>>>> where > >>>>>>>>>>>>>>>> it doesn’t agree with InitializingTopics/DeletingTopics. > So, > >>>> we > >>>>>>>> are > >>>>>>>>>>>>>>>> probably not saving anything by having those fields. As a > >>>>>> result, > >>>>>>>> I > >>>>>>>>>>>> have > >>>>>>>>>>>>>>>> removed InitializingTopics and DeletingTopics and updated > >> the > >>>>>> KIP > >>>>>>>>>>>>>>>> accordingly. The GC needs to reconcile its view of the > >>>>>> initialised > >>>>>>>>>>>>>>>> topic-partitions with ShareGroupPartitionMetadataValue and > >> it > >>>>>> will > >>>>>>>>>>>>>>>> initialize or delete share-group state accordingly. > >>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>> I have made one more change to the KIP. The > >>>>>> SharePartitionAssignor > >>>>>>>>>>>>>>>> interface has been renamed to > >>>>>>>>>>>>>>>> > o.a.k.coordinator.group.assignor.ShareGroupPartitionAssignor > >>>> and > >>>>>>>> it > >>>>>>>>>>>>>>>> now extends the PartitionAssignor interface. It’s > >> essentially > >>>> a > >>>>>>>>>> marker > >>>>>>>>>>>>>>>> of which partition assignors can be used with share > groups. > >>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>> Thanks, > >>>>>>>>>>>>>>>> Andrew > >>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>> On 23 Apr 2024, at 18:27, Jun Rao > <j...@confluent.io.INVALID > >>> > >>>>>>>>>> wrote: > >>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>> Hi, Andrew, > >>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>> Thanks for the reply. > >>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>> 123. "it doesn’t need to confirm the assignment back to > the > >>>>>> GC." > >>>>>>>>>>>>>>>>> Hmm, I thought the member needs to confirm the assignment > >> to > >>>> GC > >>>>>>>> to > >>>>>>>>>>>>>>>>> avoid GC including the assignment in the heartbeat > response > >>>>>>>>>>>>>>>> continuously. I > >>>>>>>>>>>>>>>>> assume this is done by including the new group epoch in > the > >>>>>>>>>> heartbeat > >>>>>>>>>>>>>>>>> response. > >>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>> 125. It's possible that the share partition has never > been > >>>>>>>>>>>> initialized > >>>>>>>>>>>>>>>> when > >>>>>>>>>>>>>>>>> AlterShareGroupOffsets is called. If GC doesn't write > >>>>>>>>>>>>>>>>> ShareGroupPartitionMetadata and the GC fails over, it > would > >>>>>>>>>>>>>> reinitialize > >>>>>>>>>>>>>>>>> the share partition and lose the effect of > >>>>>>>> AlterShareGroupOffsets. > >>>>>>>>>> If > >>>>>>>>>>>>>> the > >>>>>>>>>>>>>>>>> partition has already been initialized and it's recorded > >>>>>>>>>>>>>>>>> in ShareGroupPartitionMetadata, it's possible not to > write > >>>>>>>>>>>>>>>>> ShareGroupPartitionMetadata again when handling > >>>>>>>>>>>> AlterShareGroupOffsets. > >>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>> 138. Could we add the flow in GC when a topic is deleted? > >>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>> 139. Do we need to add any metrics in KafkaShareConsumer? > >>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>> 54. "I don’t think there is any redundancy. The > >>>>>>>>>>>>>> ShareGroupMemberMetadata > >>>>>>>>>>>>>>>>> does include a list of subscribed topics. However, if > there > >>>> is > >>>>>> a > >>>>>>>>>>>> period > >>>>>>>>>>>>>>>> of > >>>>>>>>>>>>>>>>> time in which no members are subscribed to a particular > >>>> topic, > >>>>>> it > >>>>>>>>>>>> does > >>>>>>>>>>>>>>>> not > >>>>>>>>>>>>>>>>> mean that the topic’s ShareGroupState should be > immediately > >>>>>>>>>> removed, > >>>>>>>>>>>>>> but > >>>>>>>>>>>>>>>> it > >>>>>>>>>>>>>>>>> does mean that there will be no ShareGroupMemberMetadata > >>>>>> records > >>>>>>>>>>>>>>>> containing > >>>>>>>>>>>>>>>>> that topic." > >>>>>>>>>>>>>>>>> I am still trying to understand the value of > >>>> InitializingTopics > >>>>>>>> and > >>>>>>>>>>>>>>>>> DeletingTopics in ShareGroupPartitionMetadataValue. They > >> are > >>>>>> used > >>>>>>>>>> to > >>>>>>>>>>>>>>>>> remember the intention of an operation. However, GC still > >>>> needs > >>>>>>>> to > >>>>>>>>>>>>>> handle > >>>>>>>>>>>>>>>>> the case when the intention is not safely recorded. If GC > >>>> wants > >>>>>>>> to > >>>>>>>>>>>>>>>>> initialize a new topic/partition, a simpler approach is > for > >>>> it > >>>>>> to > >>>>>>>>>>>> send > >>>>>>>>>>>>>> an > >>>>>>>>>>>>>>>>> InitializeShareGroupState to the share coordinator and > >> after > >>>>>>>>>>>> receiving > >>>>>>>>>>>>>> a > >>>>>>>>>>>>>>>>> success response, write ShareGroupPartitionMetadataValue > >> with > >>>>>> the > >>>>>>>>>>>>>>>>> initialized partition included in InitializedTopics. This > >>>>>> saves a > >>>>>>>>>>>>>> record > >>>>>>>>>>>>>>>>> write. It's possible for GC to fail in between the two > >> steps. > >>>>>> On > >>>>>>>>>>>>>>>> failover, > >>>>>>>>>>>>>>>>> the new GC just repeats the process. The case that you > >>>>>> mentioned > >>>>>>>>>>>> above > >>>>>>>>>>>>>>>> can > >>>>>>>>>>>>>>>>> still be achieved. If a partition is in InitializedTopics > >> of > >>>>>>>>>>>>>>>>> ShareGroupPartitionMetadataValue, but no member > subscribes > >> to > >>>>>> it, > >>>>>>>>>> we > >>>>>>>>>>>>>> can > >>>>>>>>>>>>>>>>> still keep the ShareGroupState as long as the topic still > >>>>>> exists. > >>>>>>>>>> The > >>>>>>>>>>>>>>>> same > >>>>>>>>>>>>>>>>> optimization could be applied to DeletingTopics too. > >>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>> Jun > >>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>> On Tue, Apr 23, 2024 at 3:57 AM Andrew Schofield < > >>>>>>>>>>>>>>>> andrew_schofi...@live.com> > >>>>>>>>>>>>>>>>> wrote: > >>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>> Hi Jun, > >>>>>>>>>>>>>>>>>> Thanks for the reply. > >>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>> 123. Every time the GC fails over, it needs to recompute > >> the > >>>>>>>>>>>>>> assignment > >>>>>>>>>>>>>>>>>> for every member. However, the impact of re-assignment > is > >>>> not > >>>>>>>> that > >>>>>>>>>>>>>>>> onerous. > >>>>>>>>>>>>>>>>>> If the recomputed assignments are the same, which they > may > >>>>>> well > >>>>>>>>>> be, > >>>>>>>>>>>>>>>> there > >>>>>>>>>>>>>>>>>> is no impact on the members at all. > >>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>> On receiving the new assignment, the member adjusts the > >>>>>>>>>>>>>> topic-partitions > >>>>>>>>>>>>>>>>>> in its share sessions, removing those which were revoked > >> and > >>>>>>>>>> adding > >>>>>>>>>>>>>>>> those > >>>>>>>>>>>>>>>>>> which were assigned. It is able to acknowledge the > records > >>>> it > >>>>>>>>>>>> fetched > >>>>>>>>>>>>>>>> from > >>>>>>>>>>>>>>>>>> the partitions which have just been revoked, and it > >> doesn’t > >>>>>> need > >>>>>>>>>> to > >>>>>>>>>>>>>>>> confirm > >>>>>>>>>>>>>>>>>> the assignment back to the GC. > >>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>> 125. I don’t think the GC needs to write > >>>>>>>>>> ShareGroupPartitionMetadata > >>>>>>>>>>>>>>>>>> when processing AlterShareGroupOffsets. This is because > >> the > >>>>>>>>>>>> operation > >>>>>>>>>>>>>>>>>> happens as a result of an explicit administrative action > >> and > >>>>>> it > >>>>>>>> is > >>>>>>>>>>>>>>>> possible > >>>>>>>>>>>>>>>>>> to return a specific error code for each > topic-partition. > >>>> The > >>>>>>>>>> cases > >>>>>>>>>>>>>>>> where > >>>>>>>>>>>>>>>>>> ShareGroupPartitionMetadata is used are when a topic is > >>>> added > >>>>>> or > >>>>>>>>>>>>>> removed > >>>>>>>>>>>>>>>>>> from the subscribed topics, or the number of partitions > >>>>>> changes. > >>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>> 130. I suppose that limits the minimum lock timeout for > a > >>>>>>>> cluster > >>>>>>>>>> to > >>>>>>>>>>>>>>>>>> prevent > >>>>>>>>>>>>>>>>>> a group from having an excessively low value. Config > >> added. > >>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>> 131. I have changed it to > >>>>>>>> group.share.partition.max.record.locks. > >>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>> 136. When GC failover occurs, the GC gaining ownership > >> of a > >>>>>>>>>>>> partition > >>>>>>>>>>>>>>>> of > >>>>>>>>>>>>>>>>>> the __consumer_offsets topic replays the records to > build > >>>> its > >>>>>>>>>> state. > >>>>>>>>>>>>>>>>>> In the case of a share group, it learns: > >>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>> * The share group and its group epoch > (ShareGroupMetadata) > >>>>>>>>>>>>>>>>>> * The list of members (ShareGroupMemberMetadata) > >>>>>>>>>>>>>>>>>> * The list of share-partitions > >> (ShareGroupPartitionMetadata) > >>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>> It will recompute the assignments in order to respond to > >>>>>>>>>>>>>>>>>> ShareGroupHeartbeat requests. As a result, it bumps the > >>>> group > >>>>>>>>>> epoch. > >>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>> I will update the KIP accordingly to confirm the > >> behaviour. > >>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>> 137.1: The GC and the SPL report the metrics in the > >>>>>>>>>>>>>>>>>> group-coordinator-metrics > >>>>>>>>>>>>>>>>>> group. Unlike consumer groups in which the GC performs > >>>> offset > >>>>>>>>>>>> commit, > >>>>>>>>>>>>>>>>>> the share group equivalent is performed by the SPL. So, > >> I’ve > >>>>>>>>>> grouped > >>>>>>>>>>>>>> the > >>>>>>>>>>>>>>>>>> concepts which relate to the group in > >>>>>> group-coordinator-metrics. > >>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>> The SC reports the metrics in the > >> share-coordinator-metrics > >>>>>>>> group. > >>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>> 137.2: There is one metric in both groups - > >>>>>> partition-load-time. > >>>>>>>>>> In > >>>>>>>>>>>>>> the > >>>>>>>>>>>>>>>> SC > >>>>>>>>>>>>>>>>>> group, > >>>>>>>>>>>>>>>>>> it refers to the time loading data from the share-group > >>>> state > >>>>>>>>>> topic > >>>>>>>>>>>> so > >>>>>>>>>>>>>>>> that > >>>>>>>>>>>>>>>>>> a ReadShareGroupState request can be answered. In the GC > >>>>>> group, > >>>>>>>>>>>>>>>>>> it refers to the time to read the state from the > >> persister. > >>>>>>>> Apart > >>>>>>>>>>>> from > >>>>>>>>>>>>>>>> the > >>>>>>>>>>>>>>>>>> interbroker RPC latency of the read, they’re likely to > be > >>>> very > >>>>>>>>>>>> close. > >>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>> Later, for a cluster which is using a custom persister, > >> the > >>>>>>>>>>>>>>>>>> share-coordinator > >>>>>>>>>>>>>>>>>> metrics would likely not be reported, and the persister > >>>> would > >>>>>>>> have > >>>>>>>>>>>> its > >>>>>>>>>>>>>>>> own > >>>>>>>>>>>>>>>>>> metrics. > >>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>> 137.3: Correct. Fixed. > >>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>> 137.4: Yes, it does include the time to write to the > >>>> internal > >>>>>>>>>> topic. > >>>>>>>>>>>>>>>>>> I’ve tweaked the description. > >>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>> Thanks, > >>>>>>>>>>>>>>>>>> Andrew > >>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>> On 22 Apr 2024, at 20:04, Jun Rao > >> <j...@confluent.io.INVALID > >>>>> > >>>>>>>>>>>> wrote: > >>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>> Hi, Andrew, > >>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>> Thanks for the reply. > >>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>> 123. "The share group does not persist the target > >>>>>> assignment." > >>>>>>>>>>>>>>>>>>> What's the impact of this? Everytime that GC fails > over, > >> it > >>>>>>>> needs > >>>>>>>>>>>> to > >>>>>>>>>>>>>>>>>>> recompute the assignment for every member. Do we expect > >> the > >>>>>>>>>> member > >>>>>>>>>>>>>>>>>>> assignment to change on every GC failover? > >>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>> 125. Should the GC also write > >> ShareGroupPartitionMetadata? > >>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>> 127. So, group epoch is only propagated to SC when > >>>>>>>>>>>>>>>>>>> InitializeShareGroupState request is sent. This sounds > >>>> good. > >>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>> 130. Should we have a > >>>>>> group.share.min.record.lock.duration.ms > >>>>>>>> to > >>>>>>>>>>>>>> pair > >>>>>>>>>>>>>>>>>> with > >>>>>>>>>>>>>>>>>>> group.share.max.record.lock.duration.ms? > >>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>> 131. Sounds good. The name > >>>>>>>>>> group.share.record.lock.partition.limit > >>>>>>>>>>>>>>>>>> doesn't > >>>>>>>>>>>>>>>>>>> seem very intuitive. How about something > >>>>>>>>>>>>>>>>>>> like group.share.partition.max.records.pending.ack? > >>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>> 136. Could we describe the process of GC failover? I > >> guess > >>>> it > >>>>>>>>>> needs > >>>>>>>>>>>>>> to > >>>>>>>>>>>>>>>>>>> compute member reassignment and check if there is any > new > >>>>>>>>>>>>>>>> topic/partition > >>>>>>>>>>>>>>>>>>> matching the share subscription. Does it bump up the > >> group > >>>>>>>> epoch? > >>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>> 137. Metrics: > >>>>>>>>>>>>>>>>>>> 137.1 It would be useful to document who reports each > >>>> metric. > >>>>>>>> Is > >>>>>>>>>> it > >>>>>>>>>>>>>> any > >>>>>>>>>>>>>>>>>>> broker, GC, SC or SPL? > >>>>>>>>>>>>>>>>>>> 137.2 partition-load-time: Is that the loading time at > >> SPL > >>>> or > >>>>>>>> SC? > >>>>>>>>>>>>>>>>>>> 137.3 "The time taken in milliseconds to load the > >>>> share-group > >>>>>>>>>> state > >>>>>>>>>>>>>>>> from > >>>>>>>>>>>>>>>>>>> the share-group state partitions loaded in the last 30 > >>>>>>>> seconds." > >>>>>>>>>>>>>>>>>>> The window depends on metrics.num.samples and > >>>>>>>>>>>>>> metrics.sample.window.ms > >>>>>>>>>>>>>>>>>>> and is not always 30 seconds, right? > >>>>>>>>>>>>>>>>>>> 137.4 Could you explain write/write-latency a bit more? > >>>> Does > >>>>>> it > >>>>>>>>>>>>>> include > >>>>>>>>>>>>>>>>>> the > >>>>>>>>>>>>>>>>>>> time to write to the internal topic? > >>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>> Jun > >>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>> On Mon, Apr 22, 2024 at 2:57 AM Andrew Schofield < > >>>>>>>>>>>>>>>>>> andrew_schofi...@live.com> > >>>>>>>>>>>>>>>>>>> wrote: > >>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>> Hi Jun, > >>>>>>>>>>>>>>>>>>>> Thanks for your comments. > >>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>> 120. Thanks. Fixed. > >>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>> 121. ShareUpdateValue.SnapshotEpoch indicates which > >>>> snapshot > >>>>>>>>>>>>>>>>>>>> the update applies to. It should of course be the > >> snapshot > >>>>>>>> that > >>>>>>>>>>>>>>>> precedes > >>>>>>>>>>>>>>>>>>>> it in the log. It’s just there to provide a > consistency > >>>>>> check. > >>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>> I also noticed that ShareSnapshotValue was missing > >>>>>> StateEpoch. > >>>>>>>>>> It > >>>>>>>>>>>>>>>>>>>> isn’t any more. > >>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>> 122. In KIP-848, ConsumerGroupMemberMetadataValue > >> includes > >>>>>>>>>>>>>>>>>>>> GroupEpoch, but in the code it does not. In fact, > there > >> is > >>>>>>>>>>>>>>>> considerable > >>>>>>>>>>>>>>>>>>>> divergence between the KIP and the code for this > record > >>>>>> value > >>>>>>>>>>>> schema > >>>>>>>>>>>>>>>>>>>> which I expect will be resolved when the migration > code > >>>> has > >>>>>>>> been > >>>>>>>>>>>>>>>>>>>> completed. > >>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>> 123. The share group does not persist the target > >>>> assignment. > >>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>> 124. Share groups have three kinds of record: > >>>>>>>>>>>>>>>>>>>> i) ShareGroupMetadata > >>>>>>>>>>>>>>>>>>>> - this contains the group epoch and is written > whenever > >>>> the > >>>>>>>>>> group > >>>>>>>>>>>>>>>>>>>> epoch changes. > >>>>>>>>>>>>>>>>>>>> ii) ShareGroupMemberMetadata > >>>>>>>>>>>>>>>>>>>> - this does not contain the group epoch. > >>>>>>>>>>>>>>>>>>>> iii) ShareGroupPartitionMetadata > >>>>>>>>>>>>>>>>>>>> - this currently contains the epoch, but I think that > is > >>>>>>>>>>>>>> unnecessary. > >>>>>>>>>>>>>>>>>>>> For one thing, the ConsumerGroupPartitionMetadata > >>>> definition > >>>>>>>>>>>>>>>>>>>> contains the group epoch, but the value appears never > to > >>>> be > >>>>>>>> set. > >>>>>>>>>>>>>>>>>>>> David Jacot confirms that it’s not necessary and is > >>>> removing > >>>>>>>> it. > >>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>> I have removed the Epoch from > >> ShareGroupPartitionMetadata. > >>>>>>>>>>>>>>>>>>>> The only purpose of the persisting the epoch for a > share > >>>>>> group > >>>>>>>>>> is > >>>>>>>>>>>> so > >>>>>>>>>>>>>>>>>> that > >>>>>>>>>>>>>>>>>>>> when a group coordinator takes over the share group, > it > >> is > >>>>>>>> able > >>>>>>>>>> to > >>>>>>>>>>>>>>>>>>>> continue the sequence of epochs. > >>>>>> ShareGroupMetadataValue.Epoch > >>>>>>>>>>>>>>>>>>>> is used for this. > >>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>> 125. The group epoch will be incremented in this case > >> and > >>>>>>>>>>>>>>>>>>>> consequently a ShareGroupMetadata will be written. KIP > >>>>>>>> updated. > >>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>> 126. Not directly. A share group can only be deleted > >> when > >>>> it > >>>>>>>> has > >>>>>>>>>>>> no > >>>>>>>>>>>>>>>>>>>> members, so the tombstones for > ShareGroupMemberMetadata > >>>> will > >>>>>>>>>>>>>>>>>>>> have been written when the members left. I have > >> clarified > >>>>>>>> this. > >>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>> 127. The share coordinator is ignorant of the group > >> epoch. > >>>>>>>> When > >>>>>>>>>>>> the > >>>>>>>>>>>>>>>>>>>> group coordinator is initializing the share-group > state > >>>> the > >>>>>>>>>> first > >>>>>>>>>>>>>> time > >>>>>>>>>>>>>>>>>> that > >>>>>>>>>>>>>>>>>>>> a share-partition is being added to an assignment in > the > >>>>>>>> group, > >>>>>>>>>>>> the > >>>>>>>>>>>>>>>>>>>> group epoch is used as the state epoch. But as the > group > >>>>>> epoch > >>>>>>>>>>>>>>>>>>>> increases over time, the share coordinator is entirely > >>>>>>>> unaware. > >>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>> When the first consumer for a share-partition fetches > >>>>>> records > >>>>>>>>>>>> from a > >>>>>>>>>>>>>>>>>>>> share-partition leader, the SPL calls the share > >>>> coordinator > >>>>>> to > >>>>>>>>>>>>>>>>>>>> ReadShareGroupState. If the SPL has previously read > the > >>>>>>>>>>>> information > >>>>>>>>>>>>>>>>>>>> and again it’s going from 0 to 1 consumer, it confirms > >>>> it's > >>>>>> up > >>>>>>>>>> to > >>>>>>>>>>>>>> date > >>>>>>>>>>>>>>>>>> by > >>>>>>>>>>>>>>>>>>>> calling ReadShareGroupOffsetsState. > >>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>> Even if many consumers are joining at the same time, > any > >>>>>>>>>>>>>>>> share-partition > >>>>>>>>>>>>>>>>>>>> which is being initialized will not be included in > their > >>>>>>>>>>>>>> assignments. > >>>>>>>>>>>>>>>>>> Once > >>>>>>>>>>>>>>>>>>>> the initialization is complete, the next rebalance > will > >>>>>> assign > >>>>>>>>>> the > >>>>>>>>>>>>>>>>>>>> partition > >>>>>>>>>>>>>>>>>>>> to some consumers which will discover this by > >>>>>>>>>> ShareGroupHeartbeat > >>>>>>>>>>>>>>>>>>>> response. And then, the fetching begins. > >>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>> If an SPL receives a ShareFetch request before it’s > read > >>>> the > >>>>>>>>>> state > >>>>>>>>>>>>>>>>>>>> from the SC, it can make the ShareFetch request wait > up > >> to > >>>>>>>>>>>> MaxWaitMs > >>>>>>>>>>>>>>>>>>>> and then it can return an empty set of records if it’s > >>>> still > >>>>>>>> not > >>>>>>>>>>>>>>>> ready. > >>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>> So, I don’t believe there will be too much load. If a > >>>> topic > >>>>>>>> with > >>>>>>>>>>>>>> many > >>>>>>>>>>>>>>>>>>>> partitions is added to the subscribed topics for a > share > >>>>>>>> group, > >>>>>>>>>>>> the > >>>>>>>>>>>>>>>> fact > >>>>>>>>>>>>>>>>>>>> that the assignments will only start to include the > >>>>>> partitions > >>>>>>>>>> as > >>>>>>>>>>>>>>>> their > >>>>>>>>>>>>>>>>>>>> initialization completes should soften the impact. > >>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>> 128, 129: The “proper” way to turn on this feature > when > >>>> it’s > >>>>>>>>>>>>>> finished > >>>>>>>>>>>>>>>>>> will > >>>>>>>>>>>>>>>>>>>> be using `group.coordinator.rebalance.protocols` and > >>>>>>>>>>>>>> `group.version`. > >>>>>>>>>>>>>>>>>>>> While it’s in Early Access and for test cases, the > >>>>>>>>>>>>>>>> `group.share.enable` > >>>>>>>>>>>>>>>>>>>> configuration will turn it on. > >>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>> I have described `group.share.enable` as an internal > >>>>>>>>>> configuration > >>>>>>>>>>>>>> in > >>>>>>>>>>>>>>>>>>>> the KIP. > >>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>> 130. The config `group.share.record.lock.duration.ms` > >>>>>> applies > >>>>>>>>>> to > >>>>>>>>>>>>>>>> groups > >>>>>>>>>>>>>>>>>>>> which do not specify a group-level configuration for > >> lock > >>>>>>>>>>>> duration. > >>>>>>>>>>>>>>>> The > >>>>>>>>>>>>>>>>>>>> minimum and maximum for this configuration are > intended > >> to > >>>>>>>> give > >>>>>>>>>> it > >>>>>>>>>>>>>>>>>>>> sensible bounds. > >>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>> If a group does specify its own ` > >>>>>>>>>>>>>> group.share.record.lock.duration.ms > >>>>>>>>>>>>>>>> `, > >>>>>>>>>>>>>>>>>>>> the broker-level ` > >> group.share.max.record.lock.duration.ms > >>>> ` > >>>>>>>>>> gives > >>>>>>>>>>>>>> the > >>>>>>>>>>>>>>>>>>>> cluster administrator a way of setting a maximum value > >> for > >>>>>> all > >>>>>>>>>>>>>> groups. > >>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>> While editing, I renamed ` > >>>>>>>>>> group.share.record.lock.duration.max.ms > >>>>>>>>>>>> ` > >>>>>>>>>>>>>> to > >>>>>>>>>>>>>>>>>>>> `group.share.max.record.lock.duration.ms` for > >> consistency > >>>>>>>> with > >>>>>>>>>>>> the > >>>>>>>>>>>>>>>>>>>> rest of the min/max configurations. > >>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>> 131. This is the limit per partition so you can go > wider > >>>>>> with > >>>>>>>>>>>>>> multiple > >>>>>>>>>>>>>>>>>>>> partitions. > >>>>>>>>>>>>>>>>>>>> I have set the initial value low for safety. I expect > to > >>>> be > >>>>>>>> able > >>>>>>>>>>>> to > >>>>>>>>>>>>>>>>>>>> increase this > >>>>>>>>>>>>>>>>>>>> significantly when we have mature code which has been > >>>>>>>>>>>> battle-tested. > >>>>>>>>>>>>>>>>>>>> Rather than try to guess how high it can safely go, > I’ve > >>>>>> erred > >>>>>>>>>> on > >>>>>>>>>>>>>> the > >>>>>>>>>>>>>>>>>> side > >>>>>>>>>>>>>>>>>>>> of > >>>>>>>>>>>>>>>>>>>> caution and expect to open it up in a future KIP. > >>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>> 132. Good catch. The problem is that I have missed two > >>>> group > >>>>>>>>>>>>>>>>>>>> configurations, > >>>>>>>>>>>>>>>>>>>> now added. These are group.share.session.timeout.ms > and > >>>>>>>>>>>>>>>>>>>> group.share.heartbeat.timeout.ms . The configurations > >> you > >>>>>>>>>>>> mentioned > >>>>>>>>>>>>>>>>>>>> are the bounds for the group-level configurations. > >>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>> 133. The name `group.share.max.size` was chosen to > >> mirror > >>>>>> the > >>>>>>>>>>>>>> existing > >>>>>>>>>>>>>>>>>>>> `group.consumer.max.size`. > >>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>> 134. It is intended to be a list of all of the valid > >>>>>> assignors > >>>>>>>>>> for > >>>>>>>>>>>>>> the > >>>>>>>>>>>>>>>>>>>> cluster. > >>>>>>>>>>>>>>>>>>>> When the assignors are configurable at the group > level, > >>>> the > >>>>>>>>>> group > >>>>>>>>>>>>>>>>>>>> configuration > >>>>>>>>>>>>>>>>>>>> will only be permitted to name an assignor which is in > >>>> this > >>>>>>>>>> list. > >>>>>>>>>>>>>> For > >>>>>>>>>>>>>>>>>> now, > >>>>>>>>>>>>>>>>>>>> there > >>>>>>>>>>>>>>>>>>>> is no group configuration for assignor, so all groups > >> get > >>>>>> the > >>>>>>>>>> one > >>>>>>>>>>>>>> and > >>>>>>>>>>>>>>>>>> only > >>>>>>>>>>>>>>>>>>>> assignor in the list. > >>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>> 135. It’s the number of threads per broker. For a > >> cluster > >>>>>>>> with a > >>>>>>>>>>>>>> small > >>>>>>>>>>>>>>>>>>>> number of > >>>>>>>>>>>>>>>>>>>> of brokers and a lot of share group activity, it may > be > >>>>>>>>>>>> appropriate > >>>>>>>>>>>>>> to > >>>>>>>>>>>>>>>>>>>> increase > >>>>>>>>>>>>>>>>>>>> this. We will be able to give tuning advice once we > have > >>>>>>>>>>>> experience > >>>>>>>>>>>>>> of > >>>>>>>>>>>>>>>>>> the > >>>>>>>>>>>>>>>>>>>> performance impact of increasing it. > >>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>> Thanks, > >>>>>>>>>>>>>>>>>>>> Andrew > >>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>> On 20 Apr 2024, at 00:14, Jun Rao > >>>> <j...@confluent.io.INVALID > >>>>>>> > >>>>>>>>>>>>>> wrote: > >>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>> Hi, Andrew, > >>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>> Thanks for the reply. A few more comments. > >>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>> 120. There is still reference to > >>>> ConsumerGroupMetadataKey. > >>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>> 121. ShareUpdateValue.SnapshotEpoch: Should we change > >> it > >>>>>>>> since > >>>>>>>>>>>> it's > >>>>>>>>>>>>>>>>>> not a > >>>>>>>>>>>>>>>>>>>>> snapshot? > >>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>> 122. ConsumerGroupMemberMetadataValue includes epoch, > >> but > >>>>>>>>>>>>>>>>>>>>> ShareGroupMemberMetadataValue does not. Do we know > how > >>>> the > >>>>>>>>>> epoch > >>>>>>>>>>>> is > >>>>>>>>>>>>>>>>>> being > >>>>>>>>>>>>>>>>>>>>> used in the consumer group and whether it's needed in > >> the > >>>>>>>> share > >>>>>>>>>>>>>>>> group? > >>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>> 123. There is no equivalent of > >>>>>>>>>>>> ConsumerGroupTargetAssignmentMember > >>>>>>>>>>>>>>>> for > >>>>>>>>>>>>>>>>>>>>> ShareGroup. How does the shareGroup persist the > member > >>>>>>>>>>>> assignment? > >>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>> 124. Assign a share-partition: "When a > topic-partition > >> is > >>>>>>>>>>>> assigned > >>>>>>>>>>>>>>>> to a > >>>>>>>>>>>>>>>>>>>>> member of a share group for the first time, the group > >>>>>>>>>> coordinator > >>>>>>>>>>>>>>>>>> writes > >>>>>>>>>>>>>>>>>>>> a > >>>>>>>>>>>>>>>>>>>>> ShareGroupPartitionMetadata record to the > >>>>>> __consumer_offsets > >>>>>>>>>>>>>> topic". > >>>>>>>>>>>>>>>>>>>>> When does the coordinator write ShareGroupMetadata > with > >>>> the > >>>>>>>>>>>> bumped > >>>>>>>>>>>>>>>>>>>> epoch? > >>>>>>>>>>>>>>>>>>>>> In general, is there a particular order to bump up > the > >>>>>> epoch > >>>>>>>> in > >>>>>>>>>>>>>>>>>> different > >>>>>>>>>>>>>>>>>>>>> records? When can the new epoch be exposed to the > >>>>>>>>>>>>>>>> sharePartitionLeader? > >>>>>>>>>>>>>>>>>>>> It > >>>>>>>>>>>>>>>>>>>>> would be useful to make this clear in other state > >>>> changing > >>>>>>>>>>>>>> operations > >>>>>>>>>>>>>>>>>>>> too. > >>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>> 125. "Alter share group offsets Group coordinator and > >>>> share > >>>>>>>>>>>>>>>> coordinator > >>>>>>>>>>>>>>>>>>>>> Only empty share groups support this operation. The > >> group > >>>>>>>>>>>>>> coordinator > >>>>>>>>>>>>>>>>>>>> sends > >>>>>>>>>>>>>>>>>>>>> an InitializeShareGroupState request to the share > >>>>>>>> coordinator. > >>>>>>>>>>>> The > >>>>>>>>>>>>>>>>>> share > >>>>>>>>>>>>>>>>>>>>> coordinator writes a ShareSnapshot record with the > new > >>>>>> state > >>>>>>>>>>>> epoch > >>>>>>>>>>>>>> to > >>>>>>>>>>>>>>>>>> the > >>>>>>>>>>>>>>>>>>>>> __share_group_state topic." > >>>>>>>>>>>>>>>>>>>>> Does the operation need to write > >>>>>> ShareGroupPartitionMetadata > >>>>>>>>>> and > >>>>>>>>>>>>>>>>>>>>> ShareGroupMetadata (for the new epoch)? > >>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>> 126. Delete share group: Does this operation also > need > >> to > >>>>>>>>>> write a > >>>>>>>>>>>>>>>>>>>> tombstone > >>>>>>>>>>>>>>>>>>>>> for the ShareGroupMemberMetadata record? > >>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>> 127. I was thinking about the impact on a new > consumer > >>>>>>>> joining > >>>>>>>>>>>> the > >>>>>>>>>>>>>>>>>>>>> shareGroup. This causes the GroupCoordinator and the > >>>>>>>>>>>>>> ShareCoordinator > >>>>>>>>>>>>>>>>>> to > >>>>>>>>>>>>>>>>>>>>> bump up the group epoch, which in turn causes the > >>>>>>>>>> SharePartition > >>>>>>>>>>>>>>>> leader > >>>>>>>>>>>>>>>>>>>> to > >>>>>>>>>>>>>>>>>>>>> reinitialize the state with > ReadShareGroupOffsetsState. > >>>> If > >>>>>>>> many > >>>>>>>>>>>>>>>>>> consumers > >>>>>>>>>>>>>>>>>>>>> are joining a shareGroup around the same time, would > >>>> there > >>>>>> be > >>>>>>>>>> too > >>>>>>>>>>>>>>>> much > >>>>>>>>>>>>>>>>>>>> load > >>>>>>>>>>>>>>>>>>>>> for the ShareCoordinator and SharePartition leader? > >>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>> 128. Since group.share.enable will be replaced by the > >>>>>>>>>>>> group.version > >>>>>>>>>>>>>>>>>>>>> feature, should we make it an internal config? > >>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>> 129. group.coordinator.rebalance.protocols: this > seems > >>>>>>>>>> redundant > >>>>>>>>>>>>>> with > >>>>>>>>>>>>>>>>>>>>> group.share.enable? > >>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>> 130. Why do we have both > >>>>>> group.share.record.lock.duration.ms > >>>>>>>>>> and > >>>>>>>>>>>>>>>>>>>>> group.share.record.lock.duration.max.ms, each with > its > >>>> own > >>>>>>>> max > >>>>>>>>>>>>>>>> value? > >>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>> 131. group.share.record.lock.partition.limit defaults > >> to > >>>>>> 200. > >>>>>>>>>>>> This > >>>>>>>>>>>>>>>>>> limits > >>>>>>>>>>>>>>>>>>>>> the max degree of consumer parallelism to 200, right? > >> If > >>>>>>>> there > >>>>>>>>>>>> are > >>>>>>>>>>>>>>>>>>>> multiple > >>>>>>>>>>>>>>>>>>>>> records per batch, it could be even smaller. > >>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>> 132. Why do we need all three of > >>>>>>>>>> group.share.session.timeout.ms, > >>>>>>>>>>>>>>>>>>>>> group.share.min.session.timeout.ms and > >>>>>>>>>>>>>>>>>>>> group.share.max.session.timeout.ms? > >>>>>>>>>>>>>>>>>>>>> Session timeout can't be set by the client. Ditto for > >>>>>>>>>>>>>>>>>>>>> group.share.heartbeat.interval.ms. > >>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>> 133. group.share.max.size: Would > >>>>>>>>>>>> group.share.max.members.per.group > >>>>>>>>>>>>>>>> be a > >>>>>>>>>>>>>>>>>>>>> more intuitive name? > >>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>> 134. group.share.assignors: Why does it need to be a > >>>> list? > >>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>> 135. share.coordinator.threads: Is that per share > >>>>>> coordinator > >>>>>>>>>> or > >>>>>>>>>>>>>> per > >>>>>>>>>>>>>>>>>>>> broker? > >>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>> Jun > >>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>> On Tue, Apr 16, 2024 at 3:21 AM Andrew Schofield < > >>>>>>>>>>>>>>>>>>>> andrew_schofi...@live.com> > >>>>>>>>>>>>>>>>>>>>> wrote: > >>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>> Hi Jun, > >>>>>>>>>>>>>>>>>>>>>> Thanks for you reply. > >>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>> 42.1. That’s a sensible improvement. Done. > >>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>> 47,56. Done. All instances of BaseOffset changed to > >>>>>>>>>> FirstOffset. > >>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>> 105. I think that would be in a future KIP. > >> Personally, > >>>> I > >>>>>>>>>> don’t > >>>>>>>>>>>>>> mind > >>>>>>>>>>>>>>>>>>>> having > >>>>>>>>>>>>>>>>>>>>>> a non-contiguous set of values in this KIP. > >>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>> 114. Done. > >>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>> 115. If the poll is just returning a single record > >>>> because > >>>>>>>>>> there > >>>>>>>>>>>>>> is > >>>>>>>>>>>>>>>>>> not > >>>>>>>>>>>>>>>>>>>>>> much > >>>>>>>>>>>>>>>>>>>>>> data to consume, committing on every record is OK. > >> It’s > >>>>>>>>>>>>>> inefficient > >>>>>>>>>>>>>>>>>> but > >>>>>>>>>>>>>>>>>>>>>> acceptable. > >>>>>>>>>>>>>>>>>>>>>> If the first poll returns just one record, but many > >> more > >>>>>>>> have > >>>>>>>>>>>>>> piled > >>>>>>>>>>>>>>>> up > >>>>>>>>>>>>>>>>>>>>>> while > >>>>>>>>>>>>>>>>>>>>>> the first one was being processed, the next poll has > >> the > >>>>>>>>>>>>>> opportunity > >>>>>>>>>>>>>>>>>> to > >>>>>>>>>>>>>>>>>>>>>> return > >>>>>>>>>>>>>>>>>>>>>> a bunch of records and then these will be able to be > >>>>>>>> committed > >>>>>>>>>>>>>>>>>> together. > >>>>>>>>>>>>>>>>>>>>>> So, my answer is that optimisation on the broker to > >>>> return > >>>>>>>>>>>> batches > >>>>>>>>>>>>>>>> of > >>>>>>>>>>>>>>>>>>>>>> records when the records are available is the > approach > >>>> we > >>>>>>>> will > >>>>>>>>>>>>>> take > >>>>>>>>>>>>>>>>>>>> here. > >>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>> 116. Good idea. Done. > >>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>> 117. I’ve rewritten the Kafka Broker Migration > >> section. > >>>>>> Let > >>>>>>>> me > >>>>>>>>>>>>>> know > >>>>>>>>>>>>>>>>>> what > >>>>>>>>>>>>>>>>>>>>>> you think. > >>>>>>>>>>>>>>>>>>>>>> I am discussing the configuration to enable the > >> feature > >>>> in > >>>>>>>> the > >>>>>>>>>>>>>>>> mailing > >>>>>>>>>>>>>>>>>>>>>> list with > >>>>>>>>>>>>>>>>>>>>>> David Jacot also, so I anticipate a bit of change in > >>>> this > >>>>>>>> area > >>>>>>>>>>>>>>>> still. > >>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>> Thanks, > >>>>>>>>>>>>>>>>>>>>>> Andrew > >>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>>> On 15 Apr 2024, at 23:34, Jun Rao > >>>>>> <j...@confluent.io.INVALID > >>>>>>>>> > >>>>>>>>>>>>>>>> wrote: > >>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>>> Hi, Andrew, > >>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>>> Thanks for the updated KIP. > >>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>>> 42.1 "If the share group offset is altered multiple > >>>> times > >>>>>>>>>> when > >>>>>>>>>>>>>> the > >>>>>>>>>>>>>>>>>>>> group > >>>>>>>>>>>>>>>>>>>>>>> remains empty, it would be harmless if the same > state > >>>>>> epoch > >>>>>>>>>> was > >>>>>>>>>>>>>>>>>> reused > >>>>>>>>>>>>>>>>>>>> to > >>>>>>>>>>>>>>>>>>>>>>> initialize the state." > >>>>>>>>>>>>>>>>>>>>>>> Hmm, how does the partition leader know for sure > that > >>>> it > >>>>>>>> has > >>>>>>>>>>>>>>>> received > >>>>>>>>>>>>>>>>>>>> the > >>>>>>>>>>>>>>>>>>>>>>> latest share group offset if epoch is reused? > >>>>>>>>>>>>>>>>>>>>>>> Could we update the section "Group epoch - Trigger > a > >>>>>>>>>> rebalance" > >>>>>>>>>>>>>>>> that > >>>>>>>>>>>>>>>>>>>>>>> AdminClient.alterShareGroupOffsets causes the group > >>>> epoch > >>>>>>>> to > >>>>>>>>>> be > >>>>>>>>>>>>>>>>>> bumped > >>>>>>>>>>>>>>>>>>>>>> too? > >>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>>> 47,56 "my view is that BaseOffset should become > >>>>>> FirstOffset > >>>>>>>>>> in > >>>>>>>>>>>>>> ALL > >>>>>>>>>>>>>>>>>>>>>> schemas > >>>>>>>>>>>>>>>>>>>>>>> defined in the KIP." > >>>>>>>>>>>>>>>>>>>>>>> Yes, that seems better to me. > >>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>>> 105. "I have another non-terminal state in mind for > >> 3." > >>>>>>>>>>>>>>>>>>>>>>> Should we document it? > >>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>>> 114. session.timeout.ms in the consumer > >> configuration > >>>> is > >>>>>>>>>>>>>>>> deprecated > >>>>>>>>>>>>>>>>>> in > >>>>>>>>>>>>>>>>>>>>>>> KIP-848. So, we need to remove it from the > >>>> shareConsumer > >>>>>>>>>>>>>>>>>> configuration. > >>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>>> 115. I am wondering if it's a good idea to always > >>>> commit > >>>>>>>> acks > >>>>>>>>>>>> on > >>>>>>>>>>>>>>>>>>>>>>> ShareConsumer.poll(). In the extreme case, each > batch > >>>> may > >>>>>>>>>> only > >>>>>>>>>>>>>>>>>> contain > >>>>>>>>>>>>>>>>>>>> a > >>>>>>>>>>>>>>>>>>>>>>> single record and each poll() only returns a single > >>>>>> batch. > >>>>>>>>>> This > >>>>>>>>>>>>>>>> will > >>>>>>>>>>>>>>>>>>>>>> cause > >>>>>>>>>>>>>>>>>>>>>>> each record to be committed individually. Is there > a > >>>> way > >>>>>>>> for > >>>>>>>>>> a > >>>>>>>>>>>>>> user > >>>>>>>>>>>>>>>>>> to > >>>>>>>>>>>>>>>>>>>>>>> optimize this? > >>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>>> 116. For each new RPC, could we list the associated > >>>> acls? > >>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>>> 117. Since this KIP changes internal records and > >> RPCs, > >>>> it > >>>>>>>>>> would > >>>>>>>>>>>>>> be > >>>>>>>>>>>>>>>>>>>> useful > >>>>>>>>>>>>>>>>>>>>>>> to document the upgrade process. > >>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>>> Jun > >>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>>> On Wed, Apr 10, 2024 at 7:35 AM Andrew Schofield < > >>>>>>>>>>>>>>>>>>>>>> andrew_schofi...@live.com> > >>>>>>>>>>>>>>>>>>>>>>> wrote: > >>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>>>> Hi Jun, > >>>>>>>>>>>>>>>>>>>>>>>> Thanks for your questions. > >>>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>>>> 41. > >>>>>>>>>>>>>>>>>>>>>>>> 41.1. The partition leader obtains the state epoch > >> in > >>>>>> the > >>>>>>>>>>>>>> response > >>>>>>>>>>>>>>>>>>>> from > >>>>>>>>>>>>>>>>>>>>>>>> ReadShareGroupState. When it becomes a > >> share-partition > >>>>>>>>>> leader, > >>>>>>>>>>>>>>>>>>>>>>>> it reads the share-group state and one of the > things > >>>> it > >>>>>>>>>> learns > >>>>>>>>>>>>>> is > >>>>>>>>>>>>>>>>>> the > >>>>>>>>>>>>>>>>>>>>>>>> current state epoch. Then it uses the state epoch > in > >>>> all > >>>>>>>>>>>>>>>> subsequent > >>>>>>>>>>>>>>>>>>>>>>>> calls to WriteShareGroupState. The fencing is to > >>>> prevent > >>>>>>>>>>>> writes > >>>>>>>>>>>>>>>> for > >>>>>>>>>>>>>>>>>>>>>>>> a previous state epoch, which are very unlikely > but > >>>>>> which > >>>>>>>>>>>> would > >>>>>>>>>>>>>>>> mean > >>>>>>>>>>>>>>>>>>>>>>>> that a leader was using an out-of-date epoch and > was > >>>>>>>> likely > >>>>>>>>>> no > >>>>>>>>>>>>>>>>>> longer > >>>>>>>>>>>>>>>>>>>>>>>> the current leader at all, perhaps due to a long > >> pause > >>>>>> for > >>>>>>>>>>>> some > >>>>>>>>>>>>>>>>>>>> reason. > >>>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>>>> 41.2. If the group coordinator were to set the > SPSO, > >>>>>>>>>> wouldn’t > >>>>>>>>>>>> it > >>>>>>>>>>>>>>>>>> need > >>>>>>>>>>>>>>>>>>>>>>>> to discover the initial offset? I’m trying to > avoid > >>>> yet > >>>>>>>>>>>> another > >>>>>>>>>>>>>>>>>>>>>>>> inter-broker > >>>>>>>>>>>>>>>>>>>>>>>> hop. > >>>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>>>> 42. > >>>>>>>>>>>>>>>>>>>>>>>> 42.1. I think I’ve confused things. When the share > >>>> group > >>>>>>>>>>>> offset > >>>>>>>>>>>>>> is > >>>>>>>>>>>>>>>>>>>>>> altered > >>>>>>>>>>>>>>>>>>>>>>>> using AdminClient.alterShareGroupOffsets, the > group > >>>>>>>>>>>> coordinator > >>>>>>>>>>>>>>>> WILL > >>>>>>>>>>>>>>>>>>>>>>>> update the state epoch. I don’t think it needs to > >>>> update > >>>>>>>> the > >>>>>>>>>>>>>> group > >>>>>>>>>>>>>>>>>>>> epoch > >>>>>>>>>>>>>>>>>>>>>>>> at the same time (although it could) because the > >> group > >>>>>>>> epoch > >>>>>>>>>>>>>> will > >>>>>>>>>>>>>>>>>> have > >>>>>>>>>>>>>>>>>>>>>>>> been bumped when the group became empty. If the > >> share > >>>>>>>> group > >>>>>>>>>>>>>> offset > >>>>>>>>>>>>>>>>>>>>>>>> is altered multiple times when the group remains > >>>> empty, > >>>>>> it > >>>>>>>>>>>> would > >>>>>>>>>>>>>>>> be > >>>>>>>>>>>>>>>>>>>>>>>> harmless if the same state epoch was reused to > >>>>>> initialize > >>>>>>>>>> the > >>>>>>>>>>>>>>>> state. > >>>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>>>> When the share-partition leader updates the SPSO > as > >> a > >>>>>>>> result > >>>>>>>>>>>> of > >>>>>>>>>>>>>>>>>>>>>>>> the usual flow of record delivery, it does not > >> update > >>>>>> the > >>>>>>>>>>>> state > >>>>>>>>>>>>>>>>>> epoch. > >>>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>>>> 42.2. The share-partition leader will notice the > >>>>>>>> alteration > >>>>>>>>>>>>>>>> because, > >>>>>>>>>>>>>>>>>>>>>>>> when it issues WriteShareGroupState, the response > >> will > >>>>>>>>>> contain > >>>>>>>>>>>>>> the > >>>>>>>>>>>>>>>>>>>>>>>> error code FENCED_STATE_EPOCH. This is supposed to > >> be > >>>>>> the > >>>>>>>>>>>>>>>>>>>>>>>> last-resort way of catching this. > >>>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>>>> When the share-partition leader handles its first > >>>>>>>> ShareFetch > >>>>>>>>>>>>>>>>>> request, > >>>>>>>>>>>>>>>>>>>>>>>> it learns the state epoch from the response to > >>>>>>>>>>>>>>>> ReadShareGroupState. > >>>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>>>> In normal running, the state epoch will remain > >>>> constant, > >>>>>>>>>> but, > >>>>>>>>>>>>>> when > >>>>>>>>>>>>>>>>>>>> there > >>>>>>>>>>>>>>>>>>>>>>>> are no consumers and the group is empty, it might > >>>>>> change. > >>>>>>>>>> As a > >>>>>>>>>>>>>>>>>> result, > >>>>>>>>>>>>>>>>>>>>>>>> I think it would be sensible when the set of share > >>>>>>>> sessions > >>>>>>>>>>>>>>>>>>>> transitions > >>>>>>>>>>>>>>>>>>>>>>>> from 0 to 1, which is a reasonable proxy for the > >> share > >>>>>>>> group > >>>>>>>>>>>>>>>>>>>>>> transitioning > >>>>>>>>>>>>>>>>>>>>>>>> from empty to non-empty, for the share-partition > >>>> leader > >>>>>> to > >>>>>>>>>>>> issue > >>>>>>>>>>>>>>>>>>>>>>>> ReadShareGroupOffsetsState to validate the state > >>>> epoch. > >>>>>> If > >>>>>>>>>> its > >>>>>>>>>>>>>>>> state > >>>>>>>>>>>>>>>>>>>>>>>> epoch is out of date, it can then > >> ReadShareGroupState > >>>> to > >>>>>>>>>>>>>>>>>>>> re-initialize. > >>>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>>>> I’ve changed the KIP accordingly. > >>>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>>>> 47, 56. If I am to change BaseOffset to > FirstOffset, > >>>> we > >>>>>>>> need > >>>>>>>>>>>> to > >>>>>>>>>>>>>>>> have > >>>>>>>>>>>>>>>>>>>>>>>> a clear view of which is the correct term. Having > >>>>>> reviewed > >>>>>>>>>> all > >>>>>>>>>>>>>> of > >>>>>>>>>>>>>>>>>> the > >>>>>>>>>>>>>>>>>>>>>>>> instances, my view is that BaseOffset should > become > >>>>>>>>>>>> FirstOffset > >>>>>>>>>>>>>> in > >>>>>>>>>>>>>>>>>>>>>>>> ALL schemas defined in the KIP. Then, BaseOffset > is > >>>> just > >>>>>>>>>> used > >>>>>>>>>>>> in > >>>>>>>>>>>>>>>>>>>>>>>> record batches, which is already a known concept. > >>>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>>>> Please let me know if you agree. > >>>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>>>> 60. I’ve added FindCoordinator to the top level > >> index > >>>>>> for > >>>>>>>>>>>>>> protocol > >>>>>>>>>>>>>>>>>>>>>> changes. > >>>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>>>> 61. OK. I expect you are correct about how users > >> will > >>>> be > >>>>>>>>>> using > >>>>>>>>>>>>>> the > >>>>>>>>>>>>>>>>>>>>>>>> console share consumer. When I use the console > >>>>>> consumer, I > >>>>>>>>>>>>>> always > >>>>>>>>>>>>>>>>>> get > >>>>>>>>>>>>>>>>>>>>>>>> a new consumer group. I have changed the default > >> group > >>>>>> ID > >>>>>>>>>> for > >>>>>>>>>>>>>>>>>> console > >>>>>>>>>>>>>>>>>>>>>>>> share consumer to “console-share-consumer” to > match > >>>> the > >>>>>>>>>>>> console > >>>>>>>>>>>>>>>>>>>> consumer > >>>>>>>>>>>>>>>>>>>>>>>> better and give more of an idea where this > >> mysterious > >>>>>>>> group > >>>>>>>>>>>> has > >>>>>>>>>>>>>>>> come > >>>>>>>>>>>>>>>>>>>>>> from. > >>>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>>>> 77. I will work on a proposal that does not use > >>>>>> compaction > >>>>>>>>>> and > >>>>>>>>>>>>>> we > >>>>>>>>>>>>>>>>>> can > >>>>>>>>>>>>>>>>>>>>>>>> make a judgement about whether it’s a better > course > >>>> for > >>>>>>>>>>>> KIP-932. > >>>>>>>>>>>>>>>>>>>>>>>> Personally, > >>>>>>>>>>>>>>>>>>>>>>>> until I’ve written it down and lived with the > ideas > >>>> for > >>>>>> a > >>>>>>>>>> few > >>>>>>>>>>>>>>>> days, > >>>>>>>>>>>>>>>>>> I > >>>>>>>>>>>>>>>>>>>>>>>> won’t be > >>>>>>>>>>>>>>>>>>>>>>>> able to choose which I prefer. > >>>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>>>> I should be able to get the proposal written by > the > >>>> end > >>>>>> of > >>>>>>>>>>>> this > >>>>>>>>>>>>>>>>>> week. > >>>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>>>> 100. ShareGroupHeartbeatRequest.RebalanceTimeoutMs > >>>>>> matches > >>>>>>>>>>>>>>>>>>>>>>>> ConsumerGroupHeartbeatRequest.RebalanceTimeoutMs > >> from > >>>>>>>>>> KIP-848. > >>>>>>>>>>>>>>>>>>>>>>>> I prefer to maintain the consistency. > >>>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>>>> 101. Thanks for catching this. The > >>>>>>>>>> ShareGroupHeartbeatResponse > >>>>>>>>>>>>>> was > >>>>>>>>>>>>>>>>>>>>>>>> originally > >>>>>>>>>>>>>>>>>>>>>>>> created from KIP-848. This part of the schema does > >> not > >>>>>>>> apply > >>>>>>>>>>>>>> and I > >>>>>>>>>>>>>>>>>>>> have > >>>>>>>>>>>>>>>>>>>>>>>> removed > >>>>>>>>>>>>>>>>>>>>>>>> it. I have also renamed AssignedTopicPartitions to > >>>>>> simply > >>>>>>>>>>>>>>>>>>>>>> TopicPartitions > >>>>>>>>>>>>>>>>>>>>>>>> which > >>>>>>>>>>>>>>>>>>>>>>>> aligns with the actual definition of > >>>>>>>>>>>>>>>> ConsumerGroupHeartbeatResponse. > >>>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>>>> 102. No, I don’t think we do. Removed. > >>>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>>>> 103. I’ve changed the description for the error > >> codes > >>>>>> for > >>>>>>>>>>>>>>>>>>>>>>>> ShareFetchResponse. > >>>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>>>> 104. Interesting. I have added ErrorMessages to > >> these > >>>>>> RPCs > >>>>>>>>>> as > >>>>>>>>>>>>>> you > >>>>>>>>>>>>>>>>>>>>>> suggest. > >>>>>>>>>>>>>>>>>>>>>>>> It’s a good improvement for problem determination. > >>>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>>>> 105. The values are reserved in my brain. > Actually, > >> 1 > >>>> is > >>>>>>>>>>>>>> Acquired > >>>>>>>>>>>>>>>>>>>> which > >>>>>>>>>>>>>>>>>>>>>>>> is not persisted, and I have another non-terminal > >>>> state > >>>>>> in > >>>>>>>>>>>> mind > >>>>>>>>>>>>>>>> for > >>>>>>>>>>>>>>>>>> 3. > >>>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>>>> 106. A few people have raised the question of > >> whether > >>>>>>>>>>>>>>>>>>>> OffsetAndMetadata > >>>>>>>>>>>>>>>>>>>>>>>> is sensible in this KIP, given that the optional > >>>>>> Metadata > >>>>>>>>>> part > >>>>>>>>>>>>>>>> comes > >>>>>>>>>>>>>>>>>>>>>> from > >>>>>>>>>>>>>>>>>>>>>>>> when > >>>>>>>>>>>>>>>>>>>>>>>> a regular consumer commits offsets. However, it is > >>>>>> correct > >>>>>>>>>>>> that > >>>>>>>>>>>>>>>>>> there > >>>>>>>>>>>>>>>>>>>>>> will > >>>>>>>>>>>>>>>>>>>>>>>> never be metadata with a share group. I have > changed > >>>>>>>>>>>>>>>>>>>>>>>> the KIP to replace OffsetAndMetadata with Long. > >>>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>>>> 107. Yes, you are right. I have learnt during this > >>>>>> process > >>>>>>>>>>>> that > >>>>>>>>>>>>>> a > >>>>>>>>>>>>>>>>>>>>>> version > >>>>>>>>>>>>>>>>>>>>>>>> bump > >>>>>>>>>>>>>>>>>>>>>>>> can be a logical not just a physical change to the > >>>>>> schema. > >>>>>>>>>> KIP > >>>>>>>>>>>>>>>>>>>> updated. > >>>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>>>> 108. I would prefer not to extend this RPC for all > >> of > >>>>>> the > >>>>>>>>>>>> states > >>>>>>>>>>>>>>>> at > >>>>>>>>>>>>>>>>>>>> this > >>>>>>>>>>>>>>>>>>>>>>>> point. > >>>>>>>>>>>>>>>>>>>>>>>> I think there is definitely scope for another KIP > >>>>>> focused > >>>>>>>> on > >>>>>>>>>>>>>>>>>>>>>> administration > >>>>>>>>>>>>>>>>>>>>>>>> of share groups that might want this information > so > >>>>>>>> someone > >>>>>>>>>>>>>> could > >>>>>>>>>>>>>>>>>>>> build > >>>>>>>>>>>>>>>>>>>>>> a > >>>>>>>>>>>>>>>>>>>>>>>> UI and other tools on top. Doing that well is > quite > >> a > >>>>>> lot > >>>>>>>> of > >>>>>>>>>>>>>> work > >>>>>>>>>>>>>>>> in > >>>>>>>>>>>>>>>>>>>> its > >>>>>>>>>>>>>>>>>>>>>>>> own right > >>>>>>>>>>>>>>>>>>>>>>>> so I would prefer not to do that now. > >>>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>>>> 109.Yes, they’re inconsistent and follow the > >> consumer > >>>>>>>> groups > >>>>>>>>>>>>>>>>>>>> equivalents > >>>>>>>>>>>>>>>>>>>>>>>> which are also inconsistent. In general, the > >>>>>>>>>>>> KafkaAdmin.delete…. > >>>>>>>>>>>>>>>>>>>> Methods > >>>>>>>>>>>>>>>>>>>>>>>> use the Map<XXX, KafkaFuture<YYY>> pattern like > >>>>>>>>>>>>>>>>>>>> DeleteShareGroupsResult. > >>>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>>>> Would you prefer that I do that, or remain > >> consistent > >>>>>> with > >>>>>>>>>>>>>>>> consumer > >>>>>>>>>>>>>>>>>>>>>> groups? > >>>>>>>>>>>>>>>>>>>>>>>> Happy to change it. > >>>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>>>> 110. Again, consumer groups don’t yield that level > >> of > >>>>>>>> detail > >>>>>>>>>>>>>> about > >>>>>>>>>>>>>>>>>>>>>> epochs. > >>>>>>>>>>>>>>>>>>>>>>>> The MemberDescription does include the assignment, > >> but > >>>>>> not > >>>>>>>>>> the > >>>>>>>>>>>>>>>> list > >>>>>>>>>>>>>>>>>> of > >>>>>>>>>>>>>>>>>>>>>>>> subscribed topic names. > >>>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>>>> 111. I didn’t include GroupState in GroupListing > >>>> because > >>>>>>>>>>>> there’s > >>>>>>>>>>>>>>>> no > >>>>>>>>>>>>>>>>>>>>>>>> single class that includes the states of all group > >>>>>> types. > >>>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>>>> 112. I think it’s good practice for the API to > have > >>>>>>>>>>>>>>>>>>>>>>>> ListShareGroupOffsetSpec. > >>>>>>>>>>>>>>>>>>>>>>>> It makes evolution and extension of the API much > >>>> easier. > >>>>>>>>>> Also, > >>>>>>>>>>>>>> it > >>>>>>>>>>>>>>>>>>>>>> matches > >>>>>>>>>>>>>>>>>>>>>>>> the precedent set by ListConsumerGroupOffsetSpec. > >>>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>>>> 113. ListConsumerGroupsResults.errors() is the > >> same. I > >>>>>>>> think > >>>>>>>>>>>> you > >>>>>>>>>>>>>>>>>> just > >>>>>>>>>>>>>>>>>>>>>> have > >>>>>>>>>>>>>>>>>>>>>>>> to look in the exception details and the same > >> pattern > >>>> is > >>>>>>>>>> being > >>>>>>>>>>>>>>>>>>>> followed > >>>>>>>>>>>>>>>>>>>>>>>> here. > >>>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>>>> Over the next few days, I have committed to > writing > >> a > >>>>>>>>>> proposal > >>>>>>>>>>>>>> for > >>>>>>>>>>>>>>>>>> how > >>>>>>>>>>>>>>>>>>>>>> to > >>>>>>>>>>>>>>>>>>>>>>>> persist > >>>>>>>>>>>>>>>>>>>>>>>> share-group state that doesn’t use log compaction. > >>>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>>>> I am also aware that discussion with Justine > Olshan > >> on > >>>>>>>>>>>>>>>>>> read-committed > >>>>>>>>>>>>>>>>>>>>>>>> isolation level is not yet quite complete. > >>>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>>>> Thanks for the detailed review. > >>>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>>>> Andrew > >>>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>>>>> On 9 Apr 2024, at 23:58, Jun Rao > >>>>>>>> <j...@confluent.io.INVALID > >>>>>>>>>>> > >>>>>>>>>>>>>>>> wrote: > >>>>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>>>>> Hi, Andrew, > >>>>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>>>>> Thanks for the reply. A few more comments. > >>>>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>>>>> 41. > >>>>>>>>>>>>>>>>>>>>>>>>> 41.1 How does the partition leader obtain the > group > >>>>>> epoch > >>>>>>>>>> to > >>>>>>>>>>>>>> set > >>>>>>>>>>>>>>>>>>>>>>>>> WriteShareGroupStateRequest.StateEpoch? > >>>>>>>>>>>>>>>>>>>>>>>>> 41.2 What's the benefit of having the group > >>>> coordinator > >>>>>>>>>>>>>>>> initialize > >>>>>>>>>>>>>>>>>>>> the > >>>>>>>>>>>>>>>>>>>>>>>>> state and the partition leader set the SPSO? It > >> seems > >>>>>>>>>> simpler > >>>>>>>>>>>>>> to > >>>>>>>>>>>>>>>>>> have > >>>>>>>>>>>>>>>>>>>>>> the > >>>>>>>>>>>>>>>>>>>>>>>>> partition leader initialize both the state and > the > >>>> SPSO > >>>>>>>>>>>>>> together? > >>>>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>>>>> 42. > >>>>>>>>>>>>>>>>>>>>>>>>> 42.1 "I don’t think the group epoch needs to be > >>>> bumped > >>>>>>>> when > >>>>>>>>>>>> the > >>>>>>>>>>>>>>>>>> share > >>>>>>>>>>>>>>>>>>>>>>>> group > >>>>>>>>>>>>>>>>>>>>>>>>> offset is altered." > >>>>>>>>>>>>>>>>>>>>>>>>> But the part on handling Alter share group > offsets > >>>> says > >>>>>>>>>> "The > >>>>>>>>>>>>>>>> share > >>>>>>>>>>>>>>>>>>>>>>>>> coordinator writes a ShareCheckpoint record with > >> the > >>>>>> new > >>>>>>>>>>>> state > >>>>>>>>>>>>>>>>>> epoch > >>>>>>>>>>>>>>>>>>>> to > >>>>>>>>>>>>>>>>>>>>>>>> the > >>>>>>>>>>>>>>>>>>>>>>>>> __share_group_state topic." So, which is > correct? > >> We > >>>>>>>> have > >>>>>>>>>>>> two > >>>>>>>>>>>>>>>>>> paths > >>>>>>>>>>>>>>>>>>>> to > >>>>>>>>>>>>>>>>>>>>>>>>> update the state in the share coordinator, one > from > >>>> the > >>>>>>>>>> group > >>>>>>>>>>>>>>>>>>>>>> coordinator > >>>>>>>>>>>>>>>>>>>>>>>>> and another from the partition leader. I thought > >> the > >>>>>>>>>> benefit > >>>>>>>>>>>> of > >>>>>>>>>>>>>>>>>>>> bumping > >>>>>>>>>>>>>>>>>>>>>>>> up > >>>>>>>>>>>>>>>>>>>>>>>>> the epoch is to fence off a late request in the > >>>>>> previous > >>>>>>>>>>>> epoch > >>>>>>>>>>>>>>>> from > >>>>>>>>>>>>>>>>>>>>>>>> another > >>>>>>>>>>>>>>>>>>>>>>>>> path. > >>>>>>>>>>>>>>>>>>>>>>>>> 42.2 When the group coordinator alters the share > >>>> group > >>>>>>>>>> offset > >>>>>>>>>>>>>> in > >>>>>>>>>>>>>>>>>>>> share > >>>>>>>>>>>>>>>>>>>>>>>>> coordinator, how does the partition leader know > the > >>>>>> share > >>>>>>>>>>>> group > >>>>>>>>>>>>>>>>>> state > >>>>>>>>>>>>>>>>>>>>>> has > >>>>>>>>>>>>>>>>>>>>>>>>> been altered so that it could clear its in-memory > >>>>>> state? > >>>>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>>>>> 47. 56. BaseOffset typically refers to the base > >>>> offset > >>>>>>>> for > >>>>>>>>>>>> the > >>>>>>>>>>>>>>>>>> batch > >>>>>>>>>>>>>>>>>>>>>> and > >>>>>>>>>>>>>>>>>>>>>>>>> can be confusing. FirstOffset is clearer and > >> matches > >>>>>>>>>>>>>> LastOffset. > >>>>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>>>>> 60. Could we include FindCoordinatorRequest in > the > >>>> top > >>>>>>>>>> level > >>>>>>>>>>>>>>>> index > >>>>>>>>>>>>>>>>>>>> for > >>>>>>>>>>>>>>>>>>>>>>>>> Kafka protocol changes? > >>>>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>>>>> 61. I think it's probably ok to add time-based > >>>>>> expiration > >>>>>>>>>>>>>> later. > >>>>>>>>>>>>>>>>>> But > >>>>>>>>>>>>>>>>>>>>>>>> using > >>>>>>>>>>>>>>>>>>>>>>>>> a default group in console-share-consumer > probably > >>>>>> won't > >>>>>>>>>> help > >>>>>>>>>>>>>>>>>> reduce > >>>>>>>>>>>>>>>>>>>>>> the > >>>>>>>>>>>>>>>>>>>>>>>>> garbage. In the common case, the user of the > >> console > >>>>>>>>>> consumer > >>>>>>>>>>>>>>>>>> likely > >>>>>>>>>>>>>>>>>>>>>>>> wants > >>>>>>>>>>>>>>>>>>>>>>>>> to see the recently produced records for > >>>> verification. > >>>>>> If > >>>>>>>>>> the > >>>>>>>>>>>>>>>>>> default > >>>>>>>>>>>>>>>>>>>>>>>> group > >>>>>>>>>>>>>>>>>>>>>>>>> doesn't provide that (because of the stale > state), > >>>> the > >>>>>>>> user > >>>>>>>>>>>>>> will > >>>>>>>>>>>>>>>>>>>> likely > >>>>>>>>>>>>>>>>>>>>>>>>> just use a new group. It's true that we don't > >> garbage > >>>>>>>>>> collect > >>>>>>>>>>>>>>>> idle > >>>>>>>>>>>>>>>>>>>>>>>> topics. > >>>>>>>>>>>>>>>>>>>>>>>>> However, the share groups are similar to > >> consumers, > >>>>>>>> which > >>>>>>>>>>>> does > >>>>>>>>>>>>>>>>>>>> support > >>>>>>>>>>>>>>>>>>>>>>>>> garbage collection. Typically, we read topics > more > >>>> than > >>>>>>>>>>>>>> creating > >>>>>>>>>>>>>>>>>>>> them. > >>>>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>>>>> 77. If the generic compaction is inconvenient, we > >>>> could > >>>>>>>> use > >>>>>>>>>>>>>>>>>>>> customized > >>>>>>>>>>>>>>>>>>>>>>>>> logic. If we go with that route, option (b) seems > >>>>>> cleaner > >>>>>>>>>> and > >>>>>>>>>>>>>>>> more > >>>>>>>>>>>>>>>>>>>>>>>>> optimized. Since the share states for all groups > >> fit > >>>> in > >>>>>>>>>>>> memory, > >>>>>>>>>>>>>>>> we > >>>>>>>>>>>>>>>>>>>>>> could > >>>>>>>>>>>>>>>>>>>>>>>>> generate snapshots more efficiently than going > >>>> through > >>>>>>>>>>>>>>>> compaction. > >>>>>>>>>>>>>>>>>>>>>>>> Having a > >>>>>>>>>>>>>>>>>>>>>>>>> separate log per share partition is probably too > >> much > >>>>>>>>>>>> overhead. > >>>>>>>>>>>>>>>>>> It's > >>>>>>>>>>>>>>>>>>>>>> more > >>>>>>>>>>>>>>>>>>>>>>>>> efficient to put the state changes for multiple > >> share > >>>>>>>>>>>>>> partitions > >>>>>>>>>>>>>>>>>> in a > >>>>>>>>>>>>>>>>>>>>>>>>> single log. > >>>>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>>>>> 100. > ShareGroupHeartbeatRequest.RebalanceTimeoutMs: > >>>>>>>> Should > >>>>>>>>>> we > >>>>>>>>>>>>>>>> name > >>>>>>>>>>>>>>>>>> it > >>>>>>>>>>>>>>>>>>>>>>>>> SessionTimeoutMs? > >>>>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>>>>> 101. > ShareGroupHeartbeatResponse.Assignment.Error: > >>>> What > >>>>>>>>>> kind > >>>>>>>>>>>> of > >>>>>>>>>>>>>>>>>> error > >>>>>>>>>>>>>>>>>>>>>>>> could > >>>>>>>>>>>>>>>>>>>>>>>>> we have when assigning partitions? What are the > >>>>>>>>>> corresponding > >>>>>>>>>>>>>>>> error > >>>>>>>>>>>>>>>>>>>>>>>> codes? > >>>>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>>>>> 102. Do we still need > >>>>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>> > >>>>>>>>>>>>>> > >>>>>>>>>>>> > >>>>>>>>>> > >>>>>>>> > >>>>>> > >>>> > >> > ShareGroupDescribeResponse.Members.Assignment.{MetadataVersion,MetadataBytes}? > >>>>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>>>>> 103. Could we list the error codes separately for > >>>>>>>>>>>>>>>>>>>>>>>>> ShareFetchResponse.Responses.Partitions.ErrorCode > >> and > >>>>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>> ShareFetchResponse.Responses.Partitions.AcknowledgeErrorCode? > >>>>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>>>>> 104. Should we add error message for the > errorCode > >> in > >>>>>>>>>>>>>>>>>>>>>> ShareFetchResponse, > >>>>>>>>>>>>>>>>>>>>>>>>> ShareAcknowledgeResponse, > >>>> ReadShareGroupStateResponse, > >>>>>>>>>>>>>>>>>>>>>>>>> WriteShareGroupStateResponse, > >>>>>>>>>> DeleteShareGroupStateResponse, > >>>>>>>>>>>>>>>>>>>>>>>>> ReadShareGroupOffsetsStateResponse and > >>>>>>>>>>>>>>>>>>>>>> InitializeShareGroupStateResponse? > >>>>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>>>>> 105. "about": "The state - > >>>>>>>>>> 0:Available,2:Acked,4:Archived.": > >>>>>>>>>>>>>> What > >>>>>>>>>>>>>>>>>>>>>> about 1 > >>>>>>>>>>>>>>>>>>>>>>>>> and 3? Are we leaving them out intentionally? > >>>>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>>>>> 106. Do we have usage of metadata in > >>>> OffsetAndMetadata? > >>>>>>>> If > >>>>>>>>>>>> not, > >>>>>>>>>>>>>>>>>> could > >>>>>>>>>>>>>>>>>>>>>> we > >>>>>>>>>>>>>>>>>>>>>>>>> remove it from AdminClient and > KafkaShareConsumer? > >>>>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>>>>> 107. ListGroupsRequest: Should we bump up the > >> version > >>>>>>>> since > >>>>>>>>>>>> it > >>>>>>>>>>>>>>>> now > >>>>>>>>>>>>>>>>>>>>>>>> supports > >>>>>>>>>>>>>>>>>>>>>>>>> a new group type "share"? > >>>>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>>>>> 108. AdminClient.listShareGroupOffsets: Should it > >>>>>> expose > >>>>>>>>>> all > >>>>>>>>>>>>>> the > >>>>>>>>>>>>>>>>>>>> states > >>>>>>>>>>>>>>>>>>>>>>>>> from ReadShareGroupStateResponse, instead of just > >>>> SPSO? > >>>>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>>>>> 109. DeleteShareGroupOffsetsResult exposes > >>>>>>>>>>>>>>>>>>>>>>>>> public KafkaFuture<Void> partitionResult(final > >>>>>>>>>> TopicPartition > >>>>>>>>>>>>>>>>>>>>>> partition) > >>>>>>>>>>>>>>>>>>>>>>>>> DeleteShareGroupsResult exposes > >>>>>>>>>>>>>>>>>>>>>>>>> public Map<String, KafkaFuture<Void>> > >> deletedGroups() > >>>>>>>>>>>>>>>>>>>>>>>>> Should we make them more consistent? > >>>>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>>>>> 110. Should ShareGroupDescription include fields > >> like > >>>>>>>>>>>>>> GroupEpoch, > >>>>>>>>>>>>>>>>>>>>>>>>> AssignmentEpoch, MemberEpoch, and > >>>> SubscribedTopicNames? > >>>>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>>>>> 111. Should GroupListing include GroupState? > >>>>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>>>>> 112. Do we need ListShareGroupOffsetsSpec? Could > we > >>>>>> just > >>>>>>>>>> use > >>>>>>>>>>>>>>>>>>>>>>>>> Set<TopicPartition> directly? > >>>>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>>>>> 113. ListShareGroupsResult.errors(): How do we > know > >>>>>> which > >>>>>>>>>>>> group > >>>>>>>>>>>>>>>> has > >>>>>>>>>>>>>>>>>>>> an > >>>>>>>>>>>>>>>>>>>>>>>>> error? > >>>>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>>>>> Jun > >>>>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>>>>> On Mon, Apr 8, 2024 at 9:32 AM Andrew Schofield < > >>>>>>>>>>>>>>>>>>>>>>>>> andrew_schofield_j...@outlook.com> wrote: > >>>>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>>>>>> Hi David, > >>>>>>>>>>>>>>>>>>>>>>>>>> Thanks for your questions. > >>>>>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>>>>>> 70. The Group Coordinator communicates with the > >>>> Share > >>>>>>>>>>>>>>>> Coordinator > >>>>>>>>>>>>>>>>>>>> over > >>>>>>>>>>>>>>>>>>>>>>>>>> RPCs. > >>>>>>>>>>>>>>>>>>>>>>>>>> In the general case, it’s an inter-broker call. > It > >>>> is > >>>>>>>>>>>> probably > >>>>>>>>>>>>>>>>>>>>>> possible > >>>>>>>>>>>>>>>>>>>>>>>> to > >>>>>>>>>>>>>>>>>>>>>>>>>> optimise > >>>>>>>>>>>>>>>>>>>>>>>>>> for the situation in which the appropriate GC > and > >> SC > >>>>>>>>>> shards > >>>>>>>>>>>>>> are > >>>>>>>>>>>>>>>>>>>>>>>>>> co-located, but the > >>>>>>>>>>>>>>>>>>>>>>>>>> KIP does not delve that deep into potential > >>>>>> performance > >>>>>>>>>>>>>>>>>>>> optimisations. > >>>>>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>>>>>> 71. Avoiding collisions would be a good idea, > but > >> I > >>>> do > >>>>>>>>>> worry > >>>>>>>>>>>>>>>> about > >>>>>>>>>>>>>>>>>>>>>>>>>> retrospectively > >>>>>>>>>>>>>>>>>>>>>>>>>> introducing a naming convention for groups. I > feel > >>>>>> that > >>>>>>>>>>>> naming > >>>>>>>>>>>>>>>>>>>>>>>> conventions > >>>>>>>>>>>>>>>>>>>>>>>>>> will > >>>>>>>>>>>>>>>>>>>>>>>>>> typically be the responsibility of the cluster > >>>>>>>>>>>> administrators > >>>>>>>>>>>>>>>>>> based > >>>>>>>>>>>>>>>>>>>> on > >>>>>>>>>>>>>>>>>>>>>>>>>> organizational > >>>>>>>>>>>>>>>>>>>>>>>>>> factors, such as the name of an application. > >>>>>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>>>>>> 72. Personally, I don’t like INVALID_GROUP_ID > >>>> because > >>>>>>>> the > >>>>>>>>>>>>>> group > >>>>>>>>>>>>>>>> ID > >>>>>>>>>>>>>>>>>>>> is > >>>>>>>>>>>>>>>>>>>>>>>>>> correct but > >>>>>>>>>>>>>>>>>>>>>>>>>> the group is the wrong type. The nearest > existing > >>>>>> error > >>>>>>>>>> code > >>>>>>>>>>>>>>>> that > >>>>>>>>>>>>>>>>>>>> gets > >>>>>>>>>>>>>>>>>>>>>>>>>> that across > >>>>>>>>>>>>>>>>>>>>>>>>>> is INCONSISTENT_GROUP_PROTOCOL. Perhaps this is > >>>> really > >>>>>>>>>>>> showing > >>>>>>>>>>>>>>>>>> that > >>>>>>>>>>>>>>>>>>>> a > >>>>>>>>>>>>>>>>>>>>>>>> new > >>>>>>>>>>>>>>>>>>>>>>>>>> error code would be better. > >>>>>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>>>>>> 73. The Metadata fields are not used. I have > >> removed > >>>>>>>> them. > >>>>>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>>>>>> 74. The metadata is re-evaluated on every > change, > >>>> but > >>>>>>>>>> only a > >>>>>>>>>>>>>>>>>> subset > >>>>>>>>>>>>>>>>>>>> is > >>>>>>>>>>>>>>>>>>>>>>>>>> relevant > >>>>>>>>>>>>>>>>>>>>>>>>>> for rebalancing. A check is done against the > names > >>>> of > >>>>>>>> the > >>>>>>>>>>>>>>>>>> subscribed > >>>>>>>>>>>>>>>>>>>>>>>>>> topics to > >>>>>>>>>>>>>>>>>>>>>>>>>> see if any relevant changes may have occurred. > >> Then > >>>>>> the > >>>>>>>>>>>>>> changes > >>>>>>>>>>>>>>>>>>>> which > >>>>>>>>>>>>>>>>>>>>>>>>>> trigger > >>>>>>>>>>>>>>>>>>>>>>>>>> a rebalance are topic creation, deletion, change > >> in > >>>>>>>>>>>>>> partitions, > >>>>>>>>>>>>>>>> or > >>>>>>>>>>>>>>>>>>>>>> rack > >>>>>>>>>>>>>>>>>>>>>>>>>> IDs for the > >>>>>>>>>>>>>>>>>>>>>>>>>> replicas. I have updated the KIP to make this > more > >>>>>>>> clear. > >>>>>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>>>>>> 75. The assignment is not persisted because it > is > >>>> much > >>>>>>>>>> less > >>>>>>>>>>>>>>>>>>>> important > >>>>>>>>>>>>>>>>>>>>>>>> that > >>>>>>>>>>>>>>>>>>>>>>>>>> the > >>>>>>>>>>>>>>>>>>>>>>>>>> assignment survives a GC change. There’s no need > >> to > >>>>>>>>>> transfer > >>>>>>>>>>>>>>>>>>>>>> partitions > >>>>>>>>>>>>>>>>>>>>>>>>>> safely from > >>>>>>>>>>>>>>>>>>>>>>>>>> member to member in the way that is required for > >>>>>>>> consumer > >>>>>>>>>>>>>>>> groups, > >>>>>>>>>>>>>>>>>> so > >>>>>>>>>>>>>>>>>>>>>> as > >>>>>>>>>>>>>>>>>>>>>>>> an > >>>>>>>>>>>>>>>>>>>>>>>>>> optimisation, the assignments for a share group > >> are > >>>>>> not > >>>>>>>>>>>>>>>> persisted. > >>>>>>>>>>>>>>>>>>>> It > >>>>>>>>>>>>>>>>>>>>>>>>>> wouldn’t do any > >>>>>>>>>>>>>>>>>>>>>>>>>> harm, but it just seems unnecessary. > >>>>>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>>>>>> 76. In the event that a consumer tries to > >>>> acknowledge > >>>>>> a > >>>>>>>>>>>> record > >>>>>>>>>>>>>>>>>> that > >>>>>>>>>>>>>>>>>>>> it > >>>>>>>>>>>>>>>>>>>>>>>> now > >>>>>>>>>>>>>>>>>>>>>>>>>> longer > >>>>>>>>>>>>>>>>>>>>>>>>>> has the right to acknowledge, the > >>>> INVALID_RECORD_STATE > >>>>>>>>>> error > >>>>>>>>>>>>>>>> code > >>>>>>>>>>>>>>>>>> is > >>>>>>>>>>>>>>>>>>>>>>>> used. > >>>>>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>>>>>> If the application uses the > >>>>>>>> KafkaShareConsumer.commitSync > >>>>>>>>>>>>>>>> method, > >>>>>>>>>>>>>>>>>> it > >>>>>>>>>>>>>>>>>>>>>>>> will > >>>>>>>>>>>>>>>>>>>>>>>>>> see an InvalidRecordState exception returned. > >>>>>>>>>> Alternatively, > >>>>>>>>>>>>>> the > >>>>>>>>>>>>>>>>>>>>>>>>>> application can > >>>>>>>>>>>>>>>>>>>>>>>>>> register an acknowledgement commit callback > which > >>>> will > >>>>>>>> be > >>>>>>>>>>>>>> called > >>>>>>>>>>>>>>>>>>>> with > >>>>>>>>>>>>>>>>>>>>>>>> the > >>>>>>>>>>>>>>>>>>>>>>>>>> status > >>>>>>>>>>>>>>>>>>>>>>>>>> of the acknowledgements that have succeeded or > >>>> failed. > >>>>>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>>>>>> 77. I have tried to tread a careful path with > the > >>>>>>>> durable > >>>>>>>>>>>>>>>>>>>>>>>> share-partition > >>>>>>>>>>>>>>>>>>>>>>>>>> state in this > >>>>>>>>>>>>>>>>>>>>>>>>>> KIP. The significant choices I made are that: > >>>>>>>>>>>>>>>>>>>>>>>>>> * Topics are used so that the state is > replicated > >>>>>>>> between > >>>>>>>>>>>>>>>> brokers. > >>>>>>>>>>>>>>>>>>>>>>>>>> * Log compaction is used to keep a lid on the > >>>> storage. > >>>>>>>>>>>>>>>>>>>>>>>>>> * Only one topic is required. > >>>>>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>>>>>> Log compaction as it stands is not ideal for > this > >>>> kind > >>>>>>>> of > >>>>>>>>>>>>>> data, > >>>>>>>>>>>>>>>> as > >>>>>>>>>>>>>>>>>>>>>>>>>> evidenced by > >>>>>>>>>>>>>>>>>>>>>>>>>> the DeltaIndex technique I employed. > >>>>>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>>>>>> I can think of a few relatively simple ways to > >>>> improve > >>>>>>>>>> upon > >>>>>>>>>>>>>> it. > >>>>>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>>>>>> a) We could use a customised version of the log > >>>>>>>> compactor > >>>>>>>>>>>> for > >>>>>>>>>>>>>>>> this > >>>>>>>>>>>>>>>>>>>>>> topic > >>>>>>>>>>>>>>>>>>>>>>>>>> that > >>>>>>>>>>>>>>>>>>>>>>>>>> understands the rules for ShareCheckpoint and > >>>>>> ShareDelta > >>>>>>>>>>>>>>>> records. > >>>>>>>>>>>>>>>>>>>>>>>>>> Essentially, > >>>>>>>>>>>>>>>>>>>>>>>>>> for each share-partition, the latest > >> ShareCheckpoint > >>>>>> and > >>>>>>>>>> any > >>>>>>>>>>>>>>>>>>>>>> subsequent > >>>>>>>>>>>>>>>>>>>>>>>>>> ShareDelta > >>>>>>>>>>>>>>>>>>>>>>>>>> records must not be cleaned. Anything else can > be > >>>>>>>> cleaned. > >>>>>>>>>>>> We > >>>>>>>>>>>>>>>>>> could > >>>>>>>>>>>>>>>>>>>>>> then > >>>>>>>>>>>>>>>>>>>>>>>>>> be sure > >>>>>>>>>>>>>>>>>>>>>>>>>> that multiple ShareDelta records with the same > key > >>>>>> would > >>>>>>>>>>>>>> survive > >>>>>>>>>>>>>>>>>>>>>>>> cleaning > >>>>>>>>>>>>>>>>>>>>>>>>>> and we could > >>>>>>>>>>>>>>>>>>>>>>>>>> abandon the DeltaIndex technique. > >>>>>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>>>>>> b) Actually what is required is a log per > >>>>>>>> share-partition. > >>>>>>>>>>>>>> Let’s > >>>>>>>>>>>>>>>>>>>>>> imagine > >>>>>>>>>>>>>>>>>>>>>>>>>> that we had > >>>>>>>>>>>>>>>>>>>>>>>>>> a share-state topic per topic being consumed in > a > >>>>>> share > >>>>>>>>>>>> group, > >>>>>>>>>>>>>>>>>> with > >>>>>>>>>>>>>>>>>>>>>> the > >>>>>>>>>>>>>>>>>>>>>>>>>> same number > >>>>>>>>>>>>>>>>>>>>>>>>>> of partitions as the topic being consumed. We > >> could > >>>>>>>> write > >>>>>>>>>>>> many > >>>>>>>>>>>>>>>>>> more > >>>>>>>>>>>>>>>>>>>>>>>> deltas > >>>>>>>>>>>>>>>>>>>>>>>>>> between > >>>>>>>>>>>>>>>>>>>>>>>>>> checkpoints, and just take periodic checkpoints > to > >>>>>> keep > >>>>>>>>>>>>>> control > >>>>>>>>>>>>>>>> of > >>>>>>>>>>>>>>>>>>>> the > >>>>>>>>>>>>>>>>>>>>>>>>>> storage used. > >>>>>>>>>>>>>>>>>>>>>>>>>> Once a checkpoint has been taken, we could use > >>>>>>>>>>>>>>>>>>>>>>>> KafkaAdmin.deleteRecords() > >>>>>>>>>>>>>>>>>>>>>>>>>> to > >>>>>>>>>>>>>>>>>>>>>>>>>> prune all of the older records. > >>>>>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>>>>>> The share-state topics would be more numerous, > but > >>>> we > >>>>>>>> are > >>>>>>>>>>>>>>>> talking > >>>>>>>>>>>>>>>>>>>> one > >>>>>>>>>>>>>>>>>>>>>>>> per > >>>>>>>>>>>>>>>>>>>>>>>>>> topic > >>>>>>>>>>>>>>>>>>>>>>>>>> per share group that it’s being consumed in. > These > >>>>>>>> topics > >>>>>>>>>>>>>> would > >>>>>>>>>>>>>>>>>> not > >>>>>>>>>>>>>>>>>>>> be > >>>>>>>>>>>>>>>>>>>>>>>>>> compacted. > >>>>>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>>>>>> As you’ll see in the KIP, the Persister > interface > >> is > >>>>>>>>>>>> intended > >>>>>>>>>>>>>> to > >>>>>>>>>>>>>>>>>> be > >>>>>>>>>>>>>>>>>>>>>>>>>> pluggable one day. > >>>>>>>>>>>>>>>>>>>>>>>>>> I know the scheme in the KIP is not ideal. It > >> seems > >>>>>>>> likely > >>>>>>>>>>>> to > >>>>>>>>>>>>>> me > >>>>>>>>>>>>>>>>>>>> that > >>>>>>>>>>>>>>>>>>>>>>>>>> future KIPs will > >>>>>>>>>>>>>>>>>>>>>>>>>> improve upon it. > >>>>>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>>>>>> If I can get buy-in for option (b), I’m happy to > >>>>>> change > >>>>>>>>>> this > >>>>>>>>>>>>>>>> KIP. > >>>>>>>>>>>>>>>>>>>>>> While > >>>>>>>>>>>>>>>>>>>>>>>>>> option (a) is > >>>>>>>>>>>>>>>>>>>>>>>>>> probably workable, it does seem a bit of a hack > to > >>>>>> have > >>>>>>>> a > >>>>>>>>>>>>>>>>>> customised > >>>>>>>>>>>>>>>>>>>>>> log > >>>>>>>>>>>>>>>>>>>>>>>>>> compactor > >>>>>>>>>>>>>>>>>>>>>>>>>> just for this topic. > >>>>>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>>>>>> 78. How about DeliveryState? I agree that State > is > >>>>>>>>>>>> overloaded. > >>>>>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>>>>>> 79. See (77). > >>>>>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>>>>>> Thanks, > >>>>>>>>>>>>>>>>>>>>>>>>>> Andrew > >>>>>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>>>>>>> On 5 Apr 2024, at 05:07, David Arthur < > >>>>>>>>>>>>>>>> david.art...@confluent.io > >>>>>>>>>>>>>>>>>>>>>>>> .INVALID> > >>>>>>>>>>>>>>>>>>>>>>>>>> wrote: > >>>>>>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>>>>>>> Andrew, thanks for the KIP! This is a pretty > >>>> exciting > >>>>>>>>>>>> effort. > >>>>>>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>>>>>>> I've finally made it through the KIP, still > >> trying > >>>> to > >>>>>>>>>> grok > >>>>>>>>>>>>>> the > >>>>>>>>>>>>>>>>>>>> whole > >>>>>>>>>>>>>>>>>>>>>>>>>> thing. > >>>>>>>>>>>>>>>>>>>>>>>>>>> Sorry if some of my questions are basic :) > >>>>>>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>>>>>>> Concepts: > >>>>>>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>>>>>>> 70. Does the Group Coordinator communicate with > >> the > >>>>>>>> Share > >>>>>>>>>>>>>>>>>>>> Coordinator > >>>>>>>>>>>>>>>>>>>>>>>>>> over > >>>>>>>>>>>>>>>>>>>>>>>>>>> RPC or directly in-process? > >>>>>>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>>>>>>> 71. For preventing name collisions with regular > >>>>>>>> consumer > >>>>>>>>>>>>>>>> groups, > >>>>>>>>>>>>>>>>>>>>>> could > >>>>>>>>>>>>>>>>>>>>>>>> we > >>>>>>>>>>>>>>>>>>>>>>>>>>> define a reserved share group prefix? E.g., the > >>>>>>>> operator > >>>>>>>>>>>>>>>> defines > >>>>>>>>>>>>>>>>>>>>>> "sg_" > >>>>>>>>>>>>>>>>>>>>>>>>>> as a > >>>>>>>>>>>>>>>>>>>>>>>>>>> prefix for share groups only, and if a regular > >>>>>> consumer > >>>>>>>>>>>> group > >>>>>>>>>>>>>>>>>> tries > >>>>>>>>>>>>>>>>>>>>>> to > >>>>>>>>>>>>>>>>>>>>>>>>>> use > >>>>>>>>>>>>>>>>>>>>>>>>>>> that name it fails. > >>>>>>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>>>>>>> 72. When a consumer tries to use a share group, > >> or > >>>> a > >>>>>>>>>> share > >>>>>>>>>>>>>>>>>> consumer > >>>>>>>>>>>>>>>>>>>>>>>> tries > >>>>>>>>>>>>>>>>>>>>>>>>>>> to use a regular group, would INVALID_GROUP_ID > >> make > >>>>>>>> more > >>>>>>>>>>>>>> sense > >>>>>>>>>>>>>>>>>>>>>>>>>>> than INCONSISTENT_GROUP_PROTOCOL? > >>>>>>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>>>>>>> -------- > >>>>>>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>>>>>>> Share Group Membership: > >>>>>>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>>>>>>> 73. What goes in the Metadata field for > >>>>>>>>>>>>>> TargetAssignment#Member > >>>>>>>>>>>>>>>>>> and > >>>>>>>>>>>>>>>>>>>>>>>>>>> Assignment? > >>>>>>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>>>>>>> 74. Under Trigger a rebalance, it says we > >> rebalance > >>>>>>>> when > >>>>>>>>>>>> the > >>>>>>>>>>>>>>>>>>>>>> partition > >>>>>>>>>>>>>>>>>>>>>>>>>>> metadata changes. Would this be for any change, > >> or > >>>>>> just > >>>>>>>>>>>>>> certain > >>>>>>>>>>>>>>>>>>>> ones? > >>>>>>>>>>>>>>>>>>>>>>>> For > >>>>>>>>>>>>>>>>>>>>>>>>>>> example, if a follower drops out of the ISR and > >>>> comes > >>>>>>>>>> back, > >>>>>>>>>>>>>> we > >>>>>>>>>>>>>>>>>>>>>> probably > >>>>>>>>>>>>>>>>>>>>>>>>>>> don't need to rebalance. > >>>>>>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>>>>>>> 75. "For a share group, the group coordinator > >> does > >>>>>>>> *not* > >>>>>>>>>>>>>>>> persist > >>>>>>>>>>>>>>>>>>>> the > >>>>>>>>>>>>>>>>>>>>>>>>>>> assignment" Can you explain why this is not > >> needed? > >>>>>>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>>>>>>> 76. " If the consumer just failed to heartbeat > >> due > >>>>>> to a > >>>>>>>>>>>>>>>> temporary > >>>>>>>>>>>>>>>>>>>>>>>> pause, > >>>>>>>>>>>>>>>>>>>>>>>>>> it > >>>>>>>>>>>>>>>>>>>>>>>>>>> could in theory continue to fetch and > acknowledge > >>>>>>>>>> records. > >>>>>>>>>>>>>> When > >>>>>>>>>>>>>>>>>> it > >>>>>>>>>>>>>>>>>>>>>>>>>> finally > >>>>>>>>>>>>>>>>>>>>>>>>>>> sends a heartbeat and realises it’s been kicked > >> out > >>>>>> of > >>>>>>>>>> the > >>>>>>>>>>>>>>>> group, > >>>>>>>>>>>>>>>>>>>> it > >>>>>>>>>>>>>>>>>>>>>>>>>> should > >>>>>>>>>>>>>>>>>>>>>>>>>>> stop fetching records because its assignment > has > >>>> been > >>>>>>>>>>>>>> revoked, > >>>>>>>>>>>>>>>>>> and > >>>>>>>>>>>>>>>>>>>>>>>> rejoin > >>>>>>>>>>>>>>>>>>>>>>>>>>> the group." > >>>>>>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>>>>>>> A consumer with a long pause might still > deliver > >>>> some > >>>>>>>>>>>>>> buffered > >>>>>>>>>>>>>>>>>>>>>> records, > >>>>>>>>>>>>>>>>>>>>>>>>>> but > >>>>>>>>>>>>>>>>>>>>>>>>>>> if the share group coordinator has expired its > >>>>>> session, > >>>>>>>>>> it > >>>>>>>>>>>>>>>>>> wouldn't > >>>>>>>>>>>>>>>>>>>>>>>>>> accept > >>>>>>>>>>>>>>>>>>>>>>>>>>> acknowledgments for that share consumer. In > such > >> a > >>>>>>>> case, > >>>>>>>>>> is > >>>>>>>>>>>>>> any > >>>>>>>>>>>>>>>>>>>> kind > >>>>>>>>>>>>>>>>>>>>>> of > >>>>>>>>>>>>>>>>>>>>>>>>>>> error raised to the application like "hey, I > know > >>>> we > >>>>>>>> gave > >>>>>>>>>>>> you > >>>>>>>>>>>>>>>>>> these > >>>>>>>>>>>>>>>>>>>>>>>>>>> records, but really we shouldn't have" ? > >>>>>>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>>>>>>> ----- > >>>>>>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>>>>>>> Record Delivery and acknowledgement > >>>>>>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>>>>>>> 77. If we guarantee that a ShareCheckpoint is > >>>> written > >>>>>>>> at > >>>>>>>>>>>>>> least > >>>>>>>>>>>>>>>>>>>> every > >>>>>>>>>>>>>>>>>>>>>> so > >>>>>>>>>>>>>>>>>>>>>>>>>>> often, could we add a new log compactor that > >> avoids > >>>>>>>>>>>>>> compacting > >>>>>>>>>>>>>>>>>>>>>>>>>> ShareDelta-s > >>>>>>>>>>>>>>>>>>>>>>>>>>> that are still "active" (i.e., not yet > superceded > >>>> by > >>>>>> a > >>>>>>>>>> new > >>>>>>>>>>>>>>>>>>>>>>>>>>> ShareCheckpoint). Mechnically, this could be > done > >>>> by > >>>>>>>>>>>> keeping > >>>>>>>>>>>>>>>> the > >>>>>>>>>>>>>>>>>>>> LSO > >>>>>>>>>>>>>>>>>>>>>> no > >>>>>>>>>>>>>>>>>>>>>>>>>>> greater than the oldest "active" > ShareCheckpoint. > >>>>>> This > >>>>>>>>>>>> might > >>>>>>>>>>>>>>>> let > >>>>>>>>>>>>>>>>>> us > >>>>>>>>>>>>>>>>>>>>>>>>>> remove > >>>>>>>>>>>>>>>>>>>>>>>>>>> the DeltaIndex thing. > >>>>>>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>>>>>>> 78. Instead of the State in the > >>>> ShareDelta/Checkpoint > >>>>>>>>>>>>>> records, > >>>>>>>>>>>>>>>>>> how > >>>>>>>>>>>>>>>>>>>>>>>> about > >>>>>>>>>>>>>>>>>>>>>>>>>>> MessageState? (State is kind of > >>>> overloaded/ambiguous) > >>>>>>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>>>>>>> 79. One possible limitation with the current > >>>>>>>> persistence > >>>>>>>>>>>>>> model > >>>>>>>>>>>>>>>> is > >>>>>>>>>>>>>>>>>>>>>> that > >>>>>>>>>>>>>>>>>>>>>>>>>> all > >>>>>>>>>>>>>>>>>>>>>>>>>>> the share state is stored in one topic. It > seems > >>>> like > >>>>>>>> we > >>>>>>>>>>>> are > >>>>>>>>>>>>>>>>>> going > >>>>>>>>>>>>>>>>>>>> to > >>>>>>>>>>>>>>>>>>>>>>>> be > >>>>>>>>>>>>>>>>>>>>>>>>>>> storing a lot more state than we do in > >>>>>>>> __consumer_offsets > >>>>>>>>>>>>>> since > >>>>>>>>>>>>>>>>>>>> we're > >>>>>>>>>>>>>>>>>>>>>>>>>>> dealing with message-level acks. With > aggressive > >>>>>>>>>>>>>> checkpointing > >>>>>>>>>>>>>>>>>> and > >>>>>>>>>>>>>>>>>>>>>>>>>>> compaction, we can mitigate the storage > >>>> requirements, > >>>>>>>> but > >>>>>>>>>>>> the > >>>>>>>>>>>>>>>>>>>>>>>> throughput > >>>>>>>>>>>>>>>>>>>>>>>>>>> could be a limiting factor. Have we considered > >>>> other > >>>>>>>>>>>>>>>>>> possibilities > >>>>>>>>>>>>>>>>>>>>>> for > >>>>>>>>>>>>>>>>>>>>>>>>>>> persistence? > >>>>>>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>>>>>>> Cheers, > >>>>>>>>>>>>>>>>>>>>>>>>>>> David > >>>>>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>> > >>>>>>>>>>>>>> > >>>>>>>>>>>>>> > >>>>>>>>>>>> > >>>>>>>>>>>> > >>>>>>>>>> > >>>>>>>>>> > >>>>>>>> > >>>>>>>> > >>>>>> > >>>>>> > >>>> > >>>> > >> > >> > >