Hi Jun,
Thanks for your reply.

123. The GC only sends the assignment in ShareGroupHeartbeat
response if the member has just joined, or the assignment has been
recalculated. This latter condition is met when the GC fails over.

With a consumer group, it works a little differently. The heartbeating
occurs asynchronously with the reconciliation process in the client.
The client has reconciliation callbacks, as well as offsets to commit
before it can confirm that revocation has occured. So, it might take
multiple heartbeats to complete the installation of a new target
assignment in the client.

With a share group, it’s more brutal. The GC sends the assignment
in a heartbeat response, and the client is assumed to have acted on
It immediately. Since share group assignment is really about balancing
consumers across the available partitions, rather than safely handing
ownership of partitions between consumers, there is less coordination.

I wonder whether there is one situation that does need considering.
If the network connection between the client and the GC is lost, it is
possible that a response containing the assignment is lost. Then,
the connection will be reestablished, and the assignment will only be
sent when it’s recalculated. Adding the equivalent of
ConsumerGroupHeartbeatRequest.TopicPartitions would be one
way to close that.

125. Yes, you are right. The reconciliation that I describe in the
answer to (54) below is related. If AlterShareGroupOffsets is called
for a topic-partition which is not yet in ShareGroupPartitionMetadataValue,
it’s not really part of the group yet. So I think it’s permitted to fail
the AlterShareGroupOffsets because the topic-partition would not be
part of ListShareGroupOffsets result.

If AlterShareGroupOffsets is called for a topic-partition which is in
ShareGroupPartitionMetadataValue, then just calling
InitializeShareGroupState to set its offset would be sufficient without
writing ShareGroupPartitionMetadata again.

138. Added.

139. This week, I have been experimenting with using the existing
consumer metrics with share consumer code. That was my plan to get
started with metrics. While they work to some extent, I am not entirely
happy with the result.

I have added a basic set of share consumer-specific metrics to KIP-932
which I think is a step in the right direction. I anticipate a future KIP
that defines many more metrics and tackles concepts such as what
“lag” means for a share group. The metrics I’ve included are simply
counting and measuring the operations, which is a good start.

54. I think either with or without InitializingTopics/DeletingTopics in
ShareGroupPartitionMetadataValue works. However, I think there is
a deeper point behind what you said. The GC needs to be responsive
to topic creation and deletion, and addition of partitions in cases where
it doesn’t agree with InitializingTopics/DeletingTopics. So, we are
probably not saving anything by having those fields. As a result, I have
removed InitializingTopics and DeletingTopics and updated the KIP
accordingly. The GC needs to reconcile its view of the initialised
topic-partitions with ShareGroupPartitionMetadataValue and it will
initialize or delete share-group state accordingly.


I have made one more change to the KIP. The SharePartitionAssignor
interface has been renamed to
o.a.k.coordinator.group.assignor.ShareGroupPartitionAssignor and it
now extends the PartitionAssignor interface. It’s essentially a marker
of which partition assignors can be used with share groups.

Thanks,
Andrew

> On 23 Apr 2024, at 18:27, Jun Rao <j...@confluent.io.INVALID> wrote:
> 
> Hi, Andrew,
> 
> Thanks for the reply.
> 
> 123. "it doesn’t need to confirm the assignment back to the GC."
>  Hmm, I thought the member needs to confirm the assignment to GC to
> avoid GC including the assignment in the heartbeat response continuously. I
> assume this is done by including the new group epoch in the heartbeat
> response.
> 
> 125. It's possible that the share partition has never been initialized when
> AlterShareGroupOffsets is called. If GC doesn't write
> ShareGroupPartitionMetadata and the GC fails over, it would reinitialize
> the share partition and lose the effect of AlterShareGroupOffsets. If the
> partition has already been initialized and it's recorded
> in ShareGroupPartitionMetadata, it's possible not to write
> ShareGroupPartitionMetadata again when handling AlterShareGroupOffsets.
> 
> 138. Could we add the flow in GC when a topic is deleted?
> 
> 139. Do we need to add any metrics in KafkaShareConsumer?
> 
> 54. "I don’t think there is any redundancy. The ShareGroupMemberMetadata
> does include a list of subscribed topics. However, if there is a period of
> time in which no members are subscribed to a particular topic, it does not
> mean that the topic’s ShareGroupState should be immediately removed, but it
> does mean that there will be no ShareGroupMemberMetadata records containing
> that topic."
>  I am still trying to understand the value of InitializingTopics and
> DeletingTopics in ShareGroupPartitionMetadataValue. They are used to
> remember the intention of an operation. However, GC still needs to handle
> the case when the intention is not safely recorded. If GC wants to
> initialize a new topic/partition, a simpler approach is for it to send an
> InitializeShareGroupState to the share coordinator and after receiving a
> success response, write ShareGroupPartitionMetadataValue with the
> initialized partition included in InitializedTopics. This saves a record
> write. It's possible for GC to fail in between the two steps. On failover,
> the new GC just repeats the process. The case that you mentioned above can
> still be achieved. If a partition is in InitializedTopics of
> ShareGroupPartitionMetadataValue, but no member subscribes to it, we can
> still keep the ShareGroupState as long as the topic still exists. The same
> optimization could be applied to DeletingTopics too.
> 
> Jun
> 
> 
> On Tue, Apr 23, 2024 at 3:57 AM Andrew Schofield <andrew_schofi...@live.com>
> wrote:
> 
>> Hi Jun,
>> Thanks for the reply.
>> 
>> 123. Every time the GC fails over, it needs to recompute the assignment
>> for every member. However, the impact of re-assignment is not that onerous.
>> If the recomputed assignments are the same, which they may well be, there
>> is no impact on the members at all.
>> 
>> On receiving the new assignment, the member adjusts the topic-partitions
>> in its share sessions, removing those which were revoked and adding those
>> which were assigned. It is able to acknowledge the records it fetched from
>> the partitions which have just been revoked, and it doesn’t need to confirm
>> the assignment back to the GC.
>> 
>> 125. I don’t think the GC needs to write ShareGroupPartitionMetadata
>> when processing AlterShareGroupOffsets. This is because the operation
>> happens as a result of an explicit administrative action and it is possible
>> to return a specific error code for each topic-partition. The cases where
>> ShareGroupPartitionMetadata is used are when a topic is added or removed
>> from the subscribed topics, or the number of partitions changes.
>> 
>> 130. I suppose that limits the minimum lock timeout for a cluster to
>> prevent
>> a group from having an excessively low value. Config added.
>> 
>> 131. I have changed it to group.share.partition.max.record.locks.
>> 
>> 136.  When GC failover occurs, the GC gaining ownership of a partition of
>> the __consumer_offsets topic replays the records to build its state.
>> In the case of a share group, it learns:
>> 
>> * The share group and its group epoch (ShareGroupMetadata)
>> * The list of members (ShareGroupMemberMetadata)
>> * The list of share-partitions (ShareGroupPartitionMetadata)
>> 
>> It will recompute the assignments in order to respond to
>> ShareGroupHeartbeat requests. As a result, it bumps the group epoch.
>> 
>> I will update the KIP accordingly to confirm the behaviour.
>> 
>> 137.1: The GC and the SPL report the metrics in the
>> group-coordinator-metrics
>> group. Unlike consumer groups in which the GC performs offset commit,
>> the share group equivalent is performed by the SPL. So, I’ve grouped the
>> concepts which relate to the group in group-coordinator-metrics.
>> 
>> The SC reports the metrics in the share-coordinator-metrics group.
>> 
>> 137.2: There is one metric in both groups - partition-load-time. In the SC
>> group,
>> it refers to the time loading data from the share-group state topic so that
>> a ReadShareGroupState request can be answered. In the GC group,
>> it refers to the time to read the state from the persister. Apart from the
>> interbroker RPC latency of the read, they’re likely to be very close.
>> 
>> Later, for a cluster which is using a custom persister, the
>> share-coordinator
>> metrics would likely not be reported, and the persister would have its own
>> metrics.
>> 
>> 137.3: Correct. Fixed.
>> 
>> 137.4: Yes, it does include the time to write to the internal topic.
>> I’ve tweaked the description.
>> 
>> Thanks,
>> Andrew
>> 
>>> On 22 Apr 2024, at 20:04, Jun Rao <j...@confluent.io.INVALID> wrote:
>>> 
>>> Hi, Andrew,
>>> 
>>> Thanks for the reply.
>>> 
>>> 123. "The share group does not persist the target assignment."
>>> What's the impact of this? Everytime that GC fails over, it needs to
>>> recompute the assignment for every member. Do we expect the member
>>> assignment to change on every GC failover?
>>> 
>>> 125. Should the GC also write ShareGroupPartitionMetadata?
>>> 
>>> 127. So, group epoch is only propagated to SC when
>>> InitializeShareGroupState request is sent. This sounds good.
>>> 
>>> 130. Should we have a group.share.min.record.lock.duration.ms to pair
>> with
>>> group.share.max.record.lock.duration.ms?
>>> 
>>> 131. Sounds good. The name group.share.record.lock.partition.limit
>> doesn't
>>> seem very intuitive. How about something
>>> like group.share.partition.max.records.pending.ack?
>>> 
>>> 136. Could we describe the process of GC failover? I guess it needs to
>>> compute member reassignment and check if there is any new topic/partition
>>> matching the share subscription. Does it bump up the group epoch?
>>> 
>>> 137. Metrics:
>>> 137.1 It would be useful to document who reports each metric. Is it any
>>> broker, GC, SC or SPL?
>>> 137.2 partition-load-time: Is that the loading time at SPL or SC?
>>> 137.3 "The time taken in milliseconds to load the share-group state from
>>> the share-group state partitions loaded in the last 30 seconds."
>>> The window depends on metrics.num.samples and metrics.sample.window.ms
>>> and is not always 30 seconds, right?
>>> 137.4 Could you explain write/write-latency a bit more? Does it include
>> the
>>> time to write to the internal topic?
>>> 
>>> Jun
>>> 
>>> On Mon, Apr 22, 2024 at 2:57 AM Andrew Schofield <
>> andrew_schofi...@live.com>
>>> wrote:
>>> 
>>>> Hi Jun,
>>>> Thanks for your comments.
>>>> 
>>>> 120. Thanks. Fixed.
>>>> 
>>>> 121. ShareUpdateValue.SnapshotEpoch indicates which snapshot
>>>> the update applies to. It should of course be the snapshot that precedes
>>>> it in the log. It’s just there to provide a consistency check.
>>>> 
>>>> I also noticed that ShareSnapshotValue was missing StateEpoch. It
>>>> isn’t any more.
>>>> 
>>>> 122. In KIP-848, ConsumerGroupMemberMetadataValue includes
>>>> GroupEpoch, but in the code it does not. In fact, there is considerable
>>>> divergence between the KIP and the code for this record value schema
>>>> which I expect will be resolved when the migration code has been
>>>> completed.
>>>> 
>>>> 123. The share group does not persist the target assignment.
>>>> 
>>>> 124. Share groups have three kinds of record:
>>>> i) ShareGroupMetadata
>>>> - this contains the group epoch and is written whenever the group
>>>>   epoch changes.
>>>> ii) ShareGroupMemberMetadata
>>>>  - this does not contain the group epoch.
>>>> iii) ShareGroupPartitionMetadata
>>>>  - this currently contains the epoch, but I think that is unnecessary.
>>>>    For one thing, the ConsumerGroupPartitionMetadata definition
>>>>    contains the group epoch, but the value appears never to be set.
>>>>    David Jacot confirms that it’s not necessary and is removing it.
>>>> 
>>>> I have removed the Epoch from ShareGroupPartitionMetadata.
>>>> The only purpose of the persisting the epoch for a share group is so
>> that
>>>> when a group coordinator takes over the share group, it is able to
>>>> continue the sequence of epochs. ShareGroupMetadataValue.Epoch
>>>> is used for this.
>>>> 
>>>> 125. The group epoch will be incremented in this case and
>>>> consequently a ShareGroupMetadata will be written. KIP updated.
>>>> 
>>>> 126. Not directly. A share group can only be deleted when it has no
>>>> members, so the tombstones for ShareGroupMemberMetadata will
>>>> have been written when the members left. I have clarified this.
>>>> 
>>>> 127. The share coordinator is ignorant of the group epoch. When the
>>>> group coordinator is initializing the share-group state the first time
>> that
>>>> a share-partition is being added to an assignment in the group, the
>>>> group epoch is used as the state epoch. But as the group epoch
>>>> increases over time, the share coordinator is entirely unaware.
>>>> 
>>>> When the first consumer for a share-partition fetches records from a
>>>> share-partition leader, the SPL calls the share coordinator to
>>>> ReadShareGroupState. If the SPL has previously read the information
>>>> and again it’s going from 0 to 1 consumer, it confirms it's up to date
>> by
>>>> calling ReadShareGroupOffsetsState.
>>>> 
>>>> Even if many consumers are joining at the same time, any share-partition
>>>> which is being initialized will not be included in their assignments.
>> Once
>>>> the initialization is complete, the next rebalance will assign the
>>>> partition
>>>> to some consumers which will discover this by ShareGroupHeartbeat
>>>> response. And then, the fetching begins.
>>>> 
>>>> If an SPL receives a ShareFetch request before it’s read the state
>>>> from the SC, it can make the ShareFetch request wait up to MaxWaitMs
>>>> and then it can return an empty set of records if it’s still not ready.
>>>> 
>>>> So, I don’t believe there will be too much load. If a topic with many
>>>> partitions is added to the subscribed topics for a share group, the fact
>>>> that the assignments will only start to include the partitions as their
>>>> initialization completes should soften the impact.
>>>> 
>>>> 128, 129: The “proper” way to turn on this feature when it’s finished
>> will
>>>> be using `group.coordinator.rebalance.protocols` and `group.version`.
>>>> While it’s in Early Access and for test cases, the `group.share.enable`
>>>> configuration will turn it on.
>>>> 
>>>> I have described `group.share.enable` as an internal configuration in
>>>> the KIP.
>>>> 
>>>> 130. The config `group.share.record.lock.duration.ms` applies to groups
>>>> which do not specify a group-level configuration for lock duration. The
>>>> minimum and maximum for this configuration are intended to give it
>>>> sensible bounds.
>>>> 
>>>> If a group does specify its own `group.share.record.lock.duration.ms`,
>>>> the broker-level `group.share.max.record.lock.duration.ms` gives the
>>>> cluster administrator a way of setting a maximum value for all groups.
>>>> 
>>>> While editing, I renamed `group.share.record.lock.duration.max.ms` to
>>>> `group.share.max.record.lock.duration.ms` for consistency with the
>>>> rest of the min/max configurations.
>>>> 
>>>> 131. This is the limit per partition so you can go wider with multiple
>>>> partitions.
>>>> I have set the initial value low for safety. I expect to be able to
>>>> increase this
>>>> significantly when we have mature code which has been battle-tested.
>>>> Rather than try to guess how high it can safely go, I’ve erred on the
>> side
>>>> of
>>>> caution and expect to open it up in a future KIP.
>>>> 
>>>> 132. Good catch. The problem is that I have missed two group
>>>> configurations,
>>>> now added. These are group.share.session.timeout.ms and
>>>> group.share.heartbeat.timeout.ms . The configurations you mentioned
>>>> are the bounds for the group-level configurations.
>>>> 
>>>> 133. The name `group.share.max.size` was chosen to mirror the existing
>>>> `group.consumer.max.size`.
>>>> 
>>>> 134. It is intended to be a list of all of the valid assignors for the
>>>> cluster.
>>>> When the assignors are configurable at the group level, the group
>>>> configuration
>>>> will only be permitted to name an assignor which is in this list. For
>> now,
>>>> there
>>>> is no group configuration for assignor, so all groups get the one and
>> only
>>>> assignor in the list.
>>>> 
>>>> 135. It’s the number of threads per broker. For a cluster with a small
>>>> number of
>>>> of brokers and a lot of share group activity, it may be appropriate to
>>>> increase
>>>> this. We will be able to give tuning advice once we have experience of
>> the
>>>> performance impact of increasing it.
>>>> 
>>>> Thanks,
>>>> Andrew
>>>> 
>>>> 
>>>>> On 20 Apr 2024, at 00:14, Jun Rao <j...@confluent.io.INVALID> wrote:
>>>>> 
>>>>> Hi, Andrew,
>>>>> 
>>>>> Thanks for the reply. A few more comments.
>>>>> 
>>>>> 120. There is still reference to ConsumerGroupMetadataKey.
>>>>> 
>>>>> 121. ShareUpdateValue.SnapshotEpoch: Should we change it since it's
>> not a
>>>>> snapshot?
>>>>> 
>>>>> 122. ConsumerGroupMemberMetadataValue includes epoch, but
>>>>> ShareGroupMemberMetadataValue does not. Do we know how the epoch is
>> being
>>>>> used in the consumer group and whether it's needed in the share group?
>>>>> 
>>>>> 123. There is no equivalent of ConsumerGroupTargetAssignmentMember for
>>>>> ShareGroup. How does the shareGroup persist the member assignment?
>>>>> 
>>>>> 124. Assign a share-partition: "When a topic-partition is assigned to a
>>>>> member of a share group for the first time, the group coordinator
>> writes
>>>> a
>>>>> ShareGroupPartitionMetadata record to the __consumer_offsets  topic".
>>>>> When does the coordinator write ShareGroupMetadata with the bumped
>>>> epoch?
>>>>> In general, is there a particular order to bump up the epoch in
>> different
>>>>> records? When can the new epoch be exposed to the sharePartitionLeader?
>>>> It
>>>>> would be useful to make this clear in other state changing operations
>>>> too.
>>>>> 
>>>>> 125. "Alter share group offsets Group coordinator and share coordinator
>>>>> Only empty share groups support this operation. The group coordinator
>>>> sends
>>>>> an InitializeShareGroupState  request to the share coordinator. The
>> share
>>>>> coordinator writes a ShareSnapshot record with the new state epoch to
>> the
>>>>> __share_group_state  topic."
>>>>> Does the operation need to write ShareGroupPartitionMetadata and
>>>>> ShareGroupMetadata (for the new epoch)?
>>>>> 
>>>>> 126. Delete share group: Does this operation also need to write a
>>>> tombstone
>>>>> for the ShareGroupMemberMetadata record?
>>>>> 
>>>>> 127. I was thinking about the impact on a new consumer joining the
>>>>> shareGroup. This causes the GroupCoordinator and the ShareCoordinator
>> to
>>>>> bump up the group epoch, which in turn causes the SharePartition leader
>>>> to
>>>>> reinitialize the state with ReadShareGroupOffsetsState. If many
>> consumers
>>>>> are joining a shareGroup around the same time, would there be too much
>>>> load
>>>>> for the ShareCoordinator and SharePartition leader?
>>>>> 
>>>>> 128. Since group.share.enable will be replaced by the group.version
>>>>> feature, should we make it an internal config?
>>>>> 
>>>>> 129. group.coordinator.rebalance.protocols: this seems redundant with
>>>>> group.share.enable?
>>>>> 
>>>>> 130. Why do we have both group.share.record.lock.duration.ms and
>>>>> group.share.record.lock.duration.max.ms, each with its own max value?
>>>>> 
>>>>> 131. group.share.record.lock.partition.limit defaults to 200. This
>> limits
>>>>> the max degree of consumer parallelism to 200, right? If there are
>>>> multiple
>>>>> records per batch, it could be even smaller.
>>>>> 
>>>>> 132. Why do we need all three of group.share.session.timeout.ms,
>>>>> group.share.min.session.timeout.ms and
>>>> group.share.max.session.timeout.ms?
>>>>> Session timeout can't be set by the client. Ditto for
>>>>> group.share.heartbeat.interval.ms.
>>>>> 
>>>>> 133. group.share.max.size: Would group.share.max.members.per.group be a
>>>>> more intuitive name?
>>>>> 
>>>>> 134. group.share.assignors: Why does it need to be a list?
>>>>> 
>>>>> 135. share.coordinator.threads: Is that per share coordinator or per
>>>> broker?
>>>>> 
>>>>> Jun
>>>>> 
>>>>> On Tue, Apr 16, 2024 at 3:21 AM Andrew Schofield <
>>>> andrew_schofi...@live.com>
>>>>> wrote:
>>>>> 
>>>>>> Hi Jun,
>>>>>> Thanks for you reply.
>>>>>> 
>>>>>> 42.1. That’s a sensible improvement. Done.
>>>>>> 
>>>>>> 47,56. Done. All instances of BaseOffset changed to FirstOffset.
>>>>>> 
>>>>>> 105. I think that would be in a future KIP. Personally, I don’t mind
>>>> having
>>>>>> a non-contiguous set of values in this KIP.
>>>>>> 
>>>>>> 114. Done.
>>>>>> 
>>>>>> 115. If the poll is just returning a single record because there is
>> not
>>>>>> much
>>>>>> data to consume, committing on every record is OK. It’s inefficient
>> but
>>>>>> acceptable.
>>>>>> If the first poll returns just one record, but many more have piled up
>>>>>> while
>>>>>> the first one was being processed, the next poll has the opportunity
>> to
>>>>>> return
>>>>>> a bunch of records and then these will be able to be committed
>> together.
>>>>>> So, my answer is that optimisation on the broker to return batches of
>>>>>> records when the records are available is the approach we will take
>>>> here.
>>>>>> 
>>>>>> 116. Good idea. Done.
>>>>>> 
>>>>>> 117. I’ve rewritten the Kafka Broker Migration section. Let me know
>> what
>>>>>> you think.
>>>>>> I am discussing the configuration to enable the feature in the mailing
>>>>>> list with
>>>>>> David Jacot also, so I anticipate a bit of change in this area still.
>>>>>> 
>>>>>> Thanks,
>>>>>> Andrew
>>>>>> 
>>>>>>> On 15 Apr 2024, at 23:34, Jun Rao <j...@confluent.io.INVALID> wrote:
>>>>>>> 
>>>>>>> Hi, Andrew,
>>>>>>> 
>>>>>>> Thanks for the updated KIP.
>>>>>>> 
>>>>>>> 42.1 "If the share group offset is altered multiple times when the
>>>> group
>>>>>>> remains empty, it would be harmless if the same state epoch was
>> reused
>>>> to
>>>>>>> initialize the state."
>>>>>>> Hmm, how does the partition leader know for sure that it has received
>>>> the
>>>>>>> latest share group offset if epoch is reused?
>>>>>>> Could we update the section "Group epoch - Trigger a rebalance" that
>>>>>>> AdminClient.alterShareGroupOffsets causes the group epoch to be
>> bumped
>>>>>> too?
>>>>>>> 
>>>>>>> 47,56 "my view is that BaseOffset should become FirstOffset in ALL
>>>>>> schemas
>>>>>>> defined in the KIP."
>>>>>>> Yes, that seems better to me.
>>>>>>> 
>>>>>>> 105. "I have another non-terminal state in mind for 3."
>>>>>>> Should we document it?
>>>>>>> 
>>>>>>> 114. session.timeout.ms in the consumer configuration is deprecated
>> in
>>>>>>> KIP-848. So, we need to remove it from the shareConsumer
>> configuration.
>>>>>>> 
>>>>>>> 115. I am wondering if it's a good idea to always commit acks on
>>>>>>> ShareConsumer.poll(). In the extreme case, each batch may only
>> contain
>>>> a
>>>>>>> single record and each poll() only returns a single batch. This will
>>>>>> cause
>>>>>>> each record to be committed individually. Is there a way for a user
>> to
>>>>>>> optimize this?
>>>>>>> 
>>>>>>> 116. For each new RPC, could we list the associated acls?
>>>>>>> 
>>>>>>> 117. Since this KIP changes internal records and RPCs, it would be
>>>> useful
>>>>>>> to document the upgrade process.
>>>>>>> 
>>>>>>> Jun
>>>>>>> 
>>>>>>> On Wed, Apr 10, 2024 at 7:35 AM Andrew Schofield <
>>>>>> andrew_schofi...@live.com>
>>>>>>> wrote:
>>>>>>> 
>>>>>>>> Hi Jun,
>>>>>>>> Thanks for your questions.
>>>>>>>> 
>>>>>>>> 41.
>>>>>>>> 41.1. The partition leader obtains the state epoch in the response
>>>> from
>>>>>>>> ReadShareGroupState. When it becomes a share-partition leader,
>>>>>>>> it reads the share-group state and one of the things it learns is
>> the
>>>>>>>> current state epoch. Then it uses the state epoch in all subsequent
>>>>>>>> calls to WriteShareGroupState. The fencing is to prevent writes for
>>>>>>>> a previous state epoch, which are very unlikely but which would mean
>>>>>>>> that a leader was using an out-of-date epoch and was likely no
>> longer
>>>>>>>> the current leader at all, perhaps due to a long pause for some
>>>> reason.
>>>>>>>> 
>>>>>>>> 41.2. If the group coordinator were to set the SPSO, wouldn’t it
>> need
>>>>>>>> to discover the initial offset? I’m trying to avoid yet another
>>>>>>>> inter-broker
>>>>>>>> hop.
>>>>>>>> 
>>>>>>>> 42.
>>>>>>>> 42.1. I think I’ve confused things. When the share group offset is
>>>>>> altered
>>>>>>>> using AdminClient.alterShareGroupOffsets, the group coordinator WILL
>>>>>>>> update the state epoch. I don’t think it needs to update the group
>>>> epoch
>>>>>>>> at the same time (although it could) because the group epoch will
>> have
>>>>>>>> been bumped when the group became empty. If the share group offset
>>>>>>>> is altered multiple times when the group remains empty, it would be
>>>>>>>> harmless if the same state epoch was reused to initialize the state.
>>>>>>>> 
>>>>>>>> When the share-partition leader updates the SPSO as a result of
>>>>>>>> the usual flow of record delivery, it does not update the state
>> epoch.
>>>>>>>> 
>>>>>>>> 42.2. The share-partition leader will notice the alteration because,
>>>>>>>> when it issues WriteShareGroupState, the response will contain the
>>>>>>>> error code FENCED_STATE_EPOCH. This is supposed to be the
>>>>>>>> last-resort way of catching this.
>>>>>>>> 
>>>>>>>> When the share-partition leader handles its first ShareFetch
>> request,
>>>>>>>> it learns the state epoch from the response to ReadShareGroupState.
>>>>>>>> 
>>>>>>>> In normal running, the state epoch will remain constant, but, when
>>>> there
>>>>>>>> are no consumers and the group is empty, it might change. As a
>> result,
>>>>>>>> I think it would be sensible when the set of share sessions
>>>> transitions
>>>>>>>> from 0 to 1, which is a reasonable proxy for the share group
>>>>>> transitioning
>>>>>>>> from empty to non-empty, for the share-partition leader to issue
>>>>>>>> ReadShareGroupOffsetsState to validate the state epoch. If its state
>>>>>>>> epoch is out of date, it can then ReadShareGroupState to
>>>> re-initialize.
>>>>>>>> 
>>>>>>>> I’ve changed the KIP accordingly.
>>>>>>>> 
>>>>>>>> 47, 56. If I am to change BaseOffset to FirstOffset, we need to have
>>>>>>>> a clear view of which is the correct term. Having reviewed all of
>> the
>>>>>>>> instances, my view is that BaseOffset should become FirstOffset in
>>>>>>>> ALL schemas defined in the KIP. Then, BaseOffset is just used in
>>>>>>>> record batches, which is already a known concept.
>>>>>>>> 
>>>>>>>> Please let me know if you agree.
>>>>>>>> 
>>>>>>>> 60. I’ve added FindCoordinator to the top level index for protocol
>>>>>> changes.
>>>>>>>> 
>>>>>>>> 61. OK. I expect you are correct about how users will be using the
>>>>>>>> console share consumer. When I use the console consumer, I always
>> get
>>>>>>>> a new consumer group. I have changed the default group ID for
>> console
>>>>>>>> share consumer to “console-share-consumer” to match the console
>>>> consumer
>>>>>>>> better and give more of an idea where this mysterious group has come
>>>>>> from.
>>>>>>>> 
>>>>>>>> 77. I will work on a proposal that does not use compaction and we
>> can
>>>>>>>> make a judgement about whether it’s a better course for KIP-932.
>>>>>>>> Personally,
>>>>>>>> until I’ve written it down and lived with the ideas for a few days,
>> I
>>>>>>>> won’t be
>>>>>>>> able to choose which I prefer.
>>>>>>>> 
>>>>>>>> I should be able to get the proposal written by the end of this
>> week.
>>>>>>>> 
>>>>>>>> 100. ShareGroupHeartbeatRequest.RebalanceTimeoutMs matches
>>>>>>>> ConsumerGroupHeartbeatRequest.RebalanceTimeoutMs from KIP-848.
>>>>>>>> I prefer to maintain the consistency.
>>>>>>>> 
>>>>>>>> 101. Thanks for catching this. The ShareGroupHeartbeatResponse was
>>>>>>>> originally
>>>>>>>> created from KIP-848. This part of the schema does not apply and I
>>>> have
>>>>>>>> removed
>>>>>>>> it. I have also renamed AssignedTopicPartitions to simply
>>>>>> TopicPartitions
>>>>>>>> which
>>>>>>>> aligns with the actual definition of ConsumerGroupHeartbeatResponse.
>>>>>>>> 
>>>>>>>> 102. No, I don’t think we do. Removed.
>>>>>>>> 
>>>>>>>> 103. I’ve changed the description for the error codes for
>>>>>>>> ShareFetchResponse.
>>>>>>>> 
>>>>>>>> 104. Interesting. I have added ErrorMessages to these RPCs as you
>>>>>> suggest.
>>>>>>>> It’s a good improvement for problem determination.
>>>>>>>> 
>>>>>>>> 105. The values are reserved in my brain. Actually, 1 is Acquired
>>>> which
>>>>>>>> is not persisted, and I have another non-terminal state in mind for
>> 3.
>>>>>>>> 
>>>>>>>> 106. A few people have raised the question of whether
>>>> OffsetAndMetadata
>>>>>>>> is sensible in this KIP, given that the optional Metadata part comes
>>>>>> from
>>>>>>>> when
>>>>>>>> a regular consumer commits offsets. However, it is correct that
>> there
>>>>>> will
>>>>>>>> never be metadata with a share group. I have changed
>>>>>>>> the KIP to replace OffsetAndMetadata with Long.
>>>>>>>> 
>>>>>>>> 107. Yes, you are right. I have learnt during this process that a
>>>>>> version
>>>>>>>> bump
>>>>>>>> can be a logical not just a physical change to the schema. KIP
>>>> updated.
>>>>>>>> 
>>>>>>>> 108. I would prefer not to extend this RPC for all of the states at
>>>> this
>>>>>>>> point.
>>>>>>>> I think there is definitely scope for another KIP focused on
>>>>>> administration
>>>>>>>> of share groups that might want this information so someone could
>>>> build
>>>>>> a
>>>>>>>> UI and other tools on top. Doing that well is quite a lot of work in
>>>> its
>>>>>>>> own right
>>>>>>>> so I would prefer not to do that now.
>>>>>>>> 
>>>>>>>> 109.Yes, they’re inconsistent and follow the consumer groups
>>>> equivalents
>>>>>>>> which are also inconsistent. In general, the KafkaAdmin.delete….
>>>> Methods
>>>>>>>> use the Map<XXX, KafkaFuture<YYY>> pattern like
>>>> DeleteShareGroupsResult.
>>>>>>>> 
>>>>>>>> Would you prefer that I do that, or remain consistent with consumer
>>>>>> groups?
>>>>>>>> Happy to change it.
>>>>>>>> 
>>>>>>>> 110. Again, consumer groups don’t yield that level of detail about
>>>>>> epochs.
>>>>>>>> The MemberDescription does include the assignment, but not the list
>> of
>>>>>>>> subscribed topic names.
>>>>>>>> 
>>>>>>>> 111. I didn’t include GroupState in GroupListing because there’s no
>>>>>>>> single class that includes the states of all group types.
>>>>>>>> 
>>>>>>>> 112. I think it’s good practice for the API to have
>>>>>>>> ListShareGroupOffsetSpec.
>>>>>>>> It makes evolution and extension of the API much easier. Also, it
>>>>>> matches
>>>>>>>> the precedent set by ListConsumerGroupOffsetSpec.
>>>>>>>> 
>>>>>>>> 113. ListConsumerGroupsResults.errors() is the same. I think you
>> just
>>>>>> have
>>>>>>>> to look in the exception details and the same pattern is being
>>>> followed
>>>>>>>> here.
>>>>>>>> 
>>>>>>>> 
>>>>>>>> Over the next few days, I have committed to writing a proposal for
>> how
>>>>>> to
>>>>>>>> persist
>>>>>>>> share-group state that doesn’t use log compaction.
>>>>>>>> 
>>>>>>>> I am also aware that discussion with Justine Olshan on
>> read-committed
>>>>>>>> isolation level is not yet quite complete.
>>>>>>>> 
>>>>>>>> Thanks for the detailed review.
>>>>>>>> 
>>>>>>>> Andrew
>>>>>>>> 
>>>>>>>>> On 9 Apr 2024, at 23:58, Jun Rao <j...@confluent.io.INVALID> wrote:
>>>>>>>>> 
>>>>>>>>> Hi, Andrew,
>>>>>>>>> 
>>>>>>>>> Thanks for the reply. A few more comments.
>>>>>>>>> 
>>>>>>>>> 41.
>>>>>>>>> 41.1 How does the partition leader obtain the group epoch to set
>>>>>>>>> WriteShareGroupStateRequest.StateEpoch?
>>>>>>>>> 41.2 What's the benefit of having the group coordinator initialize
>>>> the
>>>>>>>>> state and the partition leader set the SPSO? It seems simpler to
>> have
>>>>>> the
>>>>>>>>> partition leader initialize both the state and the SPSO together?
>>>>>>>>> 
>>>>>>>>> 42.
>>>>>>>>> 42.1 "I don’t think the group epoch needs to be bumped when the
>> share
>>>>>>>> group
>>>>>>>>> offset is altered."
>>>>>>>>> But the part on handling Alter share group offsets says "The share
>>>>>>>>> coordinator writes a ShareCheckpoint record with the new state
>> epoch
>>>> to
>>>>>>>> the
>>>>>>>>> __share_group_state  topic." So, which is correct? We have two
>> paths
>>>> to
>>>>>>>>> update the state in the share coordinator, one from the group
>>>>>> coordinator
>>>>>>>>> and another from the partition leader. I thought the benefit of
>>>> bumping
>>>>>>>> up
>>>>>>>>> the epoch is to fence off a late request in the previous epoch from
>>>>>>>> another
>>>>>>>>> path.
>>>>>>>>> 42.2 When the group coordinator alters the share group offset in
>>>> share
>>>>>>>>> coordinator, how does the partition leader know the share group
>> state
>>>>>> has
>>>>>>>>> been altered so that it could clear its in-memory state?
>>>>>>>>> 
>>>>>>>>> 47. 56. BaseOffset typically refers to the base offset for the
>> batch
>>>>>> and
>>>>>>>>> can be confusing. FirstOffset is clearer and matches LastOffset.
>>>>>>>>> 
>>>>>>>>> 60. Could we include FindCoordinatorRequest in the top level index
>>>> for
>>>>>>>>> Kafka protocol changes?
>>>>>>>>> 
>>>>>>>>> 61. I think it's probably ok to add time-based expiration later.
>> But
>>>>>>>> using
>>>>>>>>> a default group in console-share-consumer probably won't help
>> reduce
>>>>>> the
>>>>>>>>> garbage. In the common case, the user of the console consumer
>> likely
>>>>>>>> wants
>>>>>>>>> to see the recently produced records for verification. If the
>> default
>>>>>>>> group
>>>>>>>>> doesn't provide that (because of the stale state), the user will
>>>> likely
>>>>>>>>> just use a new group. It's true that we don't garbage collect idle
>>>>>>>> topics.
>>>>>>>>> However,  the share groups are similar to consumers, which does
>>>> support
>>>>>>>>> garbage collection. Typically, we read topics more than creating
>>>> them.
>>>>>>>>> 
>>>>>>>>> 77. If the generic compaction is inconvenient, we could use
>>>> customized
>>>>>>>>> logic. If we go with that route, option (b) seems cleaner and more
>>>>>>>>> optimized. Since the share states for all groups fit in memory, we
>>>>>> could
>>>>>>>>> generate snapshots more efficiently than going through compaction.
>>>>>>>> Having a
>>>>>>>>> separate log per share partition is probably too much overhead.
>> It's
>>>>>> more
>>>>>>>>> efficient to put the state changes for multiple share partitions
>> in a
>>>>>>>>> single log.
>>>>>>>>> 
>>>>>>>>> 100. ShareGroupHeartbeatRequest.RebalanceTimeoutMs: Should we name
>> it
>>>>>>>>> SessionTimeoutMs?
>>>>>>>>> 
>>>>>>>>> 101. ShareGroupHeartbeatResponse.Assignment.Error: What kind of
>> error
>>>>>>>> could
>>>>>>>>> we have when assigning partitions? What are the corresponding error
>>>>>>>> codes?
>>>>>>>>> 
>>>>>>>>> 102. Do we still need
>>>>>>>>> 
>>>>>>>> 
>>>>>> 
>>>> 
>> ShareGroupDescribeResponse.Members.Assignment.{MetadataVersion,MetadataBytes}?
>>>>>>>>> 
>>>>>>>>> 103. Could we list the error codes separately for
>>>>>>>>> ShareFetchResponse.Responses.Partitions.ErrorCode and
>>>>>>>>> ShareFetchResponse.Responses.Partitions.AcknowledgeErrorCode?
>>>>>>>>> 
>>>>>>>>> 104. Should we add error message for the errorCode in
>>>>>> ShareFetchResponse,
>>>>>>>>> ShareAcknowledgeResponse, ReadShareGroupStateResponse,
>>>>>>>>> WriteShareGroupStateResponse, DeleteShareGroupStateResponse,
>>>>>>>>> ReadShareGroupOffsetsStateResponse and
>>>>>> InitializeShareGroupStateResponse?
>>>>>>>>> 
>>>>>>>>> 105. "about": "The state - 0:Available,2:Acked,4:Archived.": What
>>>>>> about 1
>>>>>>>>> and 3? Are we leaving them out intentionally?
>>>>>>>>> 
>>>>>>>>> 106. Do we have usage of metadata in OffsetAndMetadata? If not,
>> could
>>>>>> we
>>>>>>>>> remove it from AdminClient and KafkaShareConsumer?
>>>>>>>>> 
>>>>>>>>> 107. ListGroupsRequest: Should we bump up the version since it now
>>>>>>>> supports
>>>>>>>>> a new group type "share"?
>>>>>>>>> 
>>>>>>>>> 108. AdminClient.listShareGroupOffsets: Should it expose all the
>>>> states
>>>>>>>>> from ReadShareGroupStateResponse, instead of just SPSO?
>>>>>>>>> 
>>>>>>>>> 109. DeleteShareGroupOffsetsResult exposes
>>>>>>>>> public KafkaFuture<Void> partitionResult(final TopicPartition
>>>>>> partition)
>>>>>>>>> DeleteShareGroupsResult exposes
>>>>>>>>> public Map<String, KafkaFuture<Void>> deletedGroups()
>>>>>>>>> Should we make them more consistent?
>>>>>>>>> 
>>>>>>>>> 110. Should ShareGroupDescription include fields like GroupEpoch,
>>>>>>>>> AssignmentEpoch, MemberEpoch, and SubscribedTopicNames?
>>>>>>>>> 
>>>>>>>>> 111. Should GroupListing include GroupState?
>>>>>>>>> 
>>>>>>>>> 112. Do we need ListShareGroupOffsetsSpec? Could we just use
>>>>>>>>> Set<TopicPartition> directly?
>>>>>>>>> 
>>>>>>>>> 113. ListShareGroupsResult.errors(): How do we know which group has
>>>> an
>>>>>>>>> error?
>>>>>>>>> 
>>>>>>>>> Jun
>>>>>>>>> 
>>>>>>>>> On Mon, Apr 8, 2024 at 9:32 AM Andrew Schofield <
>>>>>>>>> andrew_schofield_j...@outlook.com> wrote:
>>>>>>>>> 
>>>>>>>>>> Hi David,
>>>>>>>>>> Thanks for your questions.
>>>>>>>>>> 
>>>>>>>>>> 70. The Group Coordinator communicates with the Share Coordinator
>>>> over
>>>>>>>>>> RPCs.
>>>>>>>>>> In the general case, it’s an inter-broker call. It is probably
>>>>>> possible
>>>>>>>> to
>>>>>>>>>> optimise
>>>>>>>>>> for the situation in which the appropriate GC and SC shards are
>>>>>>>>>> co-located, but the
>>>>>>>>>> KIP does not delve that deep into potential performance
>>>> optimisations.
>>>>>>>>>> 
>>>>>>>>>> 71. Avoiding collisions would be a good idea, but I do worry about
>>>>>>>>>> retrospectively
>>>>>>>>>> introducing a naming convention for groups. I feel that naming
>>>>>>>> conventions
>>>>>>>>>> will
>>>>>>>>>> typically be the responsibility of the cluster administrators
>> based
>>>> on
>>>>>>>>>> organizational
>>>>>>>>>> factors, such as the name of an application.
>>>>>>>>>> 
>>>>>>>>>> 72. Personally, I don’t like INVALID_GROUP_ID because the group ID
>>>> is
>>>>>>>>>> correct but
>>>>>>>>>> the group is the wrong type. The nearest existing error code that
>>>> gets
>>>>>>>>>> that across
>>>>>>>>>> is INCONSISTENT_GROUP_PROTOCOL. Perhaps this is really showing
>> that
>>>> a
>>>>>>>> new
>>>>>>>>>> error code would be better.
>>>>>>>>>> 
>>>>>>>>>> 73. The Metadata fields are not used. I have removed them.
>>>>>>>>>> 
>>>>>>>>>> 74. The metadata is re-evaluated on every change, but only a
>> subset
>>>> is
>>>>>>>>>> relevant
>>>>>>>>>> for rebalancing. A check is done against the names of the
>> subscribed
>>>>>>>>>> topics to
>>>>>>>>>> see if any relevant changes may have occurred. Then the changes
>>>> which
>>>>>>>>>> trigger
>>>>>>>>>> a rebalance are topic creation, deletion, change in partitions, or
>>>>>> rack
>>>>>>>>>> IDs for the
>>>>>>>>>> replicas. I have updated the KIP to make this more clear.
>>>>>>>>>> 
>>>>>>>>>> 75. The assignment is not persisted because it is much less
>>>> important
>>>>>>>> that
>>>>>>>>>> the
>>>>>>>>>> assignment survives a GC change. There’s no need to transfer
>>>>>> partitions
>>>>>>>>>> safely from
>>>>>>>>>> member to member in the way that is required for consumer groups,
>> so
>>>>>> as
>>>>>>>> an
>>>>>>>>>> optimisation, the assignments for a share group are not persisted.
>>>> It
>>>>>>>>>> wouldn’t do any
>>>>>>>>>> harm, but it just seems unnecessary.
>>>>>>>>>> 
>>>>>>>>>> 76. In the event that a consumer tries to acknowledge a record
>> that
>>>> it
>>>>>>>> now
>>>>>>>>>> longer
>>>>>>>>>> has the right to acknowledge, the INVALID_RECORD_STATE error code
>> is
>>>>>>>> used.
>>>>>>>>>> 
>>>>>>>>>> If the application uses the KafkaShareConsumer.commitSync method,
>> it
>>>>>>>> will
>>>>>>>>>> see an InvalidRecordState exception returned. Alternatively, the
>>>>>>>>>> application can
>>>>>>>>>> register an acknowledgement commit callback which will be called
>>>> with
>>>>>>>> the
>>>>>>>>>> status
>>>>>>>>>> of the acknowledgements that have succeeded or failed.
>>>>>>>>>> 
>>>>>>>>>> 77. I have tried to tread a careful path with the durable
>>>>>>>> share-partition
>>>>>>>>>> state in this
>>>>>>>>>> KIP. The significant choices I made are that:
>>>>>>>>>> * Topics are used so that the state is replicated between brokers.
>>>>>>>>>> * Log compaction is used to keep a lid on the storage.
>>>>>>>>>> * Only one topic is required.
>>>>>>>>>> 
>>>>>>>>>> Log compaction as it stands is not ideal for this kind of data, as
>>>>>>>>>> evidenced by
>>>>>>>>>> the DeltaIndex technique I employed.
>>>>>>>>>> 
>>>>>>>>>> I can think of a few relatively simple ways to improve upon it.
>>>>>>>>>> 
>>>>>>>>>> a) We could use a customised version of the log compactor for this
>>>>>> topic
>>>>>>>>>> that
>>>>>>>>>> understands the rules for ShareCheckpoint and ShareDelta records.
>>>>>>>>>> Essentially,
>>>>>>>>>> for each share-partition, the latest ShareCheckpoint and any
>>>>>> subsequent
>>>>>>>>>> ShareDelta
>>>>>>>>>> records must not be cleaned. Anything else can be cleaned. We
>> could
>>>>>> then
>>>>>>>>>> be sure
>>>>>>>>>> that multiple ShareDelta records with the same key would survive
>>>>>>>> cleaning
>>>>>>>>>> and we could
>>>>>>>>>> abandon the DeltaIndex technique.
>>>>>>>>>> 
>>>>>>>>>> b) Actually what is required is a log per share-partition. Let’s
>>>>>> imagine
>>>>>>>>>> that we had
>>>>>>>>>> a share-state topic per topic being consumed in a share group,
>> with
>>>>>> the
>>>>>>>>>> same number
>>>>>>>>>> of partitions as the topic being consumed. We could write many
>> more
>>>>>>>> deltas
>>>>>>>>>> between
>>>>>>>>>> checkpoints, and just take periodic checkpoints to keep control of
>>>> the
>>>>>>>>>> storage used.
>>>>>>>>>> Once a checkpoint has been taken, we could use
>>>>>>>> KafkaAdmin.deleteRecords()
>>>>>>>>>> to
>>>>>>>>>> prune all of the older records.
>>>>>>>>>> 
>>>>>>>>>> The share-state topics would be more numerous, but we are talking
>>>> one
>>>>>>>> per
>>>>>>>>>> topic
>>>>>>>>>> per share group that it’s being consumed in. These topics would
>> not
>>>> be
>>>>>>>>>> compacted.
>>>>>>>>>> 
>>>>>>>>>> As you’ll see in the KIP, the Persister interface is intended to
>> be
>>>>>>>>>> pluggable one day.
>>>>>>>>>> I know the scheme in the KIP is not ideal. It seems likely to me
>>>> that
>>>>>>>>>> future KIPs will
>>>>>>>>>> improve upon it.
>>>>>>>>>> 
>>>>>>>>>> If I can get buy-in for option (b), I’m happy to change this KIP.
>>>>>> While
>>>>>>>>>> option (a) is
>>>>>>>>>> probably workable, it does seem a bit of a hack to have a
>> customised
>>>>>> log
>>>>>>>>>> compactor
>>>>>>>>>> just for this topic.
>>>>>>>>>> 
>>>>>>>>>> 78. How about DeliveryState? I agree that State is overloaded.
>>>>>>>>>> 
>>>>>>>>>> 79. See (77).
>>>>>>>>>> 
>>>>>>>>>> Thanks,
>>>>>>>>>> Andrew
>>>>>>>>>> 
>>>>>>>>>> 
>>>>>>>>>>> On 5 Apr 2024, at 05:07, David Arthur <david.art...@confluent.io
>>>>>>>> .INVALID>
>>>>>>>>>> wrote:
>>>>>>>>>>> 
>>>>>>>>>>> Andrew, thanks for the KIP! This is a pretty exciting effort.
>>>>>>>>>>> 
>>>>>>>>>>> I've finally made it through the KIP, still trying to grok the
>>>> whole
>>>>>>>>>> thing.
>>>>>>>>>>> Sorry if some of my questions are basic :)
>>>>>>>>>>> 
>>>>>>>>>>> 
>>>>>>>>>>> Concepts:
>>>>>>>>>>> 
>>>>>>>>>>> 70. Does the Group Coordinator communicate with the Share
>>>> Coordinator
>>>>>>>>>> over
>>>>>>>>>>> RPC or directly in-process?
>>>>>>>>>>> 
>>>>>>>>>>> 71. For preventing name collisions with regular consumer groups,
>>>>>> could
>>>>>>>> we
>>>>>>>>>>> define a reserved share group prefix? E.g., the operator defines
>>>>>> "sg_"
>>>>>>>>>> as a
>>>>>>>>>>> prefix for share groups only, and if a regular consumer group
>> tries
>>>>>> to
>>>>>>>>>> use
>>>>>>>>>>> that name it fails.
>>>>>>>>>>> 
>>>>>>>>>>> 72. When a consumer tries to use a share group, or a share
>> consumer
>>>>>>>> tries
>>>>>>>>>>> to use a regular group, would INVALID_GROUP_ID make more sense
>>>>>>>>>>> than INCONSISTENT_GROUP_PROTOCOL?
>>>>>>>>>>> 
>>>>>>>>>>> --------
>>>>>>>>>>> 
>>>>>>>>>>> Share Group Membership:
>>>>>>>>>>> 
>>>>>>>>>>> 73. What goes in the Metadata field for TargetAssignment#Member
>> and
>>>>>>>>>>> Assignment?
>>>>>>>>>>> 
>>>>>>>>>>> 74. Under Trigger a rebalance, it says we rebalance when the
>>>>>> partition
>>>>>>>>>>> metadata changes. Would this be for any change, or just certain
>>>> ones?
>>>>>>>> For
>>>>>>>>>>> example, if a follower drops out of the ISR and comes back, we
>>>>>> probably
>>>>>>>>>>> don't need to rebalance.
>>>>>>>>>>> 
>>>>>>>>>>> 75. "For a share group, the group coordinator does *not* persist
>>>> the
>>>>>>>>>>> assignment" Can you explain why this is not needed?
>>>>>>>>>>> 
>>>>>>>>>>> 76. " If the consumer just failed to heartbeat due to a temporary
>>>>>>>> pause,
>>>>>>>>>> it
>>>>>>>>>>> could in theory continue to fetch and acknowledge records. When
>> it
>>>>>>>>>> finally
>>>>>>>>>>> sends a heartbeat and realises it’s been kicked out of the group,
>>>> it
>>>>>>>>>> should
>>>>>>>>>>> stop fetching records because its assignment has been revoked,
>> and
>>>>>>>> rejoin
>>>>>>>>>>> the group."
>>>>>>>>>>> 
>>>>>>>>>>> A consumer with a long pause might still deliver some buffered
>>>>>> records,
>>>>>>>>>> but
>>>>>>>>>>> if the share group coordinator has expired its session, it
>> wouldn't
>>>>>>>>>> accept
>>>>>>>>>>> acknowledgments for that share consumer. In such a case, is any
>>>> kind
>>>>>> of
>>>>>>>>>>> error raised to the application like "hey, I know we gave you
>> these
>>>>>>>>>>> records, but really we shouldn't have" ?
>>>>>>>>>>> 
>>>>>>>>>>> 
>>>>>>>>>>> -----
>>>>>>>>>>> 
>>>>>>>>>>> Record Delivery and acknowledgement
>>>>>>>>>>> 
>>>>>>>>>>> 77. If we guarantee that a ShareCheckpoint is written at least
>>>> every
>>>>>> so
>>>>>>>>>>> often, could we add a new log compactor that avoids compacting
>>>>>>>>>> ShareDelta-s
>>>>>>>>>>> that are still "active" (i.e., not yet superceded by a new
>>>>>>>>>>> ShareCheckpoint). Mechnically, this could be done by keeping the
>>>> LSO
>>>>>> no
>>>>>>>>>>> greater than the oldest "active" ShareCheckpoint. This might let
>> us
>>>>>>>>>> remove
>>>>>>>>>>> the DeltaIndex thing.
>>>>>>>>>>> 
>>>>>>>>>>> 78. Instead of the State in the ShareDelta/Checkpoint records,
>> how
>>>>>>>> about
>>>>>>>>>>> MessageState? (State is kind of overloaded/ambiguous)
>>>>>>>>>>> 
>>>>>>>>>>> 79. One possible limitation with the current persistence model is
>>>>>> that
>>>>>>>>>> all
>>>>>>>>>>> the share state is stored in one topic. It seems like we are
>> going
>>>> to
>>>>>>>> be
>>>>>>>>>>> storing a lot more state than we do in __consumer_offsets since
>>>> we're
>>>>>>>>>>> dealing with message-level acks. With aggressive checkpointing
>> and
>>>>>>>>>>> compaction, we can mitigate the storage requirements, but the
>>>>>>>> throughput
>>>>>>>>>>> could be a limiting factor. Have we considered other
>> possibilities
>>>>>> for
>>>>>>>>>>> persistence?
>>>>>>>>>>> 
>>>>>>>>>>> 
>>>>>>>>>>> Cheers,
>>>>>>>>>>> David
>>>>>>>>>> 
>>>>>>>>>> 
>>>>>>>> 
>>>>>>>> 
>>>>>> 
>>>>>> 
>>>> 
>>>> 
>> 
>> 

Reply via email to