Hi, 77. I’ve updated the KIP to use log retention rather than log compaction. The basic ideas of what to persist are unchanged. It makes a few changes:
* It changes the record names: ShareCheckpoint -> ShareSnapshot and ShareDelta -> ShareUpdate. They’re equivalent, but renaming makes it simple to check I did an atomic change to the new proposal. * It uses log retention and explicit pruning of elderly records using ReplicaManager.deleteRecords * It gets rid of the nasty DeltaIndex scheme because we don’t need to worry about the log compactor and key uniqueness. I have also changed the ambiguous “State” to “DeliveryState” in RPCs and records. And I added a clarification about how the “group.type” configuration should be used. Thanks, Andrew > On 10 Apr 2024, at 15:33, Andrew Schofield <andrew_schofield_j...@live.com> > wrote: > > Hi Jun, > Thanks for your questions. > > 41. > 41.1. The partition leader obtains the state epoch in the response from > ReadShareGroupState. When it becomes a share-partition leader, > it reads the share-group state and one of the things it learns is the > current state epoch. Then it uses the state epoch in all subsequent > calls to WriteShareGroupState. The fencing is to prevent writes for > a previous state epoch, which are very unlikely but which would mean > that a leader was using an out-of-date epoch and was likely no longer > the current leader at all, perhaps due to a long pause for some reason. > > 41.2. If the group coordinator were to set the SPSO, wouldn’t it need > to discover the initial offset? I’m trying to avoid yet another inter-broker > hop. > > 42. > 42.1. I think I’ve confused things. When the share group offset is altered > using AdminClient.alterShareGroupOffsets, the group coordinator WILL > update the state epoch. I don’t think it needs to update the group epoch > at the same time (although it could) because the group epoch will have > been bumped when the group became empty. If the share group offset > is altered multiple times when the group remains empty, it would be > harmless if the same state epoch was reused to initialize the state. > > When the share-partition leader updates the SPSO as a result of > the usual flow of record delivery, it does not update the state epoch. > > 42.2. The share-partition leader will notice the alteration because, > when it issues WriteShareGroupState, the response will contain the > error code FENCED_STATE_EPOCH. This is supposed to be the > last-resort way of catching this. > > When the share-partition leader handles its first ShareFetch request, > it learns the state epoch from the response to ReadShareGroupState. > > In normal running, the state epoch will remain constant, but, when there > are no consumers and the group is empty, it might change. As a result, > I think it would be sensible when the set of share sessions transitions > from 0 to 1, which is a reasonable proxy for the share group transitioning > from empty to non-empty, for the share-partition leader to issue > ReadShareGroupOffsetsState to validate the state epoch. If its state > epoch is out of date, it can then ReadShareGroupState to re-initialize. > > I’ve changed the KIP accordingly. > > 47, 56. If I am to change BaseOffset to FirstOffset, we need to have > a clear view of which is the correct term. Having reviewed all of the > instances, my view is that BaseOffset should become FirstOffset in > ALL schemas defined in the KIP. Then, BaseOffset is just used in > record batches, which is already a known concept. > > Please let me know if you agree. > > 60. I’ve added FindCoordinator to the top level index for protocol changes. > > 61. OK. I expect you are correct about how users will be using the > console share consumer. When I use the console consumer, I always get > a new consumer group. I have changed the default group ID for console > share consumer to “console-share-consumer” to match the console consumer > better and give more of an idea where this mysterious group has come from. > > 77. I will work on a proposal that does not use compaction and we can > make a judgement about whether it’s a better course for KIP-932. Personally, > until I’ve written it down and lived with the ideas for a few days, I won’t be > able to choose which I prefer. > > I should be able to get the proposal written by the end of this week. > > 100. ShareGroupHeartbeatRequest.RebalanceTimeoutMs matches > ConsumerGroupHeartbeatRequest.RebalanceTimeoutMs from KIP-848. > I prefer to maintain the consistency. > > 101. Thanks for catching this. The ShareGroupHeartbeatResponse was originally > created from KIP-848. This part of the schema does not apply and I have > removed > it. I have also renamed AssignedTopicPartitions to simply TopicPartitions > which > aligns with the actual definition of ConsumerGroupHeartbeatResponse. > > 102. No, I don’t think we do. Removed. > > 103. I’ve changed the description for the error codes for ShareFetchResponse. > > 104. Interesting. I have added ErrorMessages to these RPCs as you suggest. > It’s a good improvement for problem determination. > > 105. The values are reserved in my brain. Actually, 1 is Acquired which > is not persisted, and I have another non-terminal state in mind for 3. > > 106. A few people have raised the question of whether OffsetAndMetadata > is sensible in this KIP, given that the optional Metadata part comes from when > a regular consumer commits offsets. However, it is correct that there will > never be metadata with a share group. I have changed > the KIP to replace OffsetAndMetadata with Long. > > 107. Yes, you are right. I have learnt during this process that a version bump > can be a logical not just a physical change to the schema. KIP updated. > > 108. I would prefer not to extend this RPC for all of the states at this > point. > I think there is definitely scope for another KIP focused on administration > of share groups that might want this information so someone could build a > UI and other tools on top. Doing that well is quite a lot of work in its own > right > so I would prefer not to do that now. > > 109.Yes, they’re inconsistent and follow the consumer groups equivalents > which are also inconsistent. In general, the KafkaAdmin.delete…. Methods > use the Map<XXX, KafkaFuture<YYY>> pattern like DeleteShareGroupsResult. > > Would you prefer that I do that, or remain consistent with consumer groups? > Happy to change it. > > 110. Again, consumer groups don’t yield that level of detail about epochs. > The MemberDescription does include the assignment, but not the list of > subscribed topic names. > > 111. I didn’t include GroupState in GroupListing because there’s no > single class that includes the states of all group types. > > 112. I think it’s good practice for the API to have ListShareGroupOffsetSpec. > It makes evolution and extension of the API much easier. Also, it matches > the precedent set by ListConsumerGroupOffsetSpec. > > 113. ListConsumerGroupsResults.errors() is the same. I think you just have > to look in the exception details and the same pattern is being followed here. > > > Over the next few days, I have committed to writing a proposal for how to > persist > share-group state that doesn’t use log compaction. > > I am also aware that discussion with Justine Olshan on read-committed > isolation level is not yet quite complete. > > Thanks for the detailed review. > > Andrew > >> On 9 Apr 2024, at 23:58, Jun Rao <j...@confluent.io.INVALID> wrote: >> >> Hi, Andrew, >> >> Thanks for the reply. A few more comments. >> >> 41. >> 41.1 How does the partition leader obtain the group epoch to set >> WriteShareGroupStateRequest.StateEpoch? >> 41.2 What's the benefit of having the group coordinator initialize the >> state and the partition leader set the SPSO? It seems simpler to have the >> partition leader initialize both the state and the SPSO together? >> >> 42. >> 42.1 "I don’t think the group epoch needs to be bumped when the share group >> offset is altered." >> But the part on handling Alter share group offsets says "The share >> coordinator writes a ShareCheckpoint record with the new state epoch to the >> __share_group_state topic." So, which is correct? We have two paths to >> update the state in the share coordinator, one from the group coordinator >> and another from the partition leader. I thought the benefit of bumping up >> the epoch is to fence off a late request in the previous epoch from another >> path. >> 42.2 When the group coordinator alters the share group offset in share >> coordinator, how does the partition leader know the share group state has >> been altered so that it could clear its in-memory state? >> >> 47. 56. BaseOffset typically refers to the base offset for the batch and >> can be confusing. FirstOffset is clearer and matches LastOffset. >> >> 60. Could we include FindCoordinatorRequest in the top level index for >> Kafka protocol changes? >> >> 61. I think it's probably ok to add time-based expiration later. But using >> a default group in console-share-consumer probably won't help reduce the >> garbage. In the common case, the user of the console consumer likely wants >> to see the recently produced records for verification. If the default group >> doesn't provide that (because of the stale state), the user will likely >> just use a new group. It's true that we don't garbage collect idle topics. >> However, the share groups are similar to consumers, which does support >> garbage collection. Typically, we read topics more than creating them. >> >> 77. If the generic compaction is inconvenient, we could use customized >> logic. If we go with that route, option (b) seems cleaner and more >> optimized. Since the share states for all groups fit in memory, we could >> generate snapshots more efficiently than going through compaction. Having a >> separate log per share partition is probably too much overhead. It's more >> efficient to put the state changes for multiple share partitions in a >> single log. >> >> 100. ShareGroupHeartbeatRequest.RebalanceTimeoutMs: Should we name it >> SessionTimeoutMs? >> >> 101. ShareGroupHeartbeatResponse.Assignment.Error: What kind of error could >> we have when assigning partitions? What are the corresponding error codes? >> >> 102. Do we still need >> ShareGroupDescribeResponse.Members.Assignment.{MetadataVersion,MetadataBytes}? >> >> 103. Could we list the error codes separately for >> ShareFetchResponse.Responses.Partitions.ErrorCode and >> ShareFetchResponse.Responses.Partitions.AcknowledgeErrorCode? >> >> 104. Should we add error message for the errorCode in ShareFetchResponse, >> ShareAcknowledgeResponse, ReadShareGroupStateResponse, >> WriteShareGroupStateResponse, DeleteShareGroupStateResponse, >> ReadShareGroupOffsetsStateResponse and InitializeShareGroupStateResponse? >> >> 105. "about": "The state - 0:Available,2:Acked,4:Archived.": What about 1 >> and 3? Are we leaving them out intentionally? >> >> 106. Do we have usage of metadata in OffsetAndMetadata? If not, could we >> remove it from AdminClient and KafkaShareConsumer? >> >> 107. ListGroupsRequest: Should we bump up the version since it now supports >> a new group type "share"? >> >> 108. AdminClient.listShareGroupOffsets: Should it expose all the states >> from ReadShareGroupStateResponse, instead of just SPSO? >> >> 109. DeleteShareGroupOffsetsResult exposes >> public KafkaFuture<Void> partitionResult(final TopicPartition partition) >> DeleteShareGroupsResult exposes >> public Map<String, KafkaFuture<Void>> deletedGroups() >> Should we make them more consistent? >> >> 110. Should ShareGroupDescription include fields like GroupEpoch, >> AssignmentEpoch, MemberEpoch, and SubscribedTopicNames? >> >> 111. Should GroupListing include GroupState? >> >> 112. Do we need ListShareGroupOffsetsSpec? Could we just use >> Set<TopicPartition> directly? >> >> 113. ListShareGroupsResult.errors(): How do we know which group has an >> error? >> >> Jun >> >> On Mon, Apr 8, 2024 at 9:32 AM Andrew Schofield < >> andrew_schofield_j...@outlook.com> wrote: >> >>> Hi David, >>> Thanks for your questions. >>> >>> 70. The Group Coordinator communicates with the Share Coordinator over >>> RPCs. >>> In the general case, it’s an inter-broker call. It is probably possible to >>> optimise >>> for the situation in which the appropriate GC and SC shards are >>> co-located, but the >>> KIP does not delve that deep into potential performance optimisations. >>> >>> 71. Avoiding collisions would be a good idea, but I do worry about >>> retrospectively >>> introducing a naming convention for groups. I feel that naming conventions >>> will >>> typically be the responsibility of the cluster administrators based on >>> organizational >>> factors, such as the name of an application. >>> >>> 72. Personally, I don’t like INVALID_GROUP_ID because the group ID is >>> correct but >>> the group is the wrong type. The nearest existing error code that gets >>> that across >>> is INCONSISTENT_GROUP_PROTOCOL. Perhaps this is really showing that a new >>> error code would be better. >>> >>> 73. The Metadata fields are not used. I have removed them. >>> >>> 74. The metadata is re-evaluated on every change, but only a subset is >>> relevant >>> for rebalancing. A check is done against the names of the subscribed >>> topics to >>> see if any relevant changes may have occurred. Then the changes which >>> trigger >>> a rebalance are topic creation, deletion, change in partitions, or rack >>> IDs for the >>> replicas. I have updated the KIP to make this more clear. >>> >>> 75. The assignment is not persisted because it is much less important that >>> the >>> assignment survives a GC change. There’s no need to transfer partitions >>> safely from >>> member to member in the way that is required for consumer groups, so as an >>> optimisation, the assignments for a share group are not persisted. It >>> wouldn’t do any >>> harm, but it just seems unnecessary. >>> >>> 76. In the event that a consumer tries to acknowledge a record that it now >>> longer >>> has the right to acknowledge, the INVALID_RECORD_STATE error code is used. >>> >>> If the application uses the KafkaShareConsumer.commitSync method, it will >>> see an InvalidRecordState exception returned. Alternatively, the >>> application can >>> register an acknowledgement commit callback which will be called with the >>> status >>> of the acknowledgements that have succeeded or failed. >>> >>> 77. I have tried to tread a careful path with the durable share-partition >>> state in this >>> KIP. The significant choices I made are that: >>> * Topics are used so that the state is replicated between brokers. >>> * Log compaction is used to keep a lid on the storage. >>> * Only one topic is required. >>> >>> Log compaction as it stands is not ideal for this kind of data, as >>> evidenced by >>> the DeltaIndex technique I employed. >>> >>> I can think of a few relatively simple ways to improve upon it. >>> >>> a) We could use a customised version of the log compactor for this topic >>> that >>> understands the rules for ShareCheckpoint and ShareDelta records. >>> Essentially, >>> for each share-partition, the latest ShareCheckpoint and any subsequent >>> ShareDelta >>> records must not be cleaned. Anything else can be cleaned. We could then >>> be sure >>> that multiple ShareDelta records with the same key would survive cleaning >>> and we could >>> abandon the DeltaIndex technique. >>> >>> b) Actually what is required is a log per share-partition. Let’s imagine >>> that we had >>> a share-state topic per topic being consumed in a share group, with the >>> same number >>> of partitions as the topic being consumed. We could write many more deltas >>> between >>> checkpoints, and just take periodic checkpoints to keep control of the >>> storage used. >>> Once a checkpoint has been taken, we could use KafkaAdmin.deleteRecords() >>> to >>> prune all of the older records. >>> >>> The share-state topics would be more numerous, but we are talking one per >>> topic >>> per share group that it’s being consumed in. These topics would not be >>> compacted. >>> >>> As you’ll see in the KIP, the Persister interface is intended to be >>> pluggable one day. >>> I know the scheme in the KIP is not ideal. It seems likely to me that >>> future KIPs will >>> improve upon it. >>> >>> If I can get buy-in for option (b), I’m happy to change this KIP. While >>> option (a) is >>> probably workable, it does seem a bit of a hack to have a customised log >>> compactor >>> just for this topic. >>> >>> 78. How about DeliveryState? I agree that State is overloaded. >>> >>> 79. See (77). >>> >>> Thanks, >>> Andrew >>> >>> >>>> On 5 Apr 2024, at 05:07, David Arthur <david.art...@confluent.io.INVALID> >>> wrote: >>>> >>>> Andrew, thanks for the KIP! This is a pretty exciting effort. >>>> >>>> I've finally made it through the KIP, still trying to grok the whole >>> thing. >>>> Sorry if some of my questions are basic :) >>>> >>>> >>>> Concepts: >>>> >>>> 70. Does the Group Coordinator communicate with the Share Coordinator >>> over >>>> RPC or directly in-process? >>>> >>>> 71. For preventing name collisions with regular consumer groups, could we >>>> define a reserved share group prefix? E.g., the operator defines "sg_" >>> as a >>>> prefix for share groups only, and if a regular consumer group tries to >>> use >>>> that name it fails. >>>> >>>> 72. When a consumer tries to use a share group, or a share consumer tries >>>> to use a regular group, would INVALID_GROUP_ID make more sense >>>> than INCONSISTENT_GROUP_PROTOCOL? >>>> >>>> -------- >>>> >>>> Share Group Membership: >>>> >>>> 73. What goes in the Metadata field for TargetAssignment#Member and >>>> Assignment? >>>> >>>> 74. Under Trigger a rebalance, it says we rebalance when the partition >>>> metadata changes. Would this be for any change, or just certain ones? For >>>> example, if a follower drops out of the ISR and comes back, we probably >>>> don't need to rebalance. >>>> >>>> 75. "For a share group, the group coordinator does *not* persist the >>>> assignment" Can you explain why this is not needed? >>>> >>>> 76. " If the consumer just failed to heartbeat due to a temporary pause, >>> it >>>> could in theory continue to fetch and acknowledge records. When it >>> finally >>>> sends a heartbeat and realises it’s been kicked out of the group, it >>> should >>>> stop fetching records because its assignment has been revoked, and rejoin >>>> the group." >>>> >>>> A consumer with a long pause might still deliver some buffered records, >>> but >>>> if the share group coordinator has expired its session, it wouldn't >>> accept >>>> acknowledgments for that share consumer. In such a case, is any kind of >>>> error raised to the application like "hey, I know we gave you these >>>> records, but really we shouldn't have" ? >>>> >>>> >>>> ----- >>>> >>>> Record Delivery and acknowledgement >>>> >>>> 77. If we guarantee that a ShareCheckpoint is written at least every so >>>> often, could we add a new log compactor that avoids compacting >>> ShareDelta-s >>>> that are still "active" (i.e., not yet superceded by a new >>>> ShareCheckpoint). Mechnically, this could be done by keeping the LSO no >>>> greater than the oldest "active" ShareCheckpoint. This might let us >>> remove >>>> the DeltaIndex thing. >>>> >>>> 78. Instead of the State in the ShareDelta/Checkpoint records, how about >>>> MessageState? (State is kind of overloaded/ambiguous) >>>> >>>> 79. One possible limitation with the current persistence model is that >>> all >>>> the share state is stored in one topic. It seems like we are going to be >>>> storing a lot more state than we do in __consumer_offsets since we're >>>> dealing with message-level acks. With aggressive checkpointing and >>>> compaction, we can mitigate the storage requirements, but the throughput >>>> could be a limiting factor. Have we considered other possibilities for >>>> persistence? >>>> >>>> >>>> Cheers, >>>> David >>> >>> >