Hi Jun,
Thanks for your questions.

41.1. The partition leader obtains the state epoch in the response from
ReadShareGroupState. When it becomes a share-partition leader,
it reads the share-group state and one of the things it learns is the
current state epoch. Then it uses the state epoch in all subsequent
calls to WriteShareGroupState. The fencing is to prevent writes for
a previous state epoch, which are very unlikely but which would mean
that a leader was using an out-of-date epoch and was likely no longer
the current leader at all, perhaps due to a long pause for some reason.

41.2. If the group coordinator were to set the SPSO, wouldn’t it need
to discover the initial offset? I’m trying to avoid yet another inter-broker

42.1. I think I’ve confused things. When the share group offset is altered
using AdminClient.alterShareGroupOffsets, the group coordinator WILL
update the state epoch. I don’t think it needs to update the group epoch
at the same time (although it could) because the group epoch will have
been bumped when the group became empty. If the share group offset
is altered multiple times when the group remains empty, it would be
harmless if the same state epoch was reused to initialize the state.

When the share-partition leader updates the SPSO as a result of
the usual flow of record delivery, it does not update the state epoch.

42.2. The share-partition leader will notice the alteration because,
when it issues WriteShareGroupState, the response will contain the
error code FENCED_STATE_EPOCH. This is supposed to be the
last-resort way of catching this.

When the share-partition leader handles its first ShareFetch request,
it learns the state epoch from the response to ReadShareGroupState.

In normal running, the state epoch will remain constant, but, when there
are no consumers and the group is empty, it might change. As a result,
I think it would be sensible when the set of share sessions transitions
from 0 to 1, which is a reasonable proxy for the share group transitioning
from empty to non-empty, for the share-partition leader to issue
ReadShareGroupOffsetsState to validate the state epoch. If its state
epoch is out of date, it can then ReadShareGroupState to re-initialize.

I’ve changed the KIP accordingly.

47, 56. If I am to change BaseOffset to FirstOffset, we need to have
a clear view of which is the correct term. Having reviewed all of the
instances, my view is that BaseOffset should become FirstOffset in
ALL schemas defined in the KIP. Then, BaseOffset is just used in
record batches, which is already a known concept.

Please let me know if you agree.

60. I’ve added FindCoordinator to the top level index for protocol changes.

61. OK. I expect you are correct about how users will be using the
console share consumer. When I use the console consumer, I always get
a new consumer group. I have changed the default group ID for console
share consumer to “console-share-consumer” to match the console consumer
better and give more of an idea where this mysterious group has come from.

77. I will work on a proposal that does not use compaction and we can
make a judgement about whether it’s a better course for KIP-932. Personally,
until I’ve written it down and lived with the ideas for a few days, I won’t be
able to choose which I prefer.

I should be able to get the proposal written by the end of this week.

100. ShareGroupHeartbeatRequest.RebalanceTimeoutMs matches
ConsumerGroupHeartbeatRequest.RebalanceTimeoutMs from KIP-848.
I prefer to maintain the consistency.

101. Thanks for catching this. The ShareGroupHeartbeatResponse was originally
created from KIP-848. This part of the schema does not apply and I have removed
it. I have also renamed AssignedTopicPartitions to simply TopicPartitions which
aligns with the actual definition of ConsumerGroupHeartbeatResponse.

102. No, I don’t think we do. Removed.

103. I’ve changed the description for the error codes for ShareFetchResponse.

104. Interesting. I have added ErrorMessages to these RPCs as you suggest.
It’s a good improvement for problem determination.

105. The values are reserved in my brain. Actually, 1 is Acquired which
is not persisted, and I have another non-terminal state in mind for 3.

106. A few people have raised the question of whether OffsetAndMetadata
is sensible in this KIP, given that the optional Metadata part comes from when
a regular consumer commits offsets. However, it is correct that there will
never be metadata with a share group. I have changed
the KIP to replace OffsetAndMetadata with Long.

107. Yes, you are right. I have learnt during this process that a version bump
can be a logical not just a physical change to the schema. KIP updated.

108. I would prefer not to extend this RPC for all of the states at this point.
I think there is definitely scope for another KIP focused on administration
of share groups that might want this information so someone could build a
UI and other tools on top. Doing that well is quite a lot of work in its own 
so I would prefer not to do that now.

109.Yes, they’re inconsistent and follow the consumer groups equivalents
which are also inconsistent. In general, the KafkaAdmin.delete…. Methods
use the Map<XXX, KafkaFuture<YYY>> pattern like DeleteShareGroupsResult.

Would you prefer that I do that, or remain consistent with consumer groups?
Happy to change it.

110. Again, consumer groups don’t yield that level of detail about epochs.
The MemberDescription does include the assignment, but not the list of
subscribed topic names.

111. I didn’t include GroupState in GroupListing because there’s no
single class that includes the states of all group types.

112. I think it’s good practice for the API to have ListShareGroupOffsetSpec.
It makes evolution and extension of the API much easier. Also, it matches
the precedent set by ListConsumerGroupOffsetSpec.

113. ListConsumerGroupsResults.errors() is the same. I think you just have
to look in the exception details and the same pattern is being followed here.

Over the next few days, I have committed to writing a proposal for how to 
share-group state that doesn’t use log compaction.

I am also aware that discussion with Justine Olshan on read-committed
isolation level is not yet quite complete.

Thanks for the detailed review.


> Hi, Andrew,
> Thanks for the reply. A few more comments.
> 41.
> 41.1 How does the partition leader obtain the group epoch to set
> WriteShareGroupStateRequest.StateEpoch?
> 41.2 What's the benefit of having the group coordinator initialize the
> state and the partition leader set the SPSO? It seems simpler to have the
> partition leader initialize both the state and the SPSO together?
> 42.
> 42.1 "I don’t think the group epoch needs to be bumped when the share group
> offset is altered."
> But the part on handling Alter share group offsets says "The share
> coordinator writes a ShareCheckpoint record with the new state epoch to the
> __share_group_state  topic." So, which is correct? We have two paths to
> update the state in the share coordinator, one from the group coordinator
> and another from the partition leader. I thought the benefit of bumping up
> the epoch is to fence off a late request in the previous epoch from another
> path.
> 42.2 When the group coordinator alters the share group offset in share
> coordinator, how does the partition leader know the share group state has
> been altered so that it could clear its in-memory state?
> 47. 56. BaseOffset typically refers to the base offset for the batch and
> can be confusing. FirstOffset is clearer and matches LastOffset.
> 60. Could we include FindCoordinatorRequest in the top level index for
> Kafka protocol changes?
> 61. I think it's probably ok to add time-based expiration later. But using
> a default group in console-share-consumer probably won't help reduce the
> garbage. In the common case, the user of the console consumer likely wants
> to see the recently produced records for verification. If the default group
> doesn't provide that (because of the stale state), the user will likely
> just use a new group. It's true that we don't garbage collect idle topics.
> However,  the share groups are similar to consumers, which does support
> garbage collection. Typically, we read topics more than creating them.
> 77. If the generic compaction is inconvenient, we could use customized
> logic. If we go with that route, option (b) seems cleaner and more
> optimized. Since the share states for all groups fit in memory, we could
> generate snapshots more efficiently than going through compaction. Having a
> separate log per share partition is probably too much overhead. It's more
> efficient to put the state changes for multiple share partitions in a
> single log.
> 100. ShareGroupHeartbeatRequest.RebalanceTimeoutMs: Should we name it
> SessionTimeoutMs?
> 101. ShareGroupHeartbeatResponse.Assignment.Error: What kind of error could
> we have when assigning partitions? What are the corresponding error codes?
> 102. Do we still need
> ShareGroupDescribeResponse.Members.Assignment.{MetadataVersion,MetadataBytes}?
> 103. Could we list the error codes separately for
> ShareFetchResponse.Responses.Partitions.ErrorCode and
> ShareFetchResponse.Responses.Partitions.AcknowledgeErrorCode?
> 104. Should we add error message for the errorCode in ShareFetchResponse,
> ShareAcknowledgeResponse, ReadShareGroupStateResponse,
> WriteShareGroupStateResponse, DeleteShareGroupStateResponse,
> ReadShareGroupOffsetsStateResponse and InitializeShareGroupStateResponse?
> 105. "about": "The state - 0:Available,2:Acked,4:Archived.": What about 1
> and 3? Are we leaving them out intentionally?
> 106. Do we have usage of metadata in OffsetAndMetadata? If not, could we
> remove it from AdminClient and KafkaShareConsumer?
> 107. ListGroupsRequest: Should we bump up the version since it now supports
> a new group type "share"?
> 108. AdminClient.listShareGroupOffsets: Should it expose all the states
> from ReadShareGroupStateResponse, instead of just SPSO?
> 109. DeleteShareGroupOffsetsResult exposes
>  public KafkaFuture<Void> partitionResult(final TopicPartition partition)
> DeleteShareGroupsResult exposes
>  public Map<String, KafkaFuture<Void>> deletedGroups()
> Should we make them more consistent?
> 110. Should ShareGroupDescription include fields like GroupEpoch,
> AssignmentEpoch, MemberEpoch, and SubscribedTopicNames?
> 111. Should GroupListing include GroupState?
> 112. Do we need ListShareGroupOffsetsSpec? Could we just use
> Set<TopicPartition> directly?
> 113. ListShareGroupsResult.errors(): How do we know which group has an
> error?
> Jun
>> Hi David,
>> Thanks for your questions.
>> 70. The Group Coordinator communicates with the Share Coordinator over
>> RPCs.
>> In the general case, it’s an inter-broker call. It is probably possible to
>> optimise
>> for the situation in which the appropriate GC and SC shards are
>> co-located, but the
>> KIP does not delve that deep into potential performance optimisations.
>> 71. Avoiding collisions would be a good idea, but I do worry about
>> retrospectively
>> introducing a naming convention for groups. I feel that naming conventions
>> will
>> typically be the responsibility of the cluster administrators based on
>> organizational
>> factors, such as the name of an application.
>> 72. Personally, I don’t like INVALID_GROUP_ID because the group ID is
>> correct but
>> the group is the wrong type. The nearest existing error code that gets
>> that across
>> is INCONSISTENT_GROUP_PROTOCOL. Perhaps this is really showing that a new
>> error code would be better.
>> 73. The Metadata fields are not used. I have removed them.
>> 74. The metadata is re-evaluated on every change, but only a subset is
>> relevant
>> for rebalancing. A check is done against the names of the subscribed
>> topics to
>> see if any relevant changes may have occurred. Then the changes which
>> trigger
>> a rebalance are topic creation, deletion, change in partitions, or rack
>> IDs for the
>> replicas. I have updated the KIP to make this more clear.
>> 75. The assignment is not persisted because it is much less important that
>> the
>> assignment survives a GC change. There’s no need to transfer partitions
>> safely from
>> member to member in the way that is required for consumer groups, so as an
>> optimisation, the assignments for a share group are not persisted. It
>> wouldn’t do any
>> harm, but it just seems unnecessary.
>> 76. In the event that a consumer tries to acknowledge a record that it now
>> longer
>> has the right to acknowledge, the INVALID_RECORD_STATE error code is used.
>> If the application uses the KafkaShareConsumer.commitSync method, it will
>> see an InvalidRecordState exception returned. Alternatively, the
>> application can
>> register an acknowledgement commit callback which will be called with the
>> status
>> of the acknowledgements that have succeeded or failed.
>> 77. I have tried to tread a careful path with the durable share-partition
>> state in this
>> KIP. The significant choices I made are that:
>> * Topics are used so that the state is replicated between brokers.
>> * Log compaction is used to keep a lid on the storage.
>> * Only one topic is required.
>> Log compaction as it stands is not ideal for this kind of data, as
>> evidenced by
>> the DeltaIndex technique I employed.
>> I can think of a few relatively simple ways to improve upon it.
>> a) We could use a customised version of the log compactor for this topic
>> that
>> understands the rules for ShareCheckpoint and ShareDelta records.
>> Essentially,
>> for each share-partition, the latest ShareCheckpoint and any subsequent
>> ShareDelta
>> records must not be cleaned. Anything else can be cleaned. We could then
>> be sure
>> that multiple ShareDelta records with the same key would survive cleaning
>> and we could
>> abandon the DeltaIndex technique.
>> b) Actually what is required is a log per share-partition. Let’s imagine
>> that we had
>> a share-state topic per topic being consumed in a share group, with the
>> same number
>> of partitions as the topic being consumed. We could write many more deltas
>> between
>> checkpoints, and just take periodic checkpoints to keep control of the
>> storage used.
>> Once a checkpoint has been taken, we could use KafkaAdmin.deleteRecords()
>> to
>> prune all of the older records.
>> The share-state topics would be more numerous, but we are talking one per
>> topic
>> per share group that it’s being consumed in. These topics would not be
>> compacted.
>> As you’ll see in the KIP, the Persister interface is intended to be
>> pluggable one day.
>> I know the scheme in the KIP is not ideal. It seems likely to me that
>> future KIPs will
>> improve upon it.
>> If I can get buy-in for option (b), I’m happy to change this KIP. While
>> option (a) is
>> probably workable, it does seem a bit of a hack to have a customised log
>> compactor
>> just for this topic.
>> 78. How about DeliveryState? I agree that State is overloaded.
>> 79. See (77).
>> Thanks,
>> Andrew
>>> Andrew, thanks for the KIP! This is a pretty exciting effort.
>>> I've finally made it through the KIP, still trying to grok the whole
>> thing.
>>> Sorry if some of my questions are basic :)
>>> Concepts:
>>> 70. Does the Group Coordinator communicate with the Share Coordinator
>> over
>>> RPC or directly in-process?
>>> 71. For preventing name collisions with regular consumer groups, could we
>>> define a reserved share group prefix? E.g., the operator defines "sg_"
>> as a
>>> prefix for share groups only, and if a regular consumer group tries to
>> use
>>> that name it fails.
>>> 72. When a consumer tries to use a share group, or a share consumer tries
>>> to use a regular group, would INVALID_GROUP_ID make more sense
>>> --------
>>> Share Group Membership:
>>> 73. What goes in the Metadata field for TargetAssignment#Member and
>>> Assignment?
>>> 74. Under Trigger a rebalance, it says we rebalance when the partition
>>> metadata changes. Would this be for any change, or just certain ones? For
>>> example, if a follower drops out of the ISR and comes back, we probably
>>> don't need to rebalance.
>>> 75. "For a share group, the group coordinator does *not* persist the
>>> assignment" Can you explain why this is not needed?
>>> 76. " If the consumer just failed to heartbeat due to a temporary pause,
>> it
>>> could in theory continue to fetch and acknowledge records. When it
>> finally
>>> sends a heartbeat and realises it’s been kicked out of the group, it
>> should
>>> stop fetching records because its assignment has been revoked, and rejoin
>>> the group."
>>> A consumer with a long pause might still deliver some buffered records,
>> but
>>> if the share group coordinator has expired its session, it wouldn't
>> accept
>>> acknowledgments for that share consumer. In such a case, is any kind of
>>> error raised to the application like "hey, I know we gave you these
>>> records, but really we shouldn't have" ?
>>> -----
>>> Record Delivery and acknowledgement
>>> 77. If we guarantee that a ShareCheckpoint is written at least every so
>>> often, could we add a new log compactor that avoids compacting
>> ShareDelta-s
>>> that are still "active" (i.e., not yet superceded by a new
>>> ShareCheckpoint). Mechnically, this could be done by keeping the LSO no
>>> greater than the oldest "active" ShareCheckpoint. This might let us
>> remove
>>> the DeltaIndex thing.
>>> 78. Instead of the State in the ShareDelta/Checkpoint records, how about
>>> MessageState? (State is kind of overloaded/ambiguous)
>>> 79. One possible limitation with the current persistence model is that
>> all
>>> the share state is stored in one topic. It seems like we are going to be
>>> storing a lot more state than we do in __consumer_offsets since we're
>>> dealing with message-level acks. With aggressive checkpointing and
>>> compaction, we can mitigate the storage requirements, but the throughput
>>> could be a limiting factor. Have we considered other possibilities for
>>> persistence?
>>> Cheers,
>>> David

