Hi, Andrew,

Thanks for the reply.

123. "The share group does not persist the target assignment."
  What's the impact of this? Everytime that GC fails over, it needs to
recompute the assignment for every member. Do we expect the member
assignment to change on every GC failover?

125. Should the GC also write ShareGroupPartitionMetadata?

127. So, group epoch is only propagated to SC when
InitializeShareGroupState request is sent. This sounds good.

130. Should we have a group.share.min.record.lock.duration.ms to pair with
group.share.max.record.lock.duration.ms?

131. Sounds good. The name group.share.record.lock.partition.limit doesn't
seem very intuitive. How about something
like group.share.partition.max.records.pending.ack?

136. Could we describe the process of GC failover? I guess it needs to
compute member reassignment and check if there is any new topic/partition
matching the share subscription. Does it bump up the group epoch?

137. Metrics:
137.1 It would be useful to document who reports each metric. Is it any
broker, GC, SC or SPL?
137.2 partition-load-time: Is that the loading time at SPL or SC?
137.3 "The time taken in milliseconds to load the share-group state from
the share-group state partitions loaded in the last 30 seconds."
  The window depends on metrics.num.samples and metrics.sample.window.ms
and is not always 30 seconds, right?
137.4 Could you explain write/write-latency a bit more? Does it include the
time to write to the internal topic?

Jun

On Mon, Apr 22, 2024 at 2:57 AM Andrew Schofield <andrew_schofi...@live.com>
wrote:

> Hi Jun,
> Thanks for your comments.
>
> 120. Thanks. Fixed.
>
> 121. ShareUpdateValue.SnapshotEpoch indicates which snapshot
> the update applies to. It should of course be the snapshot that precedes
> it in the log. It’s just there to provide a consistency check.
>
> I also noticed that ShareSnapshotValue was missing StateEpoch. It
> isn’t any more.
>
> 122. In KIP-848, ConsumerGroupMemberMetadataValue includes
> GroupEpoch, but in the code it does not. In fact, there is considerable
> divergence between the KIP and the code for this record value schema
> which I expect will be resolved when the migration code has been
> completed.
>
> 123. The share group does not persist the target assignment.
>
> 124. Share groups have three kinds of record:
> i) ShareGroupMetadata
>   - this contains the group epoch and is written whenever the group
>     epoch changes.
> ii) ShareGroupMemberMetadata
>    - this does not contain the group epoch.
> iii) ShareGroupPartitionMetadata
>    - this currently contains the epoch, but I think that is unnecessary.
>      For one thing, the ConsumerGroupPartitionMetadata definition
>      contains the group epoch, but the value appears never to be set.
>      David Jacot confirms that it’s not necessary and is removing it.
>
> I have removed the Epoch from ShareGroupPartitionMetadata.
> The only purpose of the persisting the epoch for a share group is so that
> when a group coordinator takes over the share group, it is able to
> continue the sequence of epochs. ShareGroupMetadataValue.Epoch
> is used for this.
>
> 125. The group epoch will be incremented in this case and
> consequently a ShareGroupMetadata will be written. KIP updated.
>
> 126. Not directly. A share group can only be deleted when it has no
> members, so the tombstones for ShareGroupMemberMetadata will
> have been written when the members left. I have clarified this.
>
> 127. The share coordinator is ignorant of the group epoch. When the
> group coordinator is initializing the share-group state the first time that
> a share-partition is being added to an assignment in the group, the
> group epoch is used as the state epoch. But as the group epoch
> increases over time, the share coordinator is entirely unaware.
>
> When the first consumer for a share-partition fetches records from a
> share-partition leader, the SPL calls the share coordinator to
> ReadShareGroupState. If the SPL has previously read the information
> and again it’s going from 0 to 1 consumer, it confirms it's up to date by
> calling ReadShareGroupOffsetsState.
>
> Even if many consumers are joining at the same time, any share-partition
> which is being initialized will not be included in their assignments. Once
> the initialization is complete, the next rebalance will assign the
> partition
> to some consumers which will discover this by ShareGroupHeartbeat
> response. And then, the fetching begins.
>
> If an SPL receives a ShareFetch request before it’s read the state
> from the SC, it can make the ShareFetch request wait up to MaxWaitMs
> and then it can return an empty set of records if it’s still not ready.
>
> So, I don’t believe there will be too much load. If a topic with many
> partitions is added to the subscribed topics for a share group, the fact
> that the assignments will only start to include the partitions as their
> initialization completes should soften the impact.
>
> 128, 129: The “proper” way to turn on this feature when it’s finished will
> be using `group.coordinator.rebalance.protocols` and `group.version`.
> While it’s in Early Access and for test cases, the `group.share.enable`
> configuration will turn it on.
>
> I have described `group.share.enable` as an internal configuration in
> the KIP.
>
> 130. The config `group.share.record.lock.duration.ms` applies to groups
> which do not specify a group-level configuration for lock duration. The
> minimum and maximum for this configuration are intended to give it
> sensible bounds.
>
> If a group does specify its own `group.share.record.lock.duration.ms`,
> the broker-level `group.share.max.record.lock.duration.ms` gives the
> cluster administrator a way of setting a maximum value for all groups.
>
> While editing, I renamed `group.share.record.lock.duration.max.ms` to
> `group.share.max.record.lock.duration.ms` for consistency with the
> rest of the min/max configurations.
>
> 131. This is the limit per partition so you can go wider with multiple
> partitions.
> I have set the initial value low for safety. I expect to be able to
> increase this
> significantly when we have mature code which has been battle-tested.
> Rather than try to guess how high it can safely go, I’ve erred on the side
> of
> caution and expect to open it up in a future KIP.
>
> 132. Good catch. The problem is that I have missed two group
> configurations,
> now added. These are group.share.session.timeout.ms and
> group.share.heartbeat.timeout.ms . The configurations you mentioned
> are the bounds for the group-level configurations.
>
> 133. The name `group.share.max.size` was chosen to mirror the existing
> `group.consumer.max.size`.
>
> 134. It is intended to be a list of all of the valid assignors for the
> cluster.
> When the assignors are configurable at the group level, the group
> configuration
> will only be permitted to name an assignor which is in this list. For now,
> there
> is no group configuration for assignor, so all groups get the one and only
> assignor in the list.
>
> 135. It’s the number of threads per broker. For a cluster with a small
> number of
> of brokers and a lot of share group activity, it may be appropriate to
> increase
> this. We will be able to give tuning advice once we have experience of the
> performance impact of increasing it.
>
> Thanks,
> Andrew
>
>
> > On 20 Apr 2024, at 00:14, Jun Rao <j...@confluent.io.INVALID> wrote:
> >
> > Hi, Andrew,
> >
> > Thanks for the reply. A few more comments.
> >
> > 120. There is still reference to ConsumerGroupMetadataKey.
> >
> > 121. ShareUpdateValue.SnapshotEpoch: Should we change it since it's not a
> > snapshot?
> >
> > 122. ConsumerGroupMemberMetadataValue includes epoch, but
> > ShareGroupMemberMetadataValue does not. Do we know how the epoch is being
> > used in the consumer group and whether it's needed in the share group?
> >
> > 123. There is no equivalent of ConsumerGroupTargetAssignmentMember for
> > ShareGroup. How does the shareGroup persist the member assignment?
> >
> > 124. Assign a share-partition: "When a topic-partition is assigned to a
> > member of a share group for the first time, the group coordinator writes
> a
> > ShareGroupPartitionMetadata record to the __consumer_offsets  topic".
> >  When does the coordinator write ShareGroupMetadata with the bumped
> epoch?
> > In general, is there a particular order to bump up the epoch in different
> > records? When can the new epoch be exposed to the sharePartitionLeader?
> It
> > would be useful to make this clear in other state changing operations
> too.
> >
> > 125. "Alter share group offsets Group coordinator and share coordinator
> > Only empty share groups support this operation. The group coordinator
> sends
> > an InitializeShareGroupState  request to the share coordinator. The share
> > coordinator writes a ShareSnapshot record with the new state epoch to the
> > __share_group_state  topic."
> >  Does the operation need to write ShareGroupPartitionMetadata and
> > ShareGroupMetadata (for the new epoch)?
> >
> > 126. Delete share group: Does this operation also need to write a
> tombstone
> > for the ShareGroupMemberMetadata record?
> >
> > 127. I was thinking about the impact on a new consumer joining the
> > shareGroup. This causes the GroupCoordinator and the ShareCoordinator to
> > bump up the group epoch, which in turn causes the SharePartition leader
> to
> > reinitialize the state with ReadShareGroupOffsetsState. If many consumers
> > are joining a shareGroup around the same time, would there be too much
> load
> > for the ShareCoordinator and SharePartition leader?
> >
> > 128. Since group.share.enable will be replaced by the group.version
> > feature, should we make it an internal config?
> >
> > 129. group.coordinator.rebalance.protocols: this seems redundant with
> > group.share.enable?
> >
> > 130. Why do we have both group.share.record.lock.duration.ms and
> > group.share.record.lock.duration.max.ms, each with its own max value?
> >
> > 131. group.share.record.lock.partition.limit defaults to 200. This limits
> > the max degree of consumer parallelism to 200, right? If there are
> multiple
> > records per batch, it could be even smaller.
> >
> > 132. Why do we need all three of group.share.session.timeout.ms,
> > group.share.min.session.timeout.ms and
> group.share.max.session.timeout.ms?
> > Session timeout can't be set by the client. Ditto for
> > group.share.heartbeat.interval.ms.
> >
> > 133. group.share.max.size: Would group.share.max.members.per.group be a
> > more intuitive name?
> >
> > 134. group.share.assignors: Why does it need to be a list?
> >
> > 135. share.coordinator.threads: Is that per share coordinator or per
> broker?
> >
> > Jun
> >
> > On Tue, Apr 16, 2024 at 3:21 AM Andrew Schofield <
> andrew_schofi...@live.com>
> > wrote:
> >
> >> Hi Jun,
> >> Thanks for you reply.
> >>
> >> 42.1. That’s a sensible improvement. Done.
> >>
> >> 47,56. Done. All instances of BaseOffset changed to FirstOffset.
> >>
> >> 105. I think that would be in a future KIP. Personally, I don’t mind
> having
> >> a non-contiguous set of values in this KIP.
> >>
> >> 114. Done.
> >>
> >> 115. If the poll is just returning a single record because there is not
> >> much
> >> data to consume, committing on every record is OK. It’s inefficient but
> >> acceptable.
> >> If the first poll returns just one record, but many more have piled up
> >> while
> >> the first one was being processed, the next poll has the opportunity to
> >> return
> >> a bunch of records and then these will be able to be committed together.
> >> So, my answer is that optimisation on the broker to return batches of
> >> records when the records are available is the approach we will take
> here.
> >>
> >> 116. Good idea. Done.
> >>
> >> 117. I’ve rewritten the Kafka Broker Migration section. Let me know what
> >> you think.
> >> I am discussing the configuration to enable the feature in the mailing
> >> list with
> >> David Jacot also, so I anticipate a bit of change in this area still.
> >>
> >> Thanks,
> >> Andrew
> >>
> >>> On 15 Apr 2024, at 23:34, Jun Rao <j...@confluent.io.INVALID> wrote:
> >>>
> >>> Hi, Andrew,
> >>>
> >>> Thanks for the updated KIP.
> >>>
> >>> 42.1 "If the share group offset is altered multiple times when the
> group
> >>> remains empty, it would be harmless if the same state epoch was reused
> to
> >>> initialize the state."
> >>> Hmm, how does the partition leader know for sure that it has received
> the
> >>> latest share group offset if epoch is reused?
> >>> Could we update the section "Group epoch - Trigger a rebalance" that
> >>> AdminClient.alterShareGroupOffsets causes the group epoch to be bumped
> >> too?
> >>>
> >>> 47,56 "my view is that BaseOffset should become FirstOffset in ALL
> >> schemas
> >>> defined in the KIP."
> >>> Yes, that seems better to me.
> >>>
> >>> 105. "I have another non-terminal state in mind for 3."
> >>> Should we document it?
> >>>
> >>> 114. session.timeout.ms in the consumer configuration is deprecated in
> >>> KIP-848. So, we need to remove it from the shareConsumer configuration.
> >>>
> >>> 115. I am wondering if it's a good idea to always commit acks on
> >>> ShareConsumer.poll(). In the extreme case, each batch may only contain
> a
> >>> single record and each poll() only returns a single batch. This will
> >> cause
> >>> each record to be committed individually. Is there a way for a user to
> >>> optimize this?
> >>>
> >>> 116. For each new RPC, could we list the associated acls?
> >>>
> >>> 117. Since this KIP changes internal records and RPCs, it would be
> useful
> >>> to document the upgrade process.
> >>>
> >>> Jun
> >>>
> >>> On Wed, Apr 10, 2024 at 7:35 AM Andrew Schofield <
> >> andrew_schofi...@live.com>
> >>> wrote:
> >>>
> >>>> Hi Jun,
> >>>> Thanks for your questions.
> >>>>
> >>>> 41.
> >>>> 41.1. The partition leader obtains the state epoch in the response
> from
> >>>> ReadShareGroupState. When it becomes a share-partition leader,
> >>>> it reads the share-group state and one of the things it learns is the
> >>>> current state epoch. Then it uses the state epoch in all subsequent
> >>>> calls to WriteShareGroupState. The fencing is to prevent writes for
> >>>> a previous state epoch, which are very unlikely but which would mean
> >>>> that a leader was using an out-of-date epoch and was likely no longer
> >>>> the current leader at all, perhaps due to a long pause for some
> reason.
> >>>>
> >>>> 41.2. If the group coordinator were to set the SPSO, wouldn’t it need
> >>>> to discover the initial offset? I’m trying to avoid yet another
> >>>> inter-broker
> >>>> hop.
> >>>>
> >>>> 42.
> >>>> 42.1. I think I’ve confused things. When the share group offset is
> >> altered
> >>>> using AdminClient.alterShareGroupOffsets, the group coordinator WILL
> >>>> update the state epoch. I don’t think it needs to update the group
> epoch
> >>>> at the same time (although it could) because the group epoch will have
> >>>> been bumped when the group became empty. If the share group offset
> >>>> is altered multiple times when the group remains empty, it would be
> >>>> harmless if the same state epoch was reused to initialize the state.
> >>>>
> >>>> When the share-partition leader updates the SPSO as a result of
> >>>> the usual flow of record delivery, it does not update the state epoch.
> >>>>
> >>>> 42.2. The share-partition leader will notice the alteration because,
> >>>> when it issues WriteShareGroupState, the response will contain the
> >>>> error code FENCED_STATE_EPOCH. This is supposed to be the
> >>>> last-resort way of catching this.
> >>>>
> >>>> When the share-partition leader handles its first ShareFetch request,
> >>>> it learns the state epoch from the response to ReadShareGroupState.
> >>>>
> >>>> In normal running, the state epoch will remain constant, but, when
> there
> >>>> are no consumers and the group is empty, it might change. As a result,
> >>>> I think it would be sensible when the set of share sessions
> transitions
> >>>> from 0 to 1, which is a reasonable proxy for the share group
> >> transitioning
> >>>> from empty to non-empty, for the share-partition leader to issue
> >>>> ReadShareGroupOffsetsState to validate the state epoch. If its state
> >>>> epoch is out of date, it can then ReadShareGroupState to
> re-initialize.
> >>>>
> >>>> I’ve changed the KIP accordingly.
> >>>>
> >>>> 47, 56. If I am to change BaseOffset to FirstOffset, we need to have
> >>>> a clear view of which is the correct term. Having reviewed all of the
> >>>> instances, my view is that BaseOffset should become FirstOffset in
> >>>> ALL schemas defined in the KIP. Then, BaseOffset is just used in
> >>>> record batches, which is already a known concept.
> >>>>
> >>>> Please let me know if you agree.
> >>>>
> >>>> 60. I’ve added FindCoordinator to the top level index for protocol
> >> changes.
> >>>>
> >>>> 61. OK. I expect you are correct about how users will be using the
> >>>> console share consumer. When I use the console consumer, I always get
> >>>> a new consumer group. I have changed the default group ID for console
> >>>> share consumer to “console-share-consumer” to match the console
> consumer
> >>>> better and give more of an idea where this mysterious group has come
> >> from.
> >>>>
> >>>> 77. I will work on a proposal that does not use compaction and we can
> >>>> make a judgement about whether it’s a better course for KIP-932.
> >>>> Personally,
> >>>> until I’ve written it down and lived with the ideas for a few days, I
> >>>> won’t be
> >>>> able to choose which I prefer.
> >>>>
> >>>> I should be able to get the proposal written by the end of this week.
> >>>>
> >>>> 100. ShareGroupHeartbeatRequest.RebalanceTimeoutMs matches
> >>>> ConsumerGroupHeartbeatRequest.RebalanceTimeoutMs from KIP-848.
> >>>> I prefer to maintain the consistency.
> >>>>
> >>>> 101. Thanks for catching this. The ShareGroupHeartbeatResponse was
> >>>> originally
> >>>> created from KIP-848. This part of the schema does not apply and I
> have
> >>>> removed
> >>>> it. I have also renamed AssignedTopicPartitions to simply
> >> TopicPartitions
> >>>> which
> >>>> aligns with the actual definition of ConsumerGroupHeartbeatResponse.
> >>>>
> >>>> 102. No, I don’t think we do. Removed.
> >>>>
> >>>> 103. I’ve changed the description for the error codes for
> >>>> ShareFetchResponse.
> >>>>
> >>>> 104. Interesting. I have added ErrorMessages to these RPCs as you
> >> suggest.
> >>>> It’s a good improvement for problem determination.
> >>>>
> >>>> 105. The values are reserved in my brain. Actually, 1 is Acquired
> which
> >>>> is not persisted, and I have another non-terminal state in mind for 3.
> >>>>
> >>>> 106. A few people have raised the question of whether
> OffsetAndMetadata
> >>>> is sensible in this KIP, given that the optional Metadata part comes
> >> from
> >>>> when
> >>>> a regular consumer commits offsets. However, it is correct that there
> >> will
> >>>> never be metadata with a share group. I have changed
> >>>> the KIP to replace OffsetAndMetadata with Long.
> >>>>
> >>>> 107. Yes, you are right. I have learnt during this process that a
> >> version
> >>>> bump
> >>>> can be a logical not just a physical change to the schema. KIP
> updated.
> >>>>
> >>>> 108. I would prefer not to extend this RPC for all of the states at
> this
> >>>> point.
> >>>> I think there is definitely scope for another KIP focused on
> >> administration
> >>>> of share groups that might want this information so someone could
> build
> >> a
> >>>> UI and other tools on top. Doing that well is quite a lot of work in
> its
> >>>> own right
> >>>> so I would prefer not to do that now.
> >>>>
> >>>> 109.Yes, they’re inconsistent and follow the consumer groups
> equivalents
> >>>> which are also inconsistent. In general, the KafkaAdmin.delete….
> Methods
> >>>> use the Map<XXX, KafkaFuture<YYY>> pattern like
> DeleteShareGroupsResult.
> >>>>
> >>>> Would you prefer that I do that, or remain consistent with consumer
> >> groups?
> >>>> Happy to change it.
> >>>>
> >>>> 110. Again, consumer groups don’t yield that level of detail about
> >> epochs.
> >>>> The MemberDescription does include the assignment, but not the list of
> >>>> subscribed topic names.
> >>>>
> >>>> 111. I didn’t include GroupState in GroupListing because there’s no
> >>>> single class that includes the states of all group types.
> >>>>
> >>>> 112. I think it’s good practice for the API to have
> >>>> ListShareGroupOffsetSpec.
> >>>> It makes evolution and extension of the API much easier. Also, it
> >> matches
> >>>> the precedent set by ListConsumerGroupOffsetSpec.
> >>>>
> >>>> 113. ListConsumerGroupsResults.errors() is the same. I think you just
> >> have
> >>>> to look in the exception details and the same pattern is being
> followed
> >>>> here.
> >>>>
> >>>>
> >>>> Over the next few days, I have committed to writing a proposal for how
> >> to
> >>>> persist
> >>>> share-group state that doesn’t use log compaction.
> >>>>
> >>>> I am also aware that discussion with Justine Olshan on read-committed
> >>>> isolation level is not yet quite complete.
> >>>>
> >>>> Thanks for the detailed review.
> >>>>
> >>>> Andrew
> >>>>
> >>>>> On 9 Apr 2024, at 23:58, Jun Rao <j...@confluent.io.INVALID> wrote:
> >>>>>
> >>>>> Hi, Andrew,
> >>>>>
> >>>>> Thanks for the reply. A few more comments.
> >>>>>
> >>>>> 41.
> >>>>> 41.1 How does the partition leader obtain the group epoch to set
> >>>>> WriteShareGroupStateRequest.StateEpoch?
> >>>>> 41.2 What's the benefit of having the group coordinator initialize
> the
> >>>>> state and the partition leader set the SPSO? It seems simpler to have
> >> the
> >>>>> partition leader initialize both the state and the SPSO together?
> >>>>>
> >>>>> 42.
> >>>>> 42.1 "I don’t think the group epoch needs to be bumped when the share
> >>>> group
> >>>>> offset is altered."
> >>>>> But the part on handling Alter share group offsets says "The share
> >>>>> coordinator writes a ShareCheckpoint record with the new state epoch
> to
> >>>> the
> >>>>> __share_group_state  topic." So, which is correct? We have two paths
> to
> >>>>> update the state in the share coordinator, one from the group
> >> coordinator
> >>>>> and another from the partition leader. I thought the benefit of
> bumping
> >>>> up
> >>>>> the epoch is to fence off a late request in the previous epoch from
> >>>> another
> >>>>> path.
> >>>>> 42.2 When the group coordinator alters the share group offset in
> share
> >>>>> coordinator, how does the partition leader know the share group state
> >> has
> >>>>> been altered so that it could clear its in-memory state?
> >>>>>
> >>>>> 47. 56. BaseOffset typically refers to the base offset for the batch
> >> and
> >>>>> can be confusing. FirstOffset is clearer and matches LastOffset.
> >>>>>
> >>>>> 60. Could we include FindCoordinatorRequest in the top level index
> for
> >>>>> Kafka protocol changes?
> >>>>>
> >>>>> 61. I think it's probably ok to add time-based expiration later. But
> >>>> using
> >>>>> a default group in console-share-consumer probably won't help reduce
> >> the
> >>>>> garbage. In the common case, the user of the console consumer likely
> >>>> wants
> >>>>> to see the recently produced records for verification. If the default
> >>>> group
> >>>>> doesn't provide that (because of the stale state), the user will
> likely
> >>>>> just use a new group. It's true that we don't garbage collect idle
> >>>> topics.
> >>>>> However,  the share groups are similar to consumers, which does
> support
> >>>>> garbage collection. Typically, we read topics more than creating
> them.
> >>>>>
> >>>>> 77. If the generic compaction is inconvenient, we could use
> customized
> >>>>> logic. If we go with that route, option (b) seems cleaner and more
> >>>>> optimized. Since the share states for all groups fit in memory, we
> >> could
> >>>>> generate snapshots more efficiently than going through compaction.
> >>>> Having a
> >>>>> separate log per share partition is probably too much overhead. It's
> >> more
> >>>>> efficient to put the state changes for multiple share partitions in a
> >>>>> single log.
> >>>>>
> >>>>> 100. ShareGroupHeartbeatRequest.RebalanceTimeoutMs: Should we name it
> >>>>> SessionTimeoutMs?
> >>>>>
> >>>>> 101. ShareGroupHeartbeatResponse.Assignment.Error: What kind of error
> >>>> could
> >>>>> we have when assigning partitions? What are the corresponding error
> >>>> codes?
> >>>>>
> >>>>> 102. Do we still need
> >>>>>
> >>>>
> >>
> ShareGroupDescribeResponse.Members.Assignment.{MetadataVersion,MetadataBytes}?
> >>>>>
> >>>>> 103. Could we list the error codes separately for
> >>>>> ShareFetchResponse.Responses.Partitions.ErrorCode and
> >>>>> ShareFetchResponse.Responses.Partitions.AcknowledgeErrorCode?
> >>>>>
> >>>>> 104. Should we add error message for the errorCode in
> >> ShareFetchResponse,
> >>>>> ShareAcknowledgeResponse, ReadShareGroupStateResponse,
> >>>>> WriteShareGroupStateResponse, DeleteShareGroupStateResponse,
> >>>>> ReadShareGroupOffsetsStateResponse and
> >> InitializeShareGroupStateResponse?
> >>>>>
> >>>>> 105. "about": "The state - 0:Available,2:Acked,4:Archived.": What
> >> about 1
> >>>>> and 3? Are we leaving them out intentionally?
> >>>>>
> >>>>> 106. Do we have usage of metadata in OffsetAndMetadata? If not, could
> >> we
> >>>>> remove it from AdminClient and KafkaShareConsumer?
> >>>>>
> >>>>> 107. ListGroupsRequest: Should we bump up the version since it now
> >>>> supports
> >>>>> a new group type "share"?
> >>>>>
> >>>>> 108. AdminClient.listShareGroupOffsets: Should it expose all the
> states
> >>>>> from ReadShareGroupStateResponse, instead of just SPSO?
> >>>>>
> >>>>> 109. DeleteShareGroupOffsetsResult exposes
> >>>>> public KafkaFuture<Void> partitionResult(final TopicPartition
> >> partition)
> >>>>> DeleteShareGroupsResult exposes
> >>>>> public Map<String, KafkaFuture<Void>> deletedGroups()
> >>>>> Should we make them more consistent?
> >>>>>
> >>>>> 110. Should ShareGroupDescription include fields like GroupEpoch,
> >>>>> AssignmentEpoch, MemberEpoch, and SubscribedTopicNames?
> >>>>>
> >>>>> 111. Should GroupListing include GroupState?
> >>>>>
> >>>>> 112. Do we need ListShareGroupOffsetsSpec? Could we just use
> >>>>> Set<TopicPartition> directly?
> >>>>>
> >>>>> 113. ListShareGroupsResult.errors(): How do we know which group has
> an
> >>>>> error?
> >>>>>
> >>>>> Jun
> >>>>>
> >>>>> On Mon, Apr 8, 2024 at 9:32 AM Andrew Schofield <
> >>>>> andrew_schofield_j...@outlook.com> wrote:
> >>>>>
> >>>>>> Hi David,
> >>>>>> Thanks for your questions.
> >>>>>>
> >>>>>> 70. The Group Coordinator communicates with the Share Coordinator
> over
> >>>>>> RPCs.
> >>>>>> In the general case, it’s an inter-broker call. It is probably
> >> possible
> >>>> to
> >>>>>> optimise
> >>>>>> for the situation in which the appropriate GC and SC shards are
> >>>>>> co-located, but the
> >>>>>> KIP does not delve that deep into potential performance
> optimisations.
> >>>>>>
> >>>>>> 71. Avoiding collisions would be a good idea, but I do worry about
> >>>>>> retrospectively
> >>>>>> introducing a naming convention for groups. I feel that naming
> >>>> conventions
> >>>>>> will
> >>>>>> typically be the responsibility of the cluster administrators based
> on
> >>>>>> organizational
> >>>>>> factors, such as the name of an application.
> >>>>>>
> >>>>>> 72. Personally, I don’t like INVALID_GROUP_ID because the group ID
> is
> >>>>>> correct but
> >>>>>> the group is the wrong type. The nearest existing error code that
> gets
> >>>>>> that across
> >>>>>> is INCONSISTENT_GROUP_PROTOCOL. Perhaps this is really showing that
> a
> >>>> new
> >>>>>> error code would be better.
> >>>>>>
> >>>>>> 73. The Metadata fields are not used. I have removed them.
> >>>>>>
> >>>>>> 74. The metadata is re-evaluated on every change, but only a subset
> is
> >>>>>> relevant
> >>>>>> for rebalancing. A check is done against the names of the subscribed
> >>>>>> topics to
> >>>>>> see if any relevant changes may have occurred. Then the changes
> which
> >>>>>> trigger
> >>>>>> a rebalance are topic creation, deletion, change in partitions, or
> >> rack
> >>>>>> IDs for the
> >>>>>> replicas. I have updated the KIP to make this more clear.
> >>>>>>
> >>>>>> 75. The assignment is not persisted because it is much less
> important
> >>>> that
> >>>>>> the
> >>>>>> assignment survives a GC change. There’s no need to transfer
> >> partitions
> >>>>>> safely from
> >>>>>> member to member in the way that is required for consumer groups, so
> >> as
> >>>> an
> >>>>>> optimisation, the assignments for a share group are not persisted.
> It
> >>>>>> wouldn’t do any
> >>>>>> harm, but it just seems unnecessary.
> >>>>>>
> >>>>>> 76. In the event that a consumer tries to acknowledge a record that
> it
> >>>> now
> >>>>>> longer
> >>>>>> has the right to acknowledge, the INVALID_RECORD_STATE error code is
> >>>> used.
> >>>>>>
> >>>>>> If the application uses the KafkaShareConsumer.commitSync method, it
> >>>> will
> >>>>>> see an InvalidRecordState exception returned. Alternatively, the
> >>>>>> application can
> >>>>>> register an acknowledgement commit callback which will be called
> with
> >>>> the
> >>>>>> status
> >>>>>> of the acknowledgements that have succeeded or failed.
> >>>>>>
> >>>>>> 77. I have tried to tread a careful path with the durable
> >>>> share-partition
> >>>>>> state in this
> >>>>>> KIP. The significant choices I made are that:
> >>>>>> * Topics are used so that the state is replicated between brokers.
> >>>>>> * Log compaction is used to keep a lid on the storage.
> >>>>>> * Only one topic is required.
> >>>>>>
> >>>>>> Log compaction as it stands is not ideal for this kind of data, as
> >>>>>> evidenced by
> >>>>>> the DeltaIndex technique I employed.
> >>>>>>
> >>>>>> I can think of a few relatively simple ways to improve upon it.
> >>>>>>
> >>>>>> a) We could use a customised version of the log compactor for this
> >> topic
> >>>>>> that
> >>>>>> understands the rules for ShareCheckpoint and ShareDelta records.
> >>>>>> Essentially,
> >>>>>> for each share-partition, the latest ShareCheckpoint and any
> >> subsequent
> >>>>>> ShareDelta
> >>>>>> records must not be cleaned. Anything else can be cleaned. We could
> >> then
> >>>>>> be sure
> >>>>>> that multiple ShareDelta records with the same key would survive
> >>>> cleaning
> >>>>>> and we could
> >>>>>> abandon the DeltaIndex technique.
> >>>>>>
> >>>>>> b) Actually what is required is a log per share-partition. Let’s
> >> imagine
> >>>>>> that we had
> >>>>>> a share-state topic per topic being consumed in a share group, with
> >> the
> >>>>>> same number
> >>>>>> of partitions as the topic being consumed. We could write many more
> >>>> deltas
> >>>>>> between
> >>>>>> checkpoints, and just take periodic checkpoints to keep control of
> the
> >>>>>> storage used.
> >>>>>> Once a checkpoint has been taken, we could use
> >>>> KafkaAdmin.deleteRecords()
> >>>>>> to
> >>>>>> prune all of the older records.
> >>>>>>
> >>>>>> The share-state topics would be more numerous, but we are talking
> one
> >>>> per
> >>>>>> topic
> >>>>>> per share group that it’s being consumed in. These topics would not
> be
> >>>>>> compacted.
> >>>>>>
> >>>>>> As you’ll see in the KIP, the Persister interface is intended to be
> >>>>>> pluggable one day.
> >>>>>> I know the scheme in the KIP is not ideal. It seems likely to me
> that
> >>>>>> future KIPs will
> >>>>>> improve upon it.
> >>>>>>
> >>>>>> If I can get buy-in for option (b), I’m happy to change this KIP.
> >> While
> >>>>>> option (a) is
> >>>>>> probably workable, it does seem a bit of a hack to have a customised
> >> log
> >>>>>> compactor
> >>>>>> just for this topic.
> >>>>>>
> >>>>>> 78. How about DeliveryState? I agree that State is overloaded.
> >>>>>>
> >>>>>> 79. See (77).
> >>>>>>
> >>>>>> Thanks,
> >>>>>> Andrew
> >>>>>>
> >>>>>>
> >>>>>>> On 5 Apr 2024, at 05:07, David Arthur <david.art...@confluent.io
> >>>> .INVALID>
> >>>>>> wrote:
> >>>>>>>
> >>>>>>> Andrew, thanks for the KIP! This is a pretty exciting effort.
> >>>>>>>
> >>>>>>> I've finally made it through the KIP, still trying to grok the
> whole
> >>>>>> thing.
> >>>>>>> Sorry if some of my questions are basic :)
> >>>>>>>
> >>>>>>>
> >>>>>>> Concepts:
> >>>>>>>
> >>>>>>> 70. Does the Group Coordinator communicate with the Share
> Coordinator
> >>>>>> over
> >>>>>>> RPC or directly in-process?
> >>>>>>>
> >>>>>>> 71. For preventing name collisions with regular consumer groups,
> >> could
> >>>> we
> >>>>>>> define a reserved share group prefix? E.g., the operator defines
> >> "sg_"
> >>>>>> as a
> >>>>>>> prefix for share groups only, and if a regular consumer group tries
> >> to
> >>>>>> use
> >>>>>>> that name it fails.
> >>>>>>>
> >>>>>>> 72. When a consumer tries to use a share group, or a share consumer
> >>>> tries
> >>>>>>> to use a regular group, would INVALID_GROUP_ID make more sense
> >>>>>>> than INCONSISTENT_GROUP_PROTOCOL?
> >>>>>>>
> >>>>>>> --------
> >>>>>>>
> >>>>>>> Share Group Membership:
> >>>>>>>
> >>>>>>> 73. What goes in the Metadata field for TargetAssignment#Member and
> >>>>>>> Assignment?
> >>>>>>>
> >>>>>>> 74. Under Trigger a rebalance, it says we rebalance when the
> >> partition
> >>>>>>> metadata changes. Would this be for any change, or just certain
> ones?
> >>>> For
> >>>>>>> example, if a follower drops out of the ISR and comes back, we
> >> probably
> >>>>>>> don't need to rebalance.
> >>>>>>>
> >>>>>>> 75. "For a share group, the group coordinator does *not* persist
> the
> >>>>>>> assignment" Can you explain why this is not needed?
> >>>>>>>
> >>>>>>> 76. " If the consumer just failed to heartbeat due to a temporary
> >>>> pause,
> >>>>>> it
> >>>>>>> could in theory continue to fetch and acknowledge records. When it
> >>>>>> finally
> >>>>>>> sends a heartbeat and realises it’s been kicked out of the group,
> it
> >>>>>> should
> >>>>>>> stop fetching records because its assignment has been revoked, and
> >>>> rejoin
> >>>>>>> the group."
> >>>>>>>
> >>>>>>> A consumer with a long pause might still deliver some buffered
> >> records,
> >>>>>> but
> >>>>>>> if the share group coordinator has expired its session, it wouldn't
> >>>>>> accept
> >>>>>>> acknowledgments for that share consumer. In such a case, is any
> kind
> >> of
> >>>>>>> error raised to the application like "hey, I know we gave you these
> >>>>>>> records, but really we shouldn't have" ?
> >>>>>>>
> >>>>>>>
> >>>>>>> -----
> >>>>>>>
> >>>>>>> Record Delivery and acknowledgement
> >>>>>>>
> >>>>>>> 77. If we guarantee that a ShareCheckpoint is written at least
> every
> >> so
> >>>>>>> often, could we add a new log compactor that avoids compacting
> >>>>>> ShareDelta-s
> >>>>>>> that are still "active" (i.e., not yet superceded by a new
> >>>>>>> ShareCheckpoint). Mechnically, this could be done by keeping the
> LSO
> >> no
> >>>>>>> greater than the oldest "active" ShareCheckpoint. This might let us
> >>>>>> remove
> >>>>>>> the DeltaIndex thing.
> >>>>>>>
> >>>>>>> 78. Instead of the State in the ShareDelta/Checkpoint records, how
> >>>> about
> >>>>>>> MessageState? (State is kind of overloaded/ambiguous)
> >>>>>>>
> >>>>>>> 79. One possible limitation with the current persistence model is
> >> that
> >>>>>> all
> >>>>>>> the share state is stored in one topic. It seems like we are going
> to
> >>>> be
> >>>>>>> storing a lot more state than we do in __consumer_offsets since
> we're
> >>>>>>> dealing with message-level acks. With aggressive checkpointing and
> >>>>>>> compaction, we can mitigate the storage requirements, but the
> >>>> throughput
> >>>>>>> could be a limiting factor. Have we considered other possibilities
> >> for
> >>>>>>> persistence?
> >>>>>>>
> >>>>>>>
> >>>>>>> Cheers,
> >>>>>>> David
> >>>>>>
> >>>>>>
> >>>>
> >>>>
> >>
> >>
>
>

Reply via email to