Hi, Andrew,

Thanks for the reply.

162. It's fine to start with just the group type. Since ListGroups() is a
generic API, I want to make sure that it covers all existing groups.
Currently, GroupType only has "classic" and "consumer", both of which seem
to be related to groups formed by consumers since it's part of
ConsumerGroupDescription. Does ListGroup() return connect based groups and
if so, what's the GroupType? If ListGroup() doesn't cover all groups,
should we name it more accurately?

Jun


On Fri, May 3, 2024 at 7:51 PM Andrew Schofield <andrew_schofi...@live.com>
wrote:

> Hi Jun,
> Thanks for your reply.
>
> 161. ShareGroupListing and ShareGroupDescription are using
> the same pattern as ConsumerGroupListing and
> ConsumerGroupDescription. I have gone for consistency which
> I think is probably best here. It’s what I would expect if I had previously
> used the admin API for consumer groups and was looking to use it for
> share groups. I agree it’s a bit weird.
>
> 162. GroupListing contains the only information which is properly
> in common between a ConsumerGroupListing and a ShareGroupListing.
> ListGroupsResponse.ProtocolType is interpreted to provide the
> group type. I know that the ListGroups RPC also includes the group
> state, but that’s as a string and there’s no common enum for the states
> of all types of group. As a result, I have exposed group type but not
> state on this API.
>
> Previously in the discussion for this KIP, I mentioned that I would
> create another KIP for the administration of groups, in particular
> how the administrator can ensure that particular group IDs
> are used for the group type they desire. At the moment, I think
> keeping ListGroups in this KIP is a good idea. If we actually want
> to make it more sophisticated, perhaps that would be better with
> the group administration KIP.
>
> 163. It will be one higher than the latest version at the time we are
> ready to deliver this feature for real. When we are on the cusp of
> delivery, I’ll update the KIP with the final value.
>
> 164. KRaft only. All the RPCs are “broker” only. None of the code will
> be merged until after 3.8 has branched.
>
> Thanks,
> Andrew
>
> > On 4 May 2024, at 00:12, Jun Rao <j...@confluent.io.INVALID> wrote:
> >
> > Hi, Andrew,
> >
> > Thanks for the reply. A few more comments.
> >
> > 161. ShareGroupListing.state() returns an optional, but
> > ShareGroupDescription.state() does not. Should we make them consistent?
> > Also, it seems a bit weird to return optional with an UNKNOWN state.
> >
> > 162. Should GroupListing include ProtocolType and GroupState too?
> >
> > 163. What is the value of group.version to gate the queuing feature?
> >
> > 164. Is the queueing feature only supported on KRaft clusters? For
> example,
> > the feature tool seems to be built only for the KRaft cluster.
> >
> > Jun
> >
> > On Fri, May 3, 2024 at 10:32 AM Andrew Schofield <
> andrew_schofi...@live.com>
> > wrote:
> >
> >> Hi Jun,
> >> Thanks for your reply.
> >>
> >> 147. Yes, I see what you mean. The rebalance latency will indeed
> >> be very short by comparison. I have removed the rebalance latency
> >> metrics from the client and retained the rebalance count and rate.
> >>
> >> 150. Yes, I think so. I have tweaked the text so that the simple
> >> assignor will take into account existing assignment information when
> >> it has it, which would just minimise unnecessary churn of (b).
> >>
> >> 158. I’ve changed it to ReadShareGroupStateSummary.
> >>
> >> Thanks,
> >> Andrew
> >>
> >>
> >>> On 3 May 2024, at 22:17, Jun Rao <j...@confluent.io.INVALID> wrote:
> >>>
> >>> Hi, Andrew,
> >>>
> >>> Thanks for the reply.
> >>>
> >>> 147. There seems to be some difference between consumer groups and
> share
> >>> groups. In the consumer groups, if a client receives a heartbeat
> response
> >>> to revoke some partitions, it may have to commit offsets before
> revoking
> >>> partitions or it may have to call the rebalance callbacks provided by
> the
> >>> user. This may take some time and can be reflected in the rebalance
> time
> >>> metric. In the share groups, none of that exists. If a client receives
> >> some
> >>> added/revoked partitions, it accepts them immediately, right? So, does
> >> that
> >>> practically make the rebalance time always 0?
> >>>
> >>> 150. I guess in the common case, there will be many more members than
> >>> partitions. So the need for (b) will be less common. We can probably
> >> leave
> >>> the persisting of the assignment out for now.
> >>>
> >>> 158. The new name sounds good to me.
> >>>
> >>> Jun
> >>>
> >>> On Thu, May 2, 2024 at 10:21 PM Andrew Schofield <
> >> andrew_schofi...@live.com>
> >>> wrote:
> >>>
> >>>> Hi Jun,
> >>>> Thanks for the response.
> >>>>
> >>>> 147. I am trying to get a correspondence between the concepts and
> >>>> metrics of consumer groups and share groups. In both cases,
> >>>> the client doesn’t strictly know when the rebalance starts. All it
> knows
> >>>> is when it has work to do in order to perform its part of a rebalance.
> >>>> I am proposing that share groups and consumer groups use
> >>>> equivalent logic.
> >>>>
> >>>> I could remove the rebalance metrics from the client because I do
> >>>> understand that they are making a judgement about when a rebalance
> >>>> starts, but it’s their own part of the rebalance they are measuring.
> >>>>
> >>>> I tend to think these metrics are better than no metrics and
> >>>> will at least enable administrators to see how much rebalance
> >>>> activity the members of share groups are experiencing.
> >>>>
> >>>> 150. The simple assignor does not take existing assignments into
> >>>> consideration. The ShareGroupPartitionAssignor interface would
> >>>> permit this, but the simple assignor does not currently use it.
> >>>>
> >>>> The simple assignor assigns partitions in two ways:
> >>>> a) Distribute the members across the partitions by hashed member ID.
> >>>> b) If any partitions have no members assigned, distribute the members
> >>>> across these partitions round-robin.
> >>>>
> >>>> The (a) partitions will be quite stable. The (b) partitions will be
> less
> >>>> stable. By using existing assignment information, it could make (b)
> >>>> partition assignment more stable, whether the assignments are
> >>>> persisted or not. Perhaps it would be worth changing the simple
> >>>> assignor in order to make (b) more stable.
> >>>>
> >>>> I envisage more sophisticated assignors in the future which could use
> >>>> existing assignments and also other dynamic factors such as lag.
> >>>>
> >>>> If it transpires that there is significant benefit in persisting
> >>>> assignments
> >>>> specifically to help smooth assignment in the event of GC change,
> >>>> it would be quite an easy enhancement. I am not inclined to persist
> >>>> the assignments in this KIP.
> >>>>
> >>>> 158. Ah, yes. I see. Of course, I want the names as consistent and
> >>>> understandable too. I suggest renaming
> >>>> ReadShareGroupOffsetsState to ReadShareGroupStateSummary.
> >>>> I haven’t changed the KIP yet, so let me know if that’s OK.
> >>>>
> >>>> Thanks,
> >>>> Andrew
> >>>>
> >>>>> On 2 May 2024, at 22:18, Jun Rao <j...@confluent.io.INVALID> wrote:
> >>>>>
> >>>>> Hi, Andrew,
> >>>>>
> >>>>> Thanks for the reply.
> >>>>>
> >>>>> 147. " it makes a judgement about whether an assignment received is
> >> equal
> >>>>> to what it already is using."
> >>>>> If a client receives an assignment different from what it has, it
> >>>> indicates
> >>>>> the end of the rebalance. But how does the client know when the
> >> rebalance
> >>>>> starts? In the shareHeartbeat design, the new group epoch is
> propagated
> >>>>> together with the new assignment in the response.
> >>>>>
> >>>>> 150. It could be a potential concern if each GC change forces
> >> significant
> >>>>> assignment changes. Does the default assignor take existing
> assignments
> >>>>> into consideration?
> >>>>>
> >>>>> 155. Good point. Sounds good.
> >>>>>
> >>>>> 158. My concern with the current naming is that it's not clear what
> the
> >>>>> difference is between ReadShareGroupOffsetsState and
> >> ReadShareGroupState.
> >>>>> The state in the latter is also offsets.
> >>>>>
> >>>>> Jun
> >>>>>
> >>>>> On Wed, May 1, 2024 at 9:51 PM Andrew Schofield <
> >>>> andrew_schofi...@live.com>
> >>>>> wrote:
> >>>>>
> >>>>>> Hi Jun,
> >>>>>> Thanks for your reply.
> >>>>>>
> >>>>>> 147. Perhaps the easiest is to take a look at the code in
> >>>>>> o.a.k.clients.consumer.internal.MembershipManagerImpl.
> >>>>>> This class is part of the new consumer group protocol
> >>>>>> code in the client. It makes state transitions based on
> >>>>>> the heartbeat requests and responses, and it makes a
> >>>>>> judgement about whether an assignment received is
> >>>>>> equal to what it already is using. When a state transition
> >>>>>> is deemed to be the beginning or end of a rebalance
> >>>>>> from the point of view of this client, it counts towards the
> >>>>>> rebalance metrics.
> >>>>>>
> >>>>>> Share groups will follow the same path.
> >>>>>>
> >>>>>> 150. I do not consider it a concern. Rebalancing a share group
> >>>>>> is less disruptive than rebalancing a consumer group. If the
> assignor
> >>>>>> Has information about existing assignments, it can use it. It is
> >>>>>> true that this information cannot be replayed from a topic and will
> >>>>>> sometimes be unknown as a result.
> >>>>>>
> >>>>>> 151. I don’t want to rename TopicPartitionsMetadata to
> >>>>>> simply TopicPartitions (it’s information about the partitions of
> >>>>>> a topic) because we then have an array of plurals.
> >>>>>> I’ve renamed Metadata to Info. That’s a bit less cumbersome.
> >>>>>>
> >>>>>> 152. Fixed.
> >>>>>>
> >>>>>> 153. It’s the GC. Fixed.
> >>>>>>
> >>>>>> 154. The UNKNOWN “state” is essentially a default for situations
> where
> >>>>>> the code cannot understand data it received. For example, let’s say
> >> that
> >>>>>> Kafka 4.0 has groups with states EMPTY, STABLE, DEAD. If Kafka 4.1
> >>>>>> introduced another state THINKING, a tool built with Kafka 4.0 would
> >> not
> >>>>>> know what THINKING meant. It will use “UNKNOWN” to indicate that the
> >>>>>> state was something that it could not understand.
> >>>>>>
> >>>>>> 155. No, it’s a the level of the share-partition. If the offsets for
> >>>> just
> >>>>>> one share-partition is reset, only the state epoch for that
> partition
> >> is
> >>>>>> updated.
> >>>>>>
> >>>>>> 156. Strictly speaking, it’s redundant. I think having the
> StartOffset
> >>>>>> separate gives helpful clarity and I prefer to retain it.
> >>>>>>
> >>>>>> 157. Yes, you are right. There’s no reason why a leader change needs
> >>>>>> to force a ShareSnapshot. I’ve added leaderEpoch to the ShareUpdate.
> >>>>>>
> >>>>>> 158. Although ReadShareGroupOffsetsState is a bit of a mouthful,
> >>>>>> having “State” in the name makes it clear that this one the family
> of
> >>>>>> inter-broker RPCs served by the share coordinator. The admin RPCs
> >>>>>> such as DescribeShareGroupOffsets do not include “State”.
> >>>>>>
> >>>>>> 159. Fixed.
> >>>>>>
> >>>>>> 160. Fixed.
> >>>>>>
> >>>>>> Thanks,
> >>>>>> Andrew
> >>>>>>
> >>>>>>> On 2 May 2024, at 00:29, Jun Rao <j...@confluent.io.INVALID> wrote:
> >>>>>>>
> >>>>>>> Hi, Andrew,
> >>>>>>>
> >>>>>>> Thanks for the reply.
> >>>>>>>
> >>>>>>> 147. "The measurement is certainly from the point of view of the
> >>>> client,
> >>>>>>> but it’s driven by sending and receiving heartbeats rather than
> >> whether
> >>>>>> the
> >>>>>>> client triggered the rebalance itself."
> >>>>>>> Hmm, how does a client know which heartbeat response starts a
> >>>> rebalance?
> >>>>>>>
> >>>>>>> 150. PartitionAssignor takes existing assignments into
> consideration.
> >>>>>> Since
> >>>>>>> GC doesn't persist the assignment for share groups, it means that
> >>>>>>> ShareGroupPartitionAssignor can't reliably depend on existing
> >>>>>> assignments.
> >>>>>>> Is that a concern?
> >>>>>>>
> >>>>>>> 151. ShareGroupPartitionMetadataValue: Should we rename
> >>>>>>> TopicPartitionsMetadata and TopicMetadata since there is no
> metadata?
> >>>>>>>
> >>>>>>> 152. ShareGroupMetadataKey: "versions": "3"
> >>>>>>> The versions should be 11.
> >>>>>>>
> >>>>>>> 153. ShareGroupDescription.coordinator(): The description says "The
> >>>> share
> >>>>>>> group coordinator". Is that the GC or SC?
> >>>>>>>
> >>>>>>> 154. "A share group has only three states - EMPTY , STABLE and
> DEAD".
> >>>>>>> What about UNKNOWN?
> >>>>>>>
> >>>>>>> 155. WriteShareGroupState: StateEpoch is at the group level, not
> >>>>>> partition
> >>>>>>> level, right?
> >>>>>>>
> >>>>>>> 156. ShareSnapshotValue: Is StartOffset redundant since it's the
> same
> >>>> as
> >>>>>>> the smallest FirstOffset in StateBatches?
> >>>>>>>
> >>>>>>> 157. Every leader change forces a ShareSnapshotValue write to
> persist
> >>>> the
> >>>>>>> new leader epoch. Is that a concern? An alternative is to include
> >>>>>>> leaderEpoch in ShareUpdateValue.
> >>>>>>>
> >>>>>>> 158. ReadShareGroupOffsetsState: The state is the offsets. Should
> we
> >>>>>> rename
> >>>>>>> it to something like ReadShareGroupStartOffset?
> >>>>>>>
> >>>>>>> 159. members are assigned members round-robin => members are
> assigned
> >>>>>>> round-robin
> >>>>>>>
> >>>>>>> 160. "may called": typo
> >>>>>>>
> >>>>>>> Jun
> >>>>>>>
> >>>>>>> On Mon, Apr 29, 2024 at 10:11 AM Andrew Schofield <
> >>>>>> andrew_schofi...@live.com>
> >>>>>>> wrote:
> >>>>>>>
> >>>>>>>> Hi Jun,
> >>>>>>>> Thanks for the reply and sorry for the delay in responding.
> >>>>>>>>
> >>>>>>>> 123. Yes, I didn’t quite get your point earlier. The member
> >>>>>>>> epoch is bumped by the GC when it sends a new assignment.
> >>>>>>>> When the member sends its next heartbeat, it echoes back
> >>>>>>>> the member epoch, which will confirm the receipt of the
> >>>>>>>> assignment. It would send the same member epoch even
> >>>>>>>> after recovery of a network disconnection, so that should
> >>>>>>>> be sufficient to cope with this eventuality.
> >>>>>>>>
> >>>>>>>> 125. Yes, I have added it to the table which now matches
> >>>>>>>> the text earlier in the KIP. Thanks.
> >>>>>>>>
> >>>>>>>> 140. Yes, I have added it to the table which now matches
> >>>>>>>> the text earlier in the KIP. I’ve also added more detail for
> >>>>>>>> the case where the entire share group is being deleted.
> >>>>>>>>
> >>>>>>>> 141. Yes! Sorry for confusing things.
> >>>>>>>>
> >>>>>>>> Back to the original question for this point. To delete a share
> >>>>>>>> group, should the GC write a tombstone for each
> >>>>>>>> ShareGroupMemberMetadata record?
> >>>>>>>>
> >>>>>>>> Tombstones are necessary to delete ShareGroupMemberMetadata
> >>>>>>>> records. But, deletion of a share group is only possible when
> >>>>>>>> the group is already empty, so the tombstones will have
> >>>>>>>> been written as a result of the members leaving the group.
> >>>>>>>>
> >>>>>>>> 143. Yes, that’s right.
> >>>>>>>>
> >>>>>>>> 147. The measurement is certainly from the point of view
> >>>>>>>> of the client, but it’s driven by sending and receiving heartbeats
> >>>>>>>> rather than whether the client triggered the rebalance itself.
> >>>>>>>> The client decides when it enters and leaves reconciliation
> >>>>>>>> of the assignment, and measures this period.
> >>>>>>>>
> >>>>>>>>
> >>>>>>>> Thanks,
> >>>>>>>> Andrew
> >>>>>>>>
> >>>>>>>>
> >>>>>>>>> On 26 Apr 2024, at 09:43, Jun Rao <j...@confluent.io.INVALID>
> >> wrote:
> >>>>>>>>>
> >>>>>>>>> Hi, Andrew,
> >>>>>>>>>
> >>>>>>>>> Thanks for the reply.
> >>>>>>>>>
> >>>>>>>>> 123. "Rather than add the group epoch to the
> ShareGroupHeartbeat, I
> >>>>>> have
> >>>>>>>>> decided to go for TopicPartitions in ShareGroupHeartbeatRequest
> >> which
> >>>>>>>>> mirrors ConsumerGroupHeartbeatRequest."
> >>>>>>>>> ShareGroupHeartbeat.MemberEpoch is the group epoch, right? Is
> that
> >>>>>> enough
> >>>>>>>>> for confirming the receipt of the new assignment?
> >>>>>>>>>
> >>>>>>>>> 125. This also means that "Alter share group offsets" needs to
> >> write
> >>>> a
> >>>>>>>>> ShareGroupPartitionMetadata record, if the partition is not
> already
> >>>>>>>>> initialized.
> >>>>>>>>>
> >>>>>>>>> 140. In the table for "Delete share group offsets", we need to
> add
> >> a
> >>>>>> step
> >>>>>>>>> to write a ShareGroupPartitionMetadata record with
> DeletingTopics.
> >>>>>>>>>
> >>>>>>>>> 141. Hmm, ShareGroupMemberMetadata is stored in the
> >>>> __consumer_offsets
> >>>>>>>>> topic, which is a compacted topic, right?
> >>>>>>>>>
> >>>>>>>>> 143. So, the client sends DescribeShareGroupOffsets requests to
> GC,
> >>>>>> which
> >>>>>>>>> then forwards it to SC?
> >>>>>>>>>
> >>>>>>>>> 147. I guess a client only knows the rebalance triggered by
> itself,
> >>>> but
> >>>>>>>> not
> >>>>>>>>> the ones triggered by other members or topic/partition changes?
> >>>>>>>>>
> >>>>>>>>> Jun
> >>>>>>>>>
> >>>>>>>>> On Thu, Apr 25, 2024 at 4:19 AM Andrew Schofield <
> >>>>>>>> andrew_schofi...@live.com>
> >>>>>>>>> wrote:
> >>>>>>>>>
> >>>>>>>>>> Hi Jun,
> >>>>>>>>>> Thanks for the response.
> >>>>>>>>>>
> >>>>>>>>>> 123. Of course, ShareGroupHearbeat started off as
> >>>>>> ConsumerGroupHeartbeat
> >>>>>>>>>> and then unnecessary fields were removed. In the network issue
> >> case,
> >>>>>>>>>> there is not currently enough state being exchanged to be sure
> an
> >>>>>>>>>> assignment
> >>>>>>>>>> was received.
> >>>>>>>>>>
> >>>>>>>>>> Rather than add the group epoch to the ShareGroupHeartbeat, I
> have
> >>>>>>>> decided
> >>>>>>>>>> to go for TopicPartitions in ShareGroupHeartbeatRequest which
> >>>> mirrors
> >>>>>>>>>> ConsumerGroupHeartbeatRequest. It means the share group member
> >> does
> >>>>>>>>>> confirm the assignment it is using, and that can be used by the
> GC
> >>>> to
> >>>>>>>>>> safely
> >>>>>>>>>> stop repeating the assignment in heartbeat responses.
> >>>>>>>>>>
> >>>>>>>>>> 125. Ah, yes. This is indeed something possible with a consumer
> >>>> group
> >>>>>>>>>> and share groups should support it too. This does of course
> imply
> >>>> that
> >>>>>>>>>> ShareGroupPartitionMetadataValue needs an array of partitions,
> not
> >>>>>>>>>> just the number.
> >>>>>>>>>>
> >>>>>>>>>> 140. Yes, good spot. There is an inconsistency here in consumer
> >>>> groups
> >>>>>>>>>> where you can use AdminClient.deleteConsumerGroupOffsets at the
> >>>>>>>>>> partition level, but kafka-consumer-groups.sh --delete only
> >> operates
> >>>>>>>>>> at the topic level.
> >>>>>>>>>>
> >>>>>>>>>> Personally, I don’t think it’s sensible to delete offsets at the
> >>>>>>>> partition
> >>>>>>>>>> level only. You can reset them, but if you’re actively using a
> >> topic
> >>>>>>>> with
> >>>>>>>>>> a share group, I don’t see why you’d want to delete offsets
> rather
> >>>>>> than
> >>>>>>>>>> reset. If you’ve finished using a topic with a share group and
> >> want
> >>>> to
> >>>>>>>>>> clean
> >>>>>>>>>> up, use delete.
> >>>>>>>>>>
> >>>>>>>>>> So, I’ve changed the AdminClient.deleteConsumerGroupOffsets to
> be
> >>>>>>>>>> topic-based and the RPCs behind it.
> >>>>>>>>>>
> >>>>>>>>>> The GC reconciles the cluster state with the
> >>>>>> ShareGroupPartitionMetadata
> >>>>>>>>>> to spot deletion of topics and the like. However, when the
> offsets
> >>>> for
> >>>>>>>>>> a topic were deleted manually, the topic very like still exists
> so
> >>>>>>>>>> reconciliation
> >>>>>>>>>> alone is not going to be able to continue an interrupted
> operation
> >>>>>> that
> >>>>>>>>>> has started. So, I’ve added DeletingTopics back into
> >>>>>>>>>> ShareGroupPartitionMetadata for this purpose. It’s so failover
> of
> >> a
> >>>> GC
> >>>>>>>>>> can continue where it left off rather than leaving fragments
> >> across
> >>>>>> the
> >>>>>>>>>> SCs.
> >>>>>>>>>>
> >>>>>>>>>> 141. That is not required. Because this is not a compacted
> topic,
> >> it
> >>>>>> is
> >>>>>>>>>> not necessary to write tombstones for every key. As long as
> there
> >>>> is a
> >>>>>>>>>> clear and unambiguous record for the deletion of the group, that
> >> is
> >>>>>>>> enough.
> >>>>>>>>>> The tombstone for ShareGroupPartitionMetadata is theoretically
> not
> >>>>>>>>>> required but it’s a single record, rather than one per member,
> so
> >> I
> >>>>>>>> prefer
> >>>>>>>>>> to leave it as a record that the interactions with the SC have
> >> been
> >>>>>>>>>> completed.
> >>>>>>>>>>
> >>>>>>>>>> 142.
> >>>>>>>>>> 142.1. It will prompt the user to confirm they want to continue.
> >>>>>>>>>> This is in common with `kafka-consumer-groups.sh` which
> >> historically
> >>>>>>>>>> has defaulted to --dry-run behaviour, but is due to change to
> >>>>>> prompting
> >>>>>>>>>> if neither --dry-run nor --execute is specified “in a future
> major
> >>>>>>>>>> release”.
> >>>>>>>>>>
> >>>>>>>>>> 142.2. It should support partition-level reset, but only
> >> topic-level
> >>>>>>>>>> delete.
> >>>>>>>>>> I have updated the usage text accordingly. This is in common
> with
> >>>>>>>>>> kafka-consumer-groups.sh.
> >>>>>>>>>>
> >>>>>>>>>> 142.3. --dry-run displays the operation that would be executed.
> >>>>>>>>>>
> >>>>>>>>>> 142.4. The valid values are: Dead, Empty, Stable. Added to the
> >>>>>>>>>> usage text.
> >>>>>>>>>>
> >>>>>>>>>> 143. DescribeShareGroupOffsets is served by the group
> coordinator
> >>>>>>>>>> for this kind of reason.
> >>>>>>>>>>
> >>>>>>>>>> 144. That’s the default. If you haven’t asked to release or
> >> reject,
> >>>> it
> >>>>>>>>>> accepts.
> >>>>>>>>>> This is analogous to fetching and committing offsets in a
> consumer
> >>>>>>>> group.
> >>>>>>>>>>
> >>>>>>>>>> 145. I think this is a good idea, but I would prefer to defer it
> >>>>>> until a
> >>>>>>>>>> future
> >>>>>>>>>> metrics KIP that I have planned. In KIP-932, I have added basic
> >>>>>> metrics
> >>>>>>>>>> only.
> >>>>>>>>>> For example, you’ll see that there’s no concept of lag yet,
> which
> >>>>>> surely
> >>>>>>>>>> will have relevance for share groups. I plan to create and
> deliver
> >>>> the
> >>>>>>>>>> metrics KIP before share groups are declared ready for
> production.
> >>>>>>>>>> I want the new metrics to be developed with the experience of
> >>>> running
> >>>>>>>>>> the code.
> >>>>>>>>>>
> >>>>>>>>>> 146. Milliseconds. KIP updated.
> >>>>>>>>>>
> >>>>>>>>>> 147. There is a membership state machine in the client that
> >>>>>>>>>> changes states as the ShareGroupHeartbeat requests and responses
> >>>>>>>>>> flow. The duration of a rebalance will be shorter from the point
> >> of
> >>>>>>>>>> view of the share-group consumer because it doesn’t have to
> worry
> >>>>>> about
> >>>>>>>>>> rebalance callbacks and committing offsets as the partitions
> move
> >>>>>>>>>> around, but the overall flow is very similar. So, it’s the state
> >>>>>>>>>> transitions
> >>>>>>>>>> that drive the collection of the rebalance metrics.
> >>>>>>>>>>
> >>>>>>>>>> 148. Strangely, none of the existing uses of
> >> records-per-request-avg
> >>>>>>>>>> actually have records-per-request-max. I tend to err on the side
> >> of
> >>>>>>>>>> consistency, but can’t think of any reason not to add this.
> Done.
> >>>>>>>>>>
> >>>>>>>>>> 149. There are several error codes for
> >> WriteShareGroupStateResponse:
> >>>>>>>>>>
> >>>>>>>>>> NOT_COORDINATOR - This is not the share coordinator you’re
> looking
> >>>>>> for.
> >>>>>>>>>> COORDINATOR_NOT_AVAILABLE - The SC can’t
> >>>>>>>>>> COORDINATOR_LOAD_IN_PROGRESS - The SC is replaying the topic.
> >>>>>>>>>> GROUP_ID_NOT_FOUND - The SC doesn’t have state for this group.
> >>>>>>>>>> UNKNOWN_TOPIC_OR_PARTITION - The SC doesn’t have state for this
> >>>>>>>>>> topic-partition.
> >>>>>>>>>> FENCED_STATE_EPOCH - The write has the wrong state epoch.
> >>>>>>>>>> INVALID_REQUEST - There was a problem with the request.
> >>>>>>>>>>
> >>>>>>>>>>
> >>>>>>>>>> Thanks,
> >>>>>>>>>> Andrew
> >>>>>>>>>>
> >>>>>>>>>>
> >>>>>>>>>>> On 24 Apr 2024, at 19:10, Jun Rao <j...@confluent.io.INVALID>
> >>>> wrote:
> >>>>>>>>>>>
> >>>>>>>>>>> Hi, Andrew,
> >>>>>>>>>>>
> >>>>>>>>>>> Thanks for the response.
> >>>>>>>>>>>
> >>>>>>>>>>> 123. I thought the network issue can be covered with the group
> >>>> epoch.
> >>>>>>>>>>> Basically, if the assignment is to be changed, GC bumps up the
> >>>> epoch
> >>>>>>>>>> first,
> >>>>>>>>>>> but doesn't expose the new epoch to members until the
> assignment
> >> is
> >>>>>>>>>>> complete (including initializing the sharePartitionState). Once
> >> the
> >>>>>>>>>>> assignment is complete, GC includes the bumped up epoch and the
> >> new
> >>>>>>>>>>> assignment in the heartbeatResponse. If the next heartbeat
> >> Request
> >>>>>>>>>> includes
> >>>>>>>>>>> the new epoch, it means that the member has received the new
> >>>>>> assignment
> >>>>>>>>>> and
> >>>>>>>>>>> GC can exclude the assignment in the heartbeatResponse.
> >>>>>>>>>>>
> >>>>>>>>>>> 125. "If AlterShareGroupOffsets is called for a topic-partition
> >>>> which
> >>>>>>>> is
> >>>>>>>>>>> not yet in ShareGroupPartitionMetadataValue, it’s not really
> part
> >>>> of
> >>>>>>>> the
> >>>>>>>>>>> group yet. So I think it’s permitted to fail the
> >>>>>> AlterShareGroupOffsets
> >>>>>>>>>>> because the topic-partition would not be part of
> >>>>>> ListShareGroupOffsets
> >>>>>>>>>>> result."
> >>>>>>>>>>> A user may want to initialize SPSO (e.g. based on timestamp)
> >> before
> >>>>>> the
> >>>>>>>>>>> application is first used. If we reject AlterShareGroupOffsets
> >> when
> >>>>>>>>>>> ShareGroupPartitionMetadataValue is empty, the user will be
> >> forced
> >>>> to
> >>>>>>>>>> start
> >>>>>>>>>>> the application with the wrong SPSO first and then reset it,
> >> which
> >>>>>> will
> >>>>>>>>>> be
> >>>>>>>>>>> inconvenient.
> >>>>>>>>>>>
> >>>>>>>>>>> 140. AdminClient.deleteShareGroupOffsets allows deletion of
> SPSO
> >>>> for
> >>>>>>>>>>> individual partitions, but ShareGroupPartitionMetadataValue
> only
> >>>>>> tracks
> >>>>>>>>>>> consecutive partitions.
> >>>>>>>>>>>
> >>>>>>>>>>> 141. Delete share group: Should GC also write a tombstone for
> >> each
> >>>>>>>>>>> ShareGroupMemberMetadata record?
> >>>>>>>>>>>
> >>>>>>>>>>> 142. kafka-share-groups.sh
> >>>>>>>>>>> 142.1 What happens if neither --dry-run nor --execute is
> >> specified
> >>>>>> for
> >>>>>>>>>>> reset-offsets?
> >>>>>>>>>>> 142.2 Should it support --delete-offsets and --reset-offsets at
> >> the
> >>>>>>>>>>> partition level to match the AdminClient api?
> >>>>>>>>>>> 142.3 How useful is --dry-run? The corresponding RPCs don't
> carry
> >>>> the
> >>>>>>>>>>> dry-run flag.
> >>>>>>>>>>> 142.4 --state [String]: What are the valid state values?
> >>>>>>>>>>>
> >>>>>>>>>>> 143. DescribeShareGroupOffsets is served by the share-partition
> >>>>>> leader.
> >>>>>>>>>> How
> >>>>>>>>>>> could a user describe the share group offset when there is no
> >>>> member?
> >>>>>>>>>>>
> >>>>>>>>>>> 144. kafka-console-share-consumer.sh: Is there an option to
> >> accept
> >>>>>>>>>> consumed
> >>>>>>>>>>> messages?
> >>>>>>>>>>>
> >>>>>>>>>>> 145. It would be useful to add a broker side metric that
> measures
> >>>> the
> >>>>>>>>>>> pressure from group.share.partition.max.record.locks, e.g. the
> >>>>>> fraction
> >>>>>>>>>> of
> >>>>>>>>>>> the time that SPL is blocked because
> >>>>>>>>>> group.share.partition.max.record.locks
> >>>>>>>>>>> is reached.
> >>>>>>>>>>>
> >>>>>>>>>>> 146. heartbeat-response-time-max: What's the unit?
> >>>>>>>>>>>
> >>>>>>>>>>> 147. rebalance related metrics: How does a share consumer know
> >> when
> >>>>>>>> there
> >>>>>>>>>>> is a rebalance and how does it measure the rebalance time?
> >>>>>>>>>>>
> >>>>>>>>>>> 148. records-per-request-avg: Should we pair it with
> >>>>>>>>>>> records-per-request-max?
> >>>>>>>>>>>
> >>>>>>>>>>> 149. If the shareGroupState is not available, what error code
> is
> >>>> used
> >>>>>>>> in
> >>>>>>>>>>> WriteShareGroupStateResponse?
> >>>>>>>>>>>
> >>>>>>>>>>> Jun
> >>>>>>>>>>>
> >>>>>>>>>>> On Wed, Apr 24, 2024 at 7:12 AM Andrew Schofield <
> >>>>>>>>>> andrew_schofi...@live.com>
> >>>>>>>>>>> wrote:
> >>>>>>>>>>>
> >>>>>>>>>>>> Hi Jun,
> >>>>>>>>>>>> Thanks for your reply.
> >>>>>>>>>>>>
> >>>>>>>>>>>> 123. The GC only sends the assignment in ShareGroupHeartbeat
> >>>>>>>>>>>> response if the member has just joined, or the assignment has
> >> been
> >>>>>>>>>>>> recalculated. This latter condition is met when the GC fails
> >> over.
> >>>>>>>>>>>>
> >>>>>>>>>>>> With a consumer group, it works a little differently. The
> >>>>>> heartbeating
> >>>>>>>>>>>> occurs asynchronously with the reconciliation process in the
> >>>> client.
> >>>>>>>>>>>> The client has reconciliation callbacks, as well as offsets to
> >>>>>> commit
> >>>>>>>>>>>> before it can confirm that revocation has occured. So, it
> might
> >>>> take
> >>>>>>>>>>>> multiple heartbeats to complete the installation of a new
> target
> >>>>>>>>>>>> assignment in the client.
> >>>>>>>>>>>>
> >>>>>>>>>>>> With a share group, it’s more brutal. The GC sends the
> >> assignment
> >>>>>>>>>>>> in a heartbeat response, and the client is assumed to have
> acted
> >>>> on
> >>>>>>>>>>>> It immediately. Since share group assignment is really about
> >>>>>> balancing
> >>>>>>>>>>>> consumers across the available partitions, rather than safely
> >>>>>> handing
> >>>>>>>>>>>> ownership of partitions between consumers, there is less
> >>>>>> coordination.
> >>>>>>>>>>>>
> >>>>>>>>>>>> I wonder whether there is one situation that does need
> >>>> considering.
> >>>>>>>>>>>> If the network connection between the client and the GC is
> lost,
> >>>> it
> >>>>>> is
> >>>>>>>>>>>> possible that a response containing the assignment is lost.
> >> Then,
> >>>>>>>>>>>> the connection will be reestablished, and the assignment will
> >> only
> >>>>>> be
> >>>>>>>>>>>> sent when it’s recalculated. Adding the equivalent of
> >>>>>>>>>>>> ConsumerGroupHeartbeatRequest.TopicPartitions would be one
> >>>>>>>>>>>> way to close that.
> >>>>>>>>>>>>
> >>>>>>>>>>>> 125. Yes, you are right. The reconciliation that I describe in
> >> the
> >>>>>>>>>>>> answer to (54) below is related. If AlterShareGroupOffsets is
> >>>> called
> >>>>>>>>>>>> for a topic-partition which is not yet in
> >>>>>>>>>> ShareGroupPartitionMetadataValue,
> >>>>>>>>>>>> it’s not really part of the group yet. So I think it’s
> permitted
> >>>> to
> >>>>>>>> fail
> >>>>>>>>>>>> the AlterShareGroupOffsets because the topic-partition would
> not
> >>>> be
> >>>>>>>>>>>> part of ListShareGroupOffsets result.
> >>>>>>>>>>>>
> >>>>>>>>>>>> If AlterShareGroupOffsets is called for a topic-partition
> which
> >> is
> >>>>>> in
> >>>>>>>>>>>> ShareGroupPartitionMetadataValue, then just calling
> >>>>>>>>>>>> InitializeShareGroupState to set its offset would be
> sufficient
> >>>>>>>> without
> >>>>>>>>>>>> writing ShareGroupPartitionMetadata again.
> >>>>>>>>>>>>
> >>>>>>>>>>>> 138. Added.
> >>>>>>>>>>>>
> >>>>>>>>>>>> 139. This week, I have been experimenting with using the
> >> existing
> >>>>>>>>>>>> consumer metrics with share consumer code. That was my plan to
> >> get
> >>>>>>>>>>>> started with metrics. While they work to some extent, I am not
> >>>>>>>> entirely
> >>>>>>>>>>>> happy with the result.
> >>>>>>>>>>>>
> >>>>>>>>>>>> I have added a basic set of share consumer-specific metrics to
> >>>>>> KIP-932
> >>>>>>>>>>>> which I think is a step in the right direction. I anticipate a
> >>>>>> future
> >>>>>>>>>> KIP
> >>>>>>>>>>>> that defines many more metrics and tackles concepts such as
> what
> >>>>>>>>>>>> “lag” means for a share group. The metrics I’ve included are
> >>>> simply
> >>>>>>>>>>>> counting and measuring the operations, which is a good start.
> >>>>>>>>>>>>
> >>>>>>>>>>>> 54. I think either with or without
> >>>> InitializingTopics/DeletingTopics
> >>>>>>>> in
> >>>>>>>>>>>> ShareGroupPartitionMetadataValue works. However, I think there
> >> is
> >>>>>>>>>>>> a deeper point behind what you said. The GC needs to be
> >> responsive
> >>>>>>>>>>>> to topic creation and deletion, and addition of partitions in
> >>>> cases
> >>>>>>>>>> where
> >>>>>>>>>>>> it doesn’t agree with InitializingTopics/DeletingTopics. So,
> we
> >>>> are
> >>>>>>>>>>>> probably not saving anything by having those fields. As a
> >> result,
> >>>> I
> >>>>>>>> have
> >>>>>>>>>>>> removed InitializingTopics and DeletingTopics and updated the
> >> KIP
> >>>>>>>>>>>> accordingly. The GC needs to reconcile its view of the
> >> initialised
> >>>>>>>>>>>> topic-partitions with ShareGroupPartitionMetadataValue and it
> >> will
> >>>>>>>>>>>> initialize or delete share-group state accordingly.
> >>>>>>>>>>>>
> >>>>>>>>>>>>
> >>>>>>>>>>>> I have made one more change to the KIP. The
> >> SharePartitionAssignor
> >>>>>>>>>>>> interface has been renamed to
> >>>>>>>>>>>> o.a.k.coordinator.group.assignor.ShareGroupPartitionAssignor
> and
> >>>> it
> >>>>>>>>>>>> now extends the PartitionAssignor interface. It’s essentially
> a
> >>>>>> marker
> >>>>>>>>>>>> of which partition assignors can be used with share groups.
> >>>>>>>>>>>>
> >>>>>>>>>>>> Thanks,
> >>>>>>>>>>>> Andrew
> >>>>>>>>>>>>
> >>>>>>>>>>>>> On 23 Apr 2024, at 18:27, Jun Rao <j...@confluent.io.INVALID>
> >>>>>> wrote:
> >>>>>>>>>>>>>
> >>>>>>>>>>>>> Hi, Andrew,
> >>>>>>>>>>>>>
> >>>>>>>>>>>>> Thanks for the reply.
> >>>>>>>>>>>>>
> >>>>>>>>>>>>> 123. "it doesn’t need to confirm the assignment back to the
> >> GC."
> >>>>>>>>>>>>> Hmm, I thought the member needs to confirm the assignment to
> GC
> >>>> to
> >>>>>>>>>>>>> avoid GC including the assignment in the heartbeat response
> >>>>>>>>>>>> continuously. I
> >>>>>>>>>>>>> assume this is done by including the new group epoch in the
> >>>>>> heartbeat
> >>>>>>>>>>>>> response.
> >>>>>>>>>>>>>
> >>>>>>>>>>>>> 125. It's possible that the share partition has never been
> >>>>>>>> initialized
> >>>>>>>>>>>> when
> >>>>>>>>>>>>> AlterShareGroupOffsets is called. If GC doesn't write
> >>>>>>>>>>>>> ShareGroupPartitionMetadata and the GC fails over, it would
> >>>>>>>>>> reinitialize
> >>>>>>>>>>>>> the share partition and lose the effect of
> >>>> AlterShareGroupOffsets.
> >>>>>> If
> >>>>>>>>>> the
> >>>>>>>>>>>>> partition has already been initialized and it's recorded
> >>>>>>>>>>>>> in ShareGroupPartitionMetadata, it's possible not to write
> >>>>>>>>>>>>> ShareGroupPartitionMetadata again when handling
> >>>>>>>> AlterShareGroupOffsets.
> >>>>>>>>>>>>>
> >>>>>>>>>>>>> 138. Could we add the flow in GC when a topic is deleted?
> >>>>>>>>>>>>>
> >>>>>>>>>>>>> 139. Do we need to add any metrics in KafkaShareConsumer?
> >>>>>>>>>>>>>
> >>>>>>>>>>>>> 54. "I don’t think there is any redundancy. The
> >>>>>>>>>> ShareGroupMemberMetadata
> >>>>>>>>>>>>> does include a list of subscribed topics. However, if there
> is
> >> a
> >>>>>>>> period
> >>>>>>>>>>>> of
> >>>>>>>>>>>>> time in which no members are subscribed to a particular
> topic,
> >> it
> >>>>>>>> does
> >>>>>>>>>>>> not
> >>>>>>>>>>>>> mean that the topic’s ShareGroupState should be immediately
> >>>>>> removed,
> >>>>>>>>>> but
> >>>>>>>>>>>> it
> >>>>>>>>>>>>> does mean that there will be no ShareGroupMemberMetadata
> >> records
> >>>>>>>>>>>> containing
> >>>>>>>>>>>>> that topic."
> >>>>>>>>>>>>> I am still trying to understand the value of
> InitializingTopics
> >>>> and
> >>>>>>>>>>>>> DeletingTopics in ShareGroupPartitionMetadataValue. They are
> >> used
> >>>>>> to
> >>>>>>>>>>>>> remember the intention of an operation. However, GC still
> needs
> >>>> to
> >>>>>>>>>> handle
> >>>>>>>>>>>>> the case when the intention is not safely recorded. If GC
> wants
> >>>> to
> >>>>>>>>>>>>> initialize a new topic/partition, a simpler approach is for
> it
> >> to
> >>>>>>>> send
> >>>>>>>>>> an
> >>>>>>>>>>>>> InitializeShareGroupState to the share coordinator and after
> >>>>>>>> receiving
> >>>>>>>>>> a
> >>>>>>>>>>>>> success response, write ShareGroupPartitionMetadataValue with
> >> the
> >>>>>>>>>>>>> initialized partition included in InitializedTopics. This
> >> saves a
> >>>>>>>>>> record
> >>>>>>>>>>>>> write. It's possible for GC to fail in between the two steps.
> >> On
> >>>>>>>>>>>> failover,
> >>>>>>>>>>>>> the new GC just repeats the process. The case that you
> >> mentioned
> >>>>>>>> above
> >>>>>>>>>>>> can
> >>>>>>>>>>>>> still be achieved. If a partition is in InitializedTopics of
> >>>>>>>>>>>>> ShareGroupPartitionMetadataValue, but no member subscribes to
> >> it,
> >>>>>> we
> >>>>>>>>>> can
> >>>>>>>>>>>>> still keep the ShareGroupState as long as the topic still
> >> exists.
> >>>>>> The
> >>>>>>>>>>>> same
> >>>>>>>>>>>>> optimization could be applied to DeletingTopics too.
> >>>>>>>>>>>>>
> >>>>>>>>>>>>> Jun
> >>>>>>>>>>>>>
> >>>>>>>>>>>>>
> >>>>>>>>>>>>> On Tue, Apr 23, 2024 at 3:57 AM Andrew Schofield <
> >>>>>>>>>>>> andrew_schofi...@live.com>
> >>>>>>>>>>>>> wrote:
> >>>>>>>>>>>>>
> >>>>>>>>>>>>>> Hi Jun,
> >>>>>>>>>>>>>> Thanks for the reply.
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>>> 123. Every time the GC fails over, it needs to recompute the
> >>>>>>>>>> assignment
> >>>>>>>>>>>>>> for every member. However, the impact of re-assignment is
> not
> >>>> that
> >>>>>>>>>>>> onerous.
> >>>>>>>>>>>>>> If the recomputed assignments are the same, which they may
> >> well
> >>>>>> be,
> >>>>>>>>>>>> there
> >>>>>>>>>>>>>> is no impact on the members at all.
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>>> On receiving the new assignment, the member adjusts the
> >>>>>>>>>> topic-partitions
> >>>>>>>>>>>>>> in its share sessions, removing those which were revoked and
> >>>>>> adding
> >>>>>>>>>>>> those
> >>>>>>>>>>>>>> which were assigned. It is able to acknowledge the records
> it
> >>>>>>>> fetched
> >>>>>>>>>>>> from
> >>>>>>>>>>>>>> the partitions which have just been revoked, and it doesn’t
> >> need
> >>>>>> to
> >>>>>>>>>>>> confirm
> >>>>>>>>>>>>>> the assignment back to the GC.
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>>> 125. I don’t think the GC needs to write
> >>>>>> ShareGroupPartitionMetadata
> >>>>>>>>>>>>>> when processing AlterShareGroupOffsets. This is because the
> >>>>>>>> operation
> >>>>>>>>>>>>>> happens as a result of an explicit administrative action and
> >> it
> >>>> is
> >>>>>>>>>>>> possible
> >>>>>>>>>>>>>> to return a specific error code for each topic-partition.
> The
> >>>>>> cases
> >>>>>>>>>>>> where
> >>>>>>>>>>>>>> ShareGroupPartitionMetadata is used are when a topic is
> added
> >> or
> >>>>>>>>>> removed
> >>>>>>>>>>>>>> from the subscribed topics, or the number of partitions
> >> changes.
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>>> 130. I suppose that limits the minimum lock timeout for a
> >>>> cluster
> >>>>>> to
> >>>>>>>>>>>>>> prevent
> >>>>>>>>>>>>>> a group from having an excessively low value. Config added.
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>>> 131. I have changed it to
> >>>> group.share.partition.max.record.locks.
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>>> 136.  When GC failover occurs, the GC gaining ownership of a
> >>>>>>>> partition
> >>>>>>>>>>>> of
> >>>>>>>>>>>>>> the __consumer_offsets topic replays the records to build
> its
> >>>>>> state.
> >>>>>>>>>>>>>> In the case of a share group, it learns:
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>>> * The share group and its group epoch (ShareGroupMetadata)
> >>>>>>>>>>>>>> * The list of members (ShareGroupMemberMetadata)
> >>>>>>>>>>>>>> * The list of share-partitions (ShareGroupPartitionMetadata)
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>>> It will recompute the assignments in order to respond to
> >>>>>>>>>>>>>> ShareGroupHeartbeat requests. As a result, it bumps the
> group
> >>>>>> epoch.
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>>> I will update the KIP accordingly to confirm the behaviour.
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>>> 137.1: The GC and the SPL report the metrics in the
> >>>>>>>>>>>>>> group-coordinator-metrics
> >>>>>>>>>>>>>> group. Unlike consumer groups in which the GC performs
> offset
> >>>>>>>> commit,
> >>>>>>>>>>>>>> the share group equivalent is performed by the SPL. So, I’ve
> >>>>>> grouped
> >>>>>>>>>> the
> >>>>>>>>>>>>>> concepts which relate to the group in
> >> group-coordinator-metrics.
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>>> The SC reports the metrics in the share-coordinator-metrics
> >>>> group.
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>>> 137.2: There is one metric in both groups -
> >> partition-load-time.
> >>>>>> In
> >>>>>>>>>> the
> >>>>>>>>>>>> SC
> >>>>>>>>>>>>>> group,
> >>>>>>>>>>>>>> it refers to the time loading data from the share-group
> state
> >>>>>> topic
> >>>>>>>> so
> >>>>>>>>>>>> that
> >>>>>>>>>>>>>> a ReadShareGroupState request can be answered. In the GC
> >> group,
> >>>>>>>>>>>>>> it refers to the time to read the state from the persister.
> >>>> Apart
> >>>>>>>> from
> >>>>>>>>>>>> the
> >>>>>>>>>>>>>> interbroker RPC latency of the read, they’re likely to be
> very
> >>>>>>>> close.
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>>> Later, for a cluster which is using a custom persister, the
> >>>>>>>>>>>>>> share-coordinator
> >>>>>>>>>>>>>> metrics would likely not be reported, and the persister
> would
> >>>> have
> >>>>>>>> its
> >>>>>>>>>>>> own
> >>>>>>>>>>>>>> metrics.
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>>> 137.3: Correct. Fixed.
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>>> 137.4: Yes, it does include the time to write to the
> internal
> >>>>>> topic.
> >>>>>>>>>>>>>> I’ve tweaked the description.
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>>> Thanks,
> >>>>>>>>>>>>>> Andrew
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>>>> On 22 Apr 2024, at 20:04, Jun Rao <j...@confluent.io.INVALID
> >
> >>>>>>>> wrote:
> >>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>> Hi, Andrew,
> >>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>> Thanks for the reply.
> >>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>> 123. "The share group does not persist the target
> >> assignment."
> >>>>>>>>>>>>>>> What's the impact of this? Everytime that GC fails over, it
> >>>> needs
> >>>>>>>> to
> >>>>>>>>>>>>>>> recompute the assignment for every member. Do we expect the
> >>>>>> member
> >>>>>>>>>>>>>>> assignment to change on every GC failover?
> >>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>> 125. Should the GC also write ShareGroupPartitionMetadata?
> >>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>> 127. So, group epoch is only propagated to SC when
> >>>>>>>>>>>>>>> InitializeShareGroupState request is sent. This sounds
> good.
> >>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>> 130. Should we have a
> >> group.share.min.record.lock.duration.ms
> >>>> to
> >>>>>>>>>> pair
> >>>>>>>>>>>>>> with
> >>>>>>>>>>>>>>> group.share.max.record.lock.duration.ms?
> >>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>> 131. Sounds good. The name
> >>>>>> group.share.record.lock.partition.limit
> >>>>>>>>>>>>>> doesn't
> >>>>>>>>>>>>>>> seem very intuitive. How about something
> >>>>>>>>>>>>>>> like group.share.partition.max.records.pending.ack?
> >>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>> 136. Could we describe the process of GC failover? I guess
> it
> >>>>>> needs
> >>>>>>>>>> to
> >>>>>>>>>>>>>>> compute member reassignment and check if there is any new
> >>>>>>>>>>>> topic/partition
> >>>>>>>>>>>>>>> matching the share subscription. Does it bump up the group
> >>>> epoch?
> >>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>> 137. Metrics:
> >>>>>>>>>>>>>>> 137.1 It would be useful to document who reports each
> metric.
> >>>> Is
> >>>>>> it
> >>>>>>>>>> any
> >>>>>>>>>>>>>>> broker, GC, SC or SPL?
> >>>>>>>>>>>>>>> 137.2 partition-load-time: Is that the loading time at SPL
> or
> >>>> SC?
> >>>>>>>>>>>>>>> 137.3 "The time taken in milliseconds to load the
> share-group
> >>>>>> state
> >>>>>>>>>>>> from
> >>>>>>>>>>>>>>> the share-group state partitions loaded in the last 30
> >>>> seconds."
> >>>>>>>>>>>>>>> The window depends on metrics.num.samples and
> >>>>>>>>>> metrics.sample.window.ms
> >>>>>>>>>>>>>>> and is not always 30 seconds, right?
> >>>>>>>>>>>>>>> 137.4 Could you explain write/write-latency a bit more?
> Does
> >> it
> >>>>>>>>>> include
> >>>>>>>>>>>>>> the
> >>>>>>>>>>>>>>> time to write to the internal topic?
> >>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>> Jun
> >>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>> On Mon, Apr 22, 2024 at 2:57 AM Andrew Schofield <
> >>>>>>>>>>>>>> andrew_schofi...@live.com>
> >>>>>>>>>>>>>>> wrote:
> >>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>> Hi Jun,
> >>>>>>>>>>>>>>>> Thanks for your comments.
> >>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>> 120. Thanks. Fixed.
> >>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>> 121. ShareUpdateValue.SnapshotEpoch indicates which
> snapshot
> >>>>>>>>>>>>>>>> the update applies to. It should of course be the snapshot
> >>>> that
> >>>>>>>>>>>> precedes
> >>>>>>>>>>>>>>>> it in the log. It’s just there to provide a consistency
> >> check.
> >>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>> I also noticed that ShareSnapshotValue was missing
> >> StateEpoch.
> >>>>>> It
> >>>>>>>>>>>>>>>> isn’t any more.
> >>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>> 122. In KIP-848, ConsumerGroupMemberMetadataValue includes
> >>>>>>>>>>>>>>>> GroupEpoch, but in the code it does not. In fact, there is
> >>>>>>>>>>>> considerable
> >>>>>>>>>>>>>>>> divergence between the KIP and the code for this record
> >> value
> >>>>>>>> schema
> >>>>>>>>>>>>>>>> which I expect will be resolved when the migration code
> has
> >>>> been
> >>>>>>>>>>>>>>>> completed.
> >>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>> 123. The share group does not persist the target
> assignment.
> >>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>> 124. Share groups have three kinds of record:
> >>>>>>>>>>>>>>>> i) ShareGroupMetadata
> >>>>>>>>>>>>>>>> - this contains the group epoch and is written whenever
> the
> >>>>>> group
> >>>>>>>>>>>>>>>> epoch changes.
> >>>>>>>>>>>>>>>> ii) ShareGroupMemberMetadata
> >>>>>>>>>>>>>>>> - this does not contain the group epoch.
> >>>>>>>>>>>>>>>> iii) ShareGroupPartitionMetadata
> >>>>>>>>>>>>>>>> - this currently contains the epoch, but I think that is
> >>>>>>>>>> unnecessary.
> >>>>>>>>>>>>>>>> For one thing, the ConsumerGroupPartitionMetadata
> definition
> >>>>>>>>>>>>>>>> contains the group epoch, but the value appears never to
> be
> >>>> set.
> >>>>>>>>>>>>>>>> David Jacot confirms that it’s not necessary and is
> removing
> >>>> it.
> >>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>> I have removed the Epoch from ShareGroupPartitionMetadata.
> >>>>>>>>>>>>>>>> The only purpose of the persisting the epoch for a share
> >> group
> >>>>>> is
> >>>>>>>> so
> >>>>>>>>>>>>>> that
> >>>>>>>>>>>>>>>> when a group coordinator takes over the share group, it is
> >>>> able
> >>>>>> to
> >>>>>>>>>>>>>>>> continue the sequence of epochs.
> >> ShareGroupMetadataValue.Epoch
> >>>>>>>>>>>>>>>> is used for this.
> >>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>> 125. The group epoch will be incremented in this case and
> >>>>>>>>>>>>>>>> consequently a ShareGroupMetadata will be written. KIP
> >>>> updated.
> >>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>> 126. Not directly. A share group can only be deleted when
> it
> >>>> has
> >>>>>>>> no
> >>>>>>>>>>>>>>>> members, so the tombstones for ShareGroupMemberMetadata
> will
> >>>>>>>>>>>>>>>> have been written when the members left. I have clarified
> >>>> this.
> >>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>> 127. The share coordinator is ignorant of the group epoch.
> >>>> When
> >>>>>>>> the
> >>>>>>>>>>>>>>>> group coordinator is initializing the share-group state
> the
> >>>>>> first
> >>>>>>>>>> time
> >>>>>>>>>>>>>> that
> >>>>>>>>>>>>>>>> a share-partition is being added to an assignment in the
> >>>> group,
> >>>>>>>> the
> >>>>>>>>>>>>>>>> group epoch is used as the state epoch. But as the group
> >> epoch
> >>>>>>>>>>>>>>>> increases over time, the share coordinator is entirely
> >>>> unaware.
> >>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>> When the first consumer for a share-partition fetches
> >> records
> >>>>>>>> from a
> >>>>>>>>>>>>>>>> share-partition leader, the SPL calls the share
> coordinator
> >> to
> >>>>>>>>>>>>>>>> ReadShareGroupState. If the SPL has previously read the
> >>>>>>>> information
> >>>>>>>>>>>>>>>> and again it’s going from 0 to 1 consumer, it confirms
> it's
> >> up
> >>>>>> to
> >>>>>>>>>> date
> >>>>>>>>>>>>>> by
> >>>>>>>>>>>>>>>> calling ReadShareGroupOffsetsState.
> >>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>> Even if many consumers are joining at the same time, any
> >>>>>>>>>>>> share-partition
> >>>>>>>>>>>>>>>> which is being initialized will not be included in their
> >>>>>>>>>> assignments.
> >>>>>>>>>>>>>> Once
> >>>>>>>>>>>>>>>> the initialization is complete, the next rebalance will
> >> assign
> >>>>>> the
> >>>>>>>>>>>>>>>> partition
> >>>>>>>>>>>>>>>> to some consumers which will discover this by
> >>>>>> ShareGroupHeartbeat
> >>>>>>>>>>>>>>>> response. And then, the fetching begins.
> >>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>> If an SPL receives a ShareFetch request before it’s read
> the
> >>>>>> state
> >>>>>>>>>>>>>>>> from the SC, it can make the ShareFetch request wait up to
> >>>>>>>> MaxWaitMs
> >>>>>>>>>>>>>>>> and then it can return an empty set of records if it’s
> still
> >>>> not
> >>>>>>>>>>>> ready.
> >>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>> So, I don’t believe there will be too much load. If a
> topic
> >>>> with
> >>>>>>>>>> many
> >>>>>>>>>>>>>>>> partitions is added to the subscribed topics for a share
> >>>> group,
> >>>>>>>> the
> >>>>>>>>>>>> fact
> >>>>>>>>>>>>>>>> that the assignments will only start to include the
> >> partitions
> >>>>>> as
> >>>>>>>>>>>> their
> >>>>>>>>>>>>>>>> initialization completes should soften the impact.
> >>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>> 128, 129: The “proper” way to turn on this feature when
> it’s
> >>>>>>>>>> finished
> >>>>>>>>>>>>>> will
> >>>>>>>>>>>>>>>> be using `group.coordinator.rebalance.protocols` and
> >>>>>>>>>> `group.version`.
> >>>>>>>>>>>>>>>> While it’s in Early Access and for test cases, the
> >>>>>>>>>>>> `group.share.enable`
> >>>>>>>>>>>>>>>> configuration will turn it on.
> >>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>> I have described `group.share.enable` as an internal
> >>>>>> configuration
> >>>>>>>>>> in
> >>>>>>>>>>>>>>>> the KIP.
> >>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>> 130. The config `group.share.record.lock.duration.ms`
> >> applies
> >>>>>> to
> >>>>>>>>>>>> groups
> >>>>>>>>>>>>>>>> which do not specify a group-level configuration for lock
> >>>>>>>> duration.
> >>>>>>>>>>>> The
> >>>>>>>>>>>>>>>> minimum and maximum for this configuration are intended to
> >>>> give
> >>>>>> it
> >>>>>>>>>>>>>>>> sensible bounds.
> >>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>> If a group does specify its own `
> >>>>>>>>>> group.share.record.lock.duration.ms
> >>>>>>>>>>>> `,
> >>>>>>>>>>>>>>>> the broker-level `group.share.max.record.lock.duration.ms
> `
> >>>>>> gives
> >>>>>>>>>> the
> >>>>>>>>>>>>>>>> cluster administrator a way of setting a maximum value for
> >> all
> >>>>>>>>>> groups.
> >>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>> While editing, I renamed `
> >>>>>> group.share.record.lock.duration.max.ms
> >>>>>>>> `
> >>>>>>>>>> to
> >>>>>>>>>>>>>>>> `group.share.max.record.lock.duration.ms` for consistency
> >>>> with
> >>>>>>>> the
> >>>>>>>>>>>>>>>> rest of the min/max configurations.
> >>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>> 131. This is the limit per partition so you can go wider
> >> with
> >>>>>>>>>> multiple
> >>>>>>>>>>>>>>>> partitions.
> >>>>>>>>>>>>>>>> I have set the initial value low for safety. I expect to
> be
> >>>> able
> >>>>>>>> to
> >>>>>>>>>>>>>>>> increase this
> >>>>>>>>>>>>>>>> significantly when we have mature code which has been
> >>>>>>>> battle-tested.
> >>>>>>>>>>>>>>>> Rather than try to guess how high it can safely go, I’ve
> >> erred
> >>>>>> on
> >>>>>>>>>> the
> >>>>>>>>>>>>>> side
> >>>>>>>>>>>>>>>> of
> >>>>>>>>>>>>>>>> caution and expect to open it up in a future KIP.
> >>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>> 132. Good catch. The problem is that I have missed two
> group
> >>>>>>>>>>>>>>>> configurations,
> >>>>>>>>>>>>>>>> now added. These are group.share.session.timeout.ms and
> >>>>>>>>>>>>>>>> group.share.heartbeat.timeout.ms . The configurations you
> >>>>>>>> mentioned
> >>>>>>>>>>>>>>>> are the bounds for the group-level configurations.
> >>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>> 133. The name `group.share.max.size` was chosen to mirror
> >> the
> >>>>>>>>>> existing
> >>>>>>>>>>>>>>>> `group.consumer.max.size`.
> >>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>> 134. It is intended to be a list of all of the valid
> >> assignors
> >>>>>> for
> >>>>>>>>>> the
> >>>>>>>>>>>>>>>> cluster.
> >>>>>>>>>>>>>>>> When the assignors are configurable at the group level,
> the
> >>>>>> group
> >>>>>>>>>>>>>>>> configuration
> >>>>>>>>>>>>>>>> will only be permitted to name an assignor which is in
> this
> >>>>>> list.
> >>>>>>>>>> For
> >>>>>>>>>>>>>> now,
> >>>>>>>>>>>>>>>> there
> >>>>>>>>>>>>>>>> is no group configuration for assignor, so all groups get
> >> the
> >>>>>> one
> >>>>>>>>>> and
> >>>>>>>>>>>>>> only
> >>>>>>>>>>>>>>>> assignor in the list.
> >>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>> 135. It’s the number of threads per broker. For a cluster
> >>>> with a
> >>>>>>>>>> small
> >>>>>>>>>>>>>>>> number of
> >>>>>>>>>>>>>>>> of brokers and a lot of share group activity, it may be
> >>>>>>>> appropriate
> >>>>>>>>>> to
> >>>>>>>>>>>>>>>> increase
> >>>>>>>>>>>>>>>> this. We will be able to give tuning advice once we have
> >>>>>>>> experience
> >>>>>>>>>> of
> >>>>>>>>>>>>>> the
> >>>>>>>>>>>>>>>> performance impact of increasing it.
> >>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>> Thanks,
> >>>>>>>>>>>>>>>> Andrew
> >>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>> On 20 Apr 2024, at 00:14, Jun Rao
> <j...@confluent.io.INVALID
> >>>
> >>>>>>>>>> wrote:
> >>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>> Hi, Andrew,
> >>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>> Thanks for the reply. A few more comments.
> >>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>> 120. There is still reference to
> ConsumerGroupMetadataKey.
> >>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>> 121. ShareUpdateValue.SnapshotEpoch: Should we change it
> >>>> since
> >>>>>>>> it's
> >>>>>>>>>>>>>> not a
> >>>>>>>>>>>>>>>>> snapshot?
> >>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>> 122. ConsumerGroupMemberMetadataValue includes epoch, but
> >>>>>>>>>>>>>>>>> ShareGroupMemberMetadataValue does not. Do we know how
> the
> >>>>>> epoch
> >>>>>>>> is
> >>>>>>>>>>>>>> being
> >>>>>>>>>>>>>>>>> used in the consumer group and whether it's needed in the
> >>>> share
> >>>>>>>>>>>> group?
> >>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>> 123. There is no equivalent of
> >>>>>>>> ConsumerGroupTargetAssignmentMember
> >>>>>>>>>>>> for
> >>>>>>>>>>>>>>>>> ShareGroup. How does the shareGroup persist the member
> >>>>>>>> assignment?
> >>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>> 124. Assign a share-partition: "When a topic-partition is
> >>>>>>>> assigned
> >>>>>>>>>>>> to a
> >>>>>>>>>>>>>>>>> member of a share group for the first time, the group
> >>>>>> coordinator
> >>>>>>>>>>>>>> writes
> >>>>>>>>>>>>>>>> a
> >>>>>>>>>>>>>>>>> ShareGroupPartitionMetadata record to the
> >> __consumer_offsets
> >>>>>>>>>> topic".
> >>>>>>>>>>>>>>>>> When does the coordinator write ShareGroupMetadata with
> the
> >>>>>>>> bumped
> >>>>>>>>>>>>>>>> epoch?
> >>>>>>>>>>>>>>>>> In general, is there a particular order to bump up the
> >> epoch
> >>>> in
> >>>>>>>>>>>>>> different
> >>>>>>>>>>>>>>>>> records? When can the new epoch be exposed to the
> >>>>>>>>>>>> sharePartitionLeader?
> >>>>>>>>>>>>>>>> It
> >>>>>>>>>>>>>>>>> would be useful to make this clear in other state
> changing
> >>>>>>>>>> operations
> >>>>>>>>>>>>>>>> too.
> >>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>> 125. "Alter share group offsets Group coordinator and
> share
> >>>>>>>>>>>> coordinator
> >>>>>>>>>>>>>>>>> Only empty share groups support this operation. The group
> >>>>>>>>>> coordinator
> >>>>>>>>>>>>>>>> sends
> >>>>>>>>>>>>>>>>> an InitializeShareGroupState  request to the share
> >>>> coordinator.
> >>>>>>>> The
> >>>>>>>>>>>>>> share
> >>>>>>>>>>>>>>>>> coordinator writes a ShareSnapshot record with the new
> >> state
> >>>>>>>> epoch
> >>>>>>>>>> to
> >>>>>>>>>>>>>> the
> >>>>>>>>>>>>>>>>> __share_group_state  topic."
> >>>>>>>>>>>>>>>>> Does the operation need to write
> >> ShareGroupPartitionMetadata
> >>>>>> and
> >>>>>>>>>>>>>>>>> ShareGroupMetadata (for the new epoch)?
> >>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>> 126. Delete share group: Does this operation also need to
> >>>>>> write a
> >>>>>>>>>>>>>>>> tombstone
> >>>>>>>>>>>>>>>>> for the ShareGroupMemberMetadata record?
> >>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>> 127. I was thinking about the impact on a new consumer
> >>>> joining
> >>>>>>>> the
> >>>>>>>>>>>>>>>>> shareGroup. This causes the GroupCoordinator and the
> >>>>>>>>>> ShareCoordinator
> >>>>>>>>>>>>>> to
> >>>>>>>>>>>>>>>>> bump up the group epoch, which in turn causes the
> >>>>>> SharePartition
> >>>>>>>>>>>> leader
> >>>>>>>>>>>>>>>> to
> >>>>>>>>>>>>>>>>> reinitialize the state with ReadShareGroupOffsetsState.
> If
> >>>> many
> >>>>>>>>>>>>>> consumers
> >>>>>>>>>>>>>>>>> are joining a shareGroup around the same time, would
> there
> >> be
> >>>>>> too
> >>>>>>>>>>>> much
> >>>>>>>>>>>>>>>> load
> >>>>>>>>>>>>>>>>> for the ShareCoordinator and SharePartition leader?
> >>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>> 128. Since group.share.enable will be replaced by the
> >>>>>>>> group.version
> >>>>>>>>>>>>>>>>> feature, should we make it an internal config?
> >>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>> 129. group.coordinator.rebalance.protocols: this seems
> >>>>>> redundant
> >>>>>>>>>> with
> >>>>>>>>>>>>>>>>> group.share.enable?
> >>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>> 130. Why do we have both
> >> group.share.record.lock.duration.ms
> >>>>>> and
> >>>>>>>>>>>>>>>>> group.share.record.lock.duration.max.ms, each with its
> own
> >>>> max
> >>>>>>>>>>>> value?
> >>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>> 131. group.share.record.lock.partition.limit defaults to
> >> 200.
> >>>>>>>> This
> >>>>>>>>>>>>>> limits
> >>>>>>>>>>>>>>>>> the max degree of consumer parallelism to 200, right? If
> >>>> there
> >>>>>>>> are
> >>>>>>>>>>>>>>>> multiple
> >>>>>>>>>>>>>>>>> records per batch, it could be even smaller.
> >>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>> 132. Why do we need all three of
> >>>>>> group.share.session.timeout.ms,
> >>>>>>>>>>>>>>>>> group.share.min.session.timeout.ms and
> >>>>>>>>>>>>>>>> group.share.max.session.timeout.ms?
> >>>>>>>>>>>>>>>>> Session timeout can't be set by the client. Ditto for
> >>>>>>>>>>>>>>>>> group.share.heartbeat.interval.ms.
> >>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>> 133. group.share.max.size: Would
> >>>>>>>> group.share.max.members.per.group
> >>>>>>>>>>>> be a
> >>>>>>>>>>>>>>>>> more intuitive name?
> >>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>> 134. group.share.assignors: Why does it need to be a
> list?
> >>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>> 135. share.coordinator.threads: Is that per share
> >> coordinator
> >>>>>> or
> >>>>>>>>>> per
> >>>>>>>>>>>>>>>> broker?
> >>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>> Jun
> >>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>> On Tue, Apr 16, 2024 at 3:21 AM Andrew Schofield <
> >>>>>>>>>>>>>>>> andrew_schofi...@live.com>
> >>>>>>>>>>>>>>>>> wrote:
> >>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>> Hi Jun,
> >>>>>>>>>>>>>>>>>> Thanks for you reply.
> >>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>> 42.1. That’s a sensible improvement. Done.
> >>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>> 47,56. Done. All instances of BaseOffset changed to
> >>>>>> FirstOffset.
> >>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>> 105. I think that would be in a future KIP. Personally,
> I
> >>>>>> don’t
> >>>>>>>>>> mind
> >>>>>>>>>>>>>>>> having
> >>>>>>>>>>>>>>>>>> a non-contiguous set of values in this KIP.
> >>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>> 114. Done.
> >>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>> 115. If the poll is just returning a single record
> because
> >>>>>> there
> >>>>>>>>>> is
> >>>>>>>>>>>>>> not
> >>>>>>>>>>>>>>>>>> much
> >>>>>>>>>>>>>>>>>> data to consume, committing on every record is OK. It’s
> >>>>>>>>>> inefficient
> >>>>>>>>>>>>>> but
> >>>>>>>>>>>>>>>>>> acceptable.
> >>>>>>>>>>>>>>>>>> If the first poll returns just one record, but many more
> >>>> have
> >>>>>>>>>> piled
> >>>>>>>>>>>> up
> >>>>>>>>>>>>>>>>>> while
> >>>>>>>>>>>>>>>>>> the first one was being processed, the next poll has the
> >>>>>>>>>> opportunity
> >>>>>>>>>>>>>> to
> >>>>>>>>>>>>>>>>>> return
> >>>>>>>>>>>>>>>>>> a bunch of records and then these will be able to be
> >>>> committed
> >>>>>>>>>>>>>> together.
> >>>>>>>>>>>>>>>>>> So, my answer is that optimisation on the broker to
> return
> >>>>>>>> batches
> >>>>>>>>>>>> of
> >>>>>>>>>>>>>>>>>> records when the records are available is the approach
> we
> >>>> will
> >>>>>>>>>> take
> >>>>>>>>>>>>>>>> here.
> >>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>> 116. Good idea. Done.
> >>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>> 117. I’ve rewritten the Kafka Broker Migration section.
> >> Let
> >>>> me
> >>>>>>>>>> know
> >>>>>>>>>>>>>> what
> >>>>>>>>>>>>>>>>>> you think.
> >>>>>>>>>>>>>>>>>> I am discussing the configuration to enable the feature
> in
> >>>> the
> >>>>>>>>>>>> mailing
> >>>>>>>>>>>>>>>>>> list with
> >>>>>>>>>>>>>>>>>> David Jacot also, so I anticipate a bit of change in
> this
> >>>> area
> >>>>>>>>>>>> still.
> >>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>> Thanks,
> >>>>>>>>>>>>>>>>>> Andrew
> >>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>> On 15 Apr 2024, at 23:34, Jun Rao
> >> <j...@confluent.io.INVALID
> >>>>>
> >>>>>>>>>>>> wrote:
> >>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>> Hi, Andrew,
> >>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>> Thanks for the updated KIP.
> >>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>> 42.1 "If the share group offset is altered multiple
> times
> >>>>>> when
> >>>>>>>>>> the
> >>>>>>>>>>>>>>>> group
> >>>>>>>>>>>>>>>>>>> remains empty, it would be harmless if the same state
> >> epoch
> >>>>>> was
> >>>>>>>>>>>>>> reused
> >>>>>>>>>>>>>>>> to
> >>>>>>>>>>>>>>>>>>> initialize the state."
> >>>>>>>>>>>>>>>>>>> Hmm, how does the partition leader know for sure that
> it
> >>>> has
> >>>>>>>>>>>> received
> >>>>>>>>>>>>>>>> the
> >>>>>>>>>>>>>>>>>>> latest share group offset if epoch is reused?
> >>>>>>>>>>>>>>>>>>> Could we update the section "Group epoch - Trigger a
> >>>>>> rebalance"
> >>>>>>>>>>>> that
> >>>>>>>>>>>>>>>>>>> AdminClient.alterShareGroupOffsets causes the group
> epoch
> >>>> to
> >>>>>> be
> >>>>>>>>>>>>>> bumped
> >>>>>>>>>>>>>>>>>> too?
> >>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>> 47,56 "my view is that BaseOffset should become
> >> FirstOffset
> >>>>>> in
> >>>>>>>>>> ALL
> >>>>>>>>>>>>>>>>>> schemas
> >>>>>>>>>>>>>>>>>>> defined in the KIP."
> >>>>>>>>>>>>>>>>>>> Yes, that seems better to me.
> >>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>> 105. "I have another non-terminal state in mind for 3."
> >>>>>>>>>>>>>>>>>>> Should we document it?
> >>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>> 114. session.timeout.ms in the consumer configuration
> is
> >>>>>>>>>>>> deprecated
> >>>>>>>>>>>>>> in
> >>>>>>>>>>>>>>>>>>> KIP-848. So, we need to remove it from the
> shareConsumer
> >>>>>>>>>>>>>> configuration.
> >>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>> 115. I am wondering if it's a good idea to always
> commit
> >>>> acks
> >>>>>>>> on
> >>>>>>>>>>>>>>>>>>> ShareConsumer.poll(). In the extreme case, each batch
> may
> >>>>>> only
> >>>>>>>>>>>>>> contain
> >>>>>>>>>>>>>>>> a
> >>>>>>>>>>>>>>>>>>> single record and each poll() only returns a single
> >> batch.
> >>>>>> This
> >>>>>>>>>>>> will
> >>>>>>>>>>>>>>>>>> cause
> >>>>>>>>>>>>>>>>>>> each record to be committed individually. Is there a
> way
> >>>> for
> >>>>>> a
> >>>>>>>>>> user
> >>>>>>>>>>>>>> to
> >>>>>>>>>>>>>>>>>>> optimize this?
> >>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>> 116. For each new RPC, could we list the associated
> acls?
> >>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>> 117. Since this KIP changes internal records and RPCs,
> it
> >>>>>> would
> >>>>>>>>>> be
> >>>>>>>>>>>>>>>> useful
> >>>>>>>>>>>>>>>>>>> to document the upgrade process.
> >>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>> Jun
> >>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>> On Wed, Apr 10, 2024 at 7:35 AM Andrew Schofield <
> >>>>>>>>>>>>>>>>>> andrew_schofi...@live.com>
> >>>>>>>>>>>>>>>>>>> wrote:
> >>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>> Hi Jun,
> >>>>>>>>>>>>>>>>>>>> Thanks for your questions.
> >>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>> 41.
> >>>>>>>>>>>>>>>>>>>> 41.1. The partition leader obtains the state epoch in
> >> the
> >>>>>>>>>> response
> >>>>>>>>>>>>>>>> from
> >>>>>>>>>>>>>>>>>>>> ReadShareGroupState. When it becomes a share-partition
> >>>>>> leader,
> >>>>>>>>>>>>>>>>>>>> it reads the share-group state and one of the things
> it
> >>>>>> learns
> >>>>>>>>>> is
> >>>>>>>>>>>>>> the
> >>>>>>>>>>>>>>>>>>>> current state epoch. Then it uses the state epoch in
> all
> >>>>>>>>>>>> subsequent
> >>>>>>>>>>>>>>>>>>>> calls to WriteShareGroupState. The fencing is to
> prevent
> >>>>>>>> writes
> >>>>>>>>>>>> for
> >>>>>>>>>>>>>>>>>>>> a previous state epoch, which are very unlikely but
> >> which
> >>>>>>>> would
> >>>>>>>>>>>> mean
> >>>>>>>>>>>>>>>>>>>> that a leader was using an out-of-date epoch and was
> >>>> likely
> >>>>>> no
> >>>>>>>>>>>>>> longer
> >>>>>>>>>>>>>>>>>>>> the current leader at all, perhaps due to a long pause
> >> for
> >>>>>>>> some
> >>>>>>>>>>>>>>>> reason.
> >>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>> 41.2. If the group coordinator were to set the SPSO,
> >>>>>> wouldn’t
> >>>>>>>> it
> >>>>>>>>>>>>>> need
> >>>>>>>>>>>>>>>>>>>> to discover the initial offset? I’m trying to avoid
> yet
> >>>>>>>> another
> >>>>>>>>>>>>>>>>>>>> inter-broker
> >>>>>>>>>>>>>>>>>>>> hop.
> >>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>> 42.
> >>>>>>>>>>>>>>>>>>>> 42.1. I think I’ve confused things. When the share
> group
> >>>>>>>> offset
> >>>>>>>>>> is
> >>>>>>>>>>>>>>>>>> altered
> >>>>>>>>>>>>>>>>>>>> using AdminClient.alterShareGroupOffsets, the group
> >>>>>>>> coordinator
> >>>>>>>>>>>> WILL
> >>>>>>>>>>>>>>>>>>>> update the state epoch. I don’t think it needs to
> update
> >>>> the
> >>>>>>>>>> group
> >>>>>>>>>>>>>>>> epoch
> >>>>>>>>>>>>>>>>>>>> at the same time (although it could) because the group
> >>>> epoch
> >>>>>>>>>> will
> >>>>>>>>>>>>>> have
> >>>>>>>>>>>>>>>>>>>> been bumped when the group became empty. If the share
> >>>> group
> >>>>>>>>>> offset
> >>>>>>>>>>>>>>>>>>>> is altered multiple times when the group remains
> empty,
> >> it
> >>>>>>>> would
> >>>>>>>>>>>> be
> >>>>>>>>>>>>>>>>>>>> harmless if the same state epoch was reused to
> >> initialize
> >>>>>> the
> >>>>>>>>>>>> state.
> >>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>> When the share-partition leader updates the SPSO as a
> >>>> result
> >>>>>>>> of
> >>>>>>>>>>>>>>>>>>>> the usual flow of record delivery, it does not update
> >> the
> >>>>>>>> state
> >>>>>>>>>>>>>> epoch.
> >>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>> 42.2. The share-partition leader will notice the
> >>>> alteration
> >>>>>>>>>>>> because,
> >>>>>>>>>>>>>>>>>>>> when it issues WriteShareGroupState, the response will
> >>>>>> contain
> >>>>>>>>>> the
> >>>>>>>>>>>>>>>>>>>> error code FENCED_STATE_EPOCH. This is supposed to be
> >> the
> >>>>>>>>>>>>>>>>>>>> last-resort way of catching this.
> >>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>> When the share-partition leader handles its first
> >>>> ShareFetch
> >>>>>>>>>>>>>> request,
> >>>>>>>>>>>>>>>>>>>> it learns the state epoch from the response to
> >>>>>>>>>>>> ReadShareGroupState.
> >>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>> In normal running, the state epoch will remain
> constant,
> >>>>>> but,
> >>>>>>>>>> when
> >>>>>>>>>>>>>>>> there
> >>>>>>>>>>>>>>>>>>>> are no consumers and the group is empty, it might
> >> change.
> >>>>>> As a
> >>>>>>>>>>>>>> result,
> >>>>>>>>>>>>>>>>>>>> I think it would be sensible when the set of share
> >>>> sessions
> >>>>>>>>>>>>>>>> transitions
> >>>>>>>>>>>>>>>>>>>> from 0 to 1, which is a reasonable proxy for the share
> >>>> group
> >>>>>>>>>>>>>>>>>> transitioning
> >>>>>>>>>>>>>>>>>>>> from empty to non-empty, for the share-partition
> leader
> >> to
> >>>>>>>> issue
> >>>>>>>>>>>>>>>>>>>> ReadShareGroupOffsetsState to validate the state
> epoch.
> >> If
> >>>>>> its
> >>>>>>>>>>>> state
> >>>>>>>>>>>>>>>>>>>> epoch is out of date, it can then ReadShareGroupState
> to
> >>>>>>>>>>>>>>>> re-initialize.
> >>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>> I’ve changed the KIP accordingly.
> >>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>> 47, 56. If I am to change BaseOffset to FirstOffset,
> we
> >>>> need
> >>>>>>>> to
> >>>>>>>>>>>> have
> >>>>>>>>>>>>>>>>>>>> a clear view of which is the correct term. Having
> >> reviewed
> >>>>>> all
> >>>>>>>>>> of
> >>>>>>>>>>>>>> the
> >>>>>>>>>>>>>>>>>>>> instances, my view is that BaseOffset should become
> >>>>>>>> FirstOffset
> >>>>>>>>>> in
> >>>>>>>>>>>>>>>>>>>> ALL schemas defined in the KIP. Then, BaseOffset is
> just
> >>>>>> used
> >>>>>>>> in
> >>>>>>>>>>>>>>>>>>>> record batches, which is already a known concept.
> >>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>> Please let me know if you agree.
> >>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>> 60. I’ve added FindCoordinator to the top level index
> >> for
> >>>>>>>>>> protocol
> >>>>>>>>>>>>>>>>>> changes.
> >>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>> 61. OK. I expect you are correct about how users will
> be
> >>>>>> using
> >>>>>>>>>> the
> >>>>>>>>>>>>>>>>>>>> console share consumer. When I use the console
> >> consumer, I
> >>>>>>>>>> always
> >>>>>>>>>>>>>> get
> >>>>>>>>>>>>>>>>>>>> a new consumer group. I have changed the default group
> >> ID
> >>>>>> for
> >>>>>>>>>>>>>> console
> >>>>>>>>>>>>>>>>>>>> share consumer to “console-share-consumer” to match
> the
> >>>>>>>> console
> >>>>>>>>>>>>>>>> consumer
> >>>>>>>>>>>>>>>>>>>> better and give more of an idea where this mysterious
> >>>> group
> >>>>>>>> has
> >>>>>>>>>>>> come
> >>>>>>>>>>>>>>>>>> from.
> >>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>> 77. I will work on a proposal that does not use
> >> compaction
> >>>>>> and
> >>>>>>>>>> we
> >>>>>>>>>>>>>> can
> >>>>>>>>>>>>>>>>>>>> make a judgement about whether it’s a better course
> for
> >>>>>>>> KIP-932.
> >>>>>>>>>>>>>>>>>>>> Personally,
> >>>>>>>>>>>>>>>>>>>> until I’ve written it down and lived with the ideas
> for
> >> a
> >>>>>> few
> >>>>>>>>>>>> days,
> >>>>>>>>>>>>>> I
> >>>>>>>>>>>>>>>>>>>> won’t be
> >>>>>>>>>>>>>>>>>>>> able to choose which I prefer.
> >>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>> I should be able to get the proposal written by the
> end
> >> of
> >>>>>>>> this
> >>>>>>>>>>>>>> week.
> >>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>> 100. ShareGroupHeartbeatRequest.RebalanceTimeoutMs
> >> matches
> >>>>>>>>>>>>>>>>>>>> ConsumerGroupHeartbeatRequest.RebalanceTimeoutMs from
> >>>>>> KIP-848.
> >>>>>>>>>>>>>>>>>>>> I prefer to maintain the consistency.
> >>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>> 101. Thanks for catching this. The
> >>>>>> ShareGroupHeartbeatResponse
> >>>>>>>>>> was
> >>>>>>>>>>>>>>>>>>>> originally
> >>>>>>>>>>>>>>>>>>>> created from KIP-848. This part of the schema does not
> >>>> apply
> >>>>>>>>>> and I
> >>>>>>>>>>>>>>>> have
> >>>>>>>>>>>>>>>>>>>> removed
> >>>>>>>>>>>>>>>>>>>> it. I have also renamed AssignedTopicPartitions to
> >> simply
> >>>>>>>>>>>>>>>>>> TopicPartitions
> >>>>>>>>>>>>>>>>>>>> which
> >>>>>>>>>>>>>>>>>>>> aligns with the actual definition of
> >>>>>>>>>>>> ConsumerGroupHeartbeatResponse.
> >>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>> 102. No, I don’t think we do. Removed.
> >>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>> 103. I’ve changed the description for the error codes
> >> for
> >>>>>>>>>>>>>>>>>>>> ShareFetchResponse.
> >>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>> 104. Interesting. I have added ErrorMessages to these
> >> RPCs
> >>>>>> as
> >>>>>>>>>> you
> >>>>>>>>>>>>>>>>>> suggest.
> >>>>>>>>>>>>>>>>>>>> It’s a good improvement for problem determination.
> >>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>> 105. The values are reserved in my brain. Actually, 1
> is
> >>>>>>>>>> Acquired
> >>>>>>>>>>>>>>>> which
> >>>>>>>>>>>>>>>>>>>> is not persisted, and I have another non-terminal
> state
> >> in
> >>>>>>>> mind
> >>>>>>>>>>>> for
> >>>>>>>>>>>>>> 3.
> >>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>> 106. A few people have raised the question of whether
> >>>>>>>>>>>>>>>> OffsetAndMetadata
> >>>>>>>>>>>>>>>>>>>> is sensible in this KIP, given that the optional
> >> Metadata
> >>>>>> part
> >>>>>>>>>>>> comes
> >>>>>>>>>>>>>>>>>> from
> >>>>>>>>>>>>>>>>>>>> when
> >>>>>>>>>>>>>>>>>>>> a regular consumer commits offsets. However, it is
> >> correct
> >>>>>>>> that
> >>>>>>>>>>>>>> there
> >>>>>>>>>>>>>>>>>> will
> >>>>>>>>>>>>>>>>>>>> never be metadata with a share group. I have changed
> >>>>>>>>>>>>>>>>>>>> the KIP to replace OffsetAndMetadata with Long.
> >>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>> 107. Yes, you are right. I have learnt during this
> >> process
> >>>>>>>> that
> >>>>>>>>>> a
> >>>>>>>>>>>>>>>>>> version
> >>>>>>>>>>>>>>>>>>>> bump
> >>>>>>>>>>>>>>>>>>>> can be a logical not just a physical change to the
> >> schema.
> >>>>>> KIP
> >>>>>>>>>>>>>>>> updated.
> >>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>> 108. I would prefer not to extend this RPC for all of
> >> the
> >>>>>>>> states
> >>>>>>>>>>>> at
> >>>>>>>>>>>>>>>> this
> >>>>>>>>>>>>>>>>>>>> point.
> >>>>>>>>>>>>>>>>>>>> I think there is definitely scope for another KIP
> >> focused
> >>>> on
> >>>>>>>>>>>>>>>>>> administration
> >>>>>>>>>>>>>>>>>>>> of share groups that might want this information so
> >>>> someone
> >>>>>>>>>> could
> >>>>>>>>>>>>>>>> build
> >>>>>>>>>>>>>>>>>> a
> >>>>>>>>>>>>>>>>>>>> UI and other tools on top. Doing that well is quite a
> >> lot
> >>>> of
> >>>>>>>>>> work
> >>>>>>>>>>>> in
> >>>>>>>>>>>>>>>> its
> >>>>>>>>>>>>>>>>>>>> own right
> >>>>>>>>>>>>>>>>>>>> so I would prefer not to do that now.
> >>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>> 109.Yes, they’re inconsistent and follow the consumer
> >>>> groups
> >>>>>>>>>>>>>>>> equivalents
> >>>>>>>>>>>>>>>>>>>> which are also inconsistent. In general, the
> >>>>>>>> KafkaAdmin.delete….
> >>>>>>>>>>>>>>>> Methods
> >>>>>>>>>>>>>>>>>>>> use the Map<XXX, KafkaFuture<YYY>> pattern like
> >>>>>>>>>>>>>>>> DeleteShareGroupsResult.
> >>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>> Would you prefer that I do that, or remain consistent
> >> with
> >>>>>>>>>>>> consumer
> >>>>>>>>>>>>>>>>>> groups?
> >>>>>>>>>>>>>>>>>>>> Happy to change it.
> >>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>> 110. Again, consumer groups don’t yield that level of
> >>>> detail
> >>>>>>>>>> about
> >>>>>>>>>>>>>>>>>> epochs.
> >>>>>>>>>>>>>>>>>>>> The MemberDescription does include the assignment, but
> >> not
> >>>>>> the
> >>>>>>>>>>>> list
> >>>>>>>>>>>>>> of
> >>>>>>>>>>>>>>>>>>>> subscribed topic names.
> >>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>> 111. I didn’t include GroupState in GroupListing
> because
> >>>>>>>> there’s
> >>>>>>>>>>>> no
> >>>>>>>>>>>>>>>>>>>> single class that includes the states of all group
> >> types.
> >>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>> 112. I think it’s good practice for the API to have
> >>>>>>>>>>>>>>>>>>>> ListShareGroupOffsetSpec.
> >>>>>>>>>>>>>>>>>>>> It makes evolution and extension of the API much
> easier.
> >>>>>> Also,
> >>>>>>>>>> it
> >>>>>>>>>>>>>>>>>> matches
> >>>>>>>>>>>>>>>>>>>> the precedent set by ListConsumerGroupOffsetSpec.
> >>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>> 113. ListConsumerGroupsResults.errors() is the same. I
> >>>> think
> >>>>>>>> you
> >>>>>>>>>>>>>> just
> >>>>>>>>>>>>>>>>>> have
> >>>>>>>>>>>>>>>>>>>> to look in the exception details and the same pattern
> is
> >>>>>> being
> >>>>>>>>>>>>>>>> followed
> >>>>>>>>>>>>>>>>>>>> here.
> >>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>> Over the next few days, I have committed to writing a
> >>>>>> proposal
> >>>>>>>>>> for
> >>>>>>>>>>>>>> how
> >>>>>>>>>>>>>>>>>> to
> >>>>>>>>>>>>>>>>>>>> persist
> >>>>>>>>>>>>>>>>>>>> share-group state that doesn’t use log compaction.
> >>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>> I am also aware that discussion with Justine Olshan on
> >>>>>>>>>>>>>> read-committed
> >>>>>>>>>>>>>>>>>>>> isolation level is not yet quite complete.
> >>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>> Thanks for the detailed review.
> >>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>> Andrew
> >>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>> On 9 Apr 2024, at 23:58, Jun Rao
> >>>> <j...@confluent.io.INVALID
> >>>>>>>
> >>>>>>>>>>>> wrote:
> >>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>> Hi, Andrew,
> >>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>> Thanks for the reply. A few more comments.
> >>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>> 41.
> >>>>>>>>>>>>>>>>>>>>> 41.1 How does the partition leader obtain the group
> >> epoch
> >>>>>> to
> >>>>>>>>>> set
> >>>>>>>>>>>>>>>>>>>>> WriteShareGroupStateRequest.StateEpoch?
> >>>>>>>>>>>>>>>>>>>>> 41.2 What's the benefit of having the group
> coordinator
> >>>>>>>>>>>> initialize
> >>>>>>>>>>>>>>>> the
> >>>>>>>>>>>>>>>>>>>>> state and the partition leader set the SPSO? It seems
> >>>>>> simpler
> >>>>>>>>>> to
> >>>>>>>>>>>>>> have
> >>>>>>>>>>>>>>>>>> the
> >>>>>>>>>>>>>>>>>>>>> partition leader initialize both the state and the
> SPSO
> >>>>>>>>>> together?
> >>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>> 42.
> >>>>>>>>>>>>>>>>>>>>> 42.1 "I don’t think the group epoch needs to be
> bumped
> >>>> when
> >>>>>>>> the
> >>>>>>>>>>>>>> share
> >>>>>>>>>>>>>>>>>>>> group
> >>>>>>>>>>>>>>>>>>>>> offset is altered."
> >>>>>>>>>>>>>>>>>>>>> But the part on handling Alter share group offsets
> says
> >>>>>> "The
> >>>>>>>>>>>> share
> >>>>>>>>>>>>>>>>>>>>> coordinator writes a ShareCheckpoint record with the
> >> new
> >>>>>>>> state
> >>>>>>>>>>>>>> epoch
> >>>>>>>>>>>>>>>> to
> >>>>>>>>>>>>>>>>>>>> the
> >>>>>>>>>>>>>>>>>>>>> __share_group_state  topic." So, which is correct? We
> >>>> have
> >>>>>>>> two
> >>>>>>>>>>>>>> paths
> >>>>>>>>>>>>>>>> to
> >>>>>>>>>>>>>>>>>>>>> update the state in the share coordinator, one from
> the
> >>>>>> group
> >>>>>>>>>>>>>>>>>> coordinator
> >>>>>>>>>>>>>>>>>>>>> and another from the partition leader. I thought the
> >>>>>> benefit
> >>>>>>>> of
> >>>>>>>>>>>>>>>> bumping
> >>>>>>>>>>>>>>>>>>>> up
> >>>>>>>>>>>>>>>>>>>>> the epoch is to fence off a late request in the
> >> previous
> >>>>>>>> epoch
> >>>>>>>>>>>> from
> >>>>>>>>>>>>>>>>>>>> another
> >>>>>>>>>>>>>>>>>>>>> path.
> >>>>>>>>>>>>>>>>>>>>> 42.2 When the group coordinator alters the share
> group
> >>>>>> offset
> >>>>>>>>>> in
> >>>>>>>>>>>>>>>> share
> >>>>>>>>>>>>>>>>>>>>> coordinator, how does the partition leader know the
> >> share
> >>>>>>>> group
> >>>>>>>>>>>>>> state
> >>>>>>>>>>>>>>>>>> has
> >>>>>>>>>>>>>>>>>>>>> been altered so that it could clear its in-memory
> >> state?
> >>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>> 47. 56. BaseOffset typically refers to the base
> offset
> >>>> for
> >>>>>>>> the
> >>>>>>>>>>>>>> batch
> >>>>>>>>>>>>>>>>>> and
> >>>>>>>>>>>>>>>>>>>>> can be confusing. FirstOffset is clearer and matches
> >>>>>>>>>> LastOffset.
> >>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>> 60. Could we include FindCoordinatorRequest in the
> top
> >>>>>> level
> >>>>>>>>>>>> index
> >>>>>>>>>>>>>>>> for
> >>>>>>>>>>>>>>>>>>>>> Kafka protocol changes?
> >>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>> 61. I think it's probably ok to add time-based
> >> expiration
> >>>>>>>>>> later.
> >>>>>>>>>>>>>> But
> >>>>>>>>>>>>>>>>>>>> using
> >>>>>>>>>>>>>>>>>>>>> a default group in console-share-consumer probably
> >> won't
> >>>>>> help
> >>>>>>>>>>>>>> reduce
> >>>>>>>>>>>>>>>>>> the
> >>>>>>>>>>>>>>>>>>>>> garbage. In the common case, the user of the console
> >>>>>> consumer
> >>>>>>>>>>>>>> likely
> >>>>>>>>>>>>>>>>>>>> wants
> >>>>>>>>>>>>>>>>>>>>> to see the recently produced records for
> verification.
> >> If
> >>>>>> the
> >>>>>>>>>>>>>> default
> >>>>>>>>>>>>>>>>>>>> group
> >>>>>>>>>>>>>>>>>>>>> doesn't provide that (because of the stale state),
> the
> >>>> user
> >>>>>>>>>> will
> >>>>>>>>>>>>>>>> likely
> >>>>>>>>>>>>>>>>>>>>> just use a new group. It's true that we don't garbage
> >>>>>> collect
> >>>>>>>>>>>> idle
> >>>>>>>>>>>>>>>>>>>> topics.
> >>>>>>>>>>>>>>>>>>>>> However,  the share groups are similar to consumers,
> >>>> which
> >>>>>>>> does
> >>>>>>>>>>>>>>>> support
> >>>>>>>>>>>>>>>>>>>>> garbage collection. Typically, we read topics more
> than
> >>>>>>>>>> creating
> >>>>>>>>>>>>>>>> them.
> >>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>> 77. If the generic compaction is inconvenient, we
> could
> >>>> use
> >>>>>>>>>>>>>>>> customized
> >>>>>>>>>>>>>>>>>>>>> logic. If we go with that route, option (b) seems
> >> cleaner
> >>>>>> and
> >>>>>>>>>>>> more
> >>>>>>>>>>>>>>>>>>>>> optimized. Since the share states for all groups fit
> in
> >>>>>>>> memory,
> >>>>>>>>>>>> we
> >>>>>>>>>>>>>>>>>> could
> >>>>>>>>>>>>>>>>>>>>> generate snapshots more efficiently than going
> through
> >>>>>>>>>>>> compaction.
> >>>>>>>>>>>>>>>>>>>> Having a
> >>>>>>>>>>>>>>>>>>>>> separate log per share partition is probably too much
> >>>>>>>> overhead.
> >>>>>>>>>>>>>> It's
> >>>>>>>>>>>>>>>>>> more
> >>>>>>>>>>>>>>>>>>>>> efficient to put the state changes for multiple share
> >>>>>>>>>> partitions
> >>>>>>>>>>>>>> in a
> >>>>>>>>>>>>>>>>>>>>> single log.
> >>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>> 100. ShareGroupHeartbeatRequest.RebalanceTimeoutMs:
> >>>> Should
> >>>>>> we
> >>>>>>>>>>>> name
> >>>>>>>>>>>>>> it
> >>>>>>>>>>>>>>>>>>>>> SessionTimeoutMs?
> >>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>> 101. ShareGroupHeartbeatResponse.Assignment.Error:
> What
> >>>>>> kind
> >>>>>>>> of
> >>>>>>>>>>>>>> error
> >>>>>>>>>>>>>>>>>>>> could
> >>>>>>>>>>>>>>>>>>>>> we have when assigning partitions? What are the
> >>>>>> corresponding
> >>>>>>>>>>>> error
> >>>>>>>>>>>>>>>>>>>> codes?
> >>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>> 102. Do we still need
> >>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>
> >>>>>>>>>>
> >>>>>>>>
> >>>>>>
> >>>>
> >>
> ShareGroupDescribeResponse.Members.Assignment.{MetadataVersion,MetadataBytes}?
> >>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>> 103. Could we list the error codes separately for
> >>>>>>>>>>>>>>>>>>>>> ShareFetchResponse.Responses.Partitions.ErrorCode and
> >>>>>>>>>>>>>>>>>>>>>
> >>>>>> ShareFetchResponse.Responses.Partitions.AcknowledgeErrorCode?
> >>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>> 104. Should we add error message for the errorCode in
> >>>>>>>>>>>>>>>>>> ShareFetchResponse,
> >>>>>>>>>>>>>>>>>>>>> ShareAcknowledgeResponse,
> ReadShareGroupStateResponse,
> >>>>>>>>>>>>>>>>>>>>> WriteShareGroupStateResponse,
> >>>>>> DeleteShareGroupStateResponse,
> >>>>>>>>>>>>>>>>>>>>> ReadShareGroupOffsetsStateResponse and
> >>>>>>>>>>>>>>>>>> InitializeShareGroupStateResponse?
> >>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>> 105. "about": "The state -
> >>>>>> 0:Available,2:Acked,4:Archived.":
> >>>>>>>>>> What
> >>>>>>>>>>>>>>>>>> about 1
> >>>>>>>>>>>>>>>>>>>>> and 3? Are we leaving them out intentionally?
> >>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>> 106. Do we have usage of metadata in
> OffsetAndMetadata?
> >>>> If
> >>>>>>>> not,
> >>>>>>>>>>>>>> could
> >>>>>>>>>>>>>>>>>> we
> >>>>>>>>>>>>>>>>>>>>> remove it from AdminClient and KafkaShareConsumer?
> >>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>> 107. ListGroupsRequest: Should we bump up the version
> >>>> since
> >>>>>>>> it
> >>>>>>>>>>>> now
> >>>>>>>>>>>>>>>>>>>> supports
> >>>>>>>>>>>>>>>>>>>>> a new group type "share"?
> >>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>> 108. AdminClient.listShareGroupOffsets: Should it
> >> expose
> >>>>>> all
> >>>>>>>>>> the
> >>>>>>>>>>>>>>>> states
> >>>>>>>>>>>>>>>>>>>>> from ReadShareGroupStateResponse, instead of just
> SPSO?
> >>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>> 109. DeleteShareGroupOffsetsResult exposes
> >>>>>>>>>>>>>>>>>>>>> public KafkaFuture<Void> partitionResult(final
> >>>>>> TopicPartition
> >>>>>>>>>>>>>>>>>> partition)
> >>>>>>>>>>>>>>>>>>>>> DeleteShareGroupsResult exposes
> >>>>>>>>>>>>>>>>>>>>> public Map<String, KafkaFuture<Void>> deletedGroups()
> >>>>>>>>>>>>>>>>>>>>> Should we make them more consistent?
> >>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>> 110. Should ShareGroupDescription include fields like
> >>>>>>>>>> GroupEpoch,
> >>>>>>>>>>>>>>>>>>>>> AssignmentEpoch, MemberEpoch, and
> SubscribedTopicNames?
> >>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>> 111. Should GroupListing include GroupState?
> >>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>> 112. Do we need ListShareGroupOffsetsSpec? Could we
> >> just
> >>>>>> use
> >>>>>>>>>>>>>>>>>>>>> Set<TopicPartition> directly?
> >>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>> 113. ListShareGroupsResult.errors(): How do we know
> >> which
> >>>>>>>> group
> >>>>>>>>>>>> has
> >>>>>>>>>>>>>>>> an
> >>>>>>>>>>>>>>>>>>>>> error?
> >>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>> Jun
> >>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>> On Mon, Apr 8, 2024 at 9:32 AM Andrew Schofield <
> >>>>>>>>>>>>>>>>>>>>> andrew_schofield_j...@outlook.com> wrote:
> >>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>> Hi David,
> >>>>>>>>>>>>>>>>>>>>>> Thanks for your questions.
> >>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>> 70. The Group Coordinator communicates with the
> Share
> >>>>>>>>>>>> Coordinator
> >>>>>>>>>>>>>>>> over
> >>>>>>>>>>>>>>>>>>>>>> RPCs.
> >>>>>>>>>>>>>>>>>>>>>> In the general case, it’s an inter-broker call. It
> is
> >>>>>>>> probably
> >>>>>>>>>>>>>>>>>> possible
> >>>>>>>>>>>>>>>>>>>> to
> >>>>>>>>>>>>>>>>>>>>>> optimise
> >>>>>>>>>>>>>>>>>>>>>> for the situation in which the appropriate GC and SC
> >>>>>> shards
> >>>>>>>>>> are
> >>>>>>>>>>>>>>>>>>>>>> co-located, but the
> >>>>>>>>>>>>>>>>>>>>>> KIP does not delve that deep into potential
> >> performance
> >>>>>>>>>>>>>>>> optimisations.
> >>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>> 71. Avoiding collisions would be a good idea, but I
> do
> >>>>>> worry
> >>>>>>>>>>>> about
> >>>>>>>>>>>>>>>>>>>>>> retrospectively
> >>>>>>>>>>>>>>>>>>>>>> introducing a naming convention for groups. I feel
> >> that
> >>>>>>>> naming
> >>>>>>>>>>>>>>>>>>>> conventions
> >>>>>>>>>>>>>>>>>>>>>> will
> >>>>>>>>>>>>>>>>>>>>>> typically be the responsibility of the cluster
> >>>>>>>> administrators
> >>>>>>>>>>>>>> based
> >>>>>>>>>>>>>>>> on
> >>>>>>>>>>>>>>>>>>>>>> organizational
> >>>>>>>>>>>>>>>>>>>>>> factors, such as the name of an application.
> >>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>> 72. Personally, I don’t like INVALID_GROUP_ID
> because
> >>>> the
> >>>>>>>>>> group
> >>>>>>>>>>>> ID
> >>>>>>>>>>>>>>>> is
> >>>>>>>>>>>>>>>>>>>>>> correct but
> >>>>>>>>>>>>>>>>>>>>>> the group is the wrong type. The nearest existing
> >> error
> >>>>>> code
> >>>>>>>>>>>> that
> >>>>>>>>>>>>>>>> gets
> >>>>>>>>>>>>>>>>>>>>>> that across
> >>>>>>>>>>>>>>>>>>>>>> is INCONSISTENT_GROUP_PROTOCOL. Perhaps this is
> really
> >>>>>>>> showing
> >>>>>>>>>>>>>> that
> >>>>>>>>>>>>>>>> a
> >>>>>>>>>>>>>>>>>>>> new
> >>>>>>>>>>>>>>>>>>>>>> error code would be better.
> >>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>> 73. The Metadata fields are not used. I have removed
> >>>> them.
> >>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>> 74. The metadata is re-evaluated on every change,
> but
> >>>>>> only a
> >>>>>>>>>>>>>> subset
> >>>>>>>>>>>>>>>> is
> >>>>>>>>>>>>>>>>>>>>>> relevant
> >>>>>>>>>>>>>>>>>>>>>> for rebalancing. A check is done against the names
> of
> >>>> the
> >>>>>>>>>>>>>> subscribed
> >>>>>>>>>>>>>>>>>>>>>> topics to
> >>>>>>>>>>>>>>>>>>>>>> see if any relevant changes may have occurred. Then
> >> the
> >>>>>>>>>> changes
> >>>>>>>>>>>>>>>> which
> >>>>>>>>>>>>>>>>>>>>>> trigger
> >>>>>>>>>>>>>>>>>>>>>> a rebalance are topic creation, deletion, change in
> >>>>>>>>>> partitions,
> >>>>>>>>>>>> or
> >>>>>>>>>>>>>>>>>> rack
> >>>>>>>>>>>>>>>>>>>>>> IDs for the
> >>>>>>>>>>>>>>>>>>>>>> replicas. I have updated the KIP to make this more
> >>>> clear.
> >>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>> 75. The assignment is not persisted because it is
> much
> >>>>>> less
> >>>>>>>>>>>>>>>> important
> >>>>>>>>>>>>>>>>>>>> that
> >>>>>>>>>>>>>>>>>>>>>> the
> >>>>>>>>>>>>>>>>>>>>>> assignment survives a GC change. There’s no need to
> >>>>>> transfer
> >>>>>>>>>>>>>>>>>> partitions
> >>>>>>>>>>>>>>>>>>>>>> safely from
> >>>>>>>>>>>>>>>>>>>>>> member to member in the way that is required for
> >>>> consumer
> >>>>>>>>>>>> groups,
> >>>>>>>>>>>>>> so
> >>>>>>>>>>>>>>>>>> as
> >>>>>>>>>>>>>>>>>>>> an
> >>>>>>>>>>>>>>>>>>>>>> optimisation, the assignments for a share group are
> >> not
> >>>>>>>>>>>> persisted.
> >>>>>>>>>>>>>>>> It
> >>>>>>>>>>>>>>>>>>>>>> wouldn’t do any
> >>>>>>>>>>>>>>>>>>>>>> harm, but it just seems unnecessary.
> >>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>> 76. In the event that a consumer tries to
> acknowledge
> >> a
> >>>>>>>> record
> >>>>>>>>>>>>>> that
> >>>>>>>>>>>>>>>> it
> >>>>>>>>>>>>>>>>>>>> now
> >>>>>>>>>>>>>>>>>>>>>> longer
> >>>>>>>>>>>>>>>>>>>>>> has the right to acknowledge, the
> INVALID_RECORD_STATE
> >>>>>> error
> >>>>>>>>>>>> code
> >>>>>>>>>>>>>> is
> >>>>>>>>>>>>>>>>>>>> used.
> >>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>> If the application uses the
> >>>> KafkaShareConsumer.commitSync
> >>>>>>>>>>>> method,
> >>>>>>>>>>>>>> it
> >>>>>>>>>>>>>>>>>>>> will
> >>>>>>>>>>>>>>>>>>>>>> see an InvalidRecordState exception returned.
> >>>>>> Alternatively,
> >>>>>>>>>> the
> >>>>>>>>>>>>>>>>>>>>>> application can
> >>>>>>>>>>>>>>>>>>>>>> register an acknowledgement commit callback which
> will
> >>>> be
> >>>>>>>>>> called
> >>>>>>>>>>>>>>>> with
> >>>>>>>>>>>>>>>>>>>> the
> >>>>>>>>>>>>>>>>>>>>>> status
> >>>>>>>>>>>>>>>>>>>>>> of the acknowledgements that have succeeded or
> failed.
> >>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>> 77. I have tried to tread a careful path with the
> >>>> durable
> >>>>>>>>>>>>>>>>>>>> share-partition
> >>>>>>>>>>>>>>>>>>>>>> state in this
> >>>>>>>>>>>>>>>>>>>>>> KIP. The significant choices I made are that:
> >>>>>>>>>>>>>>>>>>>>>> * Topics are used so that the state is replicated
> >>>> between
> >>>>>>>>>>>> brokers.
> >>>>>>>>>>>>>>>>>>>>>> * Log compaction is used to keep a lid on the
> storage.
> >>>>>>>>>>>>>>>>>>>>>> * Only one topic is required.
> >>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>> Log compaction as it stands is not ideal for this
> kind
> >>>> of
> >>>>>>>>>> data,
> >>>>>>>>>>>> as
> >>>>>>>>>>>>>>>>>>>>>> evidenced by
> >>>>>>>>>>>>>>>>>>>>>> the DeltaIndex technique I employed.
> >>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>> I can think of a few relatively simple ways to
> improve
> >>>>>> upon
> >>>>>>>>>> it.
> >>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>> a) We could use a customised version of the log
> >>>> compactor
> >>>>>>>> for
> >>>>>>>>>>>> this
> >>>>>>>>>>>>>>>>>> topic
> >>>>>>>>>>>>>>>>>>>>>> that
> >>>>>>>>>>>>>>>>>>>>>> understands the rules for ShareCheckpoint and
> >> ShareDelta
> >>>>>>>>>>>> records.
> >>>>>>>>>>>>>>>>>>>>>> Essentially,
> >>>>>>>>>>>>>>>>>>>>>> for each share-partition, the latest ShareCheckpoint
> >> and
> >>>>>> any
> >>>>>>>>>>>>>>>>>> subsequent
> >>>>>>>>>>>>>>>>>>>>>> ShareDelta
> >>>>>>>>>>>>>>>>>>>>>> records must not be cleaned. Anything else can be
> >>>> cleaned.
> >>>>>>>> We
> >>>>>>>>>>>>>> could
> >>>>>>>>>>>>>>>>>> then
> >>>>>>>>>>>>>>>>>>>>>> be sure
> >>>>>>>>>>>>>>>>>>>>>> that multiple ShareDelta records with the same key
> >> would
> >>>>>>>>>> survive
> >>>>>>>>>>>>>>>>>>>> cleaning
> >>>>>>>>>>>>>>>>>>>>>> and we could
> >>>>>>>>>>>>>>>>>>>>>> abandon the DeltaIndex technique.
> >>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>> b) Actually what is required is a log per
> >>>> share-partition.
> >>>>>>>>>> Let’s
> >>>>>>>>>>>>>>>>>> imagine
> >>>>>>>>>>>>>>>>>>>>>> that we had
> >>>>>>>>>>>>>>>>>>>>>> a share-state topic per topic being consumed in a
> >> share
> >>>>>>>> group,
> >>>>>>>>>>>>>> with
> >>>>>>>>>>>>>>>>>> the
> >>>>>>>>>>>>>>>>>>>>>> same number
> >>>>>>>>>>>>>>>>>>>>>> of partitions as the topic being consumed. We could
> >>>> write
> >>>>>>>> many
> >>>>>>>>>>>>>> more
> >>>>>>>>>>>>>>>>>>>> deltas
> >>>>>>>>>>>>>>>>>>>>>> between
> >>>>>>>>>>>>>>>>>>>>>> checkpoints, and just take periodic checkpoints to
> >> keep
> >>>>>>>>>> control
> >>>>>>>>>>>> of
> >>>>>>>>>>>>>>>> the
> >>>>>>>>>>>>>>>>>>>>>> storage used.
> >>>>>>>>>>>>>>>>>>>>>> Once a checkpoint has been taken, we could use
> >>>>>>>>>>>>>>>>>>>> KafkaAdmin.deleteRecords()
> >>>>>>>>>>>>>>>>>>>>>> to
> >>>>>>>>>>>>>>>>>>>>>> prune all of the older records.
> >>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>> The share-state topics would be more numerous, but
> we
> >>>> are
> >>>>>>>>>>>> talking
> >>>>>>>>>>>>>>>> one
> >>>>>>>>>>>>>>>>>>>> per
> >>>>>>>>>>>>>>>>>>>>>> topic
> >>>>>>>>>>>>>>>>>>>>>> per share group that it’s being consumed in. These
> >>>> topics
> >>>>>>>>>> would
> >>>>>>>>>>>>>> not
> >>>>>>>>>>>>>>>> be
> >>>>>>>>>>>>>>>>>>>>>> compacted.
> >>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>> As you’ll see in the KIP, the Persister interface is
> >>>>>>>> intended
> >>>>>>>>>> to
> >>>>>>>>>>>>>> be
> >>>>>>>>>>>>>>>>>>>>>> pluggable one day.
> >>>>>>>>>>>>>>>>>>>>>> I know the scheme in the KIP is not ideal. It seems
> >>>> likely
> >>>>>>>> to
> >>>>>>>>>> me
> >>>>>>>>>>>>>>>> that
> >>>>>>>>>>>>>>>>>>>>>> future KIPs will
> >>>>>>>>>>>>>>>>>>>>>> improve upon it.
> >>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>> If I can get buy-in for option (b), I’m happy to
> >> change
> >>>>>> this
> >>>>>>>>>>>> KIP.
> >>>>>>>>>>>>>>>>>> While
> >>>>>>>>>>>>>>>>>>>>>> option (a) is
> >>>>>>>>>>>>>>>>>>>>>> probably workable, it does seem a bit of a hack to
> >> have
> >>>> a
> >>>>>>>>>>>>>> customised
> >>>>>>>>>>>>>>>>>> log
> >>>>>>>>>>>>>>>>>>>>>> compactor
> >>>>>>>>>>>>>>>>>>>>>> just for this topic.
> >>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>> 78. How about DeliveryState? I agree that State is
> >>>>>>>> overloaded.
> >>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>> 79. See (77).
> >>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>> Thanks,
> >>>>>>>>>>>>>>>>>>>>>> Andrew
> >>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>> On 5 Apr 2024, at 05:07, David Arthur <
> >>>>>>>>>>>> david.art...@confluent.io
> >>>>>>>>>>>>>>>>>>>> .INVALID>
> >>>>>>>>>>>>>>>>>>>>>> wrote:
> >>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>> Andrew, thanks for the KIP! This is a pretty
> exciting
> >>>>>>>> effort.
> >>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>> I've finally made it through the KIP, still trying
> to
> >>>>>> grok
> >>>>>>>>>> the
> >>>>>>>>>>>>>>>> whole
> >>>>>>>>>>>>>>>>>>>>>> thing.
> >>>>>>>>>>>>>>>>>>>>>>> Sorry if some of my questions are basic :)
> >>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>> Concepts:
> >>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>> 70. Does the Group Coordinator communicate with the
> >>>> Share
> >>>>>>>>>>>>>>>> Coordinator
> >>>>>>>>>>>>>>>>>>>>>> over
> >>>>>>>>>>>>>>>>>>>>>>> RPC or directly in-process?
> >>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>> 71. For preventing name collisions with regular
> >>>> consumer
> >>>>>>>>>>>> groups,
> >>>>>>>>>>>>>>>>>> could
> >>>>>>>>>>>>>>>>>>>> we
> >>>>>>>>>>>>>>>>>>>>>>> define a reserved share group prefix? E.g., the
> >>>> operator
> >>>>>>>>>>>> defines
> >>>>>>>>>>>>>>>>>> "sg_"
> >>>>>>>>>>>>>>>>>>>>>> as a
> >>>>>>>>>>>>>>>>>>>>>>> prefix for share groups only, and if a regular
> >> consumer
> >>>>>>>> group
> >>>>>>>>>>>>>> tries
> >>>>>>>>>>>>>>>>>> to
> >>>>>>>>>>>>>>>>>>>>>> use
> >>>>>>>>>>>>>>>>>>>>>>> that name it fails.
> >>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>> 72. When a consumer tries to use a share group, or
> a
> >>>>>> share
> >>>>>>>>>>>>>> consumer
> >>>>>>>>>>>>>>>>>>>> tries
> >>>>>>>>>>>>>>>>>>>>>>> to use a regular group, would INVALID_GROUP_ID make
> >>>> more
> >>>>>>>>>> sense
> >>>>>>>>>>>>>>>>>>>>>>> than INCONSISTENT_GROUP_PROTOCOL?
> >>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>> --------
> >>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>> Share Group Membership:
> >>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>> 73. What goes in the Metadata field for
> >>>>>>>>>> TargetAssignment#Member
> >>>>>>>>>>>>>> and
> >>>>>>>>>>>>>>>>>>>>>>> Assignment?
> >>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>> 74. Under Trigger a rebalance, it says we rebalance
> >>>> when
> >>>>>>>> the
> >>>>>>>>>>>>>>>>>> partition
> >>>>>>>>>>>>>>>>>>>>>>> metadata changes. Would this be for any change, or
> >> just
> >>>>>>>>>> certain
> >>>>>>>>>>>>>>>> ones?
> >>>>>>>>>>>>>>>>>>>> For
> >>>>>>>>>>>>>>>>>>>>>>> example, if a follower drops out of the ISR and
> comes
> >>>>>> back,
> >>>>>>>>>> we
> >>>>>>>>>>>>>>>>>> probably
> >>>>>>>>>>>>>>>>>>>>>>> don't need to rebalance.
> >>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>> 75. "For a share group, the group coordinator does
> >>>> *not*
> >>>>>>>>>>>> persist
> >>>>>>>>>>>>>>>> the
> >>>>>>>>>>>>>>>>>>>>>>> assignment" Can you explain why this is not needed?
> >>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>> 76. " If the consumer just failed to heartbeat due
> >> to a
> >>>>>>>>>>>> temporary
> >>>>>>>>>>>>>>>>>>>> pause,
> >>>>>>>>>>>>>>>>>>>>>> it
> >>>>>>>>>>>>>>>>>>>>>>> could in theory continue to fetch and acknowledge
> >>>>>> records.
> >>>>>>>>>> When
> >>>>>>>>>>>>>> it
> >>>>>>>>>>>>>>>>>>>>>> finally
> >>>>>>>>>>>>>>>>>>>>>>> sends a heartbeat and realises it’s been kicked out
> >> of
> >>>>>> the
> >>>>>>>>>>>> group,
> >>>>>>>>>>>>>>>> it
> >>>>>>>>>>>>>>>>>>>>>> should
> >>>>>>>>>>>>>>>>>>>>>>> stop fetching records because its assignment has
> been
> >>>>>>>>>> revoked,
> >>>>>>>>>>>>>> and
> >>>>>>>>>>>>>>>>>>>> rejoin
> >>>>>>>>>>>>>>>>>>>>>>> the group."
> >>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>> A consumer with a long pause might still deliver
> some
> >>>>>>>>>> buffered
> >>>>>>>>>>>>>>>>>> records,
> >>>>>>>>>>>>>>>>>>>>>> but
> >>>>>>>>>>>>>>>>>>>>>>> if the share group coordinator has expired its
> >> session,
> >>>>>> it
> >>>>>>>>>>>>>> wouldn't
> >>>>>>>>>>>>>>>>>>>>>> accept
> >>>>>>>>>>>>>>>>>>>>>>> acknowledgments for that share consumer. In such a
> >>>> case,
> >>>>>> is
> >>>>>>>>>> any
> >>>>>>>>>>>>>>>> kind
> >>>>>>>>>>>>>>>>>> of
> >>>>>>>>>>>>>>>>>>>>>>> error raised to the application like "hey, I know
> we
> >>>> gave
> >>>>>>>> you
> >>>>>>>>>>>>>> these
> >>>>>>>>>>>>>>>>>>>>>>> records, but really we shouldn't have" ?
> >>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>> -----
> >>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>> Record Delivery and acknowledgement
> >>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>> 77. If we guarantee that a ShareCheckpoint is
> written
> >>>> at
> >>>>>>>>>> least
> >>>>>>>>>>>>>>>> every
> >>>>>>>>>>>>>>>>>> so
> >>>>>>>>>>>>>>>>>>>>>>> often, could we add a new log compactor that avoids
> >>>>>>>>>> compacting
> >>>>>>>>>>>>>>>>>>>>>> ShareDelta-s
> >>>>>>>>>>>>>>>>>>>>>>> that are still "active" (i.e., not yet superceded
> by
> >> a
> >>>>>> new
> >>>>>>>>>>>>>>>>>>>>>>> ShareCheckpoint). Mechnically, this could be done
> by
> >>>>>>>> keeping
> >>>>>>>>>>>> the
> >>>>>>>>>>>>>>>> LSO
> >>>>>>>>>>>>>>>>>> no
> >>>>>>>>>>>>>>>>>>>>>>> greater than the oldest "active" ShareCheckpoint.
> >> This
> >>>>>>>> might
> >>>>>>>>>>>> let
> >>>>>>>>>>>>>> us
> >>>>>>>>>>>>>>>>>>>>>> remove
> >>>>>>>>>>>>>>>>>>>>>>> the DeltaIndex thing.
> >>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>> 78. Instead of the State in the
> ShareDelta/Checkpoint
> >>>>>>>>>> records,
> >>>>>>>>>>>>>> how
> >>>>>>>>>>>>>>>>>>>> about
> >>>>>>>>>>>>>>>>>>>>>>> MessageState? (State is kind of
> overloaded/ambiguous)
> >>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>> 79. One possible limitation with the current
> >>>> persistence
> >>>>>>>>>> model
> >>>>>>>>>>>> is
> >>>>>>>>>>>>>>>>>> that
> >>>>>>>>>>>>>>>>>>>>>> all
> >>>>>>>>>>>>>>>>>>>>>>> the share state is stored in one topic. It seems
> like
> >>>> we
> >>>>>>>> are
> >>>>>>>>>>>>>> going
> >>>>>>>>>>>>>>>> to
> >>>>>>>>>>>>>>>>>>>> be
> >>>>>>>>>>>>>>>>>>>>>>> storing a lot more state than we do in
> >>>> __consumer_offsets
> >>>>>>>>>> since
> >>>>>>>>>>>>>>>> we're
> >>>>>>>>>>>>>>>>>>>>>>> dealing with message-level acks. With aggressive
> >>>>>>>>>> checkpointing
> >>>>>>>>>>>>>> and
> >>>>>>>>>>>>>>>>>>>>>>> compaction, we can mitigate the storage
> requirements,
> >>>> but
> >>>>>>>> the
> >>>>>>>>>>>>>>>>>>>> throughput
> >>>>>>>>>>>>>>>>>>>>>>> could be a limiting factor. Have we considered
> other
> >>>>>>>>>>>>>> possibilities
> >>>>>>>>>>>>>>>>>> for
> >>>>>>>>>>>>>>>>>>>>>>> persistence?
> >>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>> Cheers,
> >>>>>>>>>>>>>>>>>>>>>>> David
> >>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>
> >>>>>>>>>>>>
> >>>>>>>>>>
> >>>>>>>>>>
> >>>>>>>>
> >>>>>>>>
> >>>>>>
> >>>>>>
> >>>>
> >>>>
> >>
> >>
>
>

Reply via email to