Hi Jun,
Thanks for your reply.

147. Yes, I see what you mean. The rebalance latency will indeed
be very short by comparison. I have removed the rebalance latency
metrics from the client and retained the rebalance count and rate.

150. Yes, I think so. I have tweaked the text so that the simple
assignor will take into account existing assignment information when
it has it, which would just minimise unnecessary churn of (b).

158. I’ve changed it to ReadShareGroupStateSummary.

Thanks,
Andrew


> On 3 May 2024, at 22:17, Jun Rao <j...@confluent.io.INVALID> wrote:
> 
> Hi, Andrew,
> 
> Thanks for the reply.
> 
> 147. There seems to be some difference between consumer groups and share
> groups. In the consumer groups, if a client receives a heartbeat response
> to revoke some partitions, it may have to commit offsets before revoking
> partitions or it may have to call the rebalance callbacks provided by the
> user. This may take some time and can be reflected in the rebalance time
> metric. In the share groups, none of that exists. If a client receives some
> added/revoked partitions, it accepts them immediately, right? So, does that
> practically make the rebalance time always 0?
> 
> 150. I guess in the common case, there will be many more members than
> partitions. So the need for (b) will be less common. We can probably leave
> the persisting of the assignment out for now.
> 
> 158. The new name sounds good to me.
> 
> Jun
> 
> On Thu, May 2, 2024 at 10:21 PM Andrew Schofield <andrew_schofi...@live.com>
> wrote:
> 
>> Hi Jun,
>> Thanks for the response.
>> 
>> 147. I am trying to get a correspondence between the concepts and
>> metrics of consumer groups and share groups. In both cases,
>> the client doesn’t strictly know when the rebalance starts. All it knows
>> is when it has work to do in order to perform its part of a rebalance.
>> I am proposing that share groups and consumer groups use
>> equivalent logic.
>> 
>> I could remove the rebalance metrics from the client because I do
>> understand that they are making a judgement about when a rebalance
>> starts, but it’s their own part of the rebalance they are measuring.
>> 
>> I tend to think these metrics are better than no metrics and
>> will at least enable administrators to see how much rebalance
>> activity the members of share groups are experiencing.
>> 
>> 150. The simple assignor does not take existing assignments into
>> consideration. The ShareGroupPartitionAssignor interface would
>> permit this, but the simple assignor does not currently use it.
>> 
>> The simple assignor assigns partitions in two ways:
>> a) Distribute the members across the partitions by hashed member ID.
>> b) If any partitions have no members assigned, distribute the members
>> across these partitions round-robin.
>> 
>> The (a) partitions will be quite stable. The (b) partitions will be less
>> stable. By using existing assignment information, it could make (b)
>> partition assignment more stable, whether the assignments are
>> persisted or not. Perhaps it would be worth changing the simple
>> assignor in order to make (b) more stable.
>> 
>> I envisage more sophisticated assignors in the future which could use
>> existing assignments and also other dynamic factors such as lag.
>> 
>> If it transpires that there is significant benefit in persisting
>> assignments
>> specifically to help smooth assignment in the event of GC change,
>> it would be quite an easy enhancement. I am not inclined to persist
>> the assignments in this KIP.
>> 
>> 158. Ah, yes. I see. Of course, I want the names as consistent and
>> understandable too. I suggest renaming
>> ReadShareGroupOffsetsState to ReadShareGroupStateSummary.
>> I haven’t changed the KIP yet, so let me know if that’s OK.
>> 
>> Thanks,
>> Andrew
>> 
>>> On 2 May 2024, at 22:18, Jun Rao <j...@confluent.io.INVALID> wrote:
>>> 
>>> Hi, Andrew,
>>> 
>>> Thanks for the reply.
>>> 
>>> 147. " it makes a judgement about whether an assignment received is equal
>>> to what it already is using."
>>> If a client receives an assignment different from what it has, it
>> indicates
>>> the end of the rebalance. But how does the client know when the rebalance
>>> starts? In the shareHeartbeat design, the new group epoch is propagated
>>> together with the new assignment in the response.
>>> 
>>> 150. It could be a potential concern if each GC change forces significant
>>> assignment changes. Does the default assignor take existing assignments
>>> into consideration?
>>> 
>>> 155. Good point. Sounds good.
>>> 
>>> 158. My concern with the current naming is that it's not clear what the
>>> difference is between ReadShareGroupOffsetsState and ReadShareGroupState.
>>> The state in the latter is also offsets.
>>> 
>>> Jun
>>> 
>>> On Wed, May 1, 2024 at 9:51 PM Andrew Schofield <
>> andrew_schofi...@live.com>
>>> wrote:
>>> 
>>>> Hi Jun,
>>>> Thanks for your reply.
>>>> 
>>>> 147. Perhaps the easiest is to take a look at the code in
>>>> o.a.k.clients.consumer.internal.MembershipManagerImpl.
>>>> This class is part of the new consumer group protocol
>>>> code in the client. It makes state transitions based on
>>>> the heartbeat requests and responses, and it makes a
>>>> judgement about whether an assignment received is
>>>> equal to what it already is using. When a state transition
>>>> is deemed to be the beginning or end of a rebalance
>>>> from the point of view of this client, it counts towards the
>>>> rebalance metrics.
>>>> 
>>>> Share groups will follow the same path.
>>>> 
>>>> 150. I do not consider it a concern. Rebalancing a share group
>>>> is less disruptive than rebalancing a consumer group. If the assignor
>>>> Has information about existing assignments, it can use it. It is
>>>> true that this information cannot be replayed from a topic and will
>>>> sometimes be unknown as a result.
>>>> 
>>>> 151. I don’t want to rename TopicPartitionsMetadata to
>>>> simply TopicPartitions (it’s information about the partitions of
>>>> a topic) because we then have an array of plurals.
>>>> I’ve renamed Metadata to Info. That’s a bit less cumbersome.
>>>> 
>>>> 152. Fixed.
>>>> 
>>>> 153. It’s the GC. Fixed.
>>>> 
>>>> 154. The UNKNOWN “state” is essentially a default for situations where
>>>> the code cannot understand data it received. For example, let’s say that
>>>> Kafka 4.0 has groups with states EMPTY, STABLE, DEAD. If Kafka 4.1
>>>> introduced another state THINKING, a tool built with Kafka 4.0 would not
>>>> know what THINKING meant. It will use “UNKNOWN” to indicate that the
>>>> state was something that it could not understand.
>>>> 
>>>> 155. No, it’s a the level of the share-partition. If the offsets for
>> just
>>>> one share-partition is reset, only the state epoch for that partition is
>>>> updated.
>>>> 
>>>> 156. Strictly speaking, it’s redundant. I think having the StartOffset
>>>> separate gives helpful clarity and I prefer to retain it.
>>>> 
>>>> 157. Yes, you are right. There’s no reason why a leader change needs
>>>> to force a ShareSnapshot. I’ve added leaderEpoch to the ShareUpdate.
>>>> 
>>>> 158. Although ReadShareGroupOffsetsState is a bit of a mouthful,
>>>> having “State” in the name makes it clear that this one the family of
>>>> inter-broker RPCs served by the share coordinator. The admin RPCs
>>>> such as DescribeShareGroupOffsets do not include “State”.
>>>> 
>>>> 159. Fixed.
>>>> 
>>>> 160. Fixed.
>>>> 
>>>> Thanks,
>>>> Andrew
>>>> 
>>>>> On 2 May 2024, at 00:29, Jun Rao <j...@confluent.io.INVALID> wrote:
>>>>> 
>>>>> Hi, Andrew,
>>>>> 
>>>>> Thanks for the reply.
>>>>> 
>>>>> 147. "The measurement is certainly from the point of view of the
>> client,
>>>>> but it’s driven by sending and receiving heartbeats rather than whether
>>>> the
>>>>> client triggered the rebalance itself."
>>>>> Hmm, how does a client know which heartbeat response starts a
>> rebalance?
>>>>> 
>>>>> 150. PartitionAssignor takes existing assignments into consideration.
>>>> Since
>>>>> GC doesn't persist the assignment for share groups, it means that
>>>>> ShareGroupPartitionAssignor can't reliably depend on existing
>>>> assignments.
>>>>> Is that a concern?
>>>>> 
>>>>> 151. ShareGroupPartitionMetadataValue: Should we rename
>>>>> TopicPartitionsMetadata and TopicMetadata since there is no metadata?
>>>>> 
>>>>> 152. ShareGroupMetadataKey: "versions": "3"
>>>>> The versions should be 11.
>>>>> 
>>>>> 153. ShareGroupDescription.coordinator(): The description says "The
>> share
>>>>> group coordinator". Is that the GC or SC?
>>>>> 
>>>>> 154. "A share group has only three states - EMPTY , STABLE and DEAD".
>>>>> What about UNKNOWN?
>>>>> 
>>>>> 155. WriteShareGroupState: StateEpoch is at the group level, not
>>>> partition
>>>>> level, right?
>>>>> 
>>>>> 156. ShareSnapshotValue: Is StartOffset redundant since it's the same
>> as
>>>>> the smallest FirstOffset in StateBatches?
>>>>> 
>>>>> 157. Every leader change forces a ShareSnapshotValue write to persist
>> the
>>>>> new leader epoch. Is that a concern? An alternative is to include
>>>>> leaderEpoch in ShareUpdateValue.
>>>>> 
>>>>> 158. ReadShareGroupOffsetsState: The state is the offsets. Should we
>>>> rename
>>>>> it to something like ReadShareGroupStartOffset?
>>>>> 
>>>>> 159. members are assigned members round-robin => members are assigned
>>>>> round-robin
>>>>> 
>>>>> 160. "may called": typo
>>>>> 
>>>>> Jun
>>>>> 
>>>>> On Mon, Apr 29, 2024 at 10:11 AM Andrew Schofield <
>>>> andrew_schofi...@live.com>
>>>>> wrote:
>>>>> 
>>>>>> Hi Jun,
>>>>>> Thanks for the reply and sorry for the delay in responding.
>>>>>> 
>>>>>> 123. Yes, I didn’t quite get your point earlier. The member
>>>>>> epoch is bumped by the GC when it sends a new assignment.
>>>>>> When the member sends its next heartbeat, it echoes back
>>>>>> the member epoch, which will confirm the receipt of the
>>>>>> assignment. It would send the same member epoch even
>>>>>> after recovery of a network disconnection, so that should
>>>>>> be sufficient to cope with this eventuality.
>>>>>> 
>>>>>> 125. Yes, I have added it to the table which now matches
>>>>>> the text earlier in the KIP. Thanks.
>>>>>> 
>>>>>> 140. Yes, I have added it to the table which now matches
>>>>>> the text earlier in the KIP. I’ve also added more detail for
>>>>>> the case where the entire share group is being deleted.
>>>>>> 
>>>>>> 141. Yes! Sorry for confusing things.
>>>>>> 
>>>>>> Back to the original question for this point. To delete a share
>>>>>> group, should the GC write a tombstone for each
>>>>>> ShareGroupMemberMetadata record?
>>>>>> 
>>>>>> Tombstones are necessary to delete ShareGroupMemberMetadata
>>>>>> records. But, deletion of a share group is only possible when
>>>>>> the group is already empty, so the tombstones will have
>>>>>> been written as a result of the members leaving the group.
>>>>>> 
>>>>>> 143. Yes, that’s right.
>>>>>> 
>>>>>> 147. The measurement is certainly from the point of view
>>>>>> of the client, but it’s driven by sending and receiving heartbeats
>>>>>> rather than whether the client triggered the rebalance itself.
>>>>>> The client decides when it enters and leaves reconciliation
>>>>>> of the assignment, and measures this period.
>>>>>> 
>>>>>> 
>>>>>> Thanks,
>>>>>> Andrew
>>>>>> 
>>>>>> 
>>>>>>> On 26 Apr 2024, at 09:43, Jun Rao <j...@confluent.io.INVALID> wrote:
>>>>>>> 
>>>>>>> Hi, Andrew,
>>>>>>> 
>>>>>>> Thanks for the reply.
>>>>>>> 
>>>>>>> 123. "Rather than add the group epoch to the ShareGroupHeartbeat, I
>>>> have
>>>>>>> decided to go for TopicPartitions in ShareGroupHeartbeatRequest which
>>>>>>> mirrors ConsumerGroupHeartbeatRequest."
>>>>>>> ShareGroupHeartbeat.MemberEpoch is the group epoch, right? Is that
>>>> enough
>>>>>>> for confirming the receipt of the new assignment?
>>>>>>> 
>>>>>>> 125. This also means that "Alter share group offsets" needs to write
>> a
>>>>>>> ShareGroupPartitionMetadata record, if the partition is not already
>>>>>>> initialized.
>>>>>>> 
>>>>>>> 140. In the table for "Delete share group offsets", we need to add a
>>>> step
>>>>>>> to write a ShareGroupPartitionMetadata record with DeletingTopics.
>>>>>>> 
>>>>>>> 141. Hmm, ShareGroupMemberMetadata is stored in the
>> __consumer_offsets
>>>>>>> topic, which is a compacted topic, right?
>>>>>>> 
>>>>>>> 143. So, the client sends DescribeShareGroupOffsets requests to GC,
>>>> which
>>>>>>> then forwards it to SC?
>>>>>>> 
>>>>>>> 147. I guess a client only knows the rebalance triggered by itself,
>> but
>>>>>> not
>>>>>>> the ones triggered by other members or topic/partition changes?
>>>>>>> 
>>>>>>> Jun
>>>>>>> 
>>>>>>> On Thu, Apr 25, 2024 at 4:19 AM Andrew Schofield <
>>>>>> andrew_schofi...@live.com>
>>>>>>> wrote:
>>>>>>> 
>>>>>>>> Hi Jun,
>>>>>>>> Thanks for the response.
>>>>>>>> 
>>>>>>>> 123. Of course, ShareGroupHearbeat started off as
>>>> ConsumerGroupHeartbeat
>>>>>>>> and then unnecessary fields were removed. In the network issue case,
>>>>>>>> there is not currently enough state being exchanged to be sure an
>>>>>>>> assignment
>>>>>>>> was received.
>>>>>>>> 
>>>>>>>> Rather than add the group epoch to the ShareGroupHeartbeat, I have
>>>>>> decided
>>>>>>>> to go for TopicPartitions in ShareGroupHeartbeatRequest which
>> mirrors
>>>>>>>> ConsumerGroupHeartbeatRequest. It means the share group member does
>>>>>>>> confirm the assignment it is using, and that can be used by the GC
>> to
>>>>>>>> safely
>>>>>>>> stop repeating the assignment in heartbeat responses.
>>>>>>>> 
>>>>>>>> 125. Ah, yes. This is indeed something possible with a consumer
>> group
>>>>>>>> and share groups should support it too. This does of course imply
>> that
>>>>>>>> ShareGroupPartitionMetadataValue needs an array of partitions, not
>>>>>>>> just the number.
>>>>>>>> 
>>>>>>>> 140. Yes, good spot. There is an inconsistency here in consumer
>> groups
>>>>>>>> where you can use AdminClient.deleteConsumerGroupOffsets at the
>>>>>>>> partition level, but kafka-consumer-groups.sh --delete only operates
>>>>>>>> at the topic level.
>>>>>>>> 
>>>>>>>> Personally, I don’t think it’s sensible to delete offsets at the
>>>>>> partition
>>>>>>>> level only. You can reset them, but if you’re actively using a topic
>>>>>> with
>>>>>>>> a share group, I don’t see why you’d want to delete offsets rather
>>>> than
>>>>>>>> reset. If you’ve finished using a topic with a share group and want
>> to
>>>>>>>> clean
>>>>>>>> up, use delete.
>>>>>>>> 
>>>>>>>> So, I’ve changed the AdminClient.deleteConsumerGroupOffsets to be
>>>>>>>> topic-based and the RPCs behind it.
>>>>>>>> 
>>>>>>>> The GC reconciles the cluster state with the
>>>> ShareGroupPartitionMetadata
>>>>>>>> to spot deletion of topics and the like. However, when the offsets
>> for
>>>>>>>> a topic were deleted manually, the topic very like still exists so
>>>>>>>> reconciliation
>>>>>>>> alone is not going to be able to continue an interrupted operation
>>>> that
>>>>>>>> has started. So, I’ve added DeletingTopics back into
>>>>>>>> ShareGroupPartitionMetadata for this purpose. It’s so failover of a
>> GC
>>>>>>>> can continue where it left off rather than leaving fragments across
>>>> the
>>>>>>>> SCs.
>>>>>>>> 
>>>>>>>> 141. That is not required. Because this is not a compacted topic, it
>>>> is
>>>>>>>> not necessary to write tombstones for every key. As long as there
>> is a
>>>>>>>> clear and unambiguous record for the deletion of the group, that is
>>>>>> enough.
>>>>>>>> The tombstone for ShareGroupPartitionMetadata is theoretically not
>>>>>>>> required but it’s a single record, rather than one per member, so I
>>>>>> prefer
>>>>>>>> to leave it as a record that the interactions with the SC have been
>>>>>>>> completed.
>>>>>>>> 
>>>>>>>> 142.
>>>>>>>> 142.1. It will prompt the user to confirm they want to continue.
>>>>>>>> This is in common with `kafka-consumer-groups.sh` which historically
>>>>>>>> has defaulted to --dry-run behaviour, but is due to change to
>>>> prompting
>>>>>>>> if neither --dry-run nor --execute is specified “in a future major
>>>>>>>> release”.
>>>>>>>> 
>>>>>>>> 142.2. It should support partition-level reset, but only topic-level
>>>>>>>> delete.
>>>>>>>> I have updated the usage text accordingly. This is in common with
>>>>>>>> kafka-consumer-groups.sh.
>>>>>>>> 
>>>>>>>> 142.3. --dry-run displays the operation that would be executed.
>>>>>>>> 
>>>>>>>> 142.4. The valid values are: Dead, Empty, Stable. Added to the
>>>>>>>> usage text.
>>>>>>>> 
>>>>>>>> 143. DescribeShareGroupOffsets is served by the group coordinator
>>>>>>>> for this kind of reason.
>>>>>>>> 
>>>>>>>> 144. That’s the default. If you haven’t asked to release or reject,
>> it
>>>>>>>> accepts.
>>>>>>>> This is analogous to fetching and committing offsets in a consumer
>>>>>> group.
>>>>>>>> 
>>>>>>>> 145. I think this is a good idea, but I would prefer to defer it
>>>> until a
>>>>>>>> future
>>>>>>>> metrics KIP that I have planned. In KIP-932, I have added basic
>>>> metrics
>>>>>>>> only.
>>>>>>>> For example, you’ll see that there’s no concept of lag yet, which
>>>> surely
>>>>>>>> will have relevance for share groups. I plan to create and deliver
>> the
>>>>>>>> metrics KIP before share groups are declared ready for production.
>>>>>>>> I want the new metrics to be developed with the experience of
>> running
>>>>>>>> the code.
>>>>>>>> 
>>>>>>>> 146. Milliseconds. KIP updated.
>>>>>>>> 
>>>>>>>> 147. There is a membership state machine in the client that
>>>>>>>> changes states as the ShareGroupHeartbeat requests and responses
>>>>>>>> flow. The duration of a rebalance will be shorter from the point of
>>>>>>>> view of the share-group consumer because it doesn’t have to worry
>>>> about
>>>>>>>> rebalance callbacks and committing offsets as the partitions move
>>>>>>>> around, but the overall flow is very similar. So, it’s the state
>>>>>>>> transitions
>>>>>>>> that drive the collection of the rebalance metrics.
>>>>>>>> 
>>>>>>>> 148. Strangely, none of the existing uses of records-per-request-avg
>>>>>>>> actually have records-per-request-max. I tend to err on the side of
>>>>>>>> consistency, but can’t think of any reason not to add this. Done.
>>>>>>>> 
>>>>>>>> 149. There are several error codes for WriteShareGroupStateResponse:
>>>>>>>> 
>>>>>>>> NOT_COORDINATOR - This is not the share coordinator you’re looking
>>>> for.
>>>>>>>> COORDINATOR_NOT_AVAILABLE - The SC can’t
>>>>>>>> COORDINATOR_LOAD_IN_PROGRESS - The SC is replaying the topic.
>>>>>>>> GROUP_ID_NOT_FOUND - The SC doesn’t have state for this group.
>>>>>>>> UNKNOWN_TOPIC_OR_PARTITION - The SC doesn’t have state for this
>>>>>>>> topic-partition.
>>>>>>>> FENCED_STATE_EPOCH - The write has the wrong state epoch.
>>>>>>>> INVALID_REQUEST - There was a problem with the request.
>>>>>>>> 
>>>>>>>> 
>>>>>>>> Thanks,
>>>>>>>> Andrew
>>>>>>>> 
>>>>>>>> 
>>>>>>>>> On 24 Apr 2024, at 19:10, Jun Rao <j...@confluent.io.INVALID>
>> wrote:
>>>>>>>>> 
>>>>>>>>> Hi, Andrew,
>>>>>>>>> 
>>>>>>>>> Thanks for the response.
>>>>>>>>> 
>>>>>>>>> 123. I thought the network issue can be covered with the group
>> epoch.
>>>>>>>>> Basically, if the assignment is to be changed, GC bumps up the
>> epoch
>>>>>>>> first,
>>>>>>>>> but doesn't expose the new epoch to members until the assignment is
>>>>>>>>> complete (including initializing the sharePartitionState). Once the
>>>>>>>>> assignment is complete, GC includes the bumped up epoch and the new
>>>>>>>>> assignment in the heartbeatResponse. If the next heartbeat Request
>>>>>>>> includes
>>>>>>>>> the new epoch, it means that the member has received the new
>>>> assignment
>>>>>>>> and
>>>>>>>>> GC can exclude the assignment in the heartbeatResponse.
>>>>>>>>> 
>>>>>>>>> 125. "If AlterShareGroupOffsets is called for a topic-partition
>> which
>>>>>> is
>>>>>>>>> not yet in ShareGroupPartitionMetadataValue, it’s not really part
>> of
>>>>>> the
>>>>>>>>> group yet. So I think it’s permitted to fail the
>>>> AlterShareGroupOffsets
>>>>>>>>> because the topic-partition would not be part of
>>>> ListShareGroupOffsets
>>>>>>>>> result."
>>>>>>>>> A user may want to initialize SPSO (e.g. based on timestamp) before
>>>> the
>>>>>>>>> application is first used. If we reject AlterShareGroupOffsets when
>>>>>>>>> ShareGroupPartitionMetadataValue is empty, the user will be forced
>> to
>>>>>>>> start
>>>>>>>>> the application with the wrong SPSO first and then reset it, which
>>>> will
>>>>>>>> be
>>>>>>>>> inconvenient.
>>>>>>>>> 
>>>>>>>>> 140. AdminClient.deleteShareGroupOffsets allows deletion of SPSO
>> for
>>>>>>>>> individual partitions, but ShareGroupPartitionMetadataValue only
>>>> tracks
>>>>>>>>> consecutive partitions.
>>>>>>>>> 
>>>>>>>>> 141. Delete share group: Should GC also write a tombstone for each
>>>>>>>>> ShareGroupMemberMetadata record?
>>>>>>>>> 
>>>>>>>>> 142. kafka-share-groups.sh
>>>>>>>>> 142.1 What happens if neither --dry-run nor --execute is specified
>>>> for
>>>>>>>>> reset-offsets?
>>>>>>>>> 142.2 Should it support --delete-offsets and --reset-offsets at the
>>>>>>>>> partition level to match the AdminClient api?
>>>>>>>>> 142.3 How useful is --dry-run? The corresponding RPCs don't carry
>> the
>>>>>>>>> dry-run flag.
>>>>>>>>> 142.4 --state [String]: What are the valid state values?
>>>>>>>>> 
>>>>>>>>> 143. DescribeShareGroupOffsets is served by the share-partition
>>>> leader.
>>>>>>>> How
>>>>>>>>> could a user describe the share group offset when there is no
>> member?
>>>>>>>>> 
>>>>>>>>> 144. kafka-console-share-consumer.sh: Is there an option to accept
>>>>>>>> consumed
>>>>>>>>> messages?
>>>>>>>>> 
>>>>>>>>> 145. It would be useful to add a broker side metric that measures
>> the
>>>>>>>>> pressure from group.share.partition.max.record.locks, e.g. the
>>>> fraction
>>>>>>>> of
>>>>>>>>> the time that SPL is blocked because
>>>>>>>> group.share.partition.max.record.locks
>>>>>>>>> is reached.
>>>>>>>>> 
>>>>>>>>> 146. heartbeat-response-time-max: What's the unit?
>>>>>>>>> 
>>>>>>>>> 147. rebalance related metrics: How does a share consumer know when
>>>>>> there
>>>>>>>>> is a rebalance and how does it measure the rebalance time?
>>>>>>>>> 
>>>>>>>>> 148. records-per-request-avg: Should we pair it with
>>>>>>>>> records-per-request-max?
>>>>>>>>> 
>>>>>>>>> 149. If the shareGroupState is not available, what error code is
>> used
>>>>>> in
>>>>>>>>> WriteShareGroupStateResponse?
>>>>>>>>> 
>>>>>>>>> Jun
>>>>>>>>> 
>>>>>>>>> On Wed, Apr 24, 2024 at 7:12 AM Andrew Schofield <
>>>>>>>> andrew_schofi...@live.com>
>>>>>>>>> wrote:
>>>>>>>>> 
>>>>>>>>>> Hi Jun,
>>>>>>>>>> Thanks for your reply.
>>>>>>>>>> 
>>>>>>>>>> 123. The GC only sends the assignment in ShareGroupHeartbeat
>>>>>>>>>> response if the member has just joined, or the assignment has been
>>>>>>>>>> recalculated. This latter condition is met when the GC fails over.
>>>>>>>>>> 
>>>>>>>>>> With a consumer group, it works a little differently. The
>>>> heartbeating
>>>>>>>>>> occurs asynchronously with the reconciliation process in the
>> client.
>>>>>>>>>> The client has reconciliation callbacks, as well as offsets to
>>>> commit
>>>>>>>>>> before it can confirm that revocation has occured. So, it might
>> take
>>>>>>>>>> multiple heartbeats to complete the installation of a new target
>>>>>>>>>> assignment in the client.
>>>>>>>>>> 
>>>>>>>>>> With a share group, it’s more brutal. The GC sends the assignment
>>>>>>>>>> in a heartbeat response, and the client is assumed to have acted
>> on
>>>>>>>>>> It immediately. Since share group assignment is really about
>>>> balancing
>>>>>>>>>> consumers across the available partitions, rather than safely
>>>> handing
>>>>>>>>>> ownership of partitions between consumers, there is less
>>>> coordination.
>>>>>>>>>> 
>>>>>>>>>> I wonder whether there is one situation that does need
>> considering.
>>>>>>>>>> If the network connection between the client and the GC is lost,
>> it
>>>> is
>>>>>>>>>> possible that a response containing the assignment is lost. Then,
>>>>>>>>>> the connection will be reestablished, and the assignment will only
>>>> be
>>>>>>>>>> sent when it’s recalculated. Adding the equivalent of
>>>>>>>>>> ConsumerGroupHeartbeatRequest.TopicPartitions would be one
>>>>>>>>>> way to close that.
>>>>>>>>>> 
>>>>>>>>>> 125. Yes, you are right. The reconciliation that I describe in the
>>>>>>>>>> answer to (54) below is related. If AlterShareGroupOffsets is
>> called
>>>>>>>>>> for a topic-partition which is not yet in
>>>>>>>> ShareGroupPartitionMetadataValue,
>>>>>>>>>> it’s not really part of the group yet. So I think it’s permitted
>> to
>>>>>> fail
>>>>>>>>>> the AlterShareGroupOffsets because the topic-partition would not
>> be
>>>>>>>>>> part of ListShareGroupOffsets result.
>>>>>>>>>> 
>>>>>>>>>> If AlterShareGroupOffsets is called for a topic-partition which is
>>>> in
>>>>>>>>>> ShareGroupPartitionMetadataValue, then just calling
>>>>>>>>>> InitializeShareGroupState to set its offset would be sufficient
>>>>>> without
>>>>>>>>>> writing ShareGroupPartitionMetadata again.
>>>>>>>>>> 
>>>>>>>>>> 138. Added.
>>>>>>>>>> 
>>>>>>>>>> 139. This week, I have been experimenting with using the existing
>>>>>>>>>> consumer metrics with share consumer code. That was my plan to get
>>>>>>>>>> started with metrics. While they work to some extent, I am not
>>>>>> entirely
>>>>>>>>>> happy with the result.
>>>>>>>>>> 
>>>>>>>>>> I have added a basic set of share consumer-specific metrics to
>>>> KIP-932
>>>>>>>>>> which I think is a step in the right direction. I anticipate a
>>>> future
>>>>>>>> KIP
>>>>>>>>>> that defines many more metrics and tackles concepts such as what
>>>>>>>>>> “lag” means for a share group. The metrics I’ve included are
>> simply
>>>>>>>>>> counting and measuring the operations, which is a good start.
>>>>>>>>>> 
>>>>>>>>>> 54. I think either with or without
>> InitializingTopics/DeletingTopics
>>>>>> in
>>>>>>>>>> ShareGroupPartitionMetadataValue works. However, I think there is
>>>>>>>>>> a deeper point behind what you said. The GC needs to be responsive
>>>>>>>>>> to topic creation and deletion, and addition of partitions in
>> cases
>>>>>>>> where
>>>>>>>>>> it doesn’t agree with InitializingTopics/DeletingTopics. So, we
>> are
>>>>>>>>>> probably not saving anything by having those fields. As a result,
>> I
>>>>>> have
>>>>>>>>>> removed InitializingTopics and DeletingTopics and updated the KIP
>>>>>>>>>> accordingly. The GC needs to reconcile its view of the initialised
>>>>>>>>>> topic-partitions with ShareGroupPartitionMetadataValue and it will
>>>>>>>>>> initialize or delete share-group state accordingly.
>>>>>>>>>> 
>>>>>>>>>> 
>>>>>>>>>> I have made one more change to the KIP. The SharePartitionAssignor
>>>>>>>>>> interface has been renamed to
>>>>>>>>>> o.a.k.coordinator.group.assignor.ShareGroupPartitionAssignor and
>> it
>>>>>>>>>> now extends the PartitionAssignor interface. It’s essentially a
>>>> marker
>>>>>>>>>> of which partition assignors can be used with share groups.
>>>>>>>>>> 
>>>>>>>>>> Thanks,
>>>>>>>>>> Andrew
>>>>>>>>>> 
>>>>>>>>>>> On 23 Apr 2024, at 18:27, Jun Rao <j...@confluent.io.INVALID>
>>>> wrote:
>>>>>>>>>>> 
>>>>>>>>>>> Hi, Andrew,
>>>>>>>>>>> 
>>>>>>>>>>> Thanks for the reply.
>>>>>>>>>>> 
>>>>>>>>>>> 123. "it doesn’t need to confirm the assignment back to the GC."
>>>>>>>>>>> Hmm, I thought the member needs to confirm the assignment to GC
>> to
>>>>>>>>>>> avoid GC including the assignment in the heartbeat response
>>>>>>>>>> continuously. I
>>>>>>>>>>> assume this is done by including the new group epoch in the
>>>> heartbeat
>>>>>>>>>>> response.
>>>>>>>>>>> 
>>>>>>>>>>> 125. It's possible that the share partition has never been
>>>>>> initialized
>>>>>>>>>> when
>>>>>>>>>>> AlterShareGroupOffsets is called. If GC doesn't write
>>>>>>>>>>> ShareGroupPartitionMetadata and the GC fails over, it would
>>>>>>>> reinitialize
>>>>>>>>>>> the share partition and lose the effect of
>> AlterShareGroupOffsets.
>>>> If
>>>>>>>> the
>>>>>>>>>>> partition has already been initialized and it's recorded
>>>>>>>>>>> in ShareGroupPartitionMetadata, it's possible not to write
>>>>>>>>>>> ShareGroupPartitionMetadata again when handling
>>>>>> AlterShareGroupOffsets.
>>>>>>>>>>> 
>>>>>>>>>>> 138. Could we add the flow in GC when a topic is deleted?
>>>>>>>>>>> 
>>>>>>>>>>> 139. Do we need to add any metrics in KafkaShareConsumer?
>>>>>>>>>>> 
>>>>>>>>>>> 54. "I don’t think there is any redundancy. The
>>>>>>>> ShareGroupMemberMetadata
>>>>>>>>>>> does include a list of subscribed topics. However, if there is a
>>>>>> period
>>>>>>>>>> of
>>>>>>>>>>> time in which no members are subscribed to a particular topic, it
>>>>>> does
>>>>>>>>>> not
>>>>>>>>>>> mean that the topic’s ShareGroupState should be immediately
>>>> removed,
>>>>>>>> but
>>>>>>>>>> it
>>>>>>>>>>> does mean that there will be no ShareGroupMemberMetadata records
>>>>>>>>>> containing
>>>>>>>>>>> that topic."
>>>>>>>>>>> I am still trying to understand the value of InitializingTopics
>> and
>>>>>>>>>>> DeletingTopics in ShareGroupPartitionMetadataValue. They are used
>>>> to
>>>>>>>>>>> remember the intention of an operation. However, GC still needs
>> to
>>>>>>>> handle
>>>>>>>>>>> the case when the intention is not safely recorded. If GC wants
>> to
>>>>>>>>>>> initialize a new topic/partition, a simpler approach is for it to
>>>>>> send
>>>>>>>> an
>>>>>>>>>>> InitializeShareGroupState to the share coordinator and after
>>>>>> receiving
>>>>>>>> a
>>>>>>>>>>> success response, write ShareGroupPartitionMetadataValue with the
>>>>>>>>>>> initialized partition included in InitializedTopics. This saves a
>>>>>>>> record
>>>>>>>>>>> write. It's possible for GC to fail in between the two steps. On
>>>>>>>>>> failover,
>>>>>>>>>>> the new GC just repeats the process. The case that you mentioned
>>>>>> above
>>>>>>>>>> can
>>>>>>>>>>> still be achieved. If a partition is in InitializedTopics of
>>>>>>>>>>> ShareGroupPartitionMetadataValue, but no member subscribes to it,
>>>> we
>>>>>>>> can
>>>>>>>>>>> still keep the ShareGroupState as long as the topic still exists.
>>>> The
>>>>>>>>>> same
>>>>>>>>>>> optimization could be applied to DeletingTopics too.
>>>>>>>>>>> 
>>>>>>>>>>> Jun
>>>>>>>>>>> 
>>>>>>>>>>> 
>>>>>>>>>>> On Tue, Apr 23, 2024 at 3:57 AM Andrew Schofield <
>>>>>>>>>> andrew_schofi...@live.com>
>>>>>>>>>>> wrote:
>>>>>>>>>>> 
>>>>>>>>>>>> Hi Jun,
>>>>>>>>>>>> Thanks for the reply.
>>>>>>>>>>>> 
>>>>>>>>>>>> 123. Every time the GC fails over, it needs to recompute the
>>>>>>>> assignment
>>>>>>>>>>>> for every member. However, the impact of re-assignment is not
>> that
>>>>>>>>>> onerous.
>>>>>>>>>>>> If the recomputed assignments are the same, which they may well
>>>> be,
>>>>>>>>>> there
>>>>>>>>>>>> is no impact on the members at all.
>>>>>>>>>>>> 
>>>>>>>>>>>> On receiving the new assignment, the member adjusts the
>>>>>>>> topic-partitions
>>>>>>>>>>>> in its share sessions, removing those which were revoked and
>>>> adding
>>>>>>>>>> those
>>>>>>>>>>>> which were assigned. It is able to acknowledge the records it
>>>>>> fetched
>>>>>>>>>> from
>>>>>>>>>>>> the partitions which have just been revoked, and it doesn’t need
>>>> to
>>>>>>>>>> confirm
>>>>>>>>>>>> the assignment back to the GC.
>>>>>>>>>>>> 
>>>>>>>>>>>> 125. I don’t think the GC needs to write
>>>> ShareGroupPartitionMetadata
>>>>>>>>>>>> when processing AlterShareGroupOffsets. This is because the
>>>>>> operation
>>>>>>>>>>>> happens as a result of an explicit administrative action and it
>> is
>>>>>>>>>> possible
>>>>>>>>>>>> to return a specific error code for each topic-partition. The
>>>> cases
>>>>>>>>>> where
>>>>>>>>>>>> ShareGroupPartitionMetadata is used are when a topic is added or
>>>>>>>> removed
>>>>>>>>>>>> from the subscribed topics, or the number of partitions changes.
>>>>>>>>>>>> 
>>>>>>>>>>>> 130. I suppose that limits the minimum lock timeout for a
>> cluster
>>>> to
>>>>>>>>>>>> prevent
>>>>>>>>>>>> a group from having an excessively low value. Config added.
>>>>>>>>>>>> 
>>>>>>>>>>>> 131. I have changed it to
>> group.share.partition.max.record.locks.
>>>>>>>>>>>> 
>>>>>>>>>>>> 136.  When GC failover occurs, the GC gaining ownership of a
>>>>>> partition
>>>>>>>>>> of
>>>>>>>>>>>> the __consumer_offsets topic replays the records to build its
>>>> state.
>>>>>>>>>>>> In the case of a share group, it learns:
>>>>>>>>>>>> 
>>>>>>>>>>>> * The share group and its group epoch (ShareGroupMetadata)
>>>>>>>>>>>> * The list of members (ShareGroupMemberMetadata)
>>>>>>>>>>>> * The list of share-partitions (ShareGroupPartitionMetadata)
>>>>>>>>>>>> 
>>>>>>>>>>>> It will recompute the assignments in order to respond to
>>>>>>>>>>>> ShareGroupHeartbeat requests. As a result, it bumps the group
>>>> epoch.
>>>>>>>>>>>> 
>>>>>>>>>>>> I will update the KIP accordingly to confirm the behaviour.
>>>>>>>>>>>> 
>>>>>>>>>>>> 137.1: The GC and the SPL report the metrics in the
>>>>>>>>>>>> group-coordinator-metrics
>>>>>>>>>>>> group. Unlike consumer groups in which the GC performs offset
>>>>>> commit,
>>>>>>>>>>>> the share group equivalent is performed by the SPL. So, I’ve
>>>> grouped
>>>>>>>> the
>>>>>>>>>>>> concepts which relate to the group in group-coordinator-metrics.
>>>>>>>>>>>> 
>>>>>>>>>>>> The SC reports the metrics in the share-coordinator-metrics
>> group.
>>>>>>>>>>>> 
>>>>>>>>>>>> 137.2: There is one metric in both groups - partition-load-time.
>>>> In
>>>>>>>> the
>>>>>>>>>> SC
>>>>>>>>>>>> group,
>>>>>>>>>>>> it refers to the time loading data from the share-group state
>>>> topic
>>>>>> so
>>>>>>>>>> that
>>>>>>>>>>>> a ReadShareGroupState request can be answered. In the GC group,
>>>>>>>>>>>> it refers to the time to read the state from the persister.
>> Apart
>>>>>> from
>>>>>>>>>> the
>>>>>>>>>>>> interbroker RPC latency of the read, they’re likely to be very
>>>>>> close.
>>>>>>>>>>>> 
>>>>>>>>>>>> Later, for a cluster which is using a custom persister, the
>>>>>>>>>>>> share-coordinator
>>>>>>>>>>>> metrics would likely not be reported, and the persister would
>> have
>>>>>> its
>>>>>>>>>> own
>>>>>>>>>>>> metrics.
>>>>>>>>>>>> 
>>>>>>>>>>>> 137.3: Correct. Fixed.
>>>>>>>>>>>> 
>>>>>>>>>>>> 137.4: Yes, it does include the time to write to the internal
>>>> topic.
>>>>>>>>>>>> I’ve tweaked the description.
>>>>>>>>>>>> 
>>>>>>>>>>>> Thanks,
>>>>>>>>>>>> Andrew
>>>>>>>>>>>> 
>>>>>>>>>>>>> On 22 Apr 2024, at 20:04, Jun Rao <j...@confluent.io.INVALID>
>>>>>> wrote:
>>>>>>>>>>>>> 
>>>>>>>>>>>>> Hi, Andrew,
>>>>>>>>>>>>> 
>>>>>>>>>>>>> Thanks for the reply.
>>>>>>>>>>>>> 
>>>>>>>>>>>>> 123. "The share group does not persist the target assignment."
>>>>>>>>>>>>> What's the impact of this? Everytime that GC fails over, it
>> needs
>>>>>> to
>>>>>>>>>>>>> recompute the assignment for every member. Do we expect the
>>>> member
>>>>>>>>>>>>> assignment to change on every GC failover?
>>>>>>>>>>>>> 
>>>>>>>>>>>>> 125. Should the GC also write ShareGroupPartitionMetadata?
>>>>>>>>>>>>> 
>>>>>>>>>>>>> 127. So, group epoch is only propagated to SC when
>>>>>>>>>>>>> InitializeShareGroupState request is sent. This sounds good.
>>>>>>>>>>>>> 
>>>>>>>>>>>>> 130. Should we have a group.share.min.record.lock.duration.ms
>> to
>>>>>>>> pair
>>>>>>>>>>>> with
>>>>>>>>>>>>> group.share.max.record.lock.duration.ms?
>>>>>>>>>>>>> 
>>>>>>>>>>>>> 131. Sounds good. The name
>>>> group.share.record.lock.partition.limit
>>>>>>>>>>>> doesn't
>>>>>>>>>>>>> seem very intuitive. How about something
>>>>>>>>>>>>> like group.share.partition.max.records.pending.ack?
>>>>>>>>>>>>> 
>>>>>>>>>>>>> 136. Could we describe the process of GC failover? I guess it
>>>> needs
>>>>>>>> to
>>>>>>>>>>>>> compute member reassignment and check if there is any new
>>>>>>>>>> topic/partition
>>>>>>>>>>>>> matching the share subscription. Does it bump up the group
>> epoch?
>>>>>>>>>>>>> 
>>>>>>>>>>>>> 137. Metrics:
>>>>>>>>>>>>> 137.1 It would be useful to document who reports each metric.
>> Is
>>>> it
>>>>>>>> any
>>>>>>>>>>>>> broker, GC, SC or SPL?
>>>>>>>>>>>>> 137.2 partition-load-time: Is that the loading time at SPL or
>> SC?
>>>>>>>>>>>>> 137.3 "The time taken in milliseconds to load the share-group
>>>> state
>>>>>>>>>> from
>>>>>>>>>>>>> the share-group state partitions loaded in the last 30
>> seconds."
>>>>>>>>>>>>> The window depends on metrics.num.samples and
>>>>>>>> metrics.sample.window.ms
>>>>>>>>>>>>> and is not always 30 seconds, right?
>>>>>>>>>>>>> 137.4 Could you explain write/write-latency a bit more? Does it
>>>>>>>> include
>>>>>>>>>>>> the
>>>>>>>>>>>>> time to write to the internal topic?
>>>>>>>>>>>>> 
>>>>>>>>>>>>> Jun
>>>>>>>>>>>>> 
>>>>>>>>>>>>> On Mon, Apr 22, 2024 at 2:57 AM Andrew Schofield <
>>>>>>>>>>>> andrew_schofi...@live.com>
>>>>>>>>>>>>> wrote:
>>>>>>>>>>>>> 
>>>>>>>>>>>>>> Hi Jun,
>>>>>>>>>>>>>> Thanks for your comments.
>>>>>>>>>>>>>> 
>>>>>>>>>>>>>> 120. Thanks. Fixed.
>>>>>>>>>>>>>> 
>>>>>>>>>>>>>> 121. ShareUpdateValue.SnapshotEpoch indicates which snapshot
>>>>>>>>>>>>>> the update applies to. It should of course be the snapshot
>> that
>>>>>>>>>> precedes
>>>>>>>>>>>>>> it in the log. It’s just there to provide a consistency check.
>>>>>>>>>>>>>> 
>>>>>>>>>>>>>> I also noticed that ShareSnapshotValue was missing StateEpoch.
>>>> It
>>>>>>>>>>>>>> isn’t any more.
>>>>>>>>>>>>>> 
>>>>>>>>>>>>>> 122. In KIP-848, ConsumerGroupMemberMetadataValue includes
>>>>>>>>>>>>>> GroupEpoch, but in the code it does not. In fact, there is
>>>>>>>>>> considerable
>>>>>>>>>>>>>> divergence between the KIP and the code for this record value
>>>>>> schema
>>>>>>>>>>>>>> which I expect will be resolved when the migration code has
>> been
>>>>>>>>>>>>>> completed.
>>>>>>>>>>>>>> 
>>>>>>>>>>>>>> 123. The share group does not persist the target assignment.
>>>>>>>>>>>>>> 
>>>>>>>>>>>>>> 124. Share groups have three kinds of record:
>>>>>>>>>>>>>> i) ShareGroupMetadata
>>>>>>>>>>>>>> - this contains the group epoch and is written whenever the
>>>> group
>>>>>>>>>>>>>> epoch changes.
>>>>>>>>>>>>>> ii) ShareGroupMemberMetadata
>>>>>>>>>>>>>> - this does not contain the group epoch.
>>>>>>>>>>>>>> iii) ShareGroupPartitionMetadata
>>>>>>>>>>>>>> - this currently contains the epoch, but I think that is
>>>>>>>> unnecessary.
>>>>>>>>>>>>>> For one thing, the ConsumerGroupPartitionMetadata definition
>>>>>>>>>>>>>> contains the group epoch, but the value appears never to be
>> set.
>>>>>>>>>>>>>> David Jacot confirms that it’s not necessary and is removing
>> it.
>>>>>>>>>>>>>> 
>>>>>>>>>>>>>> I have removed the Epoch from ShareGroupPartitionMetadata.
>>>>>>>>>>>>>> The only purpose of the persisting the epoch for a share group
>>>> is
>>>>>> so
>>>>>>>>>>>> that
>>>>>>>>>>>>>> when a group coordinator takes over the share group, it is
>> able
>>>> to
>>>>>>>>>>>>>> continue the sequence of epochs. ShareGroupMetadataValue.Epoch
>>>>>>>>>>>>>> is used for this.
>>>>>>>>>>>>>> 
>>>>>>>>>>>>>> 125. The group epoch will be incremented in this case and
>>>>>>>>>>>>>> consequently a ShareGroupMetadata will be written. KIP
>> updated.
>>>>>>>>>>>>>> 
>>>>>>>>>>>>>> 126. Not directly. A share group can only be deleted when it
>> has
>>>>>> no
>>>>>>>>>>>>>> members, so the tombstones for ShareGroupMemberMetadata will
>>>>>>>>>>>>>> have been written when the members left. I have clarified
>> this.
>>>>>>>>>>>>>> 
>>>>>>>>>>>>>> 127. The share coordinator is ignorant of the group epoch.
>> When
>>>>>> the
>>>>>>>>>>>>>> group coordinator is initializing the share-group state the
>>>> first
>>>>>>>> time
>>>>>>>>>>>> that
>>>>>>>>>>>>>> a share-partition is being added to an assignment in the
>> group,
>>>>>> the
>>>>>>>>>>>>>> group epoch is used as the state epoch. But as the group epoch
>>>>>>>>>>>>>> increases over time, the share coordinator is entirely
>> unaware.
>>>>>>>>>>>>>> 
>>>>>>>>>>>>>> When the first consumer for a share-partition fetches records
>>>>>> from a
>>>>>>>>>>>>>> share-partition leader, the SPL calls the share coordinator to
>>>>>>>>>>>>>> ReadShareGroupState. If the SPL has previously read the
>>>>>> information
>>>>>>>>>>>>>> and again it’s going from 0 to 1 consumer, it confirms it's up
>>>> to
>>>>>>>> date
>>>>>>>>>>>> by
>>>>>>>>>>>>>> calling ReadShareGroupOffsetsState.
>>>>>>>>>>>>>> 
>>>>>>>>>>>>>> Even if many consumers are joining at the same time, any
>>>>>>>>>> share-partition
>>>>>>>>>>>>>> which is being initialized will not be included in their
>>>>>>>> assignments.
>>>>>>>>>>>> Once
>>>>>>>>>>>>>> the initialization is complete, the next rebalance will assign
>>>> the
>>>>>>>>>>>>>> partition
>>>>>>>>>>>>>> to some consumers which will discover this by
>>>> ShareGroupHeartbeat
>>>>>>>>>>>>>> response. And then, the fetching begins.
>>>>>>>>>>>>>> 
>>>>>>>>>>>>>> If an SPL receives a ShareFetch request before it’s read the
>>>> state
>>>>>>>>>>>>>> from the SC, it can make the ShareFetch request wait up to
>>>>>> MaxWaitMs
>>>>>>>>>>>>>> and then it can return an empty set of records if it’s still
>> not
>>>>>>>>>> ready.
>>>>>>>>>>>>>> 
>>>>>>>>>>>>>> So, I don’t believe there will be too much load. If a topic
>> with
>>>>>>>> many
>>>>>>>>>>>>>> partitions is added to the subscribed topics for a share
>> group,
>>>>>> the
>>>>>>>>>> fact
>>>>>>>>>>>>>> that the assignments will only start to include the partitions
>>>> as
>>>>>>>>>> their
>>>>>>>>>>>>>> initialization completes should soften the impact.
>>>>>>>>>>>>>> 
>>>>>>>>>>>>>> 128, 129: The “proper” way to turn on this feature when it’s
>>>>>>>> finished
>>>>>>>>>>>> will
>>>>>>>>>>>>>> be using `group.coordinator.rebalance.protocols` and
>>>>>>>> `group.version`.
>>>>>>>>>>>>>> While it’s in Early Access and for test cases, the
>>>>>>>>>> `group.share.enable`
>>>>>>>>>>>>>> configuration will turn it on.
>>>>>>>>>>>>>> 
>>>>>>>>>>>>>> I have described `group.share.enable` as an internal
>>>> configuration
>>>>>>>> in
>>>>>>>>>>>>>> the KIP.
>>>>>>>>>>>>>> 
>>>>>>>>>>>>>> 130. The config `group.share.record.lock.duration.ms` applies
>>>> to
>>>>>>>>>> groups
>>>>>>>>>>>>>> which do not specify a group-level configuration for lock
>>>>>> duration.
>>>>>>>>>> The
>>>>>>>>>>>>>> minimum and maximum for this configuration are intended to
>> give
>>>> it
>>>>>>>>>>>>>> sensible bounds.
>>>>>>>>>>>>>> 
>>>>>>>>>>>>>> If a group does specify its own `
>>>>>>>> group.share.record.lock.duration.ms
>>>>>>>>>> `,
>>>>>>>>>>>>>> the broker-level `group.share.max.record.lock.duration.ms`
>>>> gives
>>>>>>>> the
>>>>>>>>>>>>>> cluster administrator a way of setting a maximum value for all
>>>>>>>> groups.
>>>>>>>>>>>>>> 
>>>>>>>>>>>>>> While editing, I renamed `
>>>> group.share.record.lock.duration.max.ms
>>>>>> `
>>>>>>>> to
>>>>>>>>>>>>>> `group.share.max.record.lock.duration.ms` for consistency
>> with
>>>>>> the
>>>>>>>>>>>>>> rest of the min/max configurations.
>>>>>>>>>>>>>> 
>>>>>>>>>>>>>> 131. This is the limit per partition so you can go wider with
>>>>>>>> multiple
>>>>>>>>>>>>>> partitions.
>>>>>>>>>>>>>> I have set the initial value low for safety. I expect to be
>> able
>>>>>> to
>>>>>>>>>>>>>> increase this
>>>>>>>>>>>>>> significantly when we have mature code which has been
>>>>>> battle-tested.
>>>>>>>>>>>>>> Rather than try to guess how high it can safely go, I’ve erred
>>>> on
>>>>>>>> the
>>>>>>>>>>>> side
>>>>>>>>>>>>>> of
>>>>>>>>>>>>>> caution and expect to open it up in a future KIP.
>>>>>>>>>>>>>> 
>>>>>>>>>>>>>> 132. Good catch. The problem is that I have missed two group
>>>>>>>>>>>>>> configurations,
>>>>>>>>>>>>>> now added. These are group.share.session.timeout.ms and
>>>>>>>>>>>>>> group.share.heartbeat.timeout.ms . The configurations you
>>>>>> mentioned
>>>>>>>>>>>>>> are the bounds for the group-level configurations.
>>>>>>>>>>>>>> 
>>>>>>>>>>>>>> 133. The name `group.share.max.size` was chosen to mirror the
>>>>>>>> existing
>>>>>>>>>>>>>> `group.consumer.max.size`.
>>>>>>>>>>>>>> 
>>>>>>>>>>>>>> 134. It is intended to be a list of all of the valid assignors
>>>> for
>>>>>>>> the
>>>>>>>>>>>>>> cluster.
>>>>>>>>>>>>>> When the assignors are configurable at the group level, the
>>>> group
>>>>>>>>>>>>>> configuration
>>>>>>>>>>>>>> will only be permitted to name an assignor which is in this
>>>> list.
>>>>>>>> For
>>>>>>>>>>>> now,
>>>>>>>>>>>>>> there
>>>>>>>>>>>>>> is no group configuration for assignor, so all groups get the
>>>> one
>>>>>>>> and
>>>>>>>>>>>> only
>>>>>>>>>>>>>> assignor in the list.
>>>>>>>>>>>>>> 
>>>>>>>>>>>>>> 135. It’s the number of threads per broker. For a cluster
>> with a
>>>>>>>> small
>>>>>>>>>>>>>> number of
>>>>>>>>>>>>>> of brokers and a lot of share group activity, it may be
>>>>>> appropriate
>>>>>>>> to
>>>>>>>>>>>>>> increase
>>>>>>>>>>>>>> this. We will be able to give tuning advice once we have
>>>>>> experience
>>>>>>>> of
>>>>>>>>>>>> the
>>>>>>>>>>>>>> performance impact of increasing it.
>>>>>>>>>>>>>> 
>>>>>>>>>>>>>> Thanks,
>>>>>>>>>>>>>> Andrew
>>>>>>>>>>>>>> 
>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>> On 20 Apr 2024, at 00:14, Jun Rao <j...@confluent.io.INVALID>
>>>>>>>> wrote:
>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>> Hi, Andrew,
>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>> Thanks for the reply. A few more comments.
>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>> 120. There is still reference to ConsumerGroupMetadataKey.
>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>> 121. ShareUpdateValue.SnapshotEpoch: Should we change it
>> since
>>>>>> it's
>>>>>>>>>>>> not a
>>>>>>>>>>>>>>> snapshot?
>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>> 122. ConsumerGroupMemberMetadataValue includes epoch, but
>>>>>>>>>>>>>>> ShareGroupMemberMetadataValue does not. Do we know how the
>>>> epoch
>>>>>> is
>>>>>>>>>>>> being
>>>>>>>>>>>>>>> used in the consumer group and whether it's needed in the
>> share
>>>>>>>>>> group?
>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>> 123. There is no equivalent of
>>>>>> ConsumerGroupTargetAssignmentMember
>>>>>>>>>> for
>>>>>>>>>>>>>>> ShareGroup. How does the shareGroup persist the member
>>>>>> assignment?
>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>> 124. Assign a share-partition: "When a topic-partition is
>>>>>> assigned
>>>>>>>>>> to a
>>>>>>>>>>>>>>> member of a share group for the first time, the group
>>>> coordinator
>>>>>>>>>>>> writes
>>>>>>>>>>>>>> a
>>>>>>>>>>>>>>> ShareGroupPartitionMetadata record to the __consumer_offsets
>>>>>>>> topic".
>>>>>>>>>>>>>>> When does the coordinator write ShareGroupMetadata with the
>>>>>> bumped
>>>>>>>>>>>>>> epoch?
>>>>>>>>>>>>>>> In general, is there a particular order to bump up the epoch
>> in
>>>>>>>>>>>> different
>>>>>>>>>>>>>>> records? When can the new epoch be exposed to the
>>>>>>>>>> sharePartitionLeader?
>>>>>>>>>>>>>> It
>>>>>>>>>>>>>>> would be useful to make this clear in other state changing
>>>>>>>> operations
>>>>>>>>>>>>>> too.
>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>> 125. "Alter share group offsets Group coordinator and share
>>>>>>>>>> coordinator
>>>>>>>>>>>>>>> Only empty share groups support this operation. The group
>>>>>>>> coordinator
>>>>>>>>>>>>>> sends
>>>>>>>>>>>>>>> an InitializeShareGroupState  request to the share
>> coordinator.
>>>>>> The
>>>>>>>>>>>> share
>>>>>>>>>>>>>>> coordinator writes a ShareSnapshot record with the new state
>>>>>> epoch
>>>>>>>> to
>>>>>>>>>>>> the
>>>>>>>>>>>>>>> __share_group_state  topic."
>>>>>>>>>>>>>>> Does the operation need to write ShareGroupPartitionMetadata
>>>> and
>>>>>>>>>>>>>>> ShareGroupMetadata (for the new epoch)?
>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>> 126. Delete share group: Does this operation also need to
>>>> write a
>>>>>>>>>>>>>> tombstone
>>>>>>>>>>>>>>> for the ShareGroupMemberMetadata record?
>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>> 127. I was thinking about the impact on a new consumer
>> joining
>>>>>> the
>>>>>>>>>>>>>>> shareGroup. This causes the GroupCoordinator and the
>>>>>>>> ShareCoordinator
>>>>>>>>>>>> to
>>>>>>>>>>>>>>> bump up the group epoch, which in turn causes the
>>>> SharePartition
>>>>>>>>>> leader
>>>>>>>>>>>>>> to
>>>>>>>>>>>>>>> reinitialize the state with ReadShareGroupOffsetsState. If
>> many
>>>>>>>>>>>> consumers
>>>>>>>>>>>>>>> are joining a shareGroup around the same time, would there be
>>>> too
>>>>>>>>>> much
>>>>>>>>>>>>>> load
>>>>>>>>>>>>>>> for the ShareCoordinator and SharePartition leader?
>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>> 128. Since group.share.enable will be replaced by the
>>>>>> group.version
>>>>>>>>>>>>>>> feature, should we make it an internal config?
>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>> 129. group.coordinator.rebalance.protocols: this seems
>>>> redundant
>>>>>>>> with
>>>>>>>>>>>>>>> group.share.enable?
>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>> 130. Why do we have both group.share.record.lock.duration.ms
>>>> and
>>>>>>>>>>>>>>> group.share.record.lock.duration.max.ms, each with its own
>> max
>>>>>>>>>> value?
>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>> 131. group.share.record.lock.partition.limit defaults to 200.
>>>>>> This
>>>>>>>>>>>> limits
>>>>>>>>>>>>>>> the max degree of consumer parallelism to 200, right? If
>> there
>>>>>> are
>>>>>>>>>>>>>> multiple
>>>>>>>>>>>>>>> records per batch, it could be even smaller.
>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>> 132. Why do we need all three of
>>>> group.share.session.timeout.ms,
>>>>>>>>>>>>>>> group.share.min.session.timeout.ms and
>>>>>>>>>>>>>> group.share.max.session.timeout.ms?
>>>>>>>>>>>>>>> Session timeout can't be set by the client. Ditto for
>>>>>>>>>>>>>>> group.share.heartbeat.interval.ms.
>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>> 133. group.share.max.size: Would
>>>>>> group.share.max.members.per.group
>>>>>>>>>> be a
>>>>>>>>>>>>>>> more intuitive name?
>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>> 134. group.share.assignors: Why does it need to be a list?
>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>> 135. share.coordinator.threads: Is that per share coordinator
>>>> or
>>>>>>>> per
>>>>>>>>>>>>>> broker?
>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>> Jun
>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>> On Tue, Apr 16, 2024 at 3:21 AM Andrew Schofield <
>>>>>>>>>>>>>> andrew_schofi...@live.com>
>>>>>>>>>>>>>>> wrote:
>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>> Hi Jun,
>>>>>>>>>>>>>>>> Thanks for you reply.
>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>> 42.1. That’s a sensible improvement. Done.
>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>> 47,56. Done. All instances of BaseOffset changed to
>>>> FirstOffset.
>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>> 105. I think that would be in a future KIP. Personally, I
>>>> don’t
>>>>>>>> mind
>>>>>>>>>>>>>> having
>>>>>>>>>>>>>>>> a non-contiguous set of values in this KIP.
>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>> 114. Done.
>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>> 115. If the poll is just returning a single record because
>>>> there
>>>>>>>> is
>>>>>>>>>>>> not
>>>>>>>>>>>>>>>> much
>>>>>>>>>>>>>>>> data to consume, committing on every record is OK. It’s
>>>>>>>> inefficient
>>>>>>>>>>>> but
>>>>>>>>>>>>>>>> acceptable.
>>>>>>>>>>>>>>>> If the first poll returns just one record, but many more
>> have
>>>>>>>> piled
>>>>>>>>>> up
>>>>>>>>>>>>>>>> while
>>>>>>>>>>>>>>>> the first one was being processed, the next poll has the
>>>>>>>> opportunity
>>>>>>>>>>>> to
>>>>>>>>>>>>>>>> return
>>>>>>>>>>>>>>>> a bunch of records and then these will be able to be
>> committed
>>>>>>>>>>>> together.
>>>>>>>>>>>>>>>> So, my answer is that optimisation on the broker to return
>>>>>> batches
>>>>>>>>>> of
>>>>>>>>>>>>>>>> records when the records are available is the approach we
>> will
>>>>>>>> take
>>>>>>>>>>>>>> here.
>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>> 116. Good idea. Done.
>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>> 117. I’ve rewritten the Kafka Broker Migration section. Let
>> me
>>>>>>>> know
>>>>>>>>>>>> what
>>>>>>>>>>>>>>>> you think.
>>>>>>>>>>>>>>>> I am discussing the configuration to enable the feature in
>> the
>>>>>>>>>> mailing
>>>>>>>>>>>>>>>> list with
>>>>>>>>>>>>>>>> David Jacot also, so I anticipate a bit of change in this
>> area
>>>>>>>>>> still.
>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>> Thanks,
>>>>>>>>>>>>>>>> Andrew
>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>> On 15 Apr 2024, at 23:34, Jun Rao <j...@confluent.io.INVALID
>>> 
>>>>>>>>>> wrote:
>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>> Hi, Andrew,
>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>> Thanks for the updated KIP.
>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>> 42.1 "If the share group offset is altered multiple times
>>>> when
>>>>>>>> the
>>>>>>>>>>>>>> group
>>>>>>>>>>>>>>>>> remains empty, it would be harmless if the same state epoch
>>>> was
>>>>>>>>>>>> reused
>>>>>>>>>>>>>> to
>>>>>>>>>>>>>>>>> initialize the state."
>>>>>>>>>>>>>>>>> Hmm, how does the partition leader know for sure that it
>> has
>>>>>>>>>> received
>>>>>>>>>>>>>> the
>>>>>>>>>>>>>>>>> latest share group offset if epoch is reused?
>>>>>>>>>>>>>>>>> Could we update the section "Group epoch - Trigger a
>>>> rebalance"
>>>>>>>>>> that
>>>>>>>>>>>>>>>>> AdminClient.alterShareGroupOffsets causes the group epoch
>> to
>>>> be
>>>>>>>>>>>> bumped
>>>>>>>>>>>>>>>> too?
>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>> 47,56 "my view is that BaseOffset should become FirstOffset
>>>> in
>>>>>>>> ALL
>>>>>>>>>>>>>>>> schemas
>>>>>>>>>>>>>>>>> defined in the KIP."
>>>>>>>>>>>>>>>>> Yes, that seems better to me.
>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>> 105. "I have another non-terminal state in mind for 3."
>>>>>>>>>>>>>>>>> Should we document it?
>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>> 114. session.timeout.ms in the consumer configuration is
>>>>>>>>>> deprecated
>>>>>>>>>>>> in
>>>>>>>>>>>>>>>>> KIP-848. So, we need to remove it from the shareConsumer
>>>>>>>>>>>> configuration.
>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>> 115. I am wondering if it's a good idea to always commit
>> acks
>>>>>> on
>>>>>>>>>>>>>>>>> ShareConsumer.poll(). In the extreme case, each batch may
>>>> only
>>>>>>>>>>>> contain
>>>>>>>>>>>>>> a
>>>>>>>>>>>>>>>>> single record and each poll() only returns a single batch.
>>>> This
>>>>>>>>>> will
>>>>>>>>>>>>>>>> cause
>>>>>>>>>>>>>>>>> each record to be committed individually. Is there a way
>> for
>>>> a
>>>>>>>> user
>>>>>>>>>>>> to
>>>>>>>>>>>>>>>>> optimize this?
>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>> 116. For each new RPC, could we list the associated acls?
>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>> 117. Since this KIP changes internal records and RPCs, it
>>>> would
>>>>>>>> be
>>>>>>>>>>>>>> useful
>>>>>>>>>>>>>>>>> to document the upgrade process.
>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>> Jun
>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>> On Wed, Apr 10, 2024 at 7:35 AM Andrew Schofield <
>>>>>>>>>>>>>>>> andrew_schofi...@live.com>
>>>>>>>>>>>>>>>>> wrote:
>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>> Hi Jun,
>>>>>>>>>>>>>>>>>> Thanks for your questions.
>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>> 41.
>>>>>>>>>>>>>>>>>> 41.1. The partition leader obtains the state epoch in the
>>>>>>>> response
>>>>>>>>>>>>>> from
>>>>>>>>>>>>>>>>>> ReadShareGroupState. When it becomes a share-partition
>>>> leader,
>>>>>>>>>>>>>>>>>> it reads the share-group state and one of the things it
>>>> learns
>>>>>>>> is
>>>>>>>>>>>> the
>>>>>>>>>>>>>>>>>> current state epoch. Then it uses the state epoch in all
>>>>>>>>>> subsequent
>>>>>>>>>>>>>>>>>> calls to WriteShareGroupState. The fencing is to prevent
>>>>>> writes
>>>>>>>>>> for
>>>>>>>>>>>>>>>>>> a previous state epoch, which are very unlikely but which
>>>>>> would
>>>>>>>>>> mean
>>>>>>>>>>>>>>>>>> that a leader was using an out-of-date epoch and was
>> likely
>>>> no
>>>>>>>>>>>> longer
>>>>>>>>>>>>>>>>>> the current leader at all, perhaps due to a long pause for
>>>>>> some
>>>>>>>>>>>>>> reason.
>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>> 41.2. If the group coordinator were to set the SPSO,
>>>> wouldn’t
>>>>>> it
>>>>>>>>>>>> need
>>>>>>>>>>>>>>>>>> to discover the initial offset? I’m trying to avoid yet
>>>>>> another
>>>>>>>>>>>>>>>>>> inter-broker
>>>>>>>>>>>>>>>>>> hop.
>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>> 42.
>>>>>>>>>>>>>>>>>> 42.1. I think I’ve confused things. When the share group
>>>>>> offset
>>>>>>>> is
>>>>>>>>>>>>>>>> altered
>>>>>>>>>>>>>>>>>> using AdminClient.alterShareGroupOffsets, the group
>>>>>> coordinator
>>>>>>>>>> WILL
>>>>>>>>>>>>>>>>>> update the state epoch. I don’t think it needs to update
>> the
>>>>>>>> group
>>>>>>>>>>>>>> epoch
>>>>>>>>>>>>>>>>>> at the same time (although it could) because the group
>> epoch
>>>>>>>> will
>>>>>>>>>>>> have
>>>>>>>>>>>>>>>>>> been bumped when the group became empty. If the share
>> group
>>>>>>>> offset
>>>>>>>>>>>>>>>>>> is altered multiple times when the group remains empty, it
>>>>>> would
>>>>>>>>>> be
>>>>>>>>>>>>>>>>>> harmless if the same state epoch was reused to initialize
>>>> the
>>>>>>>>>> state.
>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>> When the share-partition leader updates the SPSO as a
>> result
>>>>>> of
>>>>>>>>>>>>>>>>>> the usual flow of record delivery, it does not update the
>>>>>> state
>>>>>>>>>>>> epoch.
>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>> 42.2. The share-partition leader will notice the
>> alteration
>>>>>>>>>> because,
>>>>>>>>>>>>>>>>>> when it issues WriteShareGroupState, the response will
>>>> contain
>>>>>>>> the
>>>>>>>>>>>>>>>>>> error code FENCED_STATE_EPOCH. This is supposed to be the
>>>>>>>>>>>>>>>>>> last-resort way of catching this.
>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>> When the share-partition leader handles its first
>> ShareFetch
>>>>>>>>>>>> request,
>>>>>>>>>>>>>>>>>> it learns the state epoch from the response to
>>>>>>>>>> ReadShareGroupState.
>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>> In normal running, the state epoch will remain constant,
>>>> but,
>>>>>>>> when
>>>>>>>>>>>>>> there
>>>>>>>>>>>>>>>>>> are no consumers and the group is empty, it might change.
>>>> As a
>>>>>>>>>>>> result,
>>>>>>>>>>>>>>>>>> I think it would be sensible when the set of share
>> sessions
>>>>>>>>>>>>>> transitions
>>>>>>>>>>>>>>>>>> from 0 to 1, which is a reasonable proxy for the share
>> group
>>>>>>>>>>>>>>>> transitioning
>>>>>>>>>>>>>>>>>> from empty to non-empty, for the share-partition leader to
>>>>>> issue
>>>>>>>>>>>>>>>>>> ReadShareGroupOffsetsState to validate the state epoch. If
>>>> its
>>>>>>>>>> state
>>>>>>>>>>>>>>>>>> epoch is out of date, it can then ReadShareGroupState to
>>>>>>>>>>>>>> re-initialize.
>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>> I’ve changed the KIP accordingly.
>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>> 47, 56. If I am to change BaseOffset to FirstOffset, we
>> need
>>>>>> to
>>>>>>>>>> have
>>>>>>>>>>>>>>>>>> a clear view of which is the correct term. Having reviewed
>>>> all
>>>>>>>> of
>>>>>>>>>>>> the
>>>>>>>>>>>>>>>>>> instances, my view is that BaseOffset should become
>>>>>> FirstOffset
>>>>>>>> in
>>>>>>>>>>>>>>>>>> ALL schemas defined in the KIP. Then, BaseOffset is just
>>>> used
>>>>>> in
>>>>>>>>>>>>>>>>>> record batches, which is already a known concept.
>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>> Please let me know if you agree.
>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>> 60. I’ve added FindCoordinator to the top level index for
>>>>>>>> protocol
>>>>>>>>>>>>>>>> changes.
>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>> 61. OK. I expect you are correct about how users will be
>>>> using
>>>>>>>> the
>>>>>>>>>>>>>>>>>> console share consumer. When I use the console consumer, I
>>>>>>>> always
>>>>>>>>>>>> get
>>>>>>>>>>>>>>>>>> a new consumer group. I have changed the default group ID
>>>> for
>>>>>>>>>>>> console
>>>>>>>>>>>>>>>>>> share consumer to “console-share-consumer” to match the
>>>>>> console
>>>>>>>>>>>>>> consumer
>>>>>>>>>>>>>>>>>> better and give more of an idea where this mysterious
>> group
>>>>>> has
>>>>>>>>>> come
>>>>>>>>>>>>>>>> from.
>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>> 77. I will work on a proposal that does not use compaction
>>>> and
>>>>>>>> we
>>>>>>>>>>>> can
>>>>>>>>>>>>>>>>>> make a judgement about whether it’s a better course for
>>>>>> KIP-932.
>>>>>>>>>>>>>>>>>> Personally,
>>>>>>>>>>>>>>>>>> until I’ve written it down and lived with the ideas for a
>>>> few
>>>>>>>>>> days,
>>>>>>>>>>>> I
>>>>>>>>>>>>>>>>>> won’t be
>>>>>>>>>>>>>>>>>> able to choose which I prefer.
>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>> I should be able to get the proposal written by the end of
>>>>>> this
>>>>>>>>>>>> week.
>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>> 100. ShareGroupHeartbeatRequest.RebalanceTimeoutMs matches
>>>>>>>>>>>>>>>>>> ConsumerGroupHeartbeatRequest.RebalanceTimeoutMs from
>>>> KIP-848.
>>>>>>>>>>>>>>>>>> I prefer to maintain the consistency.
>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>> 101. Thanks for catching this. The
>>>> ShareGroupHeartbeatResponse
>>>>>>>> was
>>>>>>>>>>>>>>>>>> originally
>>>>>>>>>>>>>>>>>> created from KIP-848. This part of the schema does not
>> apply
>>>>>>>> and I
>>>>>>>>>>>>>> have
>>>>>>>>>>>>>>>>>> removed
>>>>>>>>>>>>>>>>>> it. I have also renamed AssignedTopicPartitions to simply
>>>>>>>>>>>>>>>> TopicPartitions
>>>>>>>>>>>>>>>>>> which
>>>>>>>>>>>>>>>>>> aligns with the actual definition of
>>>>>>>>>> ConsumerGroupHeartbeatResponse.
>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>> 102. No, I don’t think we do. Removed.
>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>> 103. I’ve changed the description for the error codes for
>>>>>>>>>>>>>>>>>> ShareFetchResponse.
>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>> 104. Interesting. I have added ErrorMessages to these RPCs
>>>> as
>>>>>>>> you
>>>>>>>>>>>>>>>> suggest.
>>>>>>>>>>>>>>>>>> It’s a good improvement for problem determination.
>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>> 105. The values are reserved in my brain. Actually, 1 is
>>>>>>>> Acquired
>>>>>>>>>>>>>> which
>>>>>>>>>>>>>>>>>> is not persisted, and I have another non-terminal state in
>>>>>> mind
>>>>>>>>>> for
>>>>>>>>>>>> 3.
>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>> 106. A few people have raised the question of whether
>>>>>>>>>>>>>> OffsetAndMetadata
>>>>>>>>>>>>>>>>>> is sensible in this KIP, given that the optional Metadata
>>>> part
>>>>>>>>>> comes
>>>>>>>>>>>>>>>> from
>>>>>>>>>>>>>>>>>> when
>>>>>>>>>>>>>>>>>> a regular consumer commits offsets. However, it is correct
>>>>>> that
>>>>>>>>>>>> there
>>>>>>>>>>>>>>>> will
>>>>>>>>>>>>>>>>>> never be metadata with a share group. I have changed
>>>>>>>>>>>>>>>>>> the KIP to replace OffsetAndMetadata with Long.
>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>> 107. Yes, you are right. I have learnt during this process
>>>>>> that
>>>>>>>> a
>>>>>>>>>>>>>>>> version
>>>>>>>>>>>>>>>>>> bump
>>>>>>>>>>>>>>>>>> can be a logical not just a physical change to the schema.
>>>> KIP
>>>>>>>>>>>>>> updated.
>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>> 108. I would prefer not to extend this RPC for all of the
>>>>>> states
>>>>>>>>>> at
>>>>>>>>>>>>>> this
>>>>>>>>>>>>>>>>>> point.
>>>>>>>>>>>>>>>>>> I think there is definitely scope for another KIP focused
>> on
>>>>>>>>>>>>>>>> administration
>>>>>>>>>>>>>>>>>> of share groups that might want this information so
>> someone
>>>>>>>> could
>>>>>>>>>>>>>> build
>>>>>>>>>>>>>>>> a
>>>>>>>>>>>>>>>>>> UI and other tools on top. Doing that well is quite a lot
>> of
>>>>>>>> work
>>>>>>>>>> in
>>>>>>>>>>>>>> its
>>>>>>>>>>>>>>>>>> own right
>>>>>>>>>>>>>>>>>> so I would prefer not to do that now.
>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>> 109.Yes, they’re inconsistent and follow the consumer
>> groups
>>>>>>>>>>>>>> equivalents
>>>>>>>>>>>>>>>>>> which are also inconsistent. In general, the
>>>>>> KafkaAdmin.delete….
>>>>>>>>>>>>>> Methods
>>>>>>>>>>>>>>>>>> use the Map<XXX, KafkaFuture<YYY>> pattern like
>>>>>>>>>>>>>> DeleteShareGroupsResult.
>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>> Would you prefer that I do that, or remain consistent with
>>>>>>>>>> consumer
>>>>>>>>>>>>>>>> groups?
>>>>>>>>>>>>>>>>>> Happy to change it.
>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>> 110. Again, consumer groups don’t yield that level of
>> detail
>>>>>>>> about
>>>>>>>>>>>>>>>> epochs.
>>>>>>>>>>>>>>>>>> The MemberDescription does include the assignment, but not
>>>> the
>>>>>>>>>> list
>>>>>>>>>>>> of
>>>>>>>>>>>>>>>>>> subscribed topic names.
>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>> 111. I didn’t include GroupState in GroupListing because
>>>>>> there’s
>>>>>>>>>> no
>>>>>>>>>>>>>>>>>> single class that includes the states of all group types.
>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>> 112. I think it’s good practice for the API to have
>>>>>>>>>>>>>>>>>> ListShareGroupOffsetSpec.
>>>>>>>>>>>>>>>>>> It makes evolution and extension of the API much easier.
>>>> Also,
>>>>>>>> it
>>>>>>>>>>>>>>>> matches
>>>>>>>>>>>>>>>>>> the precedent set by ListConsumerGroupOffsetSpec.
>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>> 113. ListConsumerGroupsResults.errors() is the same. I
>> think
>>>>>> you
>>>>>>>>>>>> just
>>>>>>>>>>>>>>>> have
>>>>>>>>>>>>>>>>>> to look in the exception details and the same pattern is
>>>> being
>>>>>>>>>>>>>> followed
>>>>>>>>>>>>>>>>>> here.
>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>> Over the next few days, I have committed to writing a
>>>> proposal
>>>>>>>> for
>>>>>>>>>>>> how
>>>>>>>>>>>>>>>> to
>>>>>>>>>>>>>>>>>> persist
>>>>>>>>>>>>>>>>>> share-group state that doesn’t use log compaction.
>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>> I am also aware that discussion with Justine Olshan on
>>>>>>>>>>>> read-committed
>>>>>>>>>>>>>>>>>> isolation level is not yet quite complete.
>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>> Thanks for the detailed review.
>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>> Andrew
>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>> On 9 Apr 2024, at 23:58, Jun Rao
>> <j...@confluent.io.INVALID
>>>>> 
>>>>>>>>>> wrote:
>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>> Hi, Andrew,
>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>> Thanks for the reply. A few more comments.
>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>> 41.
>>>>>>>>>>>>>>>>>>> 41.1 How does the partition leader obtain the group epoch
>>>> to
>>>>>>>> set
>>>>>>>>>>>>>>>>>>> WriteShareGroupStateRequest.StateEpoch?
>>>>>>>>>>>>>>>>>>> 41.2 What's the benefit of having the group coordinator
>>>>>>>>>> initialize
>>>>>>>>>>>>>> the
>>>>>>>>>>>>>>>>>>> state and the partition leader set the SPSO? It seems
>>>> simpler
>>>>>>>> to
>>>>>>>>>>>> have
>>>>>>>>>>>>>>>> the
>>>>>>>>>>>>>>>>>>> partition leader initialize both the state and the SPSO
>>>>>>>> together?
>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>> 42.
>>>>>>>>>>>>>>>>>>> 42.1 "I don’t think the group epoch needs to be bumped
>> when
>>>>>> the
>>>>>>>>>>>> share
>>>>>>>>>>>>>>>>>> group
>>>>>>>>>>>>>>>>>>> offset is altered."
>>>>>>>>>>>>>>>>>>> But the part on handling Alter share group offsets says
>>>> "The
>>>>>>>>>> share
>>>>>>>>>>>>>>>>>>> coordinator writes a ShareCheckpoint record with the new
>>>>>> state
>>>>>>>>>>>> epoch
>>>>>>>>>>>>>> to
>>>>>>>>>>>>>>>>>> the
>>>>>>>>>>>>>>>>>>> __share_group_state  topic." So, which is correct? We
>> have
>>>>>> two
>>>>>>>>>>>> paths
>>>>>>>>>>>>>> to
>>>>>>>>>>>>>>>>>>> update the state in the share coordinator, one from the
>>>> group
>>>>>>>>>>>>>>>> coordinator
>>>>>>>>>>>>>>>>>>> and another from the partition leader. I thought the
>>>> benefit
>>>>>> of
>>>>>>>>>>>>>> bumping
>>>>>>>>>>>>>>>>>> up
>>>>>>>>>>>>>>>>>>> the epoch is to fence off a late request in the previous
>>>>>> epoch
>>>>>>>>>> from
>>>>>>>>>>>>>>>>>> another
>>>>>>>>>>>>>>>>>>> path.
>>>>>>>>>>>>>>>>>>> 42.2 When the group coordinator alters the share group
>>>> offset
>>>>>>>> in
>>>>>>>>>>>>>> share
>>>>>>>>>>>>>>>>>>> coordinator, how does the partition leader know the share
>>>>>> group
>>>>>>>>>>>> state
>>>>>>>>>>>>>>>> has
>>>>>>>>>>>>>>>>>>> been altered so that it could clear its in-memory state?
>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>> 47. 56. BaseOffset typically refers to the base offset
>> for
>>>>>> the
>>>>>>>>>>>> batch
>>>>>>>>>>>>>>>> and
>>>>>>>>>>>>>>>>>>> can be confusing. FirstOffset is clearer and matches
>>>>>>>> LastOffset.
>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>> 60. Could we include FindCoordinatorRequest in the top
>>>> level
>>>>>>>>>> index
>>>>>>>>>>>>>> for
>>>>>>>>>>>>>>>>>>> Kafka protocol changes?
>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>> 61. I think it's probably ok to add time-based expiration
>>>>>>>> later.
>>>>>>>>>>>> But
>>>>>>>>>>>>>>>>>> using
>>>>>>>>>>>>>>>>>>> a default group in console-share-consumer probably won't
>>>> help
>>>>>>>>>>>> reduce
>>>>>>>>>>>>>>>> the
>>>>>>>>>>>>>>>>>>> garbage. In the common case, the user of the console
>>>> consumer
>>>>>>>>>>>> likely
>>>>>>>>>>>>>>>>>> wants
>>>>>>>>>>>>>>>>>>> to see the recently produced records for verification. If
>>>> the
>>>>>>>>>>>> default
>>>>>>>>>>>>>>>>>> group
>>>>>>>>>>>>>>>>>>> doesn't provide that (because of the stale state), the
>> user
>>>>>>>> will
>>>>>>>>>>>>>> likely
>>>>>>>>>>>>>>>>>>> just use a new group. It's true that we don't garbage
>>>> collect
>>>>>>>>>> idle
>>>>>>>>>>>>>>>>>> topics.
>>>>>>>>>>>>>>>>>>> However,  the share groups are similar to consumers,
>> which
>>>>>> does
>>>>>>>>>>>>>> support
>>>>>>>>>>>>>>>>>>> garbage collection. Typically, we read topics more than
>>>>>>>> creating
>>>>>>>>>>>>>> them.
>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>> 77. If the generic compaction is inconvenient, we could
>> use
>>>>>>>>>>>>>> customized
>>>>>>>>>>>>>>>>>>> logic. If we go with that route, option (b) seems cleaner
>>>> and
>>>>>>>>>> more
>>>>>>>>>>>>>>>>>>> optimized. Since the share states for all groups fit in
>>>>>> memory,
>>>>>>>>>> we
>>>>>>>>>>>>>>>> could
>>>>>>>>>>>>>>>>>>> generate snapshots more efficiently than going through
>>>>>>>>>> compaction.
>>>>>>>>>>>>>>>>>> Having a
>>>>>>>>>>>>>>>>>>> separate log per share partition is probably too much
>>>>>> overhead.
>>>>>>>>>>>> It's
>>>>>>>>>>>>>>>> more
>>>>>>>>>>>>>>>>>>> efficient to put the state changes for multiple share
>>>>>>>> partitions
>>>>>>>>>>>> in a
>>>>>>>>>>>>>>>>>>> single log.
>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>> 100. ShareGroupHeartbeatRequest.RebalanceTimeoutMs:
>> Should
>>>> we
>>>>>>>>>> name
>>>>>>>>>>>> it
>>>>>>>>>>>>>>>>>>> SessionTimeoutMs?
>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>> 101. ShareGroupHeartbeatResponse.Assignment.Error: What
>>>> kind
>>>>>> of
>>>>>>>>>>>> error
>>>>>>>>>>>>>>>>>> could
>>>>>>>>>>>>>>>>>>> we have when assigning partitions? What are the
>>>> corresponding
>>>>>>>>>> error
>>>>>>>>>>>>>>>>>> codes?
>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>> 102. Do we still need
>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>> 
>>>>>>>>>>>> 
>>>>>>>>>> 
>>>>>>>> 
>>>>>> 
>>>> 
>> ShareGroupDescribeResponse.Members.Assignment.{MetadataVersion,MetadataBytes}?
>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>> 103. Could we list the error codes separately for
>>>>>>>>>>>>>>>>>>> ShareFetchResponse.Responses.Partitions.ErrorCode and
>>>>>>>>>>>>>>>>>>> 
>>>> ShareFetchResponse.Responses.Partitions.AcknowledgeErrorCode?
>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>> 104. Should we add error message for the errorCode in
>>>>>>>>>>>>>>>> ShareFetchResponse,
>>>>>>>>>>>>>>>>>>> ShareAcknowledgeResponse, ReadShareGroupStateResponse,
>>>>>>>>>>>>>>>>>>> WriteShareGroupStateResponse,
>>>> DeleteShareGroupStateResponse,
>>>>>>>>>>>>>>>>>>> ReadShareGroupOffsetsStateResponse and
>>>>>>>>>>>>>>>> InitializeShareGroupStateResponse?
>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>> 105. "about": "The state -
>>>> 0:Available,2:Acked,4:Archived.":
>>>>>>>> What
>>>>>>>>>>>>>>>> about 1
>>>>>>>>>>>>>>>>>>> and 3? Are we leaving them out intentionally?
>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>> 106. Do we have usage of metadata in OffsetAndMetadata?
>> If
>>>>>> not,
>>>>>>>>>>>> could
>>>>>>>>>>>>>>>> we
>>>>>>>>>>>>>>>>>>> remove it from AdminClient and KafkaShareConsumer?
>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>> 107. ListGroupsRequest: Should we bump up the version
>> since
>>>>>> it
>>>>>>>>>> now
>>>>>>>>>>>>>>>>>> supports
>>>>>>>>>>>>>>>>>>> a new group type "share"?
>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>> 108. AdminClient.listShareGroupOffsets: Should it expose
>>>> all
>>>>>>>> the
>>>>>>>>>>>>>> states
>>>>>>>>>>>>>>>>>>> from ReadShareGroupStateResponse, instead of just SPSO?
>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>> 109. DeleteShareGroupOffsetsResult exposes
>>>>>>>>>>>>>>>>>>> public KafkaFuture<Void> partitionResult(final
>>>> TopicPartition
>>>>>>>>>>>>>>>> partition)
>>>>>>>>>>>>>>>>>>> DeleteShareGroupsResult exposes
>>>>>>>>>>>>>>>>>>> public Map<String, KafkaFuture<Void>> deletedGroups()
>>>>>>>>>>>>>>>>>>> Should we make them more consistent?
>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>> 110. Should ShareGroupDescription include fields like
>>>>>>>> GroupEpoch,
>>>>>>>>>>>>>>>>>>> AssignmentEpoch, MemberEpoch, and SubscribedTopicNames?
>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>> 111. Should GroupListing include GroupState?
>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>> 112. Do we need ListShareGroupOffsetsSpec? Could we just
>>>> use
>>>>>>>>>>>>>>>>>>> Set<TopicPartition> directly?
>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>> 113. ListShareGroupsResult.errors(): How do we know which
>>>>>> group
>>>>>>>>>> has
>>>>>>>>>>>>>> an
>>>>>>>>>>>>>>>>>>> error?
>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>> Jun
>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>> On Mon, Apr 8, 2024 at 9:32 AM Andrew Schofield <
>>>>>>>>>>>>>>>>>>> andrew_schofield_j...@outlook.com> wrote:
>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>> Hi David,
>>>>>>>>>>>>>>>>>>>> Thanks for your questions.
>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>> 70. The Group Coordinator communicates with the Share
>>>>>>>>>> Coordinator
>>>>>>>>>>>>>> over
>>>>>>>>>>>>>>>>>>>> RPCs.
>>>>>>>>>>>>>>>>>>>> In the general case, it’s an inter-broker call. It is
>>>>>> probably
>>>>>>>>>>>>>>>> possible
>>>>>>>>>>>>>>>>>> to
>>>>>>>>>>>>>>>>>>>> optimise
>>>>>>>>>>>>>>>>>>>> for the situation in which the appropriate GC and SC
>>>> shards
>>>>>>>> are
>>>>>>>>>>>>>>>>>>>> co-located, but the
>>>>>>>>>>>>>>>>>>>> KIP does not delve that deep into potential performance
>>>>>>>>>>>>>> optimisations.
>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>> 71. Avoiding collisions would be a good idea, but I do
>>>> worry
>>>>>>>>>> about
>>>>>>>>>>>>>>>>>>>> retrospectively
>>>>>>>>>>>>>>>>>>>> introducing a naming convention for groups. I feel that
>>>>>> naming
>>>>>>>>>>>>>>>>>> conventions
>>>>>>>>>>>>>>>>>>>> will
>>>>>>>>>>>>>>>>>>>> typically be the responsibility of the cluster
>>>>>> administrators
>>>>>>>>>>>> based
>>>>>>>>>>>>>> on
>>>>>>>>>>>>>>>>>>>> organizational
>>>>>>>>>>>>>>>>>>>> factors, such as the name of an application.
>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>> 72. Personally, I don’t like INVALID_GROUP_ID because
>> the
>>>>>>>> group
>>>>>>>>>> ID
>>>>>>>>>>>>>> is
>>>>>>>>>>>>>>>>>>>> correct but
>>>>>>>>>>>>>>>>>>>> the group is the wrong type. The nearest existing error
>>>> code
>>>>>>>>>> that
>>>>>>>>>>>>>> gets
>>>>>>>>>>>>>>>>>>>> that across
>>>>>>>>>>>>>>>>>>>> is INCONSISTENT_GROUP_PROTOCOL. Perhaps this is really
>>>>>> showing
>>>>>>>>>>>> that
>>>>>>>>>>>>>> a
>>>>>>>>>>>>>>>>>> new
>>>>>>>>>>>>>>>>>>>> error code would be better.
>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>> 73. The Metadata fields are not used. I have removed
>> them.
>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>> 74. The metadata is re-evaluated on every change, but
>>>> only a
>>>>>>>>>>>> subset
>>>>>>>>>>>>>> is
>>>>>>>>>>>>>>>>>>>> relevant
>>>>>>>>>>>>>>>>>>>> for rebalancing. A check is done against the names of
>> the
>>>>>>>>>>>> subscribed
>>>>>>>>>>>>>>>>>>>> topics to
>>>>>>>>>>>>>>>>>>>> see if any relevant changes may have occurred. Then the
>>>>>>>> changes
>>>>>>>>>>>>>> which
>>>>>>>>>>>>>>>>>>>> trigger
>>>>>>>>>>>>>>>>>>>> a rebalance are topic creation, deletion, change in
>>>>>>>> partitions,
>>>>>>>>>> or
>>>>>>>>>>>>>>>> rack
>>>>>>>>>>>>>>>>>>>> IDs for the
>>>>>>>>>>>>>>>>>>>> replicas. I have updated the KIP to make this more
>> clear.
>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>> 75. The assignment is not persisted because it is much
>>>> less
>>>>>>>>>>>>>> important
>>>>>>>>>>>>>>>>>> that
>>>>>>>>>>>>>>>>>>>> the
>>>>>>>>>>>>>>>>>>>> assignment survives a GC change. There’s no need to
>>>> transfer
>>>>>>>>>>>>>>>> partitions
>>>>>>>>>>>>>>>>>>>> safely from
>>>>>>>>>>>>>>>>>>>> member to member in the way that is required for
>> consumer
>>>>>>>>>> groups,
>>>>>>>>>>>> so
>>>>>>>>>>>>>>>> as
>>>>>>>>>>>>>>>>>> an
>>>>>>>>>>>>>>>>>>>> optimisation, the assignments for a share group are not
>>>>>>>>>> persisted.
>>>>>>>>>>>>>> It
>>>>>>>>>>>>>>>>>>>> wouldn’t do any
>>>>>>>>>>>>>>>>>>>> harm, but it just seems unnecessary.
>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>> 76. In the event that a consumer tries to acknowledge a
>>>>>> record
>>>>>>>>>>>> that
>>>>>>>>>>>>>> it
>>>>>>>>>>>>>>>>>> now
>>>>>>>>>>>>>>>>>>>> longer
>>>>>>>>>>>>>>>>>>>> has the right to acknowledge, the INVALID_RECORD_STATE
>>>> error
>>>>>>>>>> code
>>>>>>>>>>>> is
>>>>>>>>>>>>>>>>>> used.
>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>> If the application uses the
>> KafkaShareConsumer.commitSync
>>>>>>>>>> method,
>>>>>>>>>>>> it
>>>>>>>>>>>>>>>>>> will
>>>>>>>>>>>>>>>>>>>> see an InvalidRecordState exception returned.
>>>> Alternatively,
>>>>>>>> the
>>>>>>>>>>>>>>>>>>>> application can
>>>>>>>>>>>>>>>>>>>> register an acknowledgement commit callback which will
>> be
>>>>>>>> called
>>>>>>>>>>>>>> with
>>>>>>>>>>>>>>>>>> the
>>>>>>>>>>>>>>>>>>>> status
>>>>>>>>>>>>>>>>>>>> of the acknowledgements that have succeeded or failed.
>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>> 77. I have tried to tread a careful path with the
>> durable
>>>>>>>>>>>>>>>>>> share-partition
>>>>>>>>>>>>>>>>>>>> state in this
>>>>>>>>>>>>>>>>>>>> KIP. The significant choices I made are that:
>>>>>>>>>>>>>>>>>>>> * Topics are used so that the state is replicated
>> between
>>>>>>>>>> brokers.
>>>>>>>>>>>>>>>>>>>> * Log compaction is used to keep a lid on the storage.
>>>>>>>>>>>>>>>>>>>> * Only one topic is required.
>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>> Log compaction as it stands is not ideal for this kind
>> of
>>>>>>>> data,
>>>>>>>>>> as
>>>>>>>>>>>>>>>>>>>> evidenced by
>>>>>>>>>>>>>>>>>>>> the DeltaIndex technique I employed.
>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>> I can think of a few relatively simple ways to improve
>>>> upon
>>>>>>>> it.
>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>> a) We could use a customised version of the log
>> compactor
>>>>>> for
>>>>>>>>>> this
>>>>>>>>>>>>>>>> topic
>>>>>>>>>>>>>>>>>>>> that
>>>>>>>>>>>>>>>>>>>> understands the rules for ShareCheckpoint and ShareDelta
>>>>>>>>>> records.
>>>>>>>>>>>>>>>>>>>> Essentially,
>>>>>>>>>>>>>>>>>>>> for each share-partition, the latest ShareCheckpoint and
>>>> any
>>>>>>>>>>>>>>>> subsequent
>>>>>>>>>>>>>>>>>>>> ShareDelta
>>>>>>>>>>>>>>>>>>>> records must not be cleaned. Anything else can be
>> cleaned.
>>>>>> We
>>>>>>>>>>>> could
>>>>>>>>>>>>>>>> then
>>>>>>>>>>>>>>>>>>>> be sure
>>>>>>>>>>>>>>>>>>>> that multiple ShareDelta records with the same key would
>>>>>>>> survive
>>>>>>>>>>>>>>>>>> cleaning
>>>>>>>>>>>>>>>>>>>> and we could
>>>>>>>>>>>>>>>>>>>> abandon the DeltaIndex technique.
>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>> b) Actually what is required is a log per
>> share-partition.
>>>>>>>> Let’s
>>>>>>>>>>>>>>>> imagine
>>>>>>>>>>>>>>>>>>>> that we had
>>>>>>>>>>>>>>>>>>>> a share-state topic per topic being consumed in a share
>>>>>> group,
>>>>>>>>>>>> with
>>>>>>>>>>>>>>>> the
>>>>>>>>>>>>>>>>>>>> same number
>>>>>>>>>>>>>>>>>>>> of partitions as the topic being consumed. We could
>> write
>>>>>> many
>>>>>>>>>>>> more
>>>>>>>>>>>>>>>>>> deltas
>>>>>>>>>>>>>>>>>>>> between
>>>>>>>>>>>>>>>>>>>> checkpoints, and just take periodic checkpoints to keep
>>>>>>>> control
>>>>>>>>>> of
>>>>>>>>>>>>>> the
>>>>>>>>>>>>>>>>>>>> storage used.
>>>>>>>>>>>>>>>>>>>> Once a checkpoint has been taken, we could use
>>>>>>>>>>>>>>>>>> KafkaAdmin.deleteRecords()
>>>>>>>>>>>>>>>>>>>> to
>>>>>>>>>>>>>>>>>>>> prune all of the older records.
>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>> The share-state topics would be more numerous, but we
>> are
>>>>>>>>>> talking
>>>>>>>>>>>>>> one
>>>>>>>>>>>>>>>>>> per
>>>>>>>>>>>>>>>>>>>> topic
>>>>>>>>>>>>>>>>>>>> per share group that it’s being consumed in. These
>> topics
>>>>>>>> would
>>>>>>>>>>>> not
>>>>>>>>>>>>>> be
>>>>>>>>>>>>>>>>>>>> compacted.
>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>> As you’ll see in the KIP, the Persister interface is
>>>>>> intended
>>>>>>>> to
>>>>>>>>>>>> be
>>>>>>>>>>>>>>>>>>>> pluggable one day.
>>>>>>>>>>>>>>>>>>>> I know the scheme in the KIP is not ideal. It seems
>> likely
>>>>>> to
>>>>>>>> me
>>>>>>>>>>>>>> that
>>>>>>>>>>>>>>>>>>>> future KIPs will
>>>>>>>>>>>>>>>>>>>> improve upon it.
>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>> If I can get buy-in for option (b), I’m happy to change
>>>> this
>>>>>>>>>> KIP.
>>>>>>>>>>>>>>>> While
>>>>>>>>>>>>>>>>>>>> option (a) is
>>>>>>>>>>>>>>>>>>>> probably workable, it does seem a bit of a hack to have
>> a
>>>>>>>>>>>> customised
>>>>>>>>>>>>>>>> log
>>>>>>>>>>>>>>>>>>>> compactor
>>>>>>>>>>>>>>>>>>>> just for this topic.
>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>> 78. How about DeliveryState? I agree that State is
>>>>>> overloaded.
>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>> 79. See (77).
>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>> Thanks,
>>>>>>>>>>>>>>>>>>>> Andrew
>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>> On 5 Apr 2024, at 05:07, David Arthur <
>>>>>>>>>> david.art...@confluent.io
>>>>>>>>>>>>>>>>>> .INVALID>
>>>>>>>>>>>>>>>>>>>> wrote:
>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>> Andrew, thanks for the KIP! This is a pretty exciting
>>>>>> effort.
>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>> I've finally made it through the KIP, still trying to
>>>> grok
>>>>>>>> the
>>>>>>>>>>>>>> whole
>>>>>>>>>>>>>>>>>>>> thing.
>>>>>>>>>>>>>>>>>>>>> Sorry if some of my questions are basic :)
>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>> Concepts:
>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>> 70. Does the Group Coordinator communicate with the
>> Share
>>>>>>>>>>>>>> Coordinator
>>>>>>>>>>>>>>>>>>>> over
>>>>>>>>>>>>>>>>>>>>> RPC or directly in-process?
>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>> 71. For preventing name collisions with regular
>> consumer
>>>>>>>>>> groups,
>>>>>>>>>>>>>>>> could
>>>>>>>>>>>>>>>>>> we
>>>>>>>>>>>>>>>>>>>>> define a reserved share group prefix? E.g., the
>> operator
>>>>>>>>>> defines
>>>>>>>>>>>>>>>> "sg_"
>>>>>>>>>>>>>>>>>>>> as a
>>>>>>>>>>>>>>>>>>>>> prefix for share groups only, and if a regular consumer
>>>>>> group
>>>>>>>>>>>> tries
>>>>>>>>>>>>>>>> to
>>>>>>>>>>>>>>>>>>>> use
>>>>>>>>>>>>>>>>>>>>> that name it fails.
>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>> 72. When a consumer tries to use a share group, or a
>>>> share
>>>>>>>>>>>> consumer
>>>>>>>>>>>>>>>>>> tries
>>>>>>>>>>>>>>>>>>>>> to use a regular group, would INVALID_GROUP_ID make
>> more
>>>>>>>> sense
>>>>>>>>>>>>>>>>>>>>> than INCONSISTENT_GROUP_PROTOCOL?
>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>> --------
>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>> Share Group Membership:
>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>> 73. What goes in the Metadata field for
>>>>>>>> TargetAssignment#Member
>>>>>>>>>>>> and
>>>>>>>>>>>>>>>>>>>>> Assignment?
>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>> 74. Under Trigger a rebalance, it says we rebalance
>> when
>>>>>> the
>>>>>>>>>>>>>>>> partition
>>>>>>>>>>>>>>>>>>>>> metadata changes. Would this be for any change, or just
>>>>>>>> certain
>>>>>>>>>>>>>> ones?
>>>>>>>>>>>>>>>>>> For
>>>>>>>>>>>>>>>>>>>>> example, if a follower drops out of the ISR and comes
>>>> back,
>>>>>>>> we
>>>>>>>>>>>>>>>> probably
>>>>>>>>>>>>>>>>>>>>> don't need to rebalance.
>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>> 75. "For a share group, the group coordinator does
>> *not*
>>>>>>>>>> persist
>>>>>>>>>>>>>> the
>>>>>>>>>>>>>>>>>>>>> assignment" Can you explain why this is not needed?
>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>> 76. " If the consumer just failed to heartbeat due to a
>>>>>>>>>> temporary
>>>>>>>>>>>>>>>>>> pause,
>>>>>>>>>>>>>>>>>>>> it
>>>>>>>>>>>>>>>>>>>>> could in theory continue to fetch and acknowledge
>>>> records.
>>>>>>>> When
>>>>>>>>>>>> it
>>>>>>>>>>>>>>>>>>>> finally
>>>>>>>>>>>>>>>>>>>>> sends a heartbeat and realises it’s been kicked out of
>>>> the
>>>>>>>>>> group,
>>>>>>>>>>>>>> it
>>>>>>>>>>>>>>>>>>>> should
>>>>>>>>>>>>>>>>>>>>> stop fetching records because its assignment has been
>>>>>>>> revoked,
>>>>>>>>>>>> and
>>>>>>>>>>>>>>>>>> rejoin
>>>>>>>>>>>>>>>>>>>>> the group."
>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>> A consumer with a long pause might still deliver some
>>>>>>>> buffered
>>>>>>>>>>>>>>>> records,
>>>>>>>>>>>>>>>>>>>> but
>>>>>>>>>>>>>>>>>>>>> if the share group coordinator has expired its session,
>>>> it
>>>>>>>>>>>> wouldn't
>>>>>>>>>>>>>>>>>>>> accept
>>>>>>>>>>>>>>>>>>>>> acknowledgments for that share consumer. In such a
>> case,
>>>> is
>>>>>>>> any
>>>>>>>>>>>>>> kind
>>>>>>>>>>>>>>>> of
>>>>>>>>>>>>>>>>>>>>> error raised to the application like "hey, I know we
>> gave
>>>>>> you
>>>>>>>>>>>> these
>>>>>>>>>>>>>>>>>>>>> records, but really we shouldn't have" ?
>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>> -----
>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>> Record Delivery and acknowledgement
>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>> 77. If we guarantee that a ShareCheckpoint is written
>> at
>>>>>>>> least
>>>>>>>>>>>>>> every
>>>>>>>>>>>>>>>> so
>>>>>>>>>>>>>>>>>>>>> often, could we add a new log compactor that avoids
>>>>>>>> compacting
>>>>>>>>>>>>>>>>>>>> ShareDelta-s
>>>>>>>>>>>>>>>>>>>>> that are still "active" (i.e., not yet superceded by a
>>>> new
>>>>>>>>>>>>>>>>>>>>> ShareCheckpoint). Mechnically, this could be done by
>>>>>> keeping
>>>>>>>>>> the
>>>>>>>>>>>>>> LSO
>>>>>>>>>>>>>>>> no
>>>>>>>>>>>>>>>>>>>>> greater than the oldest "active" ShareCheckpoint. This
>>>>>> might
>>>>>>>>>> let
>>>>>>>>>>>> us
>>>>>>>>>>>>>>>>>>>> remove
>>>>>>>>>>>>>>>>>>>>> the DeltaIndex thing.
>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>> 78. Instead of the State in the ShareDelta/Checkpoint
>>>>>>>> records,
>>>>>>>>>>>> how
>>>>>>>>>>>>>>>>>> about
>>>>>>>>>>>>>>>>>>>>> MessageState? (State is kind of overloaded/ambiguous)
>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>> 79. One possible limitation with the current
>> persistence
>>>>>>>> model
>>>>>>>>>> is
>>>>>>>>>>>>>>>> that
>>>>>>>>>>>>>>>>>>>> all
>>>>>>>>>>>>>>>>>>>>> the share state is stored in one topic. It seems like
>> we
>>>>>> are
>>>>>>>>>>>> going
>>>>>>>>>>>>>> to
>>>>>>>>>>>>>>>>>> be
>>>>>>>>>>>>>>>>>>>>> storing a lot more state than we do in
>> __consumer_offsets
>>>>>>>> since
>>>>>>>>>>>>>> we're
>>>>>>>>>>>>>>>>>>>>> dealing with message-level acks. With aggressive
>>>>>>>> checkpointing
>>>>>>>>>>>> and
>>>>>>>>>>>>>>>>>>>>> compaction, we can mitigate the storage requirements,
>> but
>>>>>> the
>>>>>>>>>>>>>>>>>> throughput
>>>>>>>>>>>>>>>>>>>>> could be a limiting factor. Have we considered other
>>>>>>>>>>>> possibilities
>>>>>>>>>>>>>>>> for
>>>>>>>>>>>>>>>>>>>>> persistence?
>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>> Cheers,
>>>>>>>>>>>>>>>>>>>>> David
>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>> 
>>>>>>>>>>>>>> 
>>>>>>>>>>>> 
>>>>>>>>>>>> 
>>>>>>>>>> 
>>>>>>>>>> 
>>>>>>>> 
>>>>>>>> 
>>>>>> 
>>>>>> 
>>>> 
>>>> 
>> 
>> 

Reply via email to