Hi Chirag,
Thanks for your question.

28. Good catch. Those options were omitted in error. I will update the KIP.

Thanks,
Andrew

> On 12 Feb 2024, at 13:06, Chirag Wadhwa <cwad...@confluent.io.INVALID> wrote:
>
> Hi Andrew,
>
> Thank you for the KIP, it is a great read ! I just have a small question.
>
> 28. I noticed that the "*--state*" and "*--timeout*" options are not
> mentioned for the kafka-share-groups.sh tool. Was this omission
> intentional, or is it possibly an oversight in the KIP?
> Thanks,
> Chirag
>
> On Mon, Feb 12, 2024 at 5:25 PM Andrew Schofield <
> andrew_schofield_j...@outlook.com> wrote:
>
>> Hi Jun
>> Thanks for your comments.
>>
>> 10. For read-uncommitted isolation level, the consumer just reads all
>> records.
>> For read-committed isolation level, the share-partition leader does the
>> filtering to
>> enable correct skipping of aborted records. The consumers in a share group
>> are not
>> aware of the filtering, unlike consumers in consumer groups.
>>
>> 11. The “classic” type is the pre-KIP 848 consumer groups.
>>
>> 12. By setting the configuration for a group resource, you are saying
>> “when a new group is
>> created with this name, it must have this type”. It’s not changing the
>> type of an existing
>> group.
>>
>> 13. Good catch. The Server Assignor should be at group level. I will
>> change it.
>>
>> 14. That is true. I have maintained it to keep similarity with consumer
>> groups,
>> but it is not currently exposed to clients. It might be best to remove it.
>>
>> 15. I had intended that SimpleAssignor implements
>> org.apache.kafka.clients.consumer.ConsumerPartitionAssignor.
>> Actually, I think there’s benefit to using a new interface so that someone
>> doesn’t inadvertently
>> configure something like the RoundRobinAssignor for a share group. It
>> wouldn’t go well. I will
>> add a new interface to the KIP.
>>
>> 16. When an existing member issues a ShareGroupHeartbeatRequest to the new
>> coordinator,
>> the coordinator returns UNKNOWN_MEMBER_ID. The client then sends another
>> ShareGroupHeartbeatRequest
>> containing no member ID and epoch 0. The coordinator then returns the
>> member ID.
>>
>> 17. I don’t think so. What is the client going to do with the exception?
>> Share groups are
>> intentionally removing some of the details of using Kafka offsets from the
>> consumers. If the
>> SPSO needs to be reset due to retention, it just does that automatically.
>>
>> 18. The proposed use of control records needs some careful thought.
>> 18.1. They’re written by the share-partition leader, not the coordinator.
>> 18.2. If the client commits the acknowledgement, it is only confirmed to
>> the client
>> once it has been replicated to the other replica brokers. So, committing
>> an acknowledgement
>> is very similar to sending a record to a topic in terms of the behaviour.
>>
>> 19. You are correct. The possibility of record duplication exists in
>> failure scenarios. A future KIP
>> will add EOS support for share groups.
>>
>> 20.1. Yes, an exception. I was thinking InvalidOffsetException. I will
>> update the KIP with more
>> detail about protocol error codes and API exceptions.
>> 20.2. I think that’s a mistake. I’ll rectify it.
>>
>> 21. The message sets for the new control records would be filtered out for
>> all consumers.
>>
>> 22. Fetch from follower is not supported. I will update the KIP.
>>
>> 23.1. I am not quite happy with the explanation of the checkpoint and
>> delta records. Essentially,
>> there needs to be one checkpoint and then any number of deltas. Then
>> another checkpoint supersedes
>> the previous records, and can have its own sequence of deltas, and so on.
>> Because recovery requires the
>> leader to read the latest checkpoint and all subsequent deltas, you want
>> to take checkpoints frequently
>> enough to speed up recovery, but infrequently enough to minimise the
>> performance impact of reserializing
>> all the state.
>> 23.2. I’ll check the document again carefully, but the SHARE_DELTA should
>> always contain DeliveryCount
>> for every member of the States array.
>>
>> 24. I was anticipating added to the index files which are part of each log
>> segment.
>>
>> 25. The acknowledgements for each topic-partition are atomic. All this
>> really means is that we perform the
>> state checking and the state persistence atomically (one control record).
>> The callback tells you whether the
>> acknowledgements for the entire topic-partition succeeded or failed,
>> rather than each record individually.
>> I could have gone with a callback with a record-based interface. Would
>> that be preferable, do you think?
>> For one thing, that does give more flexibility for optimisations such as
>> fetch pipelining in the future.
>>
>> 26. The metadata is unused. This is re-using an existing class
>> (OffsetAndMetadata). Perhaps it would be better
>> not to.
>>
>> 27. Yes, agreed. I will add it.
>>
>> Thanks,
>> Andrew
>>
>>> On 9 Feb 2024, at 23:14, Jun Rao <j...@confluent.io.INVALID> wrote:
>>>
>>> Hi, Andrew,
>>>
>>> Thanks for the KIP. A few comments below.
>>>
>>> 10. ShareFetchResponse: To consume transactional data, currently
>>> FetchResponse includes the AbortedTransactions fields for the client to
>>> properly skip aborted records. ShareFetchResponse doesn't include that.
>> How
>>> do we prevent the consumer from reading aborted records in a share group?
>>>
>>> 11. "adding "share"  to the existing group types of "consumer"  and
>>> "classic" "
>>> What's the "classic" type?
>>>
>>> 12. bin/kafka-configs.sh --bootstrap-server localhost:9092 --entity-name
>>> group --entity-name G1 --alter --add-config group.type=share
>>> So, one could change the group type? What happens to the states
>> associated
>>> with the group (members, epoch, offsets, etc)?
>>>
>>> 13. Why is Server Assignor at member level, instead of group level?
>>>
>>> 14. Member.metadata: How is that being used? It isn't exposed to the
>> client.
>>>
>>> 15. What public interface does SimpleAssignor implement?
>>>
>>> 16. "This means that existing members will have to rejoin the share group
>>> following a coordinator failover."
>>> When an existing member issues a ShareGroupHeartbeatRequest to the new
>>> coordinator, does the coordinator return UNKNOWN_MEMBER_ID and a new
>>> memberId?
>>>
>>> 17. auto.offset.reset has the option to throw an exception to the client
>> if
>>> the current offset does not exist any more on the server (e.g. due to
>>> retention). Should group.share.auto.offset.reset support  that too?
>>>
>>> 18. SHARE_CHECKPOINT and SHARE_DELTA records:
>>> 18.1 When does the coordinator write them?
>>> 18.2 If the client commits the acknowledgement successfully, could
>>> the acknowledgement be lost on the broker if the coordinator fails over?
>>>
>>> 19. In the current consumer model, coordinator failover doesn't cause
>>> duplicate records in consumers. In the share group model, I guess this is
>>> no longer true since we are not persisting the acquired state?
>>>
>>> 20. "The calls to KafkaShareConsumer.acknowledge(ConsumerRecord,
>>> AcknowledgeType) must be issued in the order in which the records appear
>> in
>>> the ConsumerRecords object, which will be in order of increasing offset
>> for
>>> each share-partition."
>>> 20.1  What happens if the acknowledge() call doesn't follow this? Does
>>> the caller get an exception?
>>> 20.2 The example with Acknowledge 119. It seems the acknowledgement is
>> out
>>> of order since records at offset 111-118 haven't been acknowledged?
>>>
>>> 21. "Indeed, these message sets are not returned to consumer". Are we
>>> excluding those control records for non-shared consumers too?
>>>
>>> 22. The design doesn't seem to support fetching from the followers. This
>>> might be ok, but it will be useful to explicitly mention this.
>>>
>>> 23. Examples with control records for SHARE_DELTA:
>>> 23.1 Some of the state changes contain cumulative state instead of delta.
>>> For example, "record 110 (available, delivery count 1), records 111-118
>>> acquired, record 119 acknowledged" for "Acknowledge 119".
>>> 23.2 SHARE_DELTA sometimes include available records with DeliveryCount
>> of
>>> 0. But we don't do that for every record. What's the convention?
>>>   {
>>>     "BaseOffset": 111,
>>>     "LastOffset": 118,
>>>     "State": 0 (Available),
>>>     "DeliveryCount": 0
>>>   }
>>>
>>> 24. "when a broker becomes the leader of a share-partition, it must read
>>> the most recent SHARE_CHECKPOINT": How does a broker find this
>> efficiently
>>> on restart?
>>>
>>> 25. AcknowledgeCommitCallback: How would an application use it? It
>> doesn't
>>> indicate which record's acknowledgement has failed.
>>>
>>> 26. AlterShareGroupOffsetsResult alterShareGroupOffsets(String groupId,
>>> Map<TopicPartition, OffsetAndMetadata> offsets): How is the metadata
>> used?
>>> It doesn't seem there is an API to use it in either the client
>> application
>>> or the broker.
>>>
>>> 27. It would be useful to add a section on downgradability since the KIP
>>> changes the record format in the internal offset topic.
>>>
>>> Thanks,
>>>
>>> Jun
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>> On Wed, Oct 11, 2023 at 8:25 AM Andrew Schofield <
>>> andrew_schofield_j...@outlook.com> wrote:
>>>
>>>> Hi Jack,
>>>> Thanks for your comments.
>>>>
>>>> I have added a new section on Log Retention which describes the
>> behaviour
>>>> of the SPSO as the LSO advances. That makes total sense
>>>> and was an omission from the KIP.
>>>>
>>>> I have added the other ideas as potential future work. I do like the
>> idea
>>>> of having the SPSO influence the advancements of the LSO
>>>> for topics which are primarily being using with share groups.
>>>>
>>>> I have published an updated version of the KIP.
>>>>
>>>> Thanks,
>>>> Andrew
>>>>
>>>>> On 4 Oct 2023, at 10:09, Jack Vanlightly <vanligh...@apache.org>
>> wrote:
>>>>>
>>>>> I would like to see more explicit discussion of topic retention and
>>>> share groups. There are a few options here from simple to more
>>>> sophisticated. There are also topic-level and share-group level options.
>>>>>
>>>>> The simple thing would be to ensure that the SPSO of each share group
>> is
>>>> bounded by the Log Start Offset (LSO) of each partition which itself is
>>>> managed by the retention policy. This is a topic-level control which
>>>> applies to all share-groups. I would say that this shared retention is
>> the
>>>> largest drawback of modeling queues on shared logs and this is worth
>> noting.
>>>>>
>>>>> More sophisticated approaches can be to allow the LSO to advance not
>>>> (only) by retention policy but by the advancement of the lowest SPSO.
>> This
>>>> can keep the amount of data lower by garbage collecting messages that
>> have
>>>> been acknowledged by all share groups. Some people may like that
>> behaviour
>>>> on those topics where share groups are the only consumption model and no
>>>> replay is needed.
>>>>>
>>>>> There are per-share-group possibilities such as share-group TTLs where
>>>> messages can be archived on a per share group basis.
>>>>>
>>>>> Thanks
>>>>> Jack
>>>>
>>>>
>>
>>
>
> --
>
> [image: Confluent] <https://www.confluent.io/>
> Chirag Wadhwa
> Software Engineer Intern
> +91 9873590730 <+91+9873590730>
> Follow us: [image: Blog]
> <https://www.confluent.io/blog?utm_source=footer&utm_medium=email&utm_campaign=ch.email-signature_type.community_content.blog>[image:
> Twitter] <https://twitter.com/ConfluentInc>
>
> [image: Try Confluent Cloud for Free]
> <https://www.confluent.io/get-started?utm_campaign=tm.fm-apac_cd.inbound&utm_source=gmail&utm_medium=organic>

Reply via email to