Hi, Andrew,

Thanks for addressing all the comments. The KIP looks good to me now.

Jun

On Mon, May 6, 2024 at 2:15 PM Andrew Schofield <andrew_schofi...@live.com>
wrote:

> Hi Jun,
> I have removed AdminClient.listGroups and the associated classes and
> interfaces.
>
> Version 6 of the ListGroups RPC remains because it adds support for share
> groups.
> This is needed to list share groups for the admin client and the
> command-line
> tools.
>
> Thanks,
> Andrew
>
> > On 6 May 2024, at 19:26, Jun Rao <j...@confluent.io.INVALID> wrote:
> >
> > Hi, Andrew,
> >
> > Removing AdminClient.listGroups() and the LisGroups RPC for now sounds
> good
> > to me.
> >
> > Thanks,
> >
> > Jun
> >
> > On Mon, May 6, 2024 at 11:10 AM Andrew Schofield <
> andrew_schofi...@live.com>
> > wrote:
> >
> >> Hi Jun,
> >> Thanks for the reply.
> >>
> >> 162. I’ve mentioned before that I plan another KIP for administration
> >> of groups. I think this is heading into that territory. I would like
> >> listGroups() to do a comprehensive job of returning all of the groups,
> >> and that does include groups which aren’t currently covered by
> GroupType.
> >> There’s a single namespace for all groups, and if someone is to make
> sense
> >> of the groups in a cluster, I think they need to be able to see them
> all.
> >> This is really just putting an API on top of the ListGroups RPC that
> >> already
> >> exists.
> >>
> >> I don’t think it would be desirable for connect-based groups to
> >> have GroupType.UNKNOWN. There are other custom group types.
> >> This all needs sorting out, I think.
> >>
> >> I propose to remove AdminClient.listGroups() from this KIP, and put
> >> it in the administration KIP.
> >>
> >> Let me know what you think.
> >>
> >> Thanks,
> >> Andrew
> >>
> >>
> >>> On 6 May 2024, at 18:04, Jun Rao <j...@confluent.io.INVALID> wrote:
> >>>
> >>> Hi, Andrew,
> >>>
> >>> Thanks for the reply.
> >>>
> >>> 162. It's fine to start with just the group type. Since ListGroups()
> is a
> >>> generic API, I want to make sure that it covers all existing groups.
> >>> Currently, GroupType only has "classic" and "consumer", both of which
> >> seem
> >>> to be related to groups formed by consumers since it's part of
> >>> ConsumerGroupDescription. Does ListGroup() return connect based groups
> >> and
> >>> if so, what's the GroupType? If ListGroup() doesn't cover all groups,
> >>> should we name it more accurately?
> >>>
> >>> Jun
> >>>
> >>>
> >>> On Fri, May 3, 2024 at 7:51 PM Andrew Schofield <
> >> andrew_schofi...@live.com>
> >>> wrote:
> >>>
> >>>> Hi Jun,
> >>>> Thanks for your reply.
> >>>>
> >>>> 161. ShareGroupListing and ShareGroupDescription are using
> >>>> the same pattern as ConsumerGroupListing and
> >>>> ConsumerGroupDescription. I have gone for consistency which
> >>>> I think is probably best here. It’s what I would expect if I had
> >> previously
> >>>> used the admin API for consumer groups and was looking to use it for
> >>>> share groups. I agree it’s a bit weird.
> >>>>
> >>>> 162. GroupListing contains the only information which is properly
> >>>> in common between a ConsumerGroupListing and a ShareGroupListing.
> >>>> ListGroupsResponse.ProtocolType is interpreted to provide the
> >>>> group type. I know that the ListGroups RPC also includes the group
> >>>> state, but that’s as a string and there’s no common enum for the
> states
> >>>> of all types of group. As a result, I have exposed group type but not
> >>>> state on this API.
> >>>>
> >>>> Previously in the discussion for this KIP, I mentioned that I would
> >>>> create another KIP for the administration of groups, in particular
> >>>> how the administrator can ensure that particular group IDs
> >>>> are used for the group type they desire. At the moment, I think
> >>>> keeping ListGroups in this KIP is a good idea. If we actually want
> >>>> to make it more sophisticated, perhaps that would be better with
> >>>> the group administration KIP.
> >>>>
> >>>> 163. It will be one higher than the latest version at the time we are
> >>>> ready to deliver this feature for real. When we are on the cusp of
> >>>> delivery, I’ll update the KIP with the final value.
> >>>>
> >>>> 164. KRaft only. All the RPCs are “broker” only. None of the code will
> >>>> be merged until after 3.8 has branched.
> >>>>
> >>>> Thanks,
> >>>> Andrew
> >>>>
> >>>>> On 4 May 2024, at 00:12, Jun Rao <j...@confluent.io.INVALID> wrote:
> >>>>>
> >>>>> Hi, Andrew,
> >>>>>
> >>>>> Thanks for the reply. A few more comments.
> >>>>>
> >>>>> 161. ShareGroupListing.state() returns an optional, but
> >>>>> ShareGroupDescription.state() does not. Should we make them
> consistent?
> >>>>> Also, it seems a bit weird to return optional with an UNKNOWN state.
> >>>>>
> >>>>> 162. Should GroupListing include ProtocolType and GroupState too?
> >>>>>
> >>>>> 163. What is the value of group.version to gate the queuing feature?
> >>>>>
> >>>>> 164. Is the queueing feature only supported on KRaft clusters? For
> >>>> example,
> >>>>> the feature tool seems to be built only for the KRaft cluster.
> >>>>>
> >>>>> Jun
> >>>>>
> >>>>> On Fri, May 3, 2024 at 10:32 AM Andrew Schofield <
> >>>> andrew_schofi...@live.com>
> >>>>> wrote:
> >>>>>
> >>>>>> Hi Jun,
> >>>>>> Thanks for your reply.
> >>>>>>
> >>>>>> 147. Yes, I see what you mean. The rebalance latency will indeed
> >>>>>> be very short by comparison. I have removed the rebalance latency
> >>>>>> metrics from the client and retained the rebalance count and rate.
> >>>>>>
> >>>>>> 150. Yes, I think so. I have tweaked the text so that the simple
> >>>>>> assignor will take into account existing assignment information when
> >>>>>> it has it, which would just minimise unnecessary churn of (b).
> >>>>>>
> >>>>>> 158. I’ve changed it to ReadShareGroupStateSummary.
> >>>>>>
> >>>>>> Thanks,
> >>>>>> Andrew
> >>>>>>
> >>>>>>
> >>>>>>> On 3 May 2024, at 22:17, Jun Rao <j...@confluent.io.INVALID> wrote:
> >>>>>>>
> >>>>>>> Hi, Andrew,
> >>>>>>>
> >>>>>>> Thanks for the reply.
> >>>>>>>
> >>>>>>> 147. There seems to be some difference between consumer groups and
> >>>> share
> >>>>>>> groups. In the consumer groups, if a client receives a heartbeat
> >>>> response
> >>>>>>> to revoke some partitions, it may have to commit offsets before
> >>>> revoking
> >>>>>>> partitions or it may have to call the rebalance callbacks provided
> by
> >>>> the
> >>>>>>> user. This may take some time and can be reflected in the rebalance
> >>>> time
> >>>>>>> metric. In the share groups, none of that exists. If a client
> >> receives
> >>>>>> some
> >>>>>>> added/revoked partitions, it accepts them immediately, right? So,
> >> does
> >>>>>> that
> >>>>>>> practically make the rebalance time always 0?
> >>>>>>>
> >>>>>>> 150. I guess in the common case, there will be many more members
> than
> >>>>>>> partitions. So the need for (b) will be less common. We can
> probably
> >>>>>> leave
> >>>>>>> the persisting of the assignment out for now.
> >>>>>>>
> >>>>>>> 158. The new name sounds good to me.
> >>>>>>>
> >>>>>>> Jun
> >>>>>>>
> >>>>>>> On Thu, May 2, 2024 at 10:21 PM Andrew Schofield <
> >>>>>> andrew_schofi...@live.com>
> >>>>>>> wrote:
> >>>>>>>
> >>>>>>>> Hi Jun,
> >>>>>>>> Thanks for the response.
> >>>>>>>>
> >>>>>>>> 147. I am trying to get a correspondence between the concepts and
> >>>>>>>> metrics of consumer groups and share groups. In both cases,
> >>>>>>>> the client doesn’t strictly know when the rebalance starts. All it
> >>>> knows
> >>>>>>>> is when it has work to do in order to perform its part of a
> >> rebalance.
> >>>>>>>> I am proposing that share groups and consumer groups use
> >>>>>>>> equivalent logic.
> >>>>>>>>
> >>>>>>>> I could remove the rebalance metrics from the client because I do
> >>>>>>>> understand that they are making a judgement about when a rebalance
> >>>>>>>> starts, but it’s their own part of the rebalance they are
> measuring.
> >>>>>>>>
> >>>>>>>> I tend to think these metrics are better than no metrics and
> >>>>>>>> will at least enable administrators to see how much rebalance
> >>>>>>>> activity the members of share groups are experiencing.
> >>>>>>>>
> >>>>>>>> 150. The simple assignor does not take existing assignments into
> >>>>>>>> consideration. The ShareGroupPartitionAssignor interface would
> >>>>>>>> permit this, but the simple assignor does not currently use it.
> >>>>>>>>
> >>>>>>>> The simple assignor assigns partitions in two ways:
> >>>>>>>> a) Distribute the members across the partitions by hashed member
> ID.
> >>>>>>>> b) If any partitions have no members assigned, distribute the
> >> members
> >>>>>>>> across these partitions round-robin.
> >>>>>>>>
> >>>>>>>> The (a) partitions will be quite stable. The (b) partitions will
> be
> >>>> less
> >>>>>>>> stable. By using existing assignment information, it could make
> (b)
> >>>>>>>> partition assignment more stable, whether the assignments are
> >>>>>>>> persisted or not. Perhaps it would be worth changing the simple
> >>>>>>>> assignor in order to make (b) more stable.
> >>>>>>>>
> >>>>>>>> I envisage more sophisticated assignors in the future which could
> >> use
> >>>>>>>> existing assignments and also other dynamic factors such as lag.
> >>>>>>>>
> >>>>>>>> If it transpires that there is significant benefit in persisting
> >>>>>>>> assignments
> >>>>>>>> specifically to help smooth assignment in the event of GC change,
> >>>>>>>> it would be quite an easy enhancement. I am not inclined to
> persist
> >>>>>>>> the assignments in this KIP.
> >>>>>>>>
> >>>>>>>> 158. Ah, yes. I see. Of course, I want the names as consistent and
> >>>>>>>> understandable too. I suggest renaming
> >>>>>>>> ReadShareGroupOffsetsState to ReadShareGroupStateSummary.
> >>>>>>>> I haven’t changed the KIP yet, so let me know if that’s OK.
> >>>>>>>>
> >>>>>>>> Thanks,
> >>>>>>>> Andrew
> >>>>>>>>
> >>>>>>>>> On 2 May 2024, at 22:18, Jun Rao <j...@confluent.io.INVALID>
> wrote:
> >>>>>>>>>
> >>>>>>>>> Hi, Andrew,
> >>>>>>>>>
> >>>>>>>>> Thanks for the reply.
> >>>>>>>>>
> >>>>>>>>> 147. " it makes a judgement about whether an assignment received
> is
> >>>>>> equal
> >>>>>>>>> to what it already is using."
> >>>>>>>>> If a client receives an assignment different from what it has, it
> >>>>>>>> indicates
> >>>>>>>>> the end of the rebalance. But how does the client know when the
> >>>>>> rebalance
> >>>>>>>>> starts? In the shareHeartbeat design, the new group epoch is
> >>>> propagated
> >>>>>>>>> together with the new assignment in the response.
> >>>>>>>>>
> >>>>>>>>> 150. It could be a potential concern if each GC change forces
> >>>>>> significant
> >>>>>>>>> assignment changes. Does the default assignor take existing
> >>>> assignments
> >>>>>>>>> into consideration?
> >>>>>>>>>
> >>>>>>>>> 155. Good point. Sounds good.
> >>>>>>>>>
> >>>>>>>>> 158. My concern with the current naming is that it's not clear
> what
> >>>> the
> >>>>>>>>> difference is between ReadShareGroupOffsetsState and
> >>>>>> ReadShareGroupState.
> >>>>>>>>> The state in the latter is also offsets.
> >>>>>>>>>
> >>>>>>>>> Jun
> >>>>>>>>>
> >>>>>>>>> On Wed, May 1, 2024 at 9:51 PM Andrew Schofield <
> >>>>>>>> andrew_schofi...@live.com>
> >>>>>>>>> wrote:
> >>>>>>>>>
> >>>>>>>>>> Hi Jun,
> >>>>>>>>>> Thanks for your reply.
> >>>>>>>>>>
> >>>>>>>>>> 147. Perhaps the easiest is to take a look at the code in
> >>>>>>>>>> o.a.k.clients.consumer.internal.MembershipManagerImpl.
> >>>>>>>>>> This class is part of the new consumer group protocol
> >>>>>>>>>> code in the client. It makes state transitions based on
> >>>>>>>>>> the heartbeat requests and responses, and it makes a
> >>>>>>>>>> judgement about whether an assignment received is
> >>>>>>>>>> equal to what it already is using. When a state transition
> >>>>>>>>>> is deemed to be the beginning or end of a rebalance
> >>>>>>>>>> from the point of view of this client, it counts towards the
> >>>>>>>>>> rebalance metrics.
> >>>>>>>>>>
> >>>>>>>>>> Share groups will follow the same path.
> >>>>>>>>>>
> >>>>>>>>>> 150. I do not consider it a concern. Rebalancing a share group
> >>>>>>>>>> is less disruptive than rebalancing a consumer group. If the
> >>>> assignor
> >>>>>>>>>> Has information about existing assignments, it can use it. It is
> >>>>>>>>>> true that this information cannot be replayed from a topic and
> >> will
> >>>>>>>>>> sometimes be unknown as a result.
> >>>>>>>>>>
> >>>>>>>>>> 151. I don’t want to rename TopicPartitionsMetadata to
> >>>>>>>>>> simply TopicPartitions (it’s information about the partitions of
> >>>>>>>>>> a topic) because we then have an array of plurals.
> >>>>>>>>>> I’ve renamed Metadata to Info. That’s a bit less cumbersome.
> >>>>>>>>>>
> >>>>>>>>>> 152. Fixed.
> >>>>>>>>>>
> >>>>>>>>>> 153. It’s the GC. Fixed.
> >>>>>>>>>>
> >>>>>>>>>> 154. The UNKNOWN “state” is essentially a default for situations
> >>>> where
> >>>>>>>>>> the code cannot understand data it received. For example, let’s
> >> say
> >>>>>> that
> >>>>>>>>>> Kafka 4.0 has groups with states EMPTY, STABLE, DEAD. If Kafka
> 4.1
> >>>>>>>>>> introduced another state THINKING, a tool built with Kafka 4.0
> >> would
> >>>>>> not
> >>>>>>>>>> know what THINKING meant. It will use “UNKNOWN” to indicate that
> >> the
> >>>>>>>>>> state was something that it could not understand.
> >>>>>>>>>>
> >>>>>>>>>> 155. No, it’s a the level of the share-partition. If the offsets
> >> for
> >>>>>>>> just
> >>>>>>>>>> one share-partition is reset, only the state epoch for that
> >>>> partition
> >>>>>> is
> >>>>>>>>>> updated.
> >>>>>>>>>>
> >>>>>>>>>> 156. Strictly speaking, it’s redundant. I think having the
> >>>> StartOffset
> >>>>>>>>>> separate gives helpful clarity and I prefer to retain it.
> >>>>>>>>>>
> >>>>>>>>>> 157. Yes, you are right. There’s no reason why a leader change
> >> needs
> >>>>>>>>>> to force a ShareSnapshot. I’ve added leaderEpoch to the
> >> ShareUpdate.
> >>>>>>>>>>
> >>>>>>>>>> 158. Although ReadShareGroupOffsetsState is a bit of a mouthful,
> >>>>>>>>>> having “State” in the name makes it clear that this one the
> family
> >>>> of
> >>>>>>>>>> inter-broker RPCs served by the share coordinator. The admin
> RPCs
> >>>>>>>>>> such as DescribeShareGroupOffsets do not include “State”.
> >>>>>>>>>>
> >>>>>>>>>> 159. Fixed.
> >>>>>>>>>>
> >>>>>>>>>> 160. Fixed.
> >>>>>>>>>>
> >>>>>>>>>> Thanks,
> >>>>>>>>>> Andrew
> >>>>>>>>>>
> >>>>>>>>>>> On 2 May 2024, at 00:29, Jun Rao <j...@confluent.io.INVALID>
> >> wrote:
> >>>>>>>>>>>
> >>>>>>>>>>> Hi, Andrew,
> >>>>>>>>>>>
> >>>>>>>>>>> Thanks for the reply.
> >>>>>>>>>>>
> >>>>>>>>>>> 147. "The measurement is certainly from the point of view of
> the
> >>>>>>>> client,
> >>>>>>>>>>> but it’s driven by sending and receiving heartbeats rather than
> >>>>>> whether
> >>>>>>>>>> the
> >>>>>>>>>>> client triggered the rebalance itself."
> >>>>>>>>>>> Hmm, how does a client know which heartbeat response starts a
> >>>>>>>> rebalance?
> >>>>>>>>>>>
> >>>>>>>>>>> 150. PartitionAssignor takes existing assignments into
> >>>> consideration.
> >>>>>>>>>> Since
> >>>>>>>>>>> GC doesn't persist the assignment for share groups, it means
> that
> >>>>>>>>>>> ShareGroupPartitionAssignor can't reliably depend on existing
> >>>>>>>>>> assignments.
> >>>>>>>>>>> Is that a concern?
> >>>>>>>>>>>
> >>>>>>>>>>> 151. ShareGroupPartitionMetadataValue: Should we rename
> >>>>>>>>>>> TopicPartitionsMetadata and TopicMetadata since there is no
> >>>> metadata?
> >>>>>>>>>>>
> >>>>>>>>>>> 152. ShareGroupMetadataKey: "versions": "3"
> >>>>>>>>>>> The versions should be 11.
> >>>>>>>>>>>
> >>>>>>>>>>> 153. ShareGroupDescription.coordinator(): The description says
> >> "The
> >>>>>>>> share
> >>>>>>>>>>> group coordinator". Is that the GC or SC?
> >>>>>>>>>>>
> >>>>>>>>>>> 154. "A share group has only three states - EMPTY , STABLE and
> >>>> DEAD".
> >>>>>>>>>>> What about UNKNOWN?
> >>>>>>>>>>>
> >>>>>>>>>>> 155. WriteShareGroupState: StateEpoch is at the group level,
> not
> >>>>>>>>>> partition
> >>>>>>>>>>> level, right?
> >>>>>>>>>>>
> >>>>>>>>>>> 156. ShareSnapshotValue: Is StartOffset redundant since it's
> the
> >>>> same
> >>>>>>>> as
> >>>>>>>>>>> the smallest FirstOffset in StateBatches?
> >>>>>>>>>>>
> >>>>>>>>>>> 157. Every leader change forces a ShareSnapshotValue write to
> >>>> persist
> >>>>>>>> the
> >>>>>>>>>>> new leader epoch. Is that a concern? An alternative is to
> include
> >>>>>>>>>>> leaderEpoch in ShareUpdateValue.
> >>>>>>>>>>>
> >>>>>>>>>>> 158. ReadShareGroupOffsetsState: The state is the offsets.
> Should
> >>>> we
> >>>>>>>>>> rename
> >>>>>>>>>>> it to something like ReadShareGroupStartOffset?
> >>>>>>>>>>>
> >>>>>>>>>>> 159. members are assigned members round-robin => members are
> >>>> assigned
> >>>>>>>>>>> round-robin
> >>>>>>>>>>>
> >>>>>>>>>>> 160. "may called": typo
> >>>>>>>>>>>
> >>>>>>>>>>> Jun
> >>>>>>>>>>>
> >>>>>>>>>>> On Mon, Apr 29, 2024 at 10:11 AM Andrew Schofield <
> >>>>>>>>>> andrew_schofi...@live.com>
> >>>>>>>>>>> wrote:
> >>>>>>>>>>>
> >>>>>>>>>>>> Hi Jun,
> >>>>>>>>>>>> Thanks for the reply and sorry for the delay in responding.
> >>>>>>>>>>>>
> >>>>>>>>>>>> 123. Yes, I didn’t quite get your point earlier. The member
> >>>>>>>>>>>> epoch is bumped by the GC when it sends a new assignment.
> >>>>>>>>>>>> When the member sends its next heartbeat, it echoes back
> >>>>>>>>>>>> the member epoch, which will confirm the receipt of the
> >>>>>>>>>>>> assignment. It would send the same member epoch even
> >>>>>>>>>>>> after recovery of a network disconnection, so that should
> >>>>>>>>>>>> be sufficient to cope with this eventuality.
> >>>>>>>>>>>>
> >>>>>>>>>>>> 125. Yes, I have added it to the table which now matches
> >>>>>>>>>>>> the text earlier in the KIP. Thanks.
> >>>>>>>>>>>>
> >>>>>>>>>>>> 140. Yes, I have added it to the table which now matches
> >>>>>>>>>>>> the text earlier in the KIP. I’ve also added more detail for
> >>>>>>>>>>>> the case where the entire share group is being deleted.
> >>>>>>>>>>>>
> >>>>>>>>>>>> 141. Yes! Sorry for confusing things.
> >>>>>>>>>>>>
> >>>>>>>>>>>> Back to the original question for this point. To delete a
> share
> >>>>>>>>>>>> group, should the GC write a tombstone for each
> >>>>>>>>>>>> ShareGroupMemberMetadata record?
> >>>>>>>>>>>>
> >>>>>>>>>>>> Tombstones are necessary to delete ShareGroupMemberMetadata
> >>>>>>>>>>>> records. But, deletion of a share group is only possible when
> >>>>>>>>>>>> the group is already empty, so the tombstones will have
> >>>>>>>>>>>> been written as a result of the members leaving the group.
> >>>>>>>>>>>>
> >>>>>>>>>>>> 143. Yes, that’s right.
> >>>>>>>>>>>>
> >>>>>>>>>>>> 147. The measurement is certainly from the point of view
> >>>>>>>>>>>> of the client, but it’s driven by sending and receiving
> >> heartbeats
> >>>>>>>>>>>> rather than whether the client triggered the rebalance itself.
> >>>>>>>>>>>> The client decides when it enters and leaves reconciliation
> >>>>>>>>>>>> of the assignment, and measures this period.
> >>>>>>>>>>>>
> >>>>>>>>>>>>
> >>>>>>>>>>>> Thanks,
> >>>>>>>>>>>> Andrew
> >>>>>>>>>>>>
> >>>>>>>>>>>>
> >>>>>>>>>>>>> On 26 Apr 2024, at 09:43, Jun Rao <j...@confluent.io.INVALID>
> >>>>>> wrote:
> >>>>>>>>>>>>>
> >>>>>>>>>>>>> Hi, Andrew,
> >>>>>>>>>>>>>
> >>>>>>>>>>>>> Thanks for the reply.
> >>>>>>>>>>>>>
> >>>>>>>>>>>>> 123. "Rather than add the group epoch to the
> >>>> ShareGroupHeartbeat, I
> >>>>>>>>>> have
> >>>>>>>>>>>>> decided to go for TopicPartitions in
> ShareGroupHeartbeatRequest
> >>>>>> which
> >>>>>>>>>>>>> mirrors ConsumerGroupHeartbeatRequest."
> >>>>>>>>>>>>> ShareGroupHeartbeat.MemberEpoch is the group epoch, right? Is
> >>>> that
> >>>>>>>>>> enough
> >>>>>>>>>>>>> for confirming the receipt of the new assignment?
> >>>>>>>>>>>>>
> >>>>>>>>>>>>> 125. This also means that "Alter share group offsets" needs
> to
> >>>>>> write
> >>>>>>>> a
> >>>>>>>>>>>>> ShareGroupPartitionMetadata record, if the partition is not
> >>>> already
> >>>>>>>>>>>>> initialized.
> >>>>>>>>>>>>>
> >>>>>>>>>>>>> 140. In the table for "Delete share group offsets", we need
> to
> >>>> add
> >>>>>> a
> >>>>>>>>>> step
> >>>>>>>>>>>>> to write a ShareGroupPartitionMetadata record with
> >>>> DeletingTopics.
> >>>>>>>>>>>>>
> >>>>>>>>>>>>> 141. Hmm, ShareGroupMemberMetadata is stored in the
> >>>>>>>> __consumer_offsets
> >>>>>>>>>>>>> topic, which is a compacted topic, right?
> >>>>>>>>>>>>>
> >>>>>>>>>>>>> 143. So, the client sends DescribeShareGroupOffsets requests
> to
> >>>> GC,
> >>>>>>>>>> which
> >>>>>>>>>>>>> then forwards it to SC?
> >>>>>>>>>>>>>
> >>>>>>>>>>>>> 147. I guess a client only knows the rebalance triggered by
> >>>> itself,
> >>>>>>>> but
> >>>>>>>>>>>> not
> >>>>>>>>>>>>> the ones triggered by other members or topic/partition
> changes?
> >>>>>>>>>>>>>
> >>>>>>>>>>>>> Jun
> >>>>>>>>>>>>>
> >>>>>>>>>>>>> On Thu, Apr 25, 2024 at 4:19 AM Andrew Schofield <
> >>>>>>>>>>>> andrew_schofi...@live.com>
> >>>>>>>>>>>>> wrote:
> >>>>>>>>>>>>>
> >>>>>>>>>>>>>> Hi Jun,
> >>>>>>>>>>>>>> Thanks for the response.
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>>> 123. Of course, ShareGroupHearbeat started off as
> >>>>>>>>>> ConsumerGroupHeartbeat
> >>>>>>>>>>>>>> and then unnecessary fields were removed. In the network
> issue
> >>>>>> case,
> >>>>>>>>>>>>>> there is not currently enough state being exchanged to be
> sure
> >>>> an
> >>>>>>>>>>>>>> assignment
> >>>>>>>>>>>>>> was received.
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>>> Rather than add the group epoch to the ShareGroupHeartbeat,
> I
> >>>> have
> >>>>>>>>>>>> decided
> >>>>>>>>>>>>>> to go for TopicPartitions in ShareGroupHeartbeatRequest
> which
> >>>>>>>> mirrors
> >>>>>>>>>>>>>> ConsumerGroupHeartbeatRequest. It means the share group
> member
> >>>>>> does
> >>>>>>>>>>>>>> confirm the assignment it is using, and that can be used by
> >> the
> >>>> GC
> >>>>>>>> to
> >>>>>>>>>>>>>> safely
> >>>>>>>>>>>>>> stop repeating the assignment in heartbeat responses.
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>>> 125. Ah, yes. This is indeed something possible with a
> >> consumer
> >>>>>>>> group
> >>>>>>>>>>>>>> and share groups should support it too. This does of course
> >>>> imply
> >>>>>>>> that
> >>>>>>>>>>>>>> ShareGroupPartitionMetadataValue needs an array of
> partitions,
> >>>> not
> >>>>>>>>>>>>>> just the number.
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>>> 140. Yes, good spot. There is an inconsistency here in
> >> consumer
> >>>>>>>> groups
> >>>>>>>>>>>>>> where you can use AdminClient.deleteConsumerGroupOffsets at
> >> the
> >>>>>>>>>>>>>> partition level, but kafka-consumer-groups.sh --delete only
> >>>>>> operates
> >>>>>>>>>>>>>> at the topic level.
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>>> Personally, I don’t think it’s sensible to delete offsets at
> >> the
> >>>>>>>>>>>> partition
> >>>>>>>>>>>>>> level only. You can reset them, but if you’re actively
> using a
> >>>>>> topic
> >>>>>>>>>>>> with
> >>>>>>>>>>>>>> a share group, I don’t see why you’d want to delete offsets
> >>>> rather
> >>>>>>>>>> than
> >>>>>>>>>>>>>> reset. If you’ve finished using a topic with a share group
> and
> >>>>>> want
> >>>>>>>> to
> >>>>>>>>>>>>>> clean
> >>>>>>>>>>>>>> up, use delete.
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>>> So, I’ve changed the AdminClient.deleteConsumerGroupOffsets
> to
> >>>> be
> >>>>>>>>>>>>>> topic-based and the RPCs behind it.
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>>> The GC reconciles the cluster state with the
> >>>>>>>>>> ShareGroupPartitionMetadata
> >>>>>>>>>>>>>> to spot deletion of topics and the like. However, when the
> >>>> offsets
> >>>>>>>> for
> >>>>>>>>>>>>>> a topic were deleted manually, the topic very like still
> >> exists
> >>>> so
> >>>>>>>>>>>>>> reconciliation
> >>>>>>>>>>>>>> alone is not going to be able to continue an interrupted
> >>>> operation
> >>>>>>>>>> that
> >>>>>>>>>>>>>> has started. So, I’ve added DeletingTopics back into
> >>>>>>>>>>>>>> ShareGroupPartitionMetadata for this purpose. It’s so
> failover
> >>>> of
> >>>>>> a
> >>>>>>>> GC
> >>>>>>>>>>>>>> can continue where it left off rather than leaving fragments
> >>>>>> across
> >>>>>>>>>> the
> >>>>>>>>>>>>>> SCs.
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>>> 141. That is not required. Because this is not a compacted
> >>>> topic,
> >>>>>> it
> >>>>>>>>>> is
> >>>>>>>>>>>>>> not necessary to write tombstones for every key. As long as
> >>>> there
> >>>>>>>> is a
> >>>>>>>>>>>>>> clear and unambiguous record for the deletion of the group,
> >> that
> >>>>>> is
> >>>>>>>>>>>> enough.
> >>>>>>>>>>>>>> The tombstone for ShareGroupPartitionMetadata is
> theoretically
> >>>> not
> >>>>>>>>>>>>>> required but it’s a single record, rather than one per
> member,
> >>>> so
> >>>>>> I
> >>>>>>>>>>>> prefer
> >>>>>>>>>>>>>> to leave it as a record that the interactions with the SC
> have
> >>>>>> been
> >>>>>>>>>>>>>> completed.
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>>> 142.
> >>>>>>>>>>>>>> 142.1. It will prompt the user to confirm they want to
> >> continue.
> >>>>>>>>>>>>>> This is in common with `kafka-consumer-groups.sh` which
> >>>>>> historically
> >>>>>>>>>>>>>> has defaulted to --dry-run behaviour, but is due to change
> to
> >>>>>>>>>> prompting
> >>>>>>>>>>>>>> if neither --dry-run nor --execute is specified “in a future
> >>>> major
> >>>>>>>>>>>>>> release”.
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>>> 142.2. It should support partition-level reset, but only
> >>>>>> topic-level
> >>>>>>>>>>>>>> delete.
> >>>>>>>>>>>>>> I have updated the usage text accordingly. This is in common
> >>>> with
> >>>>>>>>>>>>>> kafka-consumer-groups.sh.
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>>> 142.3. --dry-run displays the operation that would be
> >> executed.
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>>> 142.4. The valid values are: Dead, Empty, Stable. Added to
> the
> >>>>>>>>>>>>>> usage text.
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>>> 143. DescribeShareGroupOffsets is served by the group
> >>>> coordinator
> >>>>>>>>>>>>>> for this kind of reason.
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>>> 144. That’s the default. If you haven’t asked to release or
> >>>>>> reject,
> >>>>>>>> it
> >>>>>>>>>>>>>> accepts.
> >>>>>>>>>>>>>> This is analogous to fetching and committing offsets in a
> >>>> consumer
> >>>>>>>>>>>> group.
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>>> 145. I think this is a good idea, but I would prefer to
> defer
> >> it
> >>>>>>>>>> until a
> >>>>>>>>>>>>>> future
> >>>>>>>>>>>>>> metrics KIP that I have planned. In KIP-932, I have added
> >> basic
> >>>>>>>>>> metrics
> >>>>>>>>>>>>>> only.
> >>>>>>>>>>>>>> For example, you’ll see that there’s no concept of lag yet,
> >>>> which
> >>>>>>>>>> surely
> >>>>>>>>>>>>>> will have relevance for share groups. I plan to create and
> >>>> deliver
> >>>>>>>> the
> >>>>>>>>>>>>>> metrics KIP before share groups are declared ready for
> >>>> production.
> >>>>>>>>>>>>>> I want the new metrics to be developed with the experience
> of
> >>>>>>>> running
> >>>>>>>>>>>>>> the code.
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>>> 146. Milliseconds. KIP updated.
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>>> 147. There is a membership state machine in the client that
> >>>>>>>>>>>>>> changes states as the ShareGroupHeartbeat requests and
> >> responses
> >>>>>>>>>>>>>> flow. The duration of a rebalance will be shorter from the
> >> point
> >>>>>> of
> >>>>>>>>>>>>>> view of the share-group consumer because it doesn’t have to
> >>>> worry
> >>>>>>>>>> about
> >>>>>>>>>>>>>> rebalance callbacks and committing offsets as the partitions
> >>>> move
> >>>>>>>>>>>>>> around, but the overall flow is very similar. So, it’s the
> >> state
> >>>>>>>>>>>>>> transitions
> >>>>>>>>>>>>>> that drive the collection of the rebalance metrics.
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>>> 148. Strangely, none of the existing uses of
> >>>>>> records-per-request-avg
> >>>>>>>>>>>>>> actually have records-per-request-max. I tend to err on the
> >> side
> >>>>>> of
> >>>>>>>>>>>>>> consistency, but can’t think of any reason not to add this.
> >>>> Done.
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>>> 149. There are several error codes for
> >>>>>> WriteShareGroupStateResponse:
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>>> NOT_COORDINATOR - This is not the share coordinator you’re
> >>>> looking
> >>>>>>>>>> for.
> >>>>>>>>>>>>>> COORDINATOR_NOT_AVAILABLE - The SC can’t
> >>>>>>>>>>>>>> COORDINATOR_LOAD_IN_PROGRESS - The SC is replaying the
> topic.
> >>>>>>>>>>>>>> GROUP_ID_NOT_FOUND - The SC doesn’t have state for this
> group.
> >>>>>>>>>>>>>> UNKNOWN_TOPIC_OR_PARTITION - The SC doesn’t have state for
> >> this
> >>>>>>>>>>>>>> topic-partition.
> >>>>>>>>>>>>>> FENCED_STATE_EPOCH - The write has the wrong state epoch.
> >>>>>>>>>>>>>> INVALID_REQUEST - There was a problem with the request.
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>>> Thanks,
> >>>>>>>>>>>>>> Andrew
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>>>> On 24 Apr 2024, at 19:10, Jun Rao <j...@confluent.io.INVALID
> >
> >>>>>>>> wrote:
> >>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>> Hi, Andrew,
> >>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>> Thanks for the response.
> >>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>> 123. I thought the network issue can be covered with the
> >> group
> >>>>>>>> epoch.
> >>>>>>>>>>>>>>> Basically, if the assignment is to be changed, GC bumps up
> >> the
> >>>>>>>> epoch
> >>>>>>>>>>>>>> first,
> >>>>>>>>>>>>>>> but doesn't expose the new epoch to members until the
> >>>> assignment
> >>>>>> is
> >>>>>>>>>>>>>>> complete (including initializing the sharePartitionState).
> >> Once
> >>>>>> the
> >>>>>>>>>>>>>>> assignment is complete, GC includes the bumped up epoch and
> >> the
> >>>>>> new
> >>>>>>>>>>>>>>> assignment in the heartbeatResponse. If the next heartbeat
> >>>>>> Request
> >>>>>>>>>>>>>> includes
> >>>>>>>>>>>>>>> the new epoch, it means that the member has received the
> new
> >>>>>>>>>> assignment
> >>>>>>>>>>>>>> and
> >>>>>>>>>>>>>>> GC can exclude the assignment in the heartbeatResponse.
> >>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>> 125. "If AlterShareGroupOffsets is called for a
> >> topic-partition
> >>>>>>>> which
> >>>>>>>>>>>> is
> >>>>>>>>>>>>>>> not yet in ShareGroupPartitionMetadataValue, it’s not
> really
> >>>> part
> >>>>>>>> of
> >>>>>>>>>>>> the
> >>>>>>>>>>>>>>> group yet. So I think it’s permitted to fail the
> >>>>>>>>>> AlterShareGroupOffsets
> >>>>>>>>>>>>>>> because the topic-partition would not be part of
> >>>>>>>>>> ListShareGroupOffsets
> >>>>>>>>>>>>>>> result."
> >>>>>>>>>>>>>>> A user may want to initialize SPSO (e.g. based on
> timestamp)
> >>>>>> before
> >>>>>>>>>> the
> >>>>>>>>>>>>>>> application is first used. If we reject
> >> AlterShareGroupOffsets
> >>>>>> when
> >>>>>>>>>>>>>>> ShareGroupPartitionMetadataValue is empty, the user will be
> >>>>>> forced
> >>>>>>>> to
> >>>>>>>>>>>>>> start
> >>>>>>>>>>>>>>> the application with the wrong SPSO first and then reset
> it,
> >>>>>> which
> >>>>>>>>>> will
> >>>>>>>>>>>>>> be
> >>>>>>>>>>>>>>> inconvenient.
> >>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>> 140. AdminClient.deleteShareGroupOffsets allows deletion of
> >>>> SPSO
> >>>>>>>> for
> >>>>>>>>>>>>>>> individual partitions, but ShareGroupPartitionMetadataValue
> >>>> only
> >>>>>>>>>> tracks
> >>>>>>>>>>>>>>> consecutive partitions.
> >>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>> 141. Delete share group: Should GC also write a tombstone
> for
> >>>>>> each
> >>>>>>>>>>>>>>> ShareGroupMemberMetadata record?
> >>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>> 142. kafka-share-groups.sh
> >>>>>>>>>>>>>>> 142.1 What happens if neither --dry-run nor --execute is
> >>>>>> specified
> >>>>>>>>>> for
> >>>>>>>>>>>>>>> reset-offsets?
> >>>>>>>>>>>>>>> 142.2 Should it support --delete-offsets and
> --reset-offsets
> >> at
> >>>>>> the
> >>>>>>>>>>>>>>> partition level to match the AdminClient api?
> >>>>>>>>>>>>>>> 142.3 How useful is --dry-run? The corresponding RPCs don't
> >>>> carry
> >>>>>>>> the
> >>>>>>>>>>>>>>> dry-run flag.
> >>>>>>>>>>>>>>> 142.4 --state [String]: What are the valid state values?
> >>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>> 143. DescribeShareGroupOffsets is served by the
> >> share-partition
> >>>>>>>>>> leader.
> >>>>>>>>>>>>>> How
> >>>>>>>>>>>>>>> could a user describe the share group offset when there is
> no
> >>>>>>>> member?
> >>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>> 144. kafka-console-share-consumer.sh: Is there an option to
> >>>>>> accept
> >>>>>>>>>>>>>> consumed
> >>>>>>>>>>>>>>> messages?
> >>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>> 145. It would be useful to add a broker side metric that
> >>>> measures
> >>>>>>>> the
> >>>>>>>>>>>>>>> pressure from group.share.partition.max.record.locks, e.g.
> >> the
> >>>>>>>>>> fraction
> >>>>>>>>>>>>>> of
> >>>>>>>>>>>>>>> the time that SPL is blocked because
> >>>>>>>>>>>>>> group.share.partition.max.record.locks
> >>>>>>>>>>>>>>> is reached.
> >>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>> 146. heartbeat-response-time-max: What's the unit?
> >>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>> 147. rebalance related metrics: How does a share consumer
> >> know
> >>>>>> when
> >>>>>>>>>>>> there
> >>>>>>>>>>>>>>> is a rebalance and how does it measure the rebalance time?
> >>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>> 148. records-per-request-avg: Should we pair it with
> >>>>>>>>>>>>>>> records-per-request-max?
> >>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>> 149. If the shareGroupState is not available, what error
> code
> >>>> is
> >>>>>>>> used
> >>>>>>>>>>>> in
> >>>>>>>>>>>>>>> WriteShareGroupStateResponse?
> >>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>> Jun
> >>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>> On Wed, Apr 24, 2024 at 7:12 AM Andrew Schofield <
> >>>>>>>>>>>>>> andrew_schofi...@live.com>
> >>>>>>>>>>>>>>> wrote:
> >>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>> Hi Jun,
> >>>>>>>>>>>>>>>> Thanks for your reply.
> >>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>> 123. The GC only sends the assignment in
> ShareGroupHeartbeat
> >>>>>>>>>>>>>>>> response if the member has just joined, or the assignment
> >> has
> >>>>>> been
> >>>>>>>>>>>>>>>> recalculated. This latter condition is met when the GC
> fails
> >>>>>> over.
> >>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>> With a consumer group, it works a little differently. The
> >>>>>>>>>> heartbeating
> >>>>>>>>>>>>>>>> occurs asynchronously with the reconciliation process in
> the
> >>>>>>>> client.
> >>>>>>>>>>>>>>>> The client has reconciliation callbacks, as well as
> offsets
> >> to
> >>>>>>>>>> commit
> >>>>>>>>>>>>>>>> before it can confirm that revocation has occured. So, it
> >>>> might
> >>>>>>>> take
> >>>>>>>>>>>>>>>> multiple heartbeats to complete the installation of a new
> >>>> target
> >>>>>>>>>>>>>>>> assignment in the client.
> >>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>> With a share group, it’s more brutal. The GC sends the
> >>>>>> assignment
> >>>>>>>>>>>>>>>> in a heartbeat response, and the client is assumed to have
> >>>> acted
> >>>>>>>> on
> >>>>>>>>>>>>>>>> It immediately. Since share group assignment is really
> about
> >>>>>>>>>> balancing
> >>>>>>>>>>>>>>>> consumers across the available partitions, rather than
> >> safely
> >>>>>>>>>> handing
> >>>>>>>>>>>>>>>> ownership of partitions between consumers, there is less
> >>>>>>>>>> coordination.
> >>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>> I wonder whether there is one situation that does need
> >>>>>>>> considering.
> >>>>>>>>>>>>>>>> If the network connection between the client and the GC is
> >>>> lost,
> >>>>>>>> it
> >>>>>>>>>> is
> >>>>>>>>>>>>>>>> possible that a response containing the assignment is
> lost.
> >>>>>> Then,
> >>>>>>>>>>>>>>>> the connection will be reestablished, and the assignment
> >> will
> >>>>>> only
> >>>>>>>>>> be
> >>>>>>>>>>>>>>>> sent when it’s recalculated. Adding the equivalent of
> >>>>>>>>>>>>>>>> ConsumerGroupHeartbeatRequest.TopicPartitions would be one
> >>>>>>>>>>>>>>>> way to close that.
> >>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>> 125. Yes, you are right. The reconciliation that I
> describe
> >> in
> >>>>>> the
> >>>>>>>>>>>>>>>> answer to (54) below is related. If AlterShareGroupOffsets
> >> is
> >>>>>>>> called
> >>>>>>>>>>>>>>>> for a topic-partition which is not yet in
> >>>>>>>>>>>>>> ShareGroupPartitionMetadataValue,
> >>>>>>>>>>>>>>>> it’s not really part of the group yet. So I think it’s
> >>>> permitted
> >>>>>>>> to
> >>>>>>>>>>>> fail
> >>>>>>>>>>>>>>>> the AlterShareGroupOffsets because the topic-partition
> would
> >>>> not
> >>>>>>>> be
> >>>>>>>>>>>>>>>> part of ListShareGroupOffsets result.
> >>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>> If AlterShareGroupOffsets is called for a topic-partition
> >>>> which
> >>>>>> is
> >>>>>>>>>> in
> >>>>>>>>>>>>>>>> ShareGroupPartitionMetadataValue, then just calling
> >>>>>>>>>>>>>>>> InitializeShareGroupState to set its offset would be
> >>>> sufficient
> >>>>>>>>>>>> without
> >>>>>>>>>>>>>>>> writing ShareGroupPartitionMetadata again.
> >>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>> 138. Added.
> >>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>> 139. This week, I have been experimenting with using the
> >>>>>> existing
> >>>>>>>>>>>>>>>> consumer metrics with share consumer code. That was my
> plan
> >> to
> >>>>>> get
> >>>>>>>>>>>>>>>> started with metrics. While they work to some extent, I am
> >> not
> >>>>>>>>>>>> entirely
> >>>>>>>>>>>>>>>> happy with the result.
> >>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>> I have added a basic set of share consumer-specific
> metrics
> >> to
> >>>>>>>>>> KIP-932
> >>>>>>>>>>>>>>>> which I think is a step in the right direction. I
> >> anticipate a
> >>>>>>>>>> future
> >>>>>>>>>>>>>> KIP
> >>>>>>>>>>>>>>>> that defines many more metrics and tackles concepts such
> as
> >>>> what
> >>>>>>>>>>>>>>>> “lag” means for a share group. The metrics I’ve included
> are
> >>>>>>>> simply
> >>>>>>>>>>>>>>>> counting and measuring the operations, which is a good
> >> start.
> >>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>> 54. I think either with or without
> >>>>>>>> InitializingTopics/DeletingTopics
> >>>>>>>>>>>> in
> >>>>>>>>>>>>>>>> ShareGroupPartitionMetadataValue works. However, I think
> >> there
> >>>>>> is
> >>>>>>>>>>>>>>>> a deeper point behind what you said. The GC needs to be
> >>>>>> responsive
> >>>>>>>>>>>>>>>> to topic creation and deletion, and addition of partitions
> >> in
> >>>>>>>> cases
> >>>>>>>>>>>>>> where
> >>>>>>>>>>>>>>>> it doesn’t agree with InitializingTopics/DeletingTopics.
> So,
> >>>> we
> >>>>>>>> are
> >>>>>>>>>>>>>>>> probably not saving anything by having those fields. As a
> >>>>>> result,
> >>>>>>>> I
> >>>>>>>>>>>> have
> >>>>>>>>>>>>>>>> removed InitializingTopics and DeletingTopics and updated
> >> the
> >>>>>> KIP
> >>>>>>>>>>>>>>>> accordingly. The GC needs to reconcile its view of the
> >>>>>> initialised
> >>>>>>>>>>>>>>>> topic-partitions with ShareGroupPartitionMetadataValue and
> >> it
> >>>>>> will
> >>>>>>>>>>>>>>>> initialize or delete share-group state accordingly.
> >>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>> I have made one more change to the KIP. The
> >>>>>> SharePartitionAssignor
> >>>>>>>>>>>>>>>> interface has been renamed to
> >>>>>>>>>>>>>>>>
> o.a.k.coordinator.group.assignor.ShareGroupPartitionAssignor
> >>>> and
> >>>>>>>> it
> >>>>>>>>>>>>>>>> now extends the PartitionAssignor interface. It’s
> >> essentially
> >>>> a
> >>>>>>>>>> marker
> >>>>>>>>>>>>>>>> of which partition assignors can be used with share
> groups.
> >>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>> Thanks,
> >>>>>>>>>>>>>>>> Andrew
> >>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>> On 23 Apr 2024, at 18:27, Jun Rao
> <j...@confluent.io.INVALID
> >>>
> >>>>>>>>>> wrote:
> >>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>> Hi, Andrew,
> >>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>> Thanks for the reply.
> >>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>> 123. "it doesn’t need to confirm the assignment back to
> the
> >>>>>> GC."
> >>>>>>>>>>>>>>>>> Hmm, I thought the member needs to confirm the assignment
> >> to
> >>>> GC
> >>>>>>>> to
> >>>>>>>>>>>>>>>>> avoid GC including the assignment in the heartbeat
> response
> >>>>>>>>>>>>>>>> continuously. I
> >>>>>>>>>>>>>>>>> assume this is done by including the new group epoch in
> the
> >>>>>>>>>> heartbeat
> >>>>>>>>>>>>>>>>> response.
> >>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>> 125. It's possible that the share partition has never
> been
> >>>>>>>>>>>> initialized
> >>>>>>>>>>>>>>>> when
> >>>>>>>>>>>>>>>>> AlterShareGroupOffsets is called. If GC doesn't write
> >>>>>>>>>>>>>>>>> ShareGroupPartitionMetadata and the GC fails over, it
> would
> >>>>>>>>>>>>>> reinitialize
> >>>>>>>>>>>>>>>>> the share partition and lose the effect of
> >>>>>>>> AlterShareGroupOffsets.
> >>>>>>>>>> If
> >>>>>>>>>>>>>> the
> >>>>>>>>>>>>>>>>> partition has already been initialized and it's recorded
> >>>>>>>>>>>>>>>>> in ShareGroupPartitionMetadata, it's possible not to
> write
> >>>>>>>>>>>>>>>>> ShareGroupPartitionMetadata again when handling
> >>>>>>>>>>>> AlterShareGroupOffsets.
> >>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>> 138. Could we add the flow in GC when a topic is deleted?
> >>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>> 139. Do we need to add any metrics in KafkaShareConsumer?
> >>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>> 54. "I don’t think there is any redundancy. The
> >>>>>>>>>>>>>> ShareGroupMemberMetadata
> >>>>>>>>>>>>>>>>> does include a list of subscribed topics. However, if
> there
> >>>> is
> >>>>>> a
> >>>>>>>>>>>> period
> >>>>>>>>>>>>>>>> of
> >>>>>>>>>>>>>>>>> time in which no members are subscribed to a particular
> >>>> topic,
> >>>>>> it
> >>>>>>>>>>>> does
> >>>>>>>>>>>>>>>> not
> >>>>>>>>>>>>>>>>> mean that the topic’s ShareGroupState should be
> immediately
> >>>>>>>>>> removed,
> >>>>>>>>>>>>>> but
> >>>>>>>>>>>>>>>> it
> >>>>>>>>>>>>>>>>> does mean that there will be no ShareGroupMemberMetadata
> >>>>>> records
> >>>>>>>>>>>>>>>> containing
> >>>>>>>>>>>>>>>>> that topic."
> >>>>>>>>>>>>>>>>> I am still trying to understand the value of
> >>>> InitializingTopics
> >>>>>>>> and
> >>>>>>>>>>>>>>>>> DeletingTopics in ShareGroupPartitionMetadataValue. They
> >> are
> >>>>>> used
> >>>>>>>>>> to
> >>>>>>>>>>>>>>>>> remember the intention of an operation. However, GC still
> >>>> needs
> >>>>>>>> to
> >>>>>>>>>>>>>> handle
> >>>>>>>>>>>>>>>>> the case when the intention is not safely recorded. If GC
> >>>> wants
> >>>>>>>> to
> >>>>>>>>>>>>>>>>> initialize a new topic/partition, a simpler approach is
> for
> >>>> it
> >>>>>> to
> >>>>>>>>>>>> send
> >>>>>>>>>>>>>> an
> >>>>>>>>>>>>>>>>> InitializeShareGroupState to the share coordinator and
> >> after
> >>>>>>>>>>>> receiving
> >>>>>>>>>>>>>> a
> >>>>>>>>>>>>>>>>> success response, write ShareGroupPartitionMetadataValue
> >> with
> >>>>>> the
> >>>>>>>>>>>>>>>>> initialized partition included in InitializedTopics. This
> >>>>>> saves a
> >>>>>>>>>>>>>> record
> >>>>>>>>>>>>>>>>> write. It's possible for GC to fail in between the two
> >> steps.
> >>>>>> On
> >>>>>>>>>>>>>>>> failover,
> >>>>>>>>>>>>>>>>> the new GC just repeats the process. The case that you
> >>>>>> mentioned
> >>>>>>>>>>>> above
> >>>>>>>>>>>>>>>> can
> >>>>>>>>>>>>>>>>> still be achieved. If a partition is in InitializedTopics
> >> of
> >>>>>>>>>>>>>>>>> ShareGroupPartitionMetadataValue, but no member
> subscribes
> >> to
> >>>>>> it,
> >>>>>>>>>> we
> >>>>>>>>>>>>>> can
> >>>>>>>>>>>>>>>>> still keep the ShareGroupState as long as the topic still
> >>>>>> exists.
> >>>>>>>>>> The
> >>>>>>>>>>>>>>>> same
> >>>>>>>>>>>>>>>>> optimization could be applied to DeletingTopics too.
> >>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>> Jun
> >>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>> On Tue, Apr 23, 2024 at 3:57 AM Andrew Schofield <
> >>>>>>>>>>>>>>>> andrew_schofi...@live.com>
> >>>>>>>>>>>>>>>>> wrote:
> >>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>> Hi Jun,
> >>>>>>>>>>>>>>>>>> Thanks for the reply.
> >>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>> 123. Every time the GC fails over, it needs to recompute
> >> the
> >>>>>>>>>>>>>> assignment
> >>>>>>>>>>>>>>>>>> for every member. However, the impact of re-assignment
> is
> >>>> not
> >>>>>>>> that
> >>>>>>>>>>>>>>>> onerous.
> >>>>>>>>>>>>>>>>>> If the recomputed assignments are the same, which they
> may
> >>>>>> well
> >>>>>>>>>> be,
> >>>>>>>>>>>>>>>> there
> >>>>>>>>>>>>>>>>>> is no impact on the members at all.
> >>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>> On receiving the new assignment, the member adjusts the
> >>>>>>>>>>>>>> topic-partitions
> >>>>>>>>>>>>>>>>>> in its share sessions, removing those which were revoked
> >> and
> >>>>>>>>>> adding
> >>>>>>>>>>>>>>>> those
> >>>>>>>>>>>>>>>>>> which were assigned. It is able to acknowledge the
> records
> >>>> it
> >>>>>>>>>>>> fetched
> >>>>>>>>>>>>>>>> from
> >>>>>>>>>>>>>>>>>> the partitions which have just been revoked, and it
> >> doesn’t
> >>>>>> need
> >>>>>>>>>> to
> >>>>>>>>>>>>>>>> confirm
> >>>>>>>>>>>>>>>>>> the assignment back to the GC.
> >>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>> 125. I don’t think the GC needs to write
> >>>>>>>>>> ShareGroupPartitionMetadata
> >>>>>>>>>>>>>>>>>> when processing AlterShareGroupOffsets. This is because
> >> the
> >>>>>>>>>>>> operation
> >>>>>>>>>>>>>>>>>> happens as a result of an explicit administrative action
> >> and
> >>>>>> it
> >>>>>>>> is
> >>>>>>>>>>>>>>>> possible
> >>>>>>>>>>>>>>>>>> to return a specific error code for each
> topic-partition.
> >>>> The
> >>>>>>>>>> cases
> >>>>>>>>>>>>>>>> where
> >>>>>>>>>>>>>>>>>> ShareGroupPartitionMetadata is used are when a topic is
> >>>> added
> >>>>>> or
> >>>>>>>>>>>>>> removed
> >>>>>>>>>>>>>>>>>> from the subscribed topics, or the number of partitions
> >>>>>> changes.
> >>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>> 130. I suppose that limits the minimum lock timeout for
> a
> >>>>>>>> cluster
> >>>>>>>>>> to
> >>>>>>>>>>>>>>>>>> prevent
> >>>>>>>>>>>>>>>>>> a group from having an excessively low value. Config
> >> added.
> >>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>> 131. I have changed it to
> >>>>>>>> group.share.partition.max.record.locks.
> >>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>> 136.  When GC failover occurs, the GC gaining ownership
> >> of a
> >>>>>>>>>>>> partition
> >>>>>>>>>>>>>>>> of
> >>>>>>>>>>>>>>>>>> the __consumer_offsets topic replays the records to
> build
> >>>> its
> >>>>>>>>>> state.
> >>>>>>>>>>>>>>>>>> In the case of a share group, it learns:
> >>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>> * The share group and its group epoch
> (ShareGroupMetadata)
> >>>>>>>>>>>>>>>>>> * The list of members (ShareGroupMemberMetadata)
> >>>>>>>>>>>>>>>>>> * The list of share-partitions
> >> (ShareGroupPartitionMetadata)
> >>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>> It will recompute the assignments in order to respond to
> >>>>>>>>>>>>>>>>>> ShareGroupHeartbeat requests. As a result, it bumps the
> >>>> group
> >>>>>>>>>> epoch.
> >>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>> I will update the KIP accordingly to confirm the
> >> behaviour.
> >>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>> 137.1: The GC and the SPL report the metrics in the
> >>>>>>>>>>>>>>>>>> group-coordinator-metrics
> >>>>>>>>>>>>>>>>>> group. Unlike consumer groups in which the GC performs
> >>>> offset
> >>>>>>>>>>>> commit,
> >>>>>>>>>>>>>>>>>> the share group equivalent is performed by the SPL. So,
> >> I’ve
> >>>>>>>>>> grouped
> >>>>>>>>>>>>>> the
> >>>>>>>>>>>>>>>>>> concepts which relate to the group in
> >>>>>> group-coordinator-metrics.
> >>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>> The SC reports the metrics in the
> >> share-coordinator-metrics
> >>>>>>>> group.
> >>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>> 137.2: There is one metric in both groups -
> >>>>>> partition-load-time.
> >>>>>>>>>> In
> >>>>>>>>>>>>>> the
> >>>>>>>>>>>>>>>> SC
> >>>>>>>>>>>>>>>>>> group,
> >>>>>>>>>>>>>>>>>> it refers to the time loading data from the share-group
> >>>> state
> >>>>>>>>>> topic
> >>>>>>>>>>>> so
> >>>>>>>>>>>>>>>> that
> >>>>>>>>>>>>>>>>>> a ReadShareGroupState request can be answered. In the GC
> >>>>>> group,
> >>>>>>>>>>>>>>>>>> it refers to the time to read the state from the
> >> persister.
> >>>>>>>> Apart
> >>>>>>>>>>>> from
> >>>>>>>>>>>>>>>> the
> >>>>>>>>>>>>>>>>>> interbroker RPC latency of the read, they’re likely to
> be
> >>>> very
> >>>>>>>>>>>> close.
> >>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>> Later, for a cluster which is using a custom persister,
> >> the
> >>>>>>>>>>>>>>>>>> share-coordinator
> >>>>>>>>>>>>>>>>>> metrics would likely not be reported, and the persister
> >>>> would
> >>>>>>>> have
> >>>>>>>>>>>> its
> >>>>>>>>>>>>>>>> own
> >>>>>>>>>>>>>>>>>> metrics.
> >>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>> 137.3: Correct. Fixed.
> >>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>> 137.4: Yes, it does include the time to write to the
> >>>> internal
> >>>>>>>>>> topic.
> >>>>>>>>>>>>>>>>>> I’ve tweaked the description.
> >>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>> Thanks,
> >>>>>>>>>>>>>>>>>> Andrew
> >>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>> On 22 Apr 2024, at 20:04, Jun Rao
> >> <j...@confluent.io.INVALID
> >>>>>
> >>>>>>>>>>>> wrote:
> >>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>> Hi, Andrew,
> >>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>> Thanks for the reply.
> >>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>> 123. "The share group does not persist the target
> >>>>>> assignment."
> >>>>>>>>>>>>>>>>>>> What's the impact of this? Everytime that GC fails
> over,
> >> it
> >>>>>>>> needs
> >>>>>>>>>>>> to
> >>>>>>>>>>>>>>>>>>> recompute the assignment for every member. Do we expect
> >> the
> >>>>>>>>>> member
> >>>>>>>>>>>>>>>>>>> assignment to change on every GC failover?
> >>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>> 125. Should the GC also write
> >> ShareGroupPartitionMetadata?
> >>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>> 127. So, group epoch is only propagated to SC when
> >>>>>>>>>>>>>>>>>>> InitializeShareGroupState request is sent. This sounds
> >>>> good.
> >>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>> 130. Should we have a
> >>>>>> group.share.min.record.lock.duration.ms
> >>>>>>>> to
> >>>>>>>>>>>>>> pair
> >>>>>>>>>>>>>>>>>> with
> >>>>>>>>>>>>>>>>>>> group.share.max.record.lock.duration.ms?
> >>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>> 131. Sounds good. The name
> >>>>>>>>>> group.share.record.lock.partition.limit
> >>>>>>>>>>>>>>>>>> doesn't
> >>>>>>>>>>>>>>>>>>> seem very intuitive. How about something
> >>>>>>>>>>>>>>>>>>> like group.share.partition.max.records.pending.ack?
> >>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>> 136. Could we describe the process of GC failover? I
> >> guess
> >>>> it
> >>>>>>>>>> needs
> >>>>>>>>>>>>>> to
> >>>>>>>>>>>>>>>>>>> compute member reassignment and check if there is any
> new
> >>>>>>>>>>>>>>>> topic/partition
> >>>>>>>>>>>>>>>>>>> matching the share subscription. Does it bump up the
> >> group
> >>>>>>>> epoch?
> >>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>> 137. Metrics:
> >>>>>>>>>>>>>>>>>>> 137.1 It would be useful to document who reports each
> >>>> metric.
> >>>>>>>> Is
> >>>>>>>>>> it
> >>>>>>>>>>>>>> any
> >>>>>>>>>>>>>>>>>>> broker, GC, SC or SPL?
> >>>>>>>>>>>>>>>>>>> 137.2 partition-load-time: Is that the loading time at
> >> SPL
> >>>> or
> >>>>>>>> SC?
> >>>>>>>>>>>>>>>>>>> 137.3 "The time taken in milliseconds to load the
> >>>> share-group
> >>>>>>>>>> state
> >>>>>>>>>>>>>>>> from
> >>>>>>>>>>>>>>>>>>> the share-group state partitions loaded in the last 30
> >>>>>>>> seconds."
> >>>>>>>>>>>>>>>>>>> The window depends on metrics.num.samples and
> >>>>>>>>>>>>>> metrics.sample.window.ms
> >>>>>>>>>>>>>>>>>>> and is not always 30 seconds, right?
> >>>>>>>>>>>>>>>>>>> 137.4 Could you explain write/write-latency a bit more?
> >>>> Does
> >>>>>> it
> >>>>>>>>>>>>>> include
> >>>>>>>>>>>>>>>>>> the
> >>>>>>>>>>>>>>>>>>> time to write to the internal topic?
> >>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>> Jun
> >>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>> On Mon, Apr 22, 2024 at 2:57 AM Andrew Schofield <
> >>>>>>>>>>>>>>>>>> andrew_schofi...@live.com>
> >>>>>>>>>>>>>>>>>>> wrote:
> >>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>> Hi Jun,
> >>>>>>>>>>>>>>>>>>>> Thanks for your comments.
> >>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>> 120. Thanks. Fixed.
> >>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>> 121. ShareUpdateValue.SnapshotEpoch indicates which
> >>>> snapshot
> >>>>>>>>>>>>>>>>>>>> the update applies to. It should of course be the
> >> snapshot
> >>>>>>>> that
> >>>>>>>>>>>>>>>> precedes
> >>>>>>>>>>>>>>>>>>>> it in the log. It’s just there to provide a
> consistency
> >>>>>> check.
> >>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>> I also noticed that ShareSnapshotValue was missing
> >>>>>> StateEpoch.
> >>>>>>>>>> It
> >>>>>>>>>>>>>>>>>>>> isn’t any more.
> >>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>> 122. In KIP-848, ConsumerGroupMemberMetadataValue
> >> includes
> >>>>>>>>>>>>>>>>>>>> GroupEpoch, but in the code it does not. In fact,
> there
> >> is
> >>>>>>>>>>>>>>>> considerable
> >>>>>>>>>>>>>>>>>>>> divergence between the KIP and the code for this
> record
> >>>>>> value
> >>>>>>>>>>>> schema
> >>>>>>>>>>>>>>>>>>>> which I expect will be resolved when the migration
> code
> >>>> has
> >>>>>>>> been
> >>>>>>>>>>>>>>>>>>>> completed.
> >>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>> 123. The share group does not persist the target
> >>>> assignment.
> >>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>> 124. Share groups have three kinds of record:
> >>>>>>>>>>>>>>>>>>>> i) ShareGroupMetadata
> >>>>>>>>>>>>>>>>>>>> - this contains the group epoch and is written
> whenever
> >>>> the
> >>>>>>>>>> group
> >>>>>>>>>>>>>>>>>>>> epoch changes.
> >>>>>>>>>>>>>>>>>>>> ii) ShareGroupMemberMetadata
> >>>>>>>>>>>>>>>>>>>> - this does not contain the group epoch.
> >>>>>>>>>>>>>>>>>>>> iii) ShareGroupPartitionMetadata
> >>>>>>>>>>>>>>>>>>>> - this currently contains the epoch, but I think that
> is
> >>>>>>>>>>>>>> unnecessary.
> >>>>>>>>>>>>>>>>>>>> For one thing, the ConsumerGroupPartitionMetadata
> >>>> definition
> >>>>>>>>>>>>>>>>>>>> contains the group epoch, but the value appears never
> to
> >>>> be
> >>>>>>>> set.
> >>>>>>>>>>>>>>>>>>>> David Jacot confirms that it’s not necessary and is
> >>>> removing
> >>>>>>>> it.
> >>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>> I have removed the Epoch from
> >> ShareGroupPartitionMetadata.
> >>>>>>>>>>>>>>>>>>>> The only purpose of the persisting the epoch for a
> share
> >>>>>> group
> >>>>>>>>>> is
> >>>>>>>>>>>> so
> >>>>>>>>>>>>>>>>>> that
> >>>>>>>>>>>>>>>>>>>> when a group coordinator takes over the share group,
> it
> >> is
> >>>>>>>> able
> >>>>>>>>>> to
> >>>>>>>>>>>>>>>>>>>> continue the sequence of epochs.
> >>>>>> ShareGroupMetadataValue.Epoch
> >>>>>>>>>>>>>>>>>>>> is used for this.
> >>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>> 125. The group epoch will be incremented in this case
> >> and
> >>>>>>>>>>>>>>>>>>>> consequently a ShareGroupMetadata will be written. KIP
> >>>>>>>> updated.
> >>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>> 126. Not directly. A share group can only be deleted
> >> when
> >>>> it
> >>>>>>>> has
> >>>>>>>>>>>> no
> >>>>>>>>>>>>>>>>>>>> members, so the tombstones for
> ShareGroupMemberMetadata
> >>>> will
> >>>>>>>>>>>>>>>>>>>> have been written when the members left. I have
> >> clarified
> >>>>>>>> this.
> >>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>> 127. The share coordinator is ignorant of the group
> >> epoch.
> >>>>>>>> When
> >>>>>>>>>>>> the
> >>>>>>>>>>>>>>>>>>>> group coordinator is initializing the share-group
> state
> >>>> the
> >>>>>>>>>> first
> >>>>>>>>>>>>>> time
> >>>>>>>>>>>>>>>>>> that
> >>>>>>>>>>>>>>>>>>>> a share-partition is being added to an assignment in
> the
> >>>>>>>> group,
> >>>>>>>>>>>> the
> >>>>>>>>>>>>>>>>>>>> group epoch is used as the state epoch. But as the
> group
> >>>>>> epoch
> >>>>>>>>>>>>>>>>>>>> increases over time, the share coordinator is entirely
> >>>>>>>> unaware.
> >>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>> When the first consumer for a share-partition fetches
> >>>>>> records
> >>>>>>>>>>>> from a
> >>>>>>>>>>>>>>>>>>>> share-partition leader, the SPL calls the share
> >>>> coordinator
> >>>>>> to
> >>>>>>>>>>>>>>>>>>>> ReadShareGroupState. If the SPL has previously read
> the
> >>>>>>>>>>>> information
> >>>>>>>>>>>>>>>>>>>> and again it’s going from 0 to 1 consumer, it confirms
> >>>> it's
> >>>>>> up
> >>>>>>>>>> to
> >>>>>>>>>>>>>> date
> >>>>>>>>>>>>>>>>>> by
> >>>>>>>>>>>>>>>>>>>> calling ReadShareGroupOffsetsState.
> >>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>> Even if many consumers are joining at the same time,
> any
> >>>>>>>>>>>>>>>> share-partition
> >>>>>>>>>>>>>>>>>>>> which is being initialized will not be included in
> their
> >>>>>>>>>>>>>> assignments.
> >>>>>>>>>>>>>>>>>> Once
> >>>>>>>>>>>>>>>>>>>> the initialization is complete, the next rebalance
> will
> >>>>>> assign
> >>>>>>>>>> the
> >>>>>>>>>>>>>>>>>>>> partition
> >>>>>>>>>>>>>>>>>>>> to some consumers which will discover this by
> >>>>>>>>>> ShareGroupHeartbeat
> >>>>>>>>>>>>>>>>>>>> response. And then, the fetching begins.
> >>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>> If an SPL receives a ShareFetch request before it’s
> read
> >>>> the
> >>>>>>>>>> state
> >>>>>>>>>>>>>>>>>>>> from the SC, it can make the ShareFetch request wait
> up
> >> to
> >>>>>>>>>>>> MaxWaitMs
> >>>>>>>>>>>>>>>>>>>> and then it can return an empty set of records if it’s
> >>>> still
> >>>>>>>> not
> >>>>>>>>>>>>>>>> ready.
> >>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>> So, I don’t believe there will be too much load. If a
> >>>> topic
> >>>>>>>> with
> >>>>>>>>>>>>>> many
> >>>>>>>>>>>>>>>>>>>> partitions is added to the subscribed topics for a
> share
> >>>>>>>> group,
> >>>>>>>>>>>> the
> >>>>>>>>>>>>>>>> fact
> >>>>>>>>>>>>>>>>>>>> that the assignments will only start to include the
> >>>>>> partitions
> >>>>>>>>>> as
> >>>>>>>>>>>>>>>> their
> >>>>>>>>>>>>>>>>>>>> initialization completes should soften the impact.
> >>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>> 128, 129: The “proper” way to turn on this feature
> when
> >>>> it’s
> >>>>>>>>>>>>>> finished
> >>>>>>>>>>>>>>>>>> will
> >>>>>>>>>>>>>>>>>>>> be using `group.coordinator.rebalance.protocols` and
> >>>>>>>>>>>>>> `group.version`.
> >>>>>>>>>>>>>>>>>>>> While it’s in Early Access and for test cases, the
> >>>>>>>>>>>>>>>> `group.share.enable`
> >>>>>>>>>>>>>>>>>>>> configuration will turn it on.
> >>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>> I have described `group.share.enable` as an internal
> >>>>>>>>>> configuration
> >>>>>>>>>>>>>> in
> >>>>>>>>>>>>>>>>>>>> the KIP.
> >>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>> 130. The config `group.share.record.lock.duration.ms`
> >>>>>> applies
> >>>>>>>>>> to
> >>>>>>>>>>>>>>>> groups
> >>>>>>>>>>>>>>>>>>>> which do not specify a group-level configuration for
> >> lock
> >>>>>>>>>>>> duration.
> >>>>>>>>>>>>>>>> The
> >>>>>>>>>>>>>>>>>>>> minimum and maximum for this configuration are
> intended
> >> to
> >>>>>>>> give
> >>>>>>>>>> it
> >>>>>>>>>>>>>>>>>>>> sensible bounds.
> >>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>> If a group does specify its own `
> >>>>>>>>>>>>>> group.share.record.lock.duration.ms
> >>>>>>>>>>>>>>>> `,
> >>>>>>>>>>>>>>>>>>>> the broker-level `
> >> group.share.max.record.lock.duration.ms
> >>>> `
> >>>>>>>>>> gives
> >>>>>>>>>>>>>> the
> >>>>>>>>>>>>>>>>>>>> cluster administrator a way of setting a maximum value
> >> for
> >>>>>> all
> >>>>>>>>>>>>>> groups.
> >>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>> While editing, I renamed `
> >>>>>>>>>> group.share.record.lock.duration.max.ms
> >>>>>>>>>>>> `
> >>>>>>>>>>>>>> to
> >>>>>>>>>>>>>>>>>>>> `group.share.max.record.lock.duration.ms` for
> >> consistency
> >>>>>>>> with
> >>>>>>>>>>>> the
> >>>>>>>>>>>>>>>>>>>> rest of the min/max configurations.
> >>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>> 131. This is the limit per partition so you can go
> wider
> >>>>>> with
> >>>>>>>>>>>>>> multiple
> >>>>>>>>>>>>>>>>>>>> partitions.
> >>>>>>>>>>>>>>>>>>>> I have set the initial value low for safety. I expect
> to
> >>>> be
> >>>>>>>> able
> >>>>>>>>>>>> to
> >>>>>>>>>>>>>>>>>>>> increase this
> >>>>>>>>>>>>>>>>>>>> significantly when we have mature code which has been
> >>>>>>>>>>>> battle-tested.
> >>>>>>>>>>>>>>>>>>>> Rather than try to guess how high it can safely go,
> I’ve
> >>>>>> erred
> >>>>>>>>>> on
> >>>>>>>>>>>>>> the
> >>>>>>>>>>>>>>>>>> side
> >>>>>>>>>>>>>>>>>>>> of
> >>>>>>>>>>>>>>>>>>>> caution and expect to open it up in a future KIP.
> >>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>> 132. Good catch. The problem is that I have missed two
> >>>> group
> >>>>>>>>>>>>>>>>>>>> configurations,
> >>>>>>>>>>>>>>>>>>>> now added. These are group.share.session.timeout.ms
> and
> >>>>>>>>>>>>>>>>>>>> group.share.heartbeat.timeout.ms . The configurations
> >> you
> >>>>>>>>>>>> mentioned
> >>>>>>>>>>>>>>>>>>>> are the bounds for the group-level configurations.
> >>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>> 133. The name `group.share.max.size` was chosen to
> >> mirror
> >>>>>> the
> >>>>>>>>>>>>>> existing
> >>>>>>>>>>>>>>>>>>>> `group.consumer.max.size`.
> >>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>> 134. It is intended to be a list of all of the valid
> >>>>>> assignors
> >>>>>>>>>> for
> >>>>>>>>>>>>>> the
> >>>>>>>>>>>>>>>>>>>> cluster.
> >>>>>>>>>>>>>>>>>>>> When the assignors are configurable at the group
> level,
> >>>> the
> >>>>>>>>>> group
> >>>>>>>>>>>>>>>>>>>> configuration
> >>>>>>>>>>>>>>>>>>>> will only be permitted to name an assignor which is in
> >>>> this
> >>>>>>>>>> list.
> >>>>>>>>>>>>>> For
> >>>>>>>>>>>>>>>>>> now,
> >>>>>>>>>>>>>>>>>>>> there
> >>>>>>>>>>>>>>>>>>>> is no group configuration for assignor, so all groups
> >> get
> >>>>>> the
> >>>>>>>>>> one
> >>>>>>>>>>>>>> and
> >>>>>>>>>>>>>>>>>> only
> >>>>>>>>>>>>>>>>>>>> assignor in the list.
> >>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>> 135. It’s the number of threads per broker. For a
> >> cluster
> >>>>>>>> with a
> >>>>>>>>>>>>>> small
> >>>>>>>>>>>>>>>>>>>> number of
> >>>>>>>>>>>>>>>>>>>> of brokers and a lot of share group activity, it may
> be
> >>>>>>>>>>>> appropriate
> >>>>>>>>>>>>>> to
> >>>>>>>>>>>>>>>>>>>> increase
> >>>>>>>>>>>>>>>>>>>> this. We will be able to give tuning advice once we
> have
> >>>>>>>>>>>> experience
> >>>>>>>>>>>>>> of
> >>>>>>>>>>>>>>>>>> the
> >>>>>>>>>>>>>>>>>>>> performance impact of increasing it.
> >>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>> Thanks,
> >>>>>>>>>>>>>>>>>>>> Andrew
> >>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>> On 20 Apr 2024, at 00:14, Jun Rao
> >>>> <j...@confluent.io.INVALID
> >>>>>>>
> >>>>>>>>>>>>>> wrote:
> >>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>> Hi, Andrew,
> >>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>> Thanks for the reply. A few more comments.
> >>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>> 120. There is still reference to
> >>>> ConsumerGroupMetadataKey.
> >>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>> 121. ShareUpdateValue.SnapshotEpoch: Should we change
> >> it
> >>>>>>>> since
> >>>>>>>>>>>> it's
> >>>>>>>>>>>>>>>>>> not a
> >>>>>>>>>>>>>>>>>>>>> snapshot?
> >>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>> 122. ConsumerGroupMemberMetadataValue includes epoch,
> >> but
> >>>>>>>>>>>>>>>>>>>>> ShareGroupMemberMetadataValue does not. Do we know
> how
> >>>> the
> >>>>>>>>>> epoch
> >>>>>>>>>>>> is
> >>>>>>>>>>>>>>>>>> being
> >>>>>>>>>>>>>>>>>>>>> used in the consumer group and whether it's needed in
> >> the
> >>>>>>>> share
> >>>>>>>>>>>>>>>> group?
> >>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>> 123. There is no equivalent of
> >>>>>>>>>>>> ConsumerGroupTargetAssignmentMember
> >>>>>>>>>>>>>>>> for
> >>>>>>>>>>>>>>>>>>>>> ShareGroup. How does the shareGroup persist the
> member
> >>>>>>>>>>>> assignment?
> >>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>> 124. Assign a share-partition: "When a
> topic-partition
> >> is
> >>>>>>>>>>>> assigned
> >>>>>>>>>>>>>>>> to a
> >>>>>>>>>>>>>>>>>>>>> member of a share group for the first time, the group
> >>>>>>>>>> coordinator
> >>>>>>>>>>>>>>>>>> writes
> >>>>>>>>>>>>>>>>>>>> a
> >>>>>>>>>>>>>>>>>>>>> ShareGroupPartitionMetadata record to the
> >>>>>> __consumer_offsets
> >>>>>>>>>>>>>> topic".
> >>>>>>>>>>>>>>>>>>>>> When does the coordinator write ShareGroupMetadata
> with
> >>>> the
> >>>>>>>>>>>> bumped
> >>>>>>>>>>>>>>>>>>>> epoch?
> >>>>>>>>>>>>>>>>>>>>> In general, is there a particular order to bump up
> the
> >>>>>> epoch
> >>>>>>>> in
> >>>>>>>>>>>>>>>>>> different
> >>>>>>>>>>>>>>>>>>>>> records? When can the new epoch be exposed to the
> >>>>>>>>>>>>>>>> sharePartitionLeader?
> >>>>>>>>>>>>>>>>>>>> It
> >>>>>>>>>>>>>>>>>>>>> would be useful to make this clear in other state
> >>>> changing
> >>>>>>>>>>>>>> operations
> >>>>>>>>>>>>>>>>>>>> too.
> >>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>> 125. "Alter share group offsets Group coordinator and
> >>>> share
> >>>>>>>>>>>>>>>> coordinator
> >>>>>>>>>>>>>>>>>>>>> Only empty share groups support this operation. The
> >> group
> >>>>>>>>>>>>>> coordinator
> >>>>>>>>>>>>>>>>>>>> sends
> >>>>>>>>>>>>>>>>>>>>> an InitializeShareGroupState  request to the share
> >>>>>>>> coordinator.
> >>>>>>>>>>>> The
> >>>>>>>>>>>>>>>>>> share
> >>>>>>>>>>>>>>>>>>>>> coordinator writes a ShareSnapshot record with the
> new
> >>>>>> state
> >>>>>>>>>>>> epoch
> >>>>>>>>>>>>>> to
> >>>>>>>>>>>>>>>>>> the
> >>>>>>>>>>>>>>>>>>>>> __share_group_state  topic."
> >>>>>>>>>>>>>>>>>>>>> Does the operation need to write
> >>>>>> ShareGroupPartitionMetadata
> >>>>>>>>>> and
> >>>>>>>>>>>>>>>>>>>>> ShareGroupMetadata (for the new epoch)?
> >>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>> 126. Delete share group: Does this operation also
> need
> >> to
> >>>>>>>>>> write a
> >>>>>>>>>>>>>>>>>>>> tombstone
> >>>>>>>>>>>>>>>>>>>>> for the ShareGroupMemberMetadata record?
> >>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>> 127. I was thinking about the impact on a new
> consumer
> >>>>>>>> joining
> >>>>>>>>>>>> the
> >>>>>>>>>>>>>>>>>>>>> shareGroup. This causes the GroupCoordinator and the
> >>>>>>>>>>>>>> ShareCoordinator
> >>>>>>>>>>>>>>>>>> to
> >>>>>>>>>>>>>>>>>>>>> bump up the group epoch, which in turn causes the
> >>>>>>>>>> SharePartition
> >>>>>>>>>>>>>>>> leader
> >>>>>>>>>>>>>>>>>>>> to
> >>>>>>>>>>>>>>>>>>>>> reinitialize the state with
> ReadShareGroupOffsetsState.
> >>>> If
> >>>>>>>> many
> >>>>>>>>>>>>>>>>>> consumers
> >>>>>>>>>>>>>>>>>>>>> are joining a shareGroup around the same time, would
> >>>> there
> >>>>>> be
> >>>>>>>>>> too
> >>>>>>>>>>>>>>>> much
> >>>>>>>>>>>>>>>>>>>> load
> >>>>>>>>>>>>>>>>>>>>> for the ShareCoordinator and SharePartition leader?
> >>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>> 128. Since group.share.enable will be replaced by the
> >>>>>>>>>>>> group.version
> >>>>>>>>>>>>>>>>>>>>> feature, should we make it an internal config?
> >>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>> 129. group.coordinator.rebalance.protocols: this
> seems
> >>>>>>>>>> redundant
> >>>>>>>>>>>>>> with
> >>>>>>>>>>>>>>>>>>>>> group.share.enable?
> >>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>> 130. Why do we have both
> >>>>>> group.share.record.lock.duration.ms
> >>>>>>>>>> and
> >>>>>>>>>>>>>>>>>>>>> group.share.record.lock.duration.max.ms, each with
> its
> >>>> own
> >>>>>>>> max
> >>>>>>>>>>>>>>>> value?
> >>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>> 131. group.share.record.lock.partition.limit defaults
> >> to
> >>>>>> 200.
> >>>>>>>>>>>> This
> >>>>>>>>>>>>>>>>>> limits
> >>>>>>>>>>>>>>>>>>>>> the max degree of consumer parallelism to 200, right?
> >> If
> >>>>>>>> there
> >>>>>>>>>>>> are
> >>>>>>>>>>>>>>>>>>>> multiple
> >>>>>>>>>>>>>>>>>>>>> records per batch, it could be even smaller.
> >>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>> 132. Why do we need all three of
> >>>>>>>>>> group.share.session.timeout.ms,
> >>>>>>>>>>>>>>>>>>>>> group.share.min.session.timeout.ms and
> >>>>>>>>>>>>>>>>>>>> group.share.max.session.timeout.ms?
> >>>>>>>>>>>>>>>>>>>>> Session timeout can't be set by the client. Ditto for
> >>>>>>>>>>>>>>>>>>>>> group.share.heartbeat.interval.ms.
> >>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>> 133. group.share.max.size: Would
> >>>>>>>>>>>> group.share.max.members.per.group
> >>>>>>>>>>>>>>>> be a
> >>>>>>>>>>>>>>>>>>>>> more intuitive name?
> >>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>> 134. group.share.assignors: Why does it need to be a
> >>>> list?
> >>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>> 135. share.coordinator.threads: Is that per share
> >>>>>> coordinator
> >>>>>>>>>> or
> >>>>>>>>>>>>>> per
> >>>>>>>>>>>>>>>>>>>> broker?
> >>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>> Jun
> >>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>> On Tue, Apr 16, 2024 at 3:21 AM Andrew Schofield <
> >>>>>>>>>>>>>>>>>>>> andrew_schofi...@live.com>
> >>>>>>>>>>>>>>>>>>>>> wrote:
> >>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>> Hi Jun,
> >>>>>>>>>>>>>>>>>>>>>> Thanks for you reply.
> >>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>> 42.1. That’s a sensible improvement. Done.
> >>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>> 47,56. Done. All instances of BaseOffset changed to
> >>>>>>>>>> FirstOffset.
> >>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>> 105. I think that would be in a future KIP.
> >> Personally,
> >>>> I
> >>>>>>>>>> don’t
> >>>>>>>>>>>>>> mind
> >>>>>>>>>>>>>>>>>>>> having
> >>>>>>>>>>>>>>>>>>>>>> a non-contiguous set of values in this KIP.
> >>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>> 114. Done.
> >>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>> 115. If the poll is just returning a single record
> >>>> because
> >>>>>>>>>> there
> >>>>>>>>>>>>>> is
> >>>>>>>>>>>>>>>>>> not
> >>>>>>>>>>>>>>>>>>>>>> much
> >>>>>>>>>>>>>>>>>>>>>> data to consume, committing on every record is OK.
> >> It’s
> >>>>>>>>>>>>>> inefficient
> >>>>>>>>>>>>>>>>>> but
> >>>>>>>>>>>>>>>>>>>>>> acceptable.
> >>>>>>>>>>>>>>>>>>>>>> If the first poll returns just one record, but many
> >> more
> >>>>>>>> have
> >>>>>>>>>>>>>> piled
> >>>>>>>>>>>>>>>> up
> >>>>>>>>>>>>>>>>>>>>>> while
> >>>>>>>>>>>>>>>>>>>>>> the first one was being processed, the next poll has
> >> the
> >>>>>>>>>>>>>> opportunity
> >>>>>>>>>>>>>>>>>> to
> >>>>>>>>>>>>>>>>>>>>>> return
> >>>>>>>>>>>>>>>>>>>>>> a bunch of records and then these will be able to be
> >>>>>>>> committed
> >>>>>>>>>>>>>>>>>> together.
> >>>>>>>>>>>>>>>>>>>>>> So, my answer is that optimisation on the broker to
> >>>> return
> >>>>>>>>>>>> batches
> >>>>>>>>>>>>>>>> of
> >>>>>>>>>>>>>>>>>>>>>> records when the records are available is the
> approach
> >>>> we
> >>>>>>>> will
> >>>>>>>>>>>>>> take
> >>>>>>>>>>>>>>>>>>>> here.
> >>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>> 116. Good idea. Done.
> >>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>> 117. I’ve rewritten the Kafka Broker Migration
> >> section.
> >>>>>> Let
> >>>>>>>> me
> >>>>>>>>>>>>>> know
> >>>>>>>>>>>>>>>>>> what
> >>>>>>>>>>>>>>>>>>>>>> you think.
> >>>>>>>>>>>>>>>>>>>>>> I am discussing the configuration to enable the
> >> feature
> >>>> in
> >>>>>>>> the
> >>>>>>>>>>>>>>>> mailing
> >>>>>>>>>>>>>>>>>>>>>> list with
> >>>>>>>>>>>>>>>>>>>>>> David Jacot also, so I anticipate a bit of change in
> >>>> this
> >>>>>>>> area
> >>>>>>>>>>>>>>>> still.
> >>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>> Thanks,
> >>>>>>>>>>>>>>>>>>>>>> Andrew
> >>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>> On 15 Apr 2024, at 23:34, Jun Rao
> >>>>>> <j...@confluent.io.INVALID
> >>>>>>>>>
> >>>>>>>>>>>>>>>> wrote:
> >>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>> Hi, Andrew,
> >>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>> Thanks for the updated KIP.
> >>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>> 42.1 "If the share group offset is altered multiple
> >>>> times
> >>>>>>>>>> when
> >>>>>>>>>>>>>> the
> >>>>>>>>>>>>>>>>>>>> group
> >>>>>>>>>>>>>>>>>>>>>>> remains empty, it would be harmless if the same
> state
> >>>>>> epoch
> >>>>>>>>>> was
> >>>>>>>>>>>>>>>>>> reused
> >>>>>>>>>>>>>>>>>>>> to
> >>>>>>>>>>>>>>>>>>>>>>> initialize the state."
> >>>>>>>>>>>>>>>>>>>>>>> Hmm, how does the partition leader know for sure
> that
> >>>> it
> >>>>>>>> has
> >>>>>>>>>>>>>>>> received
> >>>>>>>>>>>>>>>>>>>> the
> >>>>>>>>>>>>>>>>>>>>>>> latest share group offset if epoch is reused?
> >>>>>>>>>>>>>>>>>>>>>>> Could we update the section "Group epoch - Trigger
> a
> >>>>>>>>>> rebalance"
> >>>>>>>>>>>>>>>> that
> >>>>>>>>>>>>>>>>>>>>>>> AdminClient.alterShareGroupOffsets causes the group
> >>>> epoch
> >>>>>>>> to
> >>>>>>>>>> be
> >>>>>>>>>>>>>>>>>> bumped
> >>>>>>>>>>>>>>>>>>>>>> too?
> >>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>> 47,56 "my view is that BaseOffset should become
> >>>>>> FirstOffset
> >>>>>>>>>> in
> >>>>>>>>>>>>>> ALL
> >>>>>>>>>>>>>>>>>>>>>> schemas
> >>>>>>>>>>>>>>>>>>>>>>> defined in the KIP."
> >>>>>>>>>>>>>>>>>>>>>>> Yes, that seems better to me.
> >>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>> 105. "I have another non-terminal state in mind for
> >> 3."
> >>>>>>>>>>>>>>>>>>>>>>> Should we document it?
> >>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>> 114. session.timeout.ms in the consumer
> >> configuration
> >>>> is
> >>>>>>>>>>>>>>>> deprecated
> >>>>>>>>>>>>>>>>>> in
> >>>>>>>>>>>>>>>>>>>>>>> KIP-848. So, we need to remove it from the
> >>>> shareConsumer
> >>>>>>>>>>>>>>>>>> configuration.
> >>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>> 115. I am wondering if it's a good idea to always
> >>>> commit
> >>>>>>>> acks
> >>>>>>>>>>>> on
> >>>>>>>>>>>>>>>>>>>>>>> ShareConsumer.poll(). In the extreme case, each
> batch
> >>>> may
> >>>>>>>>>> only
> >>>>>>>>>>>>>>>>>> contain
> >>>>>>>>>>>>>>>>>>>> a
> >>>>>>>>>>>>>>>>>>>>>>> single record and each poll() only returns a single
> >>>>>> batch.
> >>>>>>>>>> This
> >>>>>>>>>>>>>>>> will
> >>>>>>>>>>>>>>>>>>>>>> cause
> >>>>>>>>>>>>>>>>>>>>>>> each record to be committed individually. Is there
> a
> >>>> way
> >>>>>>>> for
> >>>>>>>>>> a
> >>>>>>>>>>>>>> user
> >>>>>>>>>>>>>>>>>> to
> >>>>>>>>>>>>>>>>>>>>>>> optimize this?
> >>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>> 116. For each new RPC, could we list the associated
> >>>> acls?
> >>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>> 117. Since this KIP changes internal records and
> >> RPCs,
> >>>> it
> >>>>>>>>>> would
> >>>>>>>>>>>>>> be
> >>>>>>>>>>>>>>>>>>>> useful
> >>>>>>>>>>>>>>>>>>>>>>> to document the upgrade process.
> >>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>> Jun
> >>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>> On Wed, Apr 10, 2024 at 7:35 AM Andrew Schofield <
> >>>>>>>>>>>>>>>>>>>>>> andrew_schofi...@live.com>
> >>>>>>>>>>>>>>>>>>>>>>> wrote:
> >>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>> Hi Jun,
> >>>>>>>>>>>>>>>>>>>>>>>> Thanks for your questions.
> >>>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>> 41.
> >>>>>>>>>>>>>>>>>>>>>>>> 41.1. The partition leader obtains the state epoch
> >> in
> >>>>>> the
> >>>>>>>>>>>>>> response
> >>>>>>>>>>>>>>>>>>>> from
> >>>>>>>>>>>>>>>>>>>>>>>> ReadShareGroupState. When it becomes a
> >> share-partition
> >>>>>>>>>> leader,
> >>>>>>>>>>>>>>>>>>>>>>>> it reads the share-group state and one of the
> things
> >>>> it
> >>>>>>>>>> learns
> >>>>>>>>>>>>>> is
> >>>>>>>>>>>>>>>>>> the
> >>>>>>>>>>>>>>>>>>>>>>>> current state epoch. Then it uses the state epoch
> in
> >>>> all
> >>>>>>>>>>>>>>>> subsequent
> >>>>>>>>>>>>>>>>>>>>>>>> calls to WriteShareGroupState. The fencing is to
> >>>> prevent
> >>>>>>>>>>>> writes
> >>>>>>>>>>>>>>>> for
> >>>>>>>>>>>>>>>>>>>>>>>> a previous state epoch, which are very unlikely
> but
> >>>>>> which
> >>>>>>>>>>>> would
> >>>>>>>>>>>>>>>> mean
> >>>>>>>>>>>>>>>>>>>>>>>> that a leader was using an out-of-date epoch and
> was
> >>>>>>>> likely
> >>>>>>>>>> no
> >>>>>>>>>>>>>>>>>> longer
> >>>>>>>>>>>>>>>>>>>>>>>> the current leader at all, perhaps due to a long
> >> pause
> >>>>>> for
> >>>>>>>>>>>> some
> >>>>>>>>>>>>>>>>>>>> reason.
> >>>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>> 41.2. If the group coordinator were to set the
> SPSO,
> >>>>>>>>>> wouldn’t
> >>>>>>>>>>>> it
> >>>>>>>>>>>>>>>>>> need
> >>>>>>>>>>>>>>>>>>>>>>>> to discover the initial offset? I’m trying to
> avoid
> >>>> yet
> >>>>>>>>>>>> another
> >>>>>>>>>>>>>>>>>>>>>>>> inter-broker
> >>>>>>>>>>>>>>>>>>>>>>>> hop.
> >>>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>> 42.
> >>>>>>>>>>>>>>>>>>>>>>>> 42.1. I think I’ve confused things. When the share
> >>>> group
> >>>>>>>>>>>> offset
> >>>>>>>>>>>>>> is
> >>>>>>>>>>>>>>>>>>>>>> altered
> >>>>>>>>>>>>>>>>>>>>>>>> using AdminClient.alterShareGroupOffsets, the
> group
> >>>>>>>>>>>> coordinator
> >>>>>>>>>>>>>>>> WILL
> >>>>>>>>>>>>>>>>>>>>>>>> update the state epoch. I don’t think it needs to
> >>>> update
> >>>>>>>> the
> >>>>>>>>>>>>>> group
> >>>>>>>>>>>>>>>>>>>> epoch
> >>>>>>>>>>>>>>>>>>>>>>>> at the same time (although it could) because the
> >> group
> >>>>>>>> epoch
> >>>>>>>>>>>>>> will
> >>>>>>>>>>>>>>>>>> have
> >>>>>>>>>>>>>>>>>>>>>>>> been bumped when the group became empty. If the
> >> share
> >>>>>>>> group
> >>>>>>>>>>>>>> offset
> >>>>>>>>>>>>>>>>>>>>>>>> is altered multiple times when the group remains
> >>>> empty,
> >>>>>> it
> >>>>>>>>>>>> would
> >>>>>>>>>>>>>>>> be
> >>>>>>>>>>>>>>>>>>>>>>>> harmless if the same state epoch was reused to
> >>>>>> initialize
> >>>>>>>>>> the
> >>>>>>>>>>>>>>>> state.
> >>>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>> When the share-partition leader updates the SPSO
> as
> >> a
> >>>>>>>> result
> >>>>>>>>>>>> of
> >>>>>>>>>>>>>>>>>>>>>>>> the usual flow of record delivery, it does not
> >> update
> >>>>>> the
> >>>>>>>>>>>> state
> >>>>>>>>>>>>>>>>>> epoch.
> >>>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>> 42.2. The share-partition leader will notice the
> >>>>>>>> alteration
> >>>>>>>>>>>>>>>> because,
> >>>>>>>>>>>>>>>>>>>>>>>> when it issues WriteShareGroupState, the response
> >> will
> >>>>>>>>>> contain
> >>>>>>>>>>>>>> the
> >>>>>>>>>>>>>>>>>>>>>>>> error code FENCED_STATE_EPOCH. This is supposed to
> >> be
> >>>>>> the
> >>>>>>>>>>>>>>>>>>>>>>>> last-resort way of catching this.
> >>>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>> When the share-partition leader handles its first
> >>>>>>>> ShareFetch
> >>>>>>>>>>>>>>>>>> request,
> >>>>>>>>>>>>>>>>>>>>>>>> it learns the state epoch from the response to
> >>>>>>>>>>>>>>>> ReadShareGroupState.
> >>>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>> In normal running, the state epoch will remain
> >>>> constant,
> >>>>>>>>>> but,
> >>>>>>>>>>>>>> when
> >>>>>>>>>>>>>>>>>>>> there
> >>>>>>>>>>>>>>>>>>>>>>>> are no consumers and the group is empty, it might
> >>>>>> change.
> >>>>>>>>>> As a
> >>>>>>>>>>>>>>>>>> result,
> >>>>>>>>>>>>>>>>>>>>>>>> I think it would be sensible when the set of share
> >>>>>>>> sessions
> >>>>>>>>>>>>>>>>>>>> transitions
> >>>>>>>>>>>>>>>>>>>>>>>> from 0 to 1, which is a reasonable proxy for the
> >> share
> >>>>>>>> group
> >>>>>>>>>>>>>>>>>>>>>> transitioning
> >>>>>>>>>>>>>>>>>>>>>>>> from empty to non-empty, for the share-partition
> >>>> leader
> >>>>>> to
> >>>>>>>>>>>> issue
> >>>>>>>>>>>>>>>>>>>>>>>> ReadShareGroupOffsetsState to validate the state
> >>>> epoch.
> >>>>>> If
> >>>>>>>>>> its
> >>>>>>>>>>>>>>>> state
> >>>>>>>>>>>>>>>>>>>>>>>> epoch is out of date, it can then
> >> ReadShareGroupState
> >>>> to
> >>>>>>>>>>>>>>>>>>>> re-initialize.
> >>>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>> I’ve changed the KIP accordingly.
> >>>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>> 47, 56. If I am to change BaseOffset to
> FirstOffset,
> >>>> we
> >>>>>>>> need
> >>>>>>>>>>>> to
> >>>>>>>>>>>>>>>> have
> >>>>>>>>>>>>>>>>>>>>>>>> a clear view of which is the correct term. Having
> >>>>>> reviewed
> >>>>>>>>>> all
> >>>>>>>>>>>>>> of
> >>>>>>>>>>>>>>>>>> the
> >>>>>>>>>>>>>>>>>>>>>>>> instances, my view is that BaseOffset should
> become
> >>>>>>>>>>>> FirstOffset
> >>>>>>>>>>>>>> in
> >>>>>>>>>>>>>>>>>>>>>>>> ALL schemas defined in the KIP. Then, BaseOffset
> is
> >>>> just
> >>>>>>>>>> used
> >>>>>>>>>>>> in
> >>>>>>>>>>>>>>>>>>>>>>>> record batches, which is already a known concept.
> >>>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>> Please let me know if you agree.
> >>>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>> 60. I’ve added FindCoordinator to the top level
> >> index
> >>>>>> for
> >>>>>>>>>>>>>> protocol
> >>>>>>>>>>>>>>>>>>>>>> changes.
> >>>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>> 61. OK. I expect you are correct about how users
> >> will
> >>>> be
> >>>>>>>>>> using
> >>>>>>>>>>>>>> the
> >>>>>>>>>>>>>>>>>>>>>>>> console share consumer. When I use the console
> >>>>>> consumer, I
> >>>>>>>>>>>>>> always
> >>>>>>>>>>>>>>>>>> get
> >>>>>>>>>>>>>>>>>>>>>>>> a new consumer group. I have changed the default
> >> group
> >>>>>> ID
> >>>>>>>>>> for
> >>>>>>>>>>>>>>>>>> console
> >>>>>>>>>>>>>>>>>>>>>>>> share consumer to “console-share-consumer” to
> match
> >>>> the
> >>>>>>>>>>>> console
> >>>>>>>>>>>>>>>>>>>> consumer
> >>>>>>>>>>>>>>>>>>>>>>>> better and give more of an idea where this
> >> mysterious
> >>>>>>>> group
> >>>>>>>>>>>> has
> >>>>>>>>>>>>>>>> come
> >>>>>>>>>>>>>>>>>>>>>> from.
> >>>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>> 77. I will work on a proposal that does not use
> >>>>>> compaction
> >>>>>>>>>> and
> >>>>>>>>>>>>>> we
> >>>>>>>>>>>>>>>>>> can
> >>>>>>>>>>>>>>>>>>>>>>>> make a judgement about whether it’s a better
> course
> >>>> for
> >>>>>>>>>>>> KIP-932.
> >>>>>>>>>>>>>>>>>>>>>>>> Personally,
> >>>>>>>>>>>>>>>>>>>>>>>> until I’ve written it down and lived with the
> ideas
> >>>> for
> >>>>>> a
> >>>>>>>>>> few
> >>>>>>>>>>>>>>>> days,
> >>>>>>>>>>>>>>>>>> I
> >>>>>>>>>>>>>>>>>>>>>>>> won’t be
> >>>>>>>>>>>>>>>>>>>>>>>> able to choose which I prefer.
> >>>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>> I should be able to get the proposal written by
> the
> >>>> end
> >>>>>> of
> >>>>>>>>>>>> this
> >>>>>>>>>>>>>>>>>> week.
> >>>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>> 100. ShareGroupHeartbeatRequest.RebalanceTimeoutMs
> >>>>>> matches
> >>>>>>>>>>>>>>>>>>>>>>>> ConsumerGroupHeartbeatRequest.RebalanceTimeoutMs
> >> from
> >>>>>>>>>> KIP-848.
> >>>>>>>>>>>>>>>>>>>>>>>> I prefer to maintain the consistency.
> >>>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>> 101. Thanks for catching this. The
> >>>>>>>>>> ShareGroupHeartbeatResponse
> >>>>>>>>>>>>>> was
> >>>>>>>>>>>>>>>>>>>>>>>> originally
> >>>>>>>>>>>>>>>>>>>>>>>> created from KIP-848. This part of the schema does
> >> not
> >>>>>>>> apply
> >>>>>>>>>>>>>> and I
> >>>>>>>>>>>>>>>>>>>> have
> >>>>>>>>>>>>>>>>>>>>>>>> removed
> >>>>>>>>>>>>>>>>>>>>>>>> it. I have also renamed AssignedTopicPartitions to
> >>>>>> simply
> >>>>>>>>>>>>>>>>>>>>>> TopicPartitions
> >>>>>>>>>>>>>>>>>>>>>>>> which
> >>>>>>>>>>>>>>>>>>>>>>>> aligns with the actual definition of
> >>>>>>>>>>>>>>>> ConsumerGroupHeartbeatResponse.
> >>>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>> 102. No, I don’t think we do. Removed.
> >>>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>> 103. I’ve changed the description for the error
> >> codes
> >>>>>> for
> >>>>>>>>>>>>>>>>>>>>>>>> ShareFetchResponse.
> >>>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>> 104. Interesting. I have added ErrorMessages to
> >> these
> >>>>>> RPCs
> >>>>>>>>>> as
> >>>>>>>>>>>>>> you
> >>>>>>>>>>>>>>>>>>>>>> suggest.
> >>>>>>>>>>>>>>>>>>>>>>>> It’s a good improvement for problem determination.
> >>>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>> 105. The values are reserved in my brain.
> Actually,
> >> 1
> >>>> is
> >>>>>>>>>>>>>> Acquired
> >>>>>>>>>>>>>>>>>>>> which
> >>>>>>>>>>>>>>>>>>>>>>>> is not persisted, and I have another non-terminal
> >>>> state
> >>>>>> in
> >>>>>>>>>>>> mind
> >>>>>>>>>>>>>>>> for
> >>>>>>>>>>>>>>>>>> 3.
> >>>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>> 106. A few people have raised the question of
> >> whether
> >>>>>>>>>>>>>>>>>>>> OffsetAndMetadata
> >>>>>>>>>>>>>>>>>>>>>>>> is sensible in this KIP, given that the optional
> >>>>>> Metadata
> >>>>>>>>>> part
> >>>>>>>>>>>>>>>> comes
> >>>>>>>>>>>>>>>>>>>>>> from
> >>>>>>>>>>>>>>>>>>>>>>>> when
> >>>>>>>>>>>>>>>>>>>>>>>> a regular consumer commits offsets. However, it is
> >>>>>> correct
> >>>>>>>>>>>> that
> >>>>>>>>>>>>>>>>>> there
> >>>>>>>>>>>>>>>>>>>>>> will
> >>>>>>>>>>>>>>>>>>>>>>>> never be metadata with a share group. I have
> changed
> >>>>>>>>>>>>>>>>>>>>>>>> the KIP to replace OffsetAndMetadata with Long.
> >>>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>> 107. Yes, you are right. I have learnt during this
> >>>>>> process
> >>>>>>>>>>>> that
> >>>>>>>>>>>>>> a
> >>>>>>>>>>>>>>>>>>>>>> version
> >>>>>>>>>>>>>>>>>>>>>>>> bump
> >>>>>>>>>>>>>>>>>>>>>>>> can be a logical not just a physical change to the
> >>>>>> schema.
> >>>>>>>>>> KIP
> >>>>>>>>>>>>>>>>>>>> updated.
> >>>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>> 108. I would prefer not to extend this RPC for all
> >> of
> >>>>>> the
> >>>>>>>>>>>> states
> >>>>>>>>>>>>>>>> at
> >>>>>>>>>>>>>>>>>>>> this
> >>>>>>>>>>>>>>>>>>>>>>>> point.
> >>>>>>>>>>>>>>>>>>>>>>>> I think there is definitely scope for another KIP
> >>>>>> focused
> >>>>>>>> on
> >>>>>>>>>>>>>>>>>>>>>> administration
> >>>>>>>>>>>>>>>>>>>>>>>> of share groups that might want this information
> so
> >>>>>>>> someone
> >>>>>>>>>>>>>> could
> >>>>>>>>>>>>>>>>>>>> build
> >>>>>>>>>>>>>>>>>>>>>> a
> >>>>>>>>>>>>>>>>>>>>>>>> UI and other tools on top. Doing that well is
> quite
> >> a
> >>>>>> lot
> >>>>>>>> of
> >>>>>>>>>>>>>> work
> >>>>>>>>>>>>>>>> in
> >>>>>>>>>>>>>>>>>>>> its
> >>>>>>>>>>>>>>>>>>>>>>>> own right
> >>>>>>>>>>>>>>>>>>>>>>>> so I would prefer not to do that now.
> >>>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>> 109.Yes, they’re inconsistent and follow the
> >> consumer
> >>>>>>>> groups
> >>>>>>>>>>>>>>>>>>>> equivalents
> >>>>>>>>>>>>>>>>>>>>>>>> which are also inconsistent. In general, the
> >>>>>>>>>>>> KafkaAdmin.delete….
> >>>>>>>>>>>>>>>>>>>> Methods
> >>>>>>>>>>>>>>>>>>>>>>>> use the Map<XXX, KafkaFuture<YYY>> pattern like
> >>>>>>>>>>>>>>>>>>>> DeleteShareGroupsResult.
> >>>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>> Would you prefer that I do that, or remain
> >> consistent
> >>>>>> with
> >>>>>>>>>>>>>>>> consumer
> >>>>>>>>>>>>>>>>>>>>>> groups?
> >>>>>>>>>>>>>>>>>>>>>>>> Happy to change it.
> >>>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>> 110. Again, consumer groups don’t yield that level
> >> of
> >>>>>>>> detail
> >>>>>>>>>>>>>> about
> >>>>>>>>>>>>>>>>>>>>>> epochs.
> >>>>>>>>>>>>>>>>>>>>>>>> The MemberDescription does include the assignment,
> >> but
> >>>>>> not
> >>>>>>>>>> the
> >>>>>>>>>>>>>>>> list
> >>>>>>>>>>>>>>>>>> of
> >>>>>>>>>>>>>>>>>>>>>>>> subscribed topic names.
> >>>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>> 111. I didn’t include GroupState in GroupListing
> >>>> because
> >>>>>>>>>>>> there’s
> >>>>>>>>>>>>>>>> no
> >>>>>>>>>>>>>>>>>>>>>>>> single class that includes the states of all group
> >>>>>> types.
> >>>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>> 112. I think it’s good practice for the API to
> have
> >>>>>>>>>>>>>>>>>>>>>>>> ListShareGroupOffsetSpec.
> >>>>>>>>>>>>>>>>>>>>>>>> It makes evolution and extension of the API much
> >>>> easier.
> >>>>>>>>>> Also,
> >>>>>>>>>>>>>> it
> >>>>>>>>>>>>>>>>>>>>>> matches
> >>>>>>>>>>>>>>>>>>>>>>>> the precedent set by ListConsumerGroupOffsetSpec.
> >>>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>> 113. ListConsumerGroupsResults.errors() is the
> >> same. I
> >>>>>>>> think
> >>>>>>>>>>>> you
> >>>>>>>>>>>>>>>>>> just
> >>>>>>>>>>>>>>>>>>>>>> have
> >>>>>>>>>>>>>>>>>>>>>>>> to look in the exception details and the same
> >> pattern
> >>>> is
> >>>>>>>>>> being
> >>>>>>>>>>>>>>>>>>>> followed
> >>>>>>>>>>>>>>>>>>>>>>>> here.
> >>>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>> Over the next few days, I have committed to
> writing
> >> a
> >>>>>>>>>> proposal
> >>>>>>>>>>>>>> for
> >>>>>>>>>>>>>>>>>> how
> >>>>>>>>>>>>>>>>>>>>>> to
> >>>>>>>>>>>>>>>>>>>>>>>> persist
> >>>>>>>>>>>>>>>>>>>>>>>> share-group state that doesn’t use log compaction.
> >>>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>> I am also aware that discussion with Justine
> Olshan
> >> on
> >>>>>>>>>>>>>>>>>> read-committed
> >>>>>>>>>>>>>>>>>>>>>>>> isolation level is not yet quite complete.
> >>>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>> Thanks for the detailed review.
> >>>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>> Andrew
> >>>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>>> On 9 Apr 2024, at 23:58, Jun Rao
> >>>>>>>> <j...@confluent.io.INVALID
> >>>>>>>>>>>
> >>>>>>>>>>>>>>>> wrote:
> >>>>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>>> Hi, Andrew,
> >>>>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>>> Thanks for the reply. A few more comments.
> >>>>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>>> 41.
> >>>>>>>>>>>>>>>>>>>>>>>>> 41.1 How does the partition leader obtain the
> group
> >>>>>> epoch
> >>>>>>>>>> to
> >>>>>>>>>>>>>> set
> >>>>>>>>>>>>>>>>>>>>>>>>> WriteShareGroupStateRequest.StateEpoch?
> >>>>>>>>>>>>>>>>>>>>>>>>> 41.2 What's the benefit of having the group
> >>>> coordinator
> >>>>>>>>>>>>>>>> initialize
> >>>>>>>>>>>>>>>>>>>> the
> >>>>>>>>>>>>>>>>>>>>>>>>> state and the partition leader set the SPSO? It
> >> seems
> >>>>>>>>>> simpler
> >>>>>>>>>>>>>> to
> >>>>>>>>>>>>>>>>>> have
> >>>>>>>>>>>>>>>>>>>>>> the
> >>>>>>>>>>>>>>>>>>>>>>>>> partition leader initialize both the state and
> the
> >>>> SPSO
> >>>>>>>>>>>>>> together?
> >>>>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>>> 42.
> >>>>>>>>>>>>>>>>>>>>>>>>> 42.1 "I don’t think the group epoch needs to be
> >>>> bumped
> >>>>>>>> when
> >>>>>>>>>>>> the
> >>>>>>>>>>>>>>>>>> share
> >>>>>>>>>>>>>>>>>>>>>>>> group
> >>>>>>>>>>>>>>>>>>>>>>>>> offset is altered."
> >>>>>>>>>>>>>>>>>>>>>>>>> But the part on handling Alter share group
> offsets
> >>>> says
> >>>>>>>>>> "The
> >>>>>>>>>>>>>>>> share
> >>>>>>>>>>>>>>>>>>>>>>>>> coordinator writes a ShareCheckpoint record with
> >> the
> >>>>>> new
> >>>>>>>>>>>> state
> >>>>>>>>>>>>>>>>>> epoch
> >>>>>>>>>>>>>>>>>>>> to
> >>>>>>>>>>>>>>>>>>>>>>>> the
> >>>>>>>>>>>>>>>>>>>>>>>>> __share_group_state  topic." So, which is
> correct?
> >> We
> >>>>>>>> have
> >>>>>>>>>>>> two
> >>>>>>>>>>>>>>>>>> paths
> >>>>>>>>>>>>>>>>>>>> to
> >>>>>>>>>>>>>>>>>>>>>>>>> update the state in the share coordinator, one
> from
> >>>> the
> >>>>>>>>>> group
> >>>>>>>>>>>>>>>>>>>>>> coordinator
> >>>>>>>>>>>>>>>>>>>>>>>>> and another from the partition leader. I thought
> >> the
> >>>>>>>>>> benefit
> >>>>>>>>>>>> of
> >>>>>>>>>>>>>>>>>>>> bumping
> >>>>>>>>>>>>>>>>>>>>>>>> up
> >>>>>>>>>>>>>>>>>>>>>>>>> the epoch is to fence off a late request in the
> >>>>>> previous
> >>>>>>>>>>>> epoch
> >>>>>>>>>>>>>>>> from
> >>>>>>>>>>>>>>>>>>>>>>>> another
> >>>>>>>>>>>>>>>>>>>>>>>>> path.
> >>>>>>>>>>>>>>>>>>>>>>>>> 42.2 When the group coordinator alters the share
> >>>> group
> >>>>>>>>>> offset
> >>>>>>>>>>>>>> in
> >>>>>>>>>>>>>>>>>>>> share
> >>>>>>>>>>>>>>>>>>>>>>>>> coordinator, how does the partition leader know
> the
> >>>>>> share
> >>>>>>>>>>>> group
> >>>>>>>>>>>>>>>>>> state
> >>>>>>>>>>>>>>>>>>>>>> has
> >>>>>>>>>>>>>>>>>>>>>>>>> been altered so that it could clear its in-memory
> >>>>>> state?
> >>>>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>>> 47. 56. BaseOffset typically refers to the base
> >>>> offset
> >>>>>>>> for
> >>>>>>>>>>>> the
> >>>>>>>>>>>>>>>>>> batch
> >>>>>>>>>>>>>>>>>>>>>> and
> >>>>>>>>>>>>>>>>>>>>>>>>> can be confusing. FirstOffset is clearer and
> >> matches
> >>>>>>>>>>>>>> LastOffset.
> >>>>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>>> 60. Could we include FindCoordinatorRequest in
> the
> >>>> top
> >>>>>>>>>> level
> >>>>>>>>>>>>>>>> index
> >>>>>>>>>>>>>>>>>>>> for
> >>>>>>>>>>>>>>>>>>>>>>>>> Kafka protocol changes?
> >>>>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>>> 61. I think it's probably ok to add time-based
> >>>>>> expiration
> >>>>>>>>>>>>>> later.
> >>>>>>>>>>>>>>>>>> But
> >>>>>>>>>>>>>>>>>>>>>>>> using
> >>>>>>>>>>>>>>>>>>>>>>>>> a default group in console-share-consumer
> probably
> >>>>>> won't
> >>>>>>>>>> help
> >>>>>>>>>>>>>>>>>> reduce
> >>>>>>>>>>>>>>>>>>>>>> the
> >>>>>>>>>>>>>>>>>>>>>>>>> garbage. In the common case, the user of the
> >> console
> >>>>>>>>>> consumer
> >>>>>>>>>>>>>>>>>> likely
> >>>>>>>>>>>>>>>>>>>>>>>> wants
> >>>>>>>>>>>>>>>>>>>>>>>>> to see the recently produced records for
> >>>> verification.
> >>>>>> If
> >>>>>>>>>> the
> >>>>>>>>>>>>>>>>>> default
> >>>>>>>>>>>>>>>>>>>>>>>> group
> >>>>>>>>>>>>>>>>>>>>>>>>> doesn't provide that (because of the stale
> state),
> >>>> the
> >>>>>>>> user
> >>>>>>>>>>>>>> will
> >>>>>>>>>>>>>>>>>>>> likely
> >>>>>>>>>>>>>>>>>>>>>>>>> just use a new group. It's true that we don't
> >> garbage
> >>>>>>>>>> collect
> >>>>>>>>>>>>>>>> idle
> >>>>>>>>>>>>>>>>>>>>>>>> topics.
> >>>>>>>>>>>>>>>>>>>>>>>>> However,  the share groups are similar to
> >> consumers,
> >>>>>>>> which
> >>>>>>>>>>>> does
> >>>>>>>>>>>>>>>>>>>> support
> >>>>>>>>>>>>>>>>>>>>>>>>> garbage collection. Typically, we read topics
> more
> >>>> than
> >>>>>>>>>>>>>> creating
> >>>>>>>>>>>>>>>>>>>> them.
> >>>>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>>> 77. If the generic compaction is inconvenient, we
> >>>> could
> >>>>>>>> use
> >>>>>>>>>>>>>>>>>>>> customized
> >>>>>>>>>>>>>>>>>>>>>>>>> logic. If we go with that route, option (b) seems
> >>>>>> cleaner
> >>>>>>>>>> and
> >>>>>>>>>>>>>>>> more
> >>>>>>>>>>>>>>>>>>>>>>>>> optimized. Since the share states for all groups
> >> fit
> >>>> in
> >>>>>>>>>>>> memory,
> >>>>>>>>>>>>>>>> we
> >>>>>>>>>>>>>>>>>>>>>> could
> >>>>>>>>>>>>>>>>>>>>>>>>> generate snapshots more efficiently than going
> >>>> through
> >>>>>>>>>>>>>>>> compaction.
> >>>>>>>>>>>>>>>>>>>>>>>> Having a
> >>>>>>>>>>>>>>>>>>>>>>>>> separate log per share partition is probably too
> >> much
> >>>>>>>>>>>> overhead.
> >>>>>>>>>>>>>>>>>> It's
> >>>>>>>>>>>>>>>>>>>>>> more
> >>>>>>>>>>>>>>>>>>>>>>>>> efficient to put the state changes for multiple
> >> share
> >>>>>>>>>>>>>> partitions
> >>>>>>>>>>>>>>>>>> in a
> >>>>>>>>>>>>>>>>>>>>>>>>> single log.
> >>>>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>>> 100.
> ShareGroupHeartbeatRequest.RebalanceTimeoutMs:
> >>>>>>>> Should
> >>>>>>>>>> we
> >>>>>>>>>>>>>>>> name
> >>>>>>>>>>>>>>>>>> it
> >>>>>>>>>>>>>>>>>>>>>>>>> SessionTimeoutMs?
> >>>>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>>> 101.
> ShareGroupHeartbeatResponse.Assignment.Error:
> >>>> What
> >>>>>>>>>> kind
> >>>>>>>>>>>> of
> >>>>>>>>>>>>>>>>>> error
> >>>>>>>>>>>>>>>>>>>>>>>> could
> >>>>>>>>>>>>>>>>>>>>>>>>> we have when assigning partitions? What are the
> >>>>>>>>>> corresponding
> >>>>>>>>>>>>>>>> error
> >>>>>>>>>>>>>>>>>>>>>>>> codes?
> >>>>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>>> 102. Do we still need
> >>>>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>
> >>>>>>>>>>
> >>>>>>>>
> >>>>>>
> >>>>
> >>
> ShareGroupDescribeResponse.Members.Assignment.{MetadataVersion,MetadataBytes}?
> >>>>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>>> 103. Could we list the error codes separately for
> >>>>>>>>>>>>>>>>>>>>>>>>> ShareFetchResponse.Responses.Partitions.ErrorCode
> >> and
> >>>>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>> ShareFetchResponse.Responses.Partitions.AcknowledgeErrorCode?
> >>>>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>>> 104. Should we add error message for the
> errorCode
> >> in
> >>>>>>>>>>>>>>>>>>>>>> ShareFetchResponse,
> >>>>>>>>>>>>>>>>>>>>>>>>> ShareAcknowledgeResponse,
> >>>> ReadShareGroupStateResponse,
> >>>>>>>>>>>>>>>>>>>>>>>>> WriteShareGroupStateResponse,
> >>>>>>>>>> DeleteShareGroupStateResponse,
> >>>>>>>>>>>>>>>>>>>>>>>>> ReadShareGroupOffsetsStateResponse and
> >>>>>>>>>>>>>>>>>>>>>> InitializeShareGroupStateResponse?
> >>>>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>>> 105. "about": "The state -
> >>>>>>>>>> 0:Available,2:Acked,4:Archived.":
> >>>>>>>>>>>>>> What
> >>>>>>>>>>>>>>>>>>>>>> about 1
> >>>>>>>>>>>>>>>>>>>>>>>>> and 3? Are we leaving them out intentionally?
> >>>>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>>> 106. Do we have usage of metadata in
> >>>> OffsetAndMetadata?
> >>>>>>>> If
> >>>>>>>>>>>> not,
> >>>>>>>>>>>>>>>>>> could
> >>>>>>>>>>>>>>>>>>>>>> we
> >>>>>>>>>>>>>>>>>>>>>>>>> remove it from AdminClient and
> KafkaShareConsumer?
> >>>>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>>> 107. ListGroupsRequest: Should we bump up the
> >> version
> >>>>>>>> since
> >>>>>>>>>>>> it
> >>>>>>>>>>>>>>>> now
> >>>>>>>>>>>>>>>>>>>>>>>> supports
> >>>>>>>>>>>>>>>>>>>>>>>>> a new group type "share"?
> >>>>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>>> 108. AdminClient.listShareGroupOffsets: Should it
> >>>>>> expose
> >>>>>>>>>> all
> >>>>>>>>>>>>>> the
> >>>>>>>>>>>>>>>>>>>> states
> >>>>>>>>>>>>>>>>>>>>>>>>> from ReadShareGroupStateResponse, instead of just
> >>>> SPSO?
> >>>>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>>> 109. DeleteShareGroupOffsetsResult exposes
> >>>>>>>>>>>>>>>>>>>>>>>>> public KafkaFuture<Void> partitionResult(final
> >>>>>>>>>> TopicPartition
> >>>>>>>>>>>>>>>>>>>>>> partition)
> >>>>>>>>>>>>>>>>>>>>>>>>> DeleteShareGroupsResult exposes
> >>>>>>>>>>>>>>>>>>>>>>>>> public Map<String, KafkaFuture<Void>>
> >> deletedGroups()
> >>>>>>>>>>>>>>>>>>>>>>>>> Should we make them more consistent?
> >>>>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>>> 110. Should ShareGroupDescription include fields
> >> like
> >>>>>>>>>>>>>> GroupEpoch,
> >>>>>>>>>>>>>>>>>>>>>>>>> AssignmentEpoch, MemberEpoch, and
> >>>> SubscribedTopicNames?
> >>>>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>>> 111. Should GroupListing include GroupState?
> >>>>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>>> 112. Do we need ListShareGroupOffsetsSpec? Could
> we
> >>>>>> just
> >>>>>>>>>> use
> >>>>>>>>>>>>>>>>>>>>>>>>> Set<TopicPartition> directly?
> >>>>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>>> 113. ListShareGroupsResult.errors(): How do we
> know
> >>>>>> which
> >>>>>>>>>>>> group
> >>>>>>>>>>>>>>>> has
> >>>>>>>>>>>>>>>>>>>> an
> >>>>>>>>>>>>>>>>>>>>>>>>> error?
> >>>>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>>> Jun
> >>>>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>>> On Mon, Apr 8, 2024 at 9:32 AM Andrew Schofield <
> >>>>>>>>>>>>>>>>>>>>>>>>> andrew_schofield_j...@outlook.com> wrote:
> >>>>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>>>> Hi David,
> >>>>>>>>>>>>>>>>>>>>>>>>>> Thanks for your questions.
> >>>>>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>>>> 70. The Group Coordinator communicates with the
> >>>> Share
> >>>>>>>>>>>>>>>> Coordinator
> >>>>>>>>>>>>>>>>>>>> over
> >>>>>>>>>>>>>>>>>>>>>>>>>> RPCs.
> >>>>>>>>>>>>>>>>>>>>>>>>>> In the general case, it’s an inter-broker call.
> It
> >>>> is
> >>>>>>>>>>>> probably
> >>>>>>>>>>>>>>>>>>>>>> possible
> >>>>>>>>>>>>>>>>>>>>>>>> to
> >>>>>>>>>>>>>>>>>>>>>>>>>> optimise
> >>>>>>>>>>>>>>>>>>>>>>>>>> for the situation in which the appropriate GC
> and
> >> SC
> >>>>>>>>>> shards
> >>>>>>>>>>>>>> are
> >>>>>>>>>>>>>>>>>>>>>>>>>> co-located, but the
> >>>>>>>>>>>>>>>>>>>>>>>>>> KIP does not delve that deep into potential
> >>>>>> performance
> >>>>>>>>>>>>>>>>>>>> optimisations.
> >>>>>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>>>> 71. Avoiding collisions would be a good idea,
> but
> >> I
> >>>> do
> >>>>>>>>>> worry
> >>>>>>>>>>>>>>>> about
> >>>>>>>>>>>>>>>>>>>>>>>>>> retrospectively
> >>>>>>>>>>>>>>>>>>>>>>>>>> introducing a naming convention for groups. I
> feel
> >>>>>> that
> >>>>>>>>>>>> naming
> >>>>>>>>>>>>>>>>>>>>>>>> conventions
> >>>>>>>>>>>>>>>>>>>>>>>>>> will
> >>>>>>>>>>>>>>>>>>>>>>>>>> typically be the responsibility of the cluster
> >>>>>>>>>>>> administrators
> >>>>>>>>>>>>>>>>>> based
> >>>>>>>>>>>>>>>>>>>> on
> >>>>>>>>>>>>>>>>>>>>>>>>>> organizational
> >>>>>>>>>>>>>>>>>>>>>>>>>> factors, such as the name of an application.
> >>>>>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>>>> 72. Personally, I don’t like INVALID_GROUP_ID
> >>>> because
> >>>>>>>> the
> >>>>>>>>>>>>>> group
> >>>>>>>>>>>>>>>> ID
> >>>>>>>>>>>>>>>>>>>> is
> >>>>>>>>>>>>>>>>>>>>>>>>>> correct but
> >>>>>>>>>>>>>>>>>>>>>>>>>> the group is the wrong type. The nearest
> existing
> >>>>>> error
> >>>>>>>>>> code
> >>>>>>>>>>>>>>>> that
> >>>>>>>>>>>>>>>>>>>> gets
> >>>>>>>>>>>>>>>>>>>>>>>>>> that across
> >>>>>>>>>>>>>>>>>>>>>>>>>> is INCONSISTENT_GROUP_PROTOCOL. Perhaps this is
> >>>> really
> >>>>>>>>>>>> showing
> >>>>>>>>>>>>>>>>>> that
> >>>>>>>>>>>>>>>>>>>> a
> >>>>>>>>>>>>>>>>>>>>>>>> new
> >>>>>>>>>>>>>>>>>>>>>>>>>> error code would be better.
> >>>>>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>>>> 73. The Metadata fields are not used. I have
> >> removed
> >>>>>>>> them.
> >>>>>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>>>> 74. The metadata is re-evaluated on every
> change,
> >>>> but
> >>>>>>>>>> only a
> >>>>>>>>>>>>>>>>>> subset
> >>>>>>>>>>>>>>>>>>>> is
> >>>>>>>>>>>>>>>>>>>>>>>>>> relevant
> >>>>>>>>>>>>>>>>>>>>>>>>>> for rebalancing. A check is done against the
> names
> >>>> of
> >>>>>>>> the
> >>>>>>>>>>>>>>>>>> subscribed
> >>>>>>>>>>>>>>>>>>>>>>>>>> topics to
> >>>>>>>>>>>>>>>>>>>>>>>>>> see if any relevant changes may have occurred.
> >> Then
> >>>>>> the
> >>>>>>>>>>>>>> changes
> >>>>>>>>>>>>>>>>>>>> which
> >>>>>>>>>>>>>>>>>>>>>>>>>> trigger
> >>>>>>>>>>>>>>>>>>>>>>>>>> a rebalance are topic creation, deletion, change
> >> in
> >>>>>>>>>>>>>> partitions,
> >>>>>>>>>>>>>>>> or
> >>>>>>>>>>>>>>>>>>>>>> rack
> >>>>>>>>>>>>>>>>>>>>>>>>>> IDs for the
> >>>>>>>>>>>>>>>>>>>>>>>>>> replicas. I have updated the KIP to make this
> more
> >>>>>>>> clear.
> >>>>>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>>>> 75. The assignment is not persisted because it
> is
> >>>> much
> >>>>>>>>>> less
> >>>>>>>>>>>>>>>>>>>> important
> >>>>>>>>>>>>>>>>>>>>>>>> that
> >>>>>>>>>>>>>>>>>>>>>>>>>> the
> >>>>>>>>>>>>>>>>>>>>>>>>>> assignment survives a GC change. There’s no need
> >> to
> >>>>>>>>>> transfer
> >>>>>>>>>>>>>>>>>>>>>> partitions
> >>>>>>>>>>>>>>>>>>>>>>>>>> safely from
> >>>>>>>>>>>>>>>>>>>>>>>>>> member to member in the way that is required for
> >>>>>>>> consumer
> >>>>>>>>>>>>>>>> groups,
> >>>>>>>>>>>>>>>>>> so
> >>>>>>>>>>>>>>>>>>>>>> as
> >>>>>>>>>>>>>>>>>>>>>>>> an
> >>>>>>>>>>>>>>>>>>>>>>>>>> optimisation, the assignments for a share group
> >> are
> >>>>>> not
> >>>>>>>>>>>>>>>> persisted.
> >>>>>>>>>>>>>>>>>>>> It
> >>>>>>>>>>>>>>>>>>>>>>>>>> wouldn’t do any
> >>>>>>>>>>>>>>>>>>>>>>>>>> harm, but it just seems unnecessary.
> >>>>>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>>>> 76. In the event that a consumer tries to
> >>>> acknowledge
> >>>>>> a
> >>>>>>>>>>>> record
> >>>>>>>>>>>>>>>>>> that
> >>>>>>>>>>>>>>>>>>>> it
> >>>>>>>>>>>>>>>>>>>>>>>> now
> >>>>>>>>>>>>>>>>>>>>>>>>>> longer
> >>>>>>>>>>>>>>>>>>>>>>>>>> has the right to acknowledge, the
> >>>> INVALID_RECORD_STATE
> >>>>>>>>>> error
> >>>>>>>>>>>>>>>> code
> >>>>>>>>>>>>>>>>>> is
> >>>>>>>>>>>>>>>>>>>>>>>> used.
> >>>>>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>>>> If the application uses the
> >>>>>>>> KafkaShareConsumer.commitSync
> >>>>>>>>>>>>>>>> method,
> >>>>>>>>>>>>>>>>>> it
> >>>>>>>>>>>>>>>>>>>>>>>> will
> >>>>>>>>>>>>>>>>>>>>>>>>>> see an InvalidRecordState exception returned.
> >>>>>>>>>> Alternatively,
> >>>>>>>>>>>>>> the
> >>>>>>>>>>>>>>>>>>>>>>>>>> application can
> >>>>>>>>>>>>>>>>>>>>>>>>>> register an acknowledgement commit callback
> which
> >>>> will
> >>>>>>>> be
> >>>>>>>>>>>>>> called
> >>>>>>>>>>>>>>>>>>>> with
> >>>>>>>>>>>>>>>>>>>>>>>> the
> >>>>>>>>>>>>>>>>>>>>>>>>>> status
> >>>>>>>>>>>>>>>>>>>>>>>>>> of the acknowledgements that have succeeded or
> >>>> failed.
> >>>>>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>>>> 77. I have tried to tread a careful path with
> the
> >>>>>>>> durable
> >>>>>>>>>>>>>>>>>>>>>>>> share-partition
> >>>>>>>>>>>>>>>>>>>>>>>>>> state in this
> >>>>>>>>>>>>>>>>>>>>>>>>>> KIP. The significant choices I made are that:
> >>>>>>>>>>>>>>>>>>>>>>>>>> * Topics are used so that the state is
> replicated
> >>>>>>>> between
> >>>>>>>>>>>>>>>> brokers.
> >>>>>>>>>>>>>>>>>>>>>>>>>> * Log compaction is used to keep a lid on the
> >>>> storage.
> >>>>>>>>>>>>>>>>>>>>>>>>>> * Only one topic is required.
> >>>>>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>>>> Log compaction as it stands is not ideal for
> this
> >>>> kind
> >>>>>>>> of
> >>>>>>>>>>>>>> data,
> >>>>>>>>>>>>>>>> as
> >>>>>>>>>>>>>>>>>>>>>>>>>> evidenced by
> >>>>>>>>>>>>>>>>>>>>>>>>>> the DeltaIndex technique I employed.
> >>>>>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>>>> I can think of a few relatively simple ways to
> >>>> improve
> >>>>>>>>>> upon
> >>>>>>>>>>>>>> it.
> >>>>>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>>>> a) We could use a customised version of the log
> >>>>>>>> compactor
> >>>>>>>>>>>> for
> >>>>>>>>>>>>>>>> this
> >>>>>>>>>>>>>>>>>>>>>> topic
> >>>>>>>>>>>>>>>>>>>>>>>>>> that
> >>>>>>>>>>>>>>>>>>>>>>>>>> understands the rules for ShareCheckpoint and
> >>>>>> ShareDelta
> >>>>>>>>>>>>>>>> records.
> >>>>>>>>>>>>>>>>>>>>>>>>>> Essentially,
> >>>>>>>>>>>>>>>>>>>>>>>>>> for each share-partition, the latest
> >> ShareCheckpoint
> >>>>>> and
> >>>>>>>>>> any
> >>>>>>>>>>>>>>>>>>>>>> subsequent
> >>>>>>>>>>>>>>>>>>>>>>>>>> ShareDelta
> >>>>>>>>>>>>>>>>>>>>>>>>>> records must not be cleaned. Anything else can
> be
> >>>>>>>> cleaned.
> >>>>>>>>>>>> We
> >>>>>>>>>>>>>>>>>> could
> >>>>>>>>>>>>>>>>>>>>>> then
> >>>>>>>>>>>>>>>>>>>>>>>>>> be sure
> >>>>>>>>>>>>>>>>>>>>>>>>>> that multiple ShareDelta records with the same
> key
> >>>>>> would
> >>>>>>>>>>>>>> survive
> >>>>>>>>>>>>>>>>>>>>>>>> cleaning
> >>>>>>>>>>>>>>>>>>>>>>>>>> and we could
> >>>>>>>>>>>>>>>>>>>>>>>>>> abandon the DeltaIndex technique.
> >>>>>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>>>> b) Actually what is required is a log per
> >>>>>>>> share-partition.
> >>>>>>>>>>>>>> Let’s
> >>>>>>>>>>>>>>>>>>>>>> imagine
> >>>>>>>>>>>>>>>>>>>>>>>>>> that we had
> >>>>>>>>>>>>>>>>>>>>>>>>>> a share-state topic per topic being consumed in
> a
> >>>>>> share
> >>>>>>>>>>>> group,
> >>>>>>>>>>>>>>>>>> with
> >>>>>>>>>>>>>>>>>>>>>> the
> >>>>>>>>>>>>>>>>>>>>>>>>>> same number
> >>>>>>>>>>>>>>>>>>>>>>>>>> of partitions as the topic being consumed. We
> >> could
> >>>>>>>> write
> >>>>>>>>>>>> many
> >>>>>>>>>>>>>>>>>> more
> >>>>>>>>>>>>>>>>>>>>>>>> deltas
> >>>>>>>>>>>>>>>>>>>>>>>>>> between
> >>>>>>>>>>>>>>>>>>>>>>>>>> checkpoints, and just take periodic checkpoints
> to
> >>>>>> keep
> >>>>>>>>>>>>>> control
> >>>>>>>>>>>>>>>> of
> >>>>>>>>>>>>>>>>>>>> the
> >>>>>>>>>>>>>>>>>>>>>>>>>> storage used.
> >>>>>>>>>>>>>>>>>>>>>>>>>> Once a checkpoint has been taken, we could use
> >>>>>>>>>>>>>>>>>>>>>>>> KafkaAdmin.deleteRecords()
> >>>>>>>>>>>>>>>>>>>>>>>>>> to
> >>>>>>>>>>>>>>>>>>>>>>>>>> prune all of the older records.
> >>>>>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>>>> The share-state topics would be more numerous,
> but
> >>>> we
> >>>>>>>> are
> >>>>>>>>>>>>>>>> talking
> >>>>>>>>>>>>>>>>>>>> one
> >>>>>>>>>>>>>>>>>>>>>>>> per
> >>>>>>>>>>>>>>>>>>>>>>>>>> topic
> >>>>>>>>>>>>>>>>>>>>>>>>>> per share group that it’s being consumed in.
> These
> >>>>>>>> topics
> >>>>>>>>>>>>>> would
> >>>>>>>>>>>>>>>>>> not
> >>>>>>>>>>>>>>>>>>>> be
> >>>>>>>>>>>>>>>>>>>>>>>>>> compacted.
> >>>>>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>>>> As you’ll see in the KIP, the Persister
> interface
> >> is
> >>>>>>>>>>>> intended
> >>>>>>>>>>>>>> to
> >>>>>>>>>>>>>>>>>> be
> >>>>>>>>>>>>>>>>>>>>>>>>>> pluggable one day.
> >>>>>>>>>>>>>>>>>>>>>>>>>> I know the scheme in the KIP is not ideal. It
> >> seems
> >>>>>>>> likely
> >>>>>>>>>>>> to
> >>>>>>>>>>>>>> me
> >>>>>>>>>>>>>>>>>>>> that
> >>>>>>>>>>>>>>>>>>>>>>>>>> future KIPs will
> >>>>>>>>>>>>>>>>>>>>>>>>>> improve upon it.
> >>>>>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>>>> If I can get buy-in for option (b), I’m happy to
> >>>>>> change
> >>>>>>>>>> this
> >>>>>>>>>>>>>>>> KIP.
> >>>>>>>>>>>>>>>>>>>>>> While
> >>>>>>>>>>>>>>>>>>>>>>>>>> option (a) is
> >>>>>>>>>>>>>>>>>>>>>>>>>> probably workable, it does seem a bit of a hack
> to
> >>>>>> have
> >>>>>>>> a
> >>>>>>>>>>>>>>>>>> customised
> >>>>>>>>>>>>>>>>>>>>>> log
> >>>>>>>>>>>>>>>>>>>>>>>>>> compactor
> >>>>>>>>>>>>>>>>>>>>>>>>>> just for this topic.
> >>>>>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>>>> 78. How about DeliveryState? I agree that State
> is
> >>>>>>>>>>>> overloaded.
> >>>>>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>>>> 79. See (77).
> >>>>>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>>>> Thanks,
> >>>>>>>>>>>>>>>>>>>>>>>>>> Andrew
> >>>>>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>>>>> On 5 Apr 2024, at 05:07, David Arthur <
> >>>>>>>>>>>>>>>> david.art...@confluent.io
> >>>>>>>>>>>>>>>>>>>>>>>> .INVALID>
> >>>>>>>>>>>>>>>>>>>>>>>>>> wrote:
> >>>>>>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>>>>> Andrew, thanks for the KIP! This is a pretty
> >>>> exciting
> >>>>>>>>>>>> effort.
> >>>>>>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>>>>> I've finally made it through the KIP, still
> >> trying
> >>>> to
> >>>>>>>>>> grok
> >>>>>>>>>>>>>> the
> >>>>>>>>>>>>>>>>>>>> whole
> >>>>>>>>>>>>>>>>>>>>>>>>>> thing.
> >>>>>>>>>>>>>>>>>>>>>>>>>>> Sorry if some of my questions are basic :)
> >>>>>>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>>>>> Concepts:
> >>>>>>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>>>>> 70. Does the Group Coordinator communicate with
> >> the
> >>>>>>>> Share
> >>>>>>>>>>>>>>>>>>>> Coordinator
> >>>>>>>>>>>>>>>>>>>>>>>>>> over
> >>>>>>>>>>>>>>>>>>>>>>>>>>> RPC or directly in-process?
> >>>>>>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>>>>> 71. For preventing name collisions with regular
> >>>>>>>> consumer
> >>>>>>>>>>>>>>>> groups,
> >>>>>>>>>>>>>>>>>>>>>> could
> >>>>>>>>>>>>>>>>>>>>>>>> we
> >>>>>>>>>>>>>>>>>>>>>>>>>>> define a reserved share group prefix? E.g., the
> >>>>>>>> operator
> >>>>>>>>>>>>>>>> defines
> >>>>>>>>>>>>>>>>>>>>>> "sg_"
> >>>>>>>>>>>>>>>>>>>>>>>>>> as a
> >>>>>>>>>>>>>>>>>>>>>>>>>>> prefix for share groups only, and if a regular
> >>>>>> consumer
> >>>>>>>>>>>> group
> >>>>>>>>>>>>>>>>>> tries
> >>>>>>>>>>>>>>>>>>>>>> to
> >>>>>>>>>>>>>>>>>>>>>>>>>> use
> >>>>>>>>>>>>>>>>>>>>>>>>>>> that name it fails.
> >>>>>>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>>>>> 72. When a consumer tries to use a share group,
> >> or
> >>>> a
> >>>>>>>>>> share
> >>>>>>>>>>>>>>>>>> consumer
> >>>>>>>>>>>>>>>>>>>>>>>> tries
> >>>>>>>>>>>>>>>>>>>>>>>>>>> to use a regular group, would INVALID_GROUP_ID
> >> make
> >>>>>>>> more
> >>>>>>>>>>>>>> sense
> >>>>>>>>>>>>>>>>>>>>>>>>>>> than INCONSISTENT_GROUP_PROTOCOL?
> >>>>>>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>>>>> --------
> >>>>>>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>>>>> Share Group Membership:
> >>>>>>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>>>>> 73. What goes in the Metadata field for
> >>>>>>>>>>>>>> TargetAssignment#Member
> >>>>>>>>>>>>>>>>>> and
> >>>>>>>>>>>>>>>>>>>>>>>>>>> Assignment?
> >>>>>>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>>>>> 74. Under Trigger a rebalance, it says we
> >> rebalance
> >>>>>>>> when
> >>>>>>>>>>>> the
> >>>>>>>>>>>>>>>>>>>>>> partition
> >>>>>>>>>>>>>>>>>>>>>>>>>>> metadata changes. Would this be for any change,
> >> or
> >>>>>> just
> >>>>>>>>>>>>>> certain
> >>>>>>>>>>>>>>>>>>>> ones?
> >>>>>>>>>>>>>>>>>>>>>>>> For
> >>>>>>>>>>>>>>>>>>>>>>>>>>> example, if a follower drops out of the ISR and
> >>>> comes
> >>>>>>>>>> back,
> >>>>>>>>>>>>>> we
> >>>>>>>>>>>>>>>>>>>>>> probably
> >>>>>>>>>>>>>>>>>>>>>>>>>>> don't need to rebalance.
> >>>>>>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>>>>> 75. "For a share group, the group coordinator
> >> does
> >>>>>>>> *not*
> >>>>>>>>>>>>>>>> persist
> >>>>>>>>>>>>>>>>>>>> the
> >>>>>>>>>>>>>>>>>>>>>>>>>>> assignment" Can you explain why this is not
> >> needed?
> >>>>>>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>>>>> 76. " If the consumer just failed to heartbeat
> >> due
> >>>>>> to a
> >>>>>>>>>>>>>>>> temporary
> >>>>>>>>>>>>>>>>>>>>>>>> pause,
> >>>>>>>>>>>>>>>>>>>>>>>>>> it
> >>>>>>>>>>>>>>>>>>>>>>>>>>> could in theory continue to fetch and
> acknowledge
> >>>>>>>>>> records.
> >>>>>>>>>>>>>> When
> >>>>>>>>>>>>>>>>>> it
> >>>>>>>>>>>>>>>>>>>>>>>>>> finally
> >>>>>>>>>>>>>>>>>>>>>>>>>>> sends a heartbeat and realises it’s been kicked
> >> out
> >>>>>> of
> >>>>>>>>>> the
> >>>>>>>>>>>>>>>> group,
> >>>>>>>>>>>>>>>>>>>> it
> >>>>>>>>>>>>>>>>>>>>>>>>>> should
> >>>>>>>>>>>>>>>>>>>>>>>>>>> stop fetching records because its assignment
> has
> >>>> been
> >>>>>>>>>>>>>> revoked,
> >>>>>>>>>>>>>>>>>> and
> >>>>>>>>>>>>>>>>>>>>>>>> rejoin
> >>>>>>>>>>>>>>>>>>>>>>>>>>> the group."
> >>>>>>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>>>>> A consumer with a long pause might still
> deliver
> >>>> some
> >>>>>>>>>>>>>> buffered
> >>>>>>>>>>>>>>>>>>>>>> records,
> >>>>>>>>>>>>>>>>>>>>>>>>>> but
> >>>>>>>>>>>>>>>>>>>>>>>>>>> if the share group coordinator has expired its
> >>>>>> session,
> >>>>>>>>>> it
> >>>>>>>>>>>>>>>>>> wouldn't
> >>>>>>>>>>>>>>>>>>>>>>>>>> accept
> >>>>>>>>>>>>>>>>>>>>>>>>>>> acknowledgments for that share consumer. In
> such
> >> a
> >>>>>>>> case,
> >>>>>>>>>> is
> >>>>>>>>>>>>>> any
> >>>>>>>>>>>>>>>>>>>> kind
> >>>>>>>>>>>>>>>>>>>>>> of
> >>>>>>>>>>>>>>>>>>>>>>>>>>> error raised to the application like "hey, I
> know
> >>>> we
> >>>>>>>> gave
> >>>>>>>>>>>> you
> >>>>>>>>>>>>>>>>>> these
> >>>>>>>>>>>>>>>>>>>>>>>>>>> records, but really we shouldn't have" ?
> >>>>>>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>>>>> -----
> >>>>>>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>>>>> Record Delivery and acknowledgement
> >>>>>>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>>>>> 77. If we guarantee that a ShareCheckpoint is
> >>>> written
> >>>>>>>> at
> >>>>>>>>>>>>>> least
> >>>>>>>>>>>>>>>>>>>> every
> >>>>>>>>>>>>>>>>>>>>>> so
> >>>>>>>>>>>>>>>>>>>>>>>>>>> often, could we add a new log compactor that
> >> avoids
> >>>>>>>>>>>>>> compacting
> >>>>>>>>>>>>>>>>>>>>>>>>>> ShareDelta-s
> >>>>>>>>>>>>>>>>>>>>>>>>>>> that are still "active" (i.e., not yet
> superceded
> >>>> by
> >>>>>> a
> >>>>>>>>>> new
> >>>>>>>>>>>>>>>>>>>>>>>>>>> ShareCheckpoint). Mechnically, this could be
> done
> >>>> by
> >>>>>>>>>>>> keeping
> >>>>>>>>>>>>>>>> the
> >>>>>>>>>>>>>>>>>>>> LSO
> >>>>>>>>>>>>>>>>>>>>>> no
> >>>>>>>>>>>>>>>>>>>>>>>>>>> greater than the oldest "active"
> ShareCheckpoint.
> >>>>>> This
> >>>>>>>>>>>> might
> >>>>>>>>>>>>>>>> let
> >>>>>>>>>>>>>>>>>> us
> >>>>>>>>>>>>>>>>>>>>>>>>>> remove
> >>>>>>>>>>>>>>>>>>>>>>>>>>> the DeltaIndex thing.
> >>>>>>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>>>>> 78. Instead of the State in the
> >>>> ShareDelta/Checkpoint
> >>>>>>>>>>>>>> records,
> >>>>>>>>>>>>>>>>>> how
> >>>>>>>>>>>>>>>>>>>>>>>> about
> >>>>>>>>>>>>>>>>>>>>>>>>>>> MessageState? (State is kind of
> >>>> overloaded/ambiguous)
> >>>>>>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>>>>> 79. One possible limitation with the current
> >>>>>>>> persistence
> >>>>>>>>>>>>>> model
> >>>>>>>>>>>>>>>> is
> >>>>>>>>>>>>>>>>>>>>>> that
> >>>>>>>>>>>>>>>>>>>>>>>>>> all
> >>>>>>>>>>>>>>>>>>>>>>>>>>> the share state is stored in one topic. It
> seems
> >>>> like
> >>>>>>>> we
> >>>>>>>>>>>> are
> >>>>>>>>>>>>>>>>>> going
> >>>>>>>>>>>>>>>>>>>> to
> >>>>>>>>>>>>>>>>>>>>>>>> be
> >>>>>>>>>>>>>>>>>>>>>>>>>>> storing a lot more state than we do in
> >>>>>>>> __consumer_offsets
> >>>>>>>>>>>>>> since
> >>>>>>>>>>>>>>>>>>>> we're
> >>>>>>>>>>>>>>>>>>>>>>>>>>> dealing with message-level acks. With
> aggressive
> >>>>>>>>>>>>>> checkpointing
> >>>>>>>>>>>>>>>>>> and
> >>>>>>>>>>>>>>>>>>>>>>>>>>> compaction, we can mitigate the storage
> >>>> requirements,
> >>>>>>>> but
> >>>>>>>>>>>> the
> >>>>>>>>>>>>>>>>>>>>>>>> throughput
> >>>>>>>>>>>>>>>>>>>>>>>>>>> could be a limiting factor. Have we considered
> >>>> other
> >>>>>>>>>>>>>>>>>> possibilities
> >>>>>>>>>>>>>>>>>>>>>> for
> >>>>>>>>>>>>>>>>>>>>>>>>>>> persistence?
> >>>>>>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>>>>> Cheers,
> >>>>>>>>>>>>>>>>>>>>>>>>>>> David
> >>>>>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>
> >>>>>>>>>>>>
> >>>>>>>>>>
> >>>>>>>>>>
> >>>>>>>>
> >>>>>>>>
> >>>>>>
> >>>>>>
> >>>>
> >>>>
> >>
> >>
>
>

Reply via email to