Hi Chirag, Thanks for your question. 28. Good catch. Those options were omitted in error. I will update the KIP.
Thanks, Andrew > On 12 Feb 2024, at 13:06, Chirag Wadhwa <cwad...@confluent.io.INVALID> wrote: > > Hi Andrew, > > Thank you for the KIP, it is a great read ! I just have a small question. > > 28. I noticed that the "*--state*" and "*--timeout*" options are not > mentioned for the kafka-share-groups.sh tool. Was this omission > intentional, or is it possibly an oversight in the KIP? > Thanks, > Chirag > > On Mon, Feb 12, 2024 at 5:25 PM Andrew Schofield < > andrew_schofield_j...@outlook.com> wrote: > >> Hi Jun >> Thanks for your comments. >> >> 10. For read-uncommitted isolation level, the consumer just reads all >> records. >> For read-committed isolation level, the share-partition leader does the >> filtering to >> enable correct skipping of aborted records. The consumers in a share group >> are not >> aware of the filtering, unlike consumers in consumer groups. >> >> 11. The “classic” type is the pre-KIP 848 consumer groups. >> >> 12. By setting the configuration for a group resource, you are saying >> “when a new group is >> created with this name, it must have this type”. It’s not changing the >> type of an existing >> group. >> >> 13. Good catch. The Server Assignor should be at group level. I will >> change it. >> >> 14. That is true. I have maintained it to keep similarity with consumer >> groups, >> but it is not currently exposed to clients. It might be best to remove it. >> >> 15. I had intended that SimpleAssignor implements >> org.apache.kafka.clients.consumer.ConsumerPartitionAssignor. >> Actually, I think there’s benefit to using a new interface so that someone >> doesn’t inadvertently >> configure something like the RoundRobinAssignor for a share group. It >> wouldn’t go well. I will >> add a new interface to the KIP. >> >> 16. When an existing member issues a ShareGroupHeartbeatRequest to the new >> coordinator, >> the coordinator returns UNKNOWN_MEMBER_ID. The client then sends another >> ShareGroupHeartbeatRequest >> containing no member ID and epoch 0. The coordinator then returns the >> member ID. >> >> 17. I don’t think so. What is the client going to do with the exception? >> Share groups are >> intentionally removing some of the details of using Kafka offsets from the >> consumers. If the >> SPSO needs to be reset due to retention, it just does that automatically. >> >> 18. The proposed use of control records needs some careful thought. >> 18.1. They’re written by the share-partition leader, not the coordinator. >> 18.2. If the client commits the acknowledgement, it is only confirmed to >> the client >> once it has been replicated to the other replica brokers. So, committing >> an acknowledgement >> is very similar to sending a record to a topic in terms of the behaviour. >> >> 19. You are correct. The possibility of record duplication exists in >> failure scenarios. A future KIP >> will add EOS support for share groups. >> >> 20.1. Yes, an exception. I was thinking InvalidOffsetException. I will >> update the KIP with more >> detail about protocol error codes and API exceptions. >> 20.2. I think that’s a mistake. I’ll rectify it. >> >> 21. The message sets for the new control records would be filtered out for >> all consumers. >> >> 22. Fetch from follower is not supported. I will update the KIP. >> >> 23.1. I am not quite happy with the explanation of the checkpoint and >> delta records. Essentially, >> there needs to be one checkpoint and then any number of deltas. Then >> another checkpoint supersedes >> the previous records, and can have its own sequence of deltas, and so on. >> Because recovery requires the >> leader to read the latest checkpoint and all subsequent deltas, you want >> to take checkpoints frequently >> enough to speed up recovery, but infrequently enough to minimise the >> performance impact of reserializing >> all the state. >> 23.2. I’ll check the document again carefully, but the SHARE_DELTA should >> always contain DeliveryCount >> for every member of the States array. >> >> 24. I was anticipating added to the index files which are part of each log >> segment. >> >> 25. The acknowledgements for each topic-partition are atomic. All this >> really means is that we perform the >> state checking and the state persistence atomically (one control record). >> The callback tells you whether the >> acknowledgements for the entire topic-partition succeeded or failed, >> rather than each record individually. >> I could have gone with a callback with a record-based interface. Would >> that be preferable, do you think? >> For one thing, that does give more flexibility for optimisations such as >> fetch pipelining in the future. >> >> 26. The metadata is unused. This is re-using an existing class >> (OffsetAndMetadata). Perhaps it would be better >> not to. >> >> 27. Yes, agreed. I will add it. >> >> Thanks, >> Andrew >> >>> On 9 Feb 2024, at 23:14, Jun Rao <j...@confluent.io.INVALID> wrote: >>> >>> Hi, Andrew, >>> >>> Thanks for the KIP. A few comments below. >>> >>> 10. ShareFetchResponse: To consume transactional data, currently >>> FetchResponse includes the AbortedTransactions fields for the client to >>> properly skip aborted records. ShareFetchResponse doesn't include that. >> How >>> do we prevent the consumer from reading aborted records in a share group? >>> >>> 11. "adding "share" to the existing group types of "consumer" and >>> "classic" " >>> What's the "classic" type? >>> >>> 12. bin/kafka-configs.sh --bootstrap-server localhost:9092 --entity-name >>> group --entity-name G1 --alter --add-config group.type=share >>> So, one could change the group type? What happens to the states >> associated >>> with the group (members, epoch, offsets, etc)? >>> >>> 13. Why is Server Assignor at member level, instead of group level? >>> >>> 14. Member.metadata: How is that being used? It isn't exposed to the >> client. >>> >>> 15. What public interface does SimpleAssignor implement? >>> >>> 16. "This means that existing members will have to rejoin the share group >>> following a coordinator failover." >>> When an existing member issues a ShareGroupHeartbeatRequest to the new >>> coordinator, does the coordinator return UNKNOWN_MEMBER_ID and a new >>> memberId? >>> >>> 17. auto.offset.reset has the option to throw an exception to the client >> if >>> the current offset does not exist any more on the server (e.g. due to >>> retention). Should group.share.auto.offset.reset support that too? >>> >>> 18. SHARE_CHECKPOINT and SHARE_DELTA records: >>> 18.1 When does the coordinator write them? >>> 18.2 If the client commits the acknowledgement successfully, could >>> the acknowledgement be lost on the broker if the coordinator fails over? >>> >>> 19. In the current consumer model, coordinator failover doesn't cause >>> duplicate records in consumers. In the share group model, I guess this is >>> no longer true since we are not persisting the acquired state? >>> >>> 20. "The calls to KafkaShareConsumer.acknowledge(ConsumerRecord, >>> AcknowledgeType) must be issued in the order in which the records appear >> in >>> the ConsumerRecords object, which will be in order of increasing offset >> for >>> each share-partition." >>> 20.1 What happens if the acknowledge() call doesn't follow this? Does >>> the caller get an exception? >>> 20.2 The example with Acknowledge 119. It seems the acknowledgement is >> out >>> of order since records at offset 111-118 haven't been acknowledged? >>> >>> 21. "Indeed, these message sets are not returned to consumer". Are we >>> excluding those control records for non-shared consumers too? >>> >>> 22. The design doesn't seem to support fetching from the followers. This >>> might be ok, but it will be useful to explicitly mention this. >>> >>> 23. Examples with control records for SHARE_DELTA: >>> 23.1 Some of the state changes contain cumulative state instead of delta. >>> For example, "record 110 (available, delivery count 1), records 111-118 >>> acquired, record 119 acknowledged" for "Acknowledge 119". >>> 23.2 SHARE_DELTA sometimes include available records with DeliveryCount >> of >>> 0. But we don't do that for every record. What's the convention? >>> { >>> "BaseOffset": 111, >>> "LastOffset": 118, >>> "State": 0 (Available), >>> "DeliveryCount": 0 >>> } >>> >>> 24. "when a broker becomes the leader of a share-partition, it must read >>> the most recent SHARE_CHECKPOINT": How does a broker find this >> efficiently >>> on restart? >>> >>> 25. AcknowledgeCommitCallback: How would an application use it? It >> doesn't >>> indicate which record's acknowledgement has failed. >>> >>> 26. AlterShareGroupOffsetsResult alterShareGroupOffsets(String groupId, >>> Map<TopicPartition, OffsetAndMetadata> offsets): How is the metadata >> used? >>> It doesn't seem there is an API to use it in either the client >> application >>> or the broker. >>> >>> 27. It would be useful to add a section on downgradability since the KIP >>> changes the record format in the internal offset topic. >>> >>> Thanks, >>> >>> Jun >>> >>> >>> >>> >>> >>> >>> >>> >>> >>> >>> >>> >>> On Wed, Oct 11, 2023 at 8:25 AM Andrew Schofield < >>> andrew_schofield_j...@outlook.com> wrote: >>> >>>> Hi Jack, >>>> Thanks for your comments. >>>> >>>> I have added a new section on Log Retention which describes the >> behaviour >>>> of the SPSO as the LSO advances. That makes total sense >>>> and was an omission from the KIP. >>>> >>>> I have added the other ideas as potential future work. I do like the >> idea >>>> of having the SPSO influence the advancements of the LSO >>>> for topics which are primarily being using with share groups. >>>> >>>> I have published an updated version of the KIP. >>>> >>>> Thanks, >>>> Andrew >>>> >>>>> On 4 Oct 2023, at 10:09, Jack Vanlightly <vanligh...@apache.org> >> wrote: >>>>> >>>>> I would like to see more explicit discussion of topic retention and >>>> share groups. There are a few options here from simple to more >>>> sophisticated. There are also topic-level and share-group level options. >>>>> >>>>> The simple thing would be to ensure that the SPSO of each share group >> is >>>> bounded by the Log Start Offset (LSO) of each partition which itself is >>>> managed by the retention policy. This is a topic-level control which >>>> applies to all share-groups. I would say that this shared retention is >> the >>>> largest drawback of modeling queues on shared logs and this is worth >> noting. >>>>> >>>>> More sophisticated approaches can be to allow the LSO to advance not >>>> (only) by retention policy but by the advancement of the lowest SPSO. >> This >>>> can keep the amount of data lower by garbage collecting messages that >> have >>>> been acknowledged by all share groups. Some people may like that >> behaviour >>>> on those topics where share groups are the only consumption model and no >>>> replay is needed. >>>>> >>>>> There are per-share-group possibilities such as share-group TTLs where >>>> messages can be archived on a per share group basis. >>>>> >>>>> Thanks >>>>> Jack >>>> >>>> >> >> > > -- > > [image: Confluent] <https://www.confluent.io/> > Chirag Wadhwa > Software Engineer Intern > +91 9873590730 <+91+9873590730> > Follow us: [image: Blog] > <https://www.confluent.io/blog?utm_source=footer&utm_medium=email&utm_campaign=ch.email-signature_type.community_content.blog>[image: > Twitter] <https://twitter.com/ConfluentInc> > > [image: Try Confluent Cloud for Free] > <https://www.confluent.io/get-started?utm_campaign=tm.fm-apac_cd.inbound&utm_source=gmail&utm_medium=organic>