Hi Daniel, True, I see your point. It’s analogous to a KafkaConsumer fetching uncommitted records but not delivering them to the application.
Thanks, Andrew > On 7 Jun 2023, at 16:38, Dániel Urbán <urb.dani...@gmail.com> wrote: > > Hi Andrew, > > I think the "pending" state could be the solution for reading beyond the > LSO. Pending could indicate that a message is not yet available for > consumption (so they won't be offered for consumers), but with transactions > ending, they can become "available". With a pending state, records wouldn't > "disappear", they would simply not show up until they become available on > commit, or archived on abort. > > Regardless, I understand that this might be some extra, unwanted > complexity, I just thought that with the message ordering guarantee gone, > it would be a cool feature for share-groups. I've seen use-cases where the > LSO being blocked for an extended period of time caused huge lag for > traditional read_committed consumers, which could be completely avoided by > share-groups. > > Thanks, > Daniel > > Andrew Schofield <andrew_schofield_j...@outlook.com> ezt írta (időpont: > 2023. jún. 7., Sze, 17:28): > >> Hi Daniel, >> Kind of. I don’t want a transaction abort to cause disappearance of >> records which are already in-flight. A “pending” state doesn’t seem >> helpful for read_committed. There’s no such disappearance problem >> for read_uncommitted. >> >> Thanks, >> Andrew >> >>> On 7 Jun 2023, at 16:19, Dániel Urbán <urb.dani...@gmail.com> wrote: >>> >>> Hi Andrew, >>> >>> I agree with having a single isolation.level for the whole group, it >> makes >>> sense. >>> As for: >>> "b) The default isolation level for a share group is read_committed, in >>> which case >>> the SPSO and SPEO cannot move past the LSO." >>> >>> With this limitation (SPEO not moving beyond LSO), are you trying to >> avoid >>> handling the complexity of some kind of a "pending" state for the >>> uncommitted in-flight messages? >>> >>> Thanks, >>> Daniel >>> >>> Andrew Schofield <andrew_schofield_j...@outlook.com> ezt írta (időpont: >>> 2023. jún. 7., Sze, 16:52): >>> >>>> HI Daniel, >>>> I’ve been thinking about this question and I think this area is a bit >>>> tricky. >>>> >>>> If there are some consumers in a share group with isolation level >>>> read_uncommitted >>>> and other consumers with read_committed, they have different >> expectations >>>> with >>>> regards to which messages are visible when EOS comes into the picture. >>>> It seems to me that this is not necessarily a good thing. >>>> >>>> One option would be to support just read_committed in KIP-932. This >> means >>>> it is unambiguous which records are in-flight, because they’re only >>>> committed >>>> ones. >>>> >>>> Another option would be to have the entire share group have an isolation >>>> level, >>>> which again gives an unambiguous set of in-flight records but without >> the >>>> restriction of permitting just read_committed behaviour. >>>> >>>> So, my preference is for the following: >>>> a) A share group has an isolation level that applies to all consumers in >>>> the group. >>>> b) The default isolation level for a share group is read_committed, in >>>> which case >>>> the SPSO and SPEO cannot move past the LSO. >>>> c) For a share group with read_uncommited isolation level, the SPSO and >>>> SPEO >>>> can move past the LSO. >>>> d) The kafka_configs.sh tool or the AdminClient can be used to set a >>>> non-default >>>> value for the isolation level for a share group. The value is applied >> when >>>> the first >>>> member joins. >>>> >>>> Thanks, >>>> Andrew >>>> >>>>> On 2 Jun 2023, at 10:02, Dániel Urbán <urb.dani...@gmail.com> wrote: >>>>> >>>>> Hi Andrew, >>>>> Thank you for the clarification. One follow-up to read_committed mode: >>>>> Taking the change in message ordering guarantees into account, does >> this >>>>> mean that in queues, share-group consumers will be able to consume >>>>> committed records AFTER the LSO? >>>>> Thanks, >>>>> Daniel >>>>> >>>>> Andrew Schofield <andrew_schofield_j...@outlook.com> ezt írta >> (időpont: >>>>> 2023. jún. 2., P, 10:39): >>>>> >>>>>> Hi Daniel, >>>>>> Thanks for your questions. >>>>>> >>>>>> 1) Yes, read_committed fetch will still be possible. >>>>>> >>>>>> 2) You weren’t wrong that this is a broad question :) >>>>>> >>>>>> Broadly speaking, I can see two ways of managing the in-flight >> records: >>>>>> the share-partition leader does it, or the share-group coordinator >> does >>>> it. >>>>>> I want to choose what works best, and I happen to have started with >>>> trying >>>>>> the share-partition leader doing it. This is just a whiteboard >> exercise >>>> at >>>>>> the >>>>>> moment, looking at the potential protocol flows and how well it all >>>> hangs >>>>>> together. When I have something coherent and understandable and worth >>>>>> reviewing, I’ll update the KIP with a proposal. >>>>>> >>>>>> I think it’s probably worth doing a similar exercise for the >> share-group >>>>>> coordinator way too. There are bound to be pros and cons, and I don’t >>>>>> really >>>>>> mind which way prevails. >>>>>> >>>>>> If the share-group coordinator does it, I already have experience of >>>>>> efficient >>>>>> storage of in-flight record state in a way that scales and is >>>>>> space-efficient. >>>>>> If the share-partition leader does it, storage of in-flight state is a >>>> bit >>>>>> more >>>>>> tricky. >>>>>> >>>>>> I think it’s worth thinking ahead to how EOS will work and also >> another >>>>>> couple of enhancements (key-based ordering and acquisition lock >>>>>> extension) so it’s somewhat future-proof. >>>>>> >>>>>> Thanks, >>>>>> Andrew >>>>>> >>>>>>> On 1 Jun 2023, at 11:51, Dániel Urbán <urb.dani...@gmail.com> wrote: >>>>>>> >>>>>>> Hi Andrew, >>>>>>> >>>>>>> Thank you for the KIP, exciting work you are doing :) >>>>>>> I have 2 questions: >>>>>>> 1. I understand that EOS won't be supported for share-groups (yet), >> but >>>>>>> read_committed fetch will still be possible, correct? >>>>>>> >>>>>>> 2. I have a very broad question about the proposed solution: why not >>>> let >>>>>>> the share-group coordinator manage the states of the in-flight >> records? >>>>>>> I'm asking this because it seems to me that using the same pattern as >>>> the >>>>>>> existing group coordinator would >>>>>>> a, solve the durability of the message state storage (same method as >>>> the >>>>>>> one used by the current group coordinator) >>>>>>> >>>>>>> b, pave the way for EOS with share-groups (same method as the one >> used >>>> by >>>>>>> the current group coordinator) >>>>>>> >>>>>>> c, allow follower-fetching >>>>>>> I saw your point about this: "FFF gives freedom to fetch records >> from a >>>>>>> nearby broker, but it does not also give the ability to commit >> offsets >>>>>> to a >>>>>>> nearby broker" >>>>>>> But does it matter if message acknowledgement is not "local"? >>>> Supposedly, >>>>>>> fetching is the actual hard work which benefits from follower >> fetching, >>>>>> not >>>>>>> the group related requests. >>>>>>> >>>>>>> The only problem I see with the share-group coordinator managing the >>>>>>> in-flight message state is that the coordinator is not aware of the >>>> exact >>>>>>> available offsets of a partition, nor how the messages are batched. >> For >>>>>>> this problem, maybe the share group coordinator could use some form >> of >>>>>>> "logical" addresses, such as "the next 2 batches after offset X", or >>>>>> "after >>>>>>> offset X, skip 2 batches, fetch next 2". Acknowledgements always >>>> contain >>>>>>> the exact offset, but for the "unknown" sections of a partition, >> these >>>>>>> logical addresses would be used. The coordinator could keep track of >>>>>>> message states with a mix of offsets and these batch based addresses. >>>> The >>>>>>> partition leader could support "skip X, fetch Y batches" fetch >>>> requests. >>>>>>> This solution would need changes in the Fetch API to allow such batch >>>>>> based >>>>>>> addresses, but I assume that fetch protocol changes will be needed >>>>>>> regardless of the specific solution. >>>>>>> >>>>>>> Thanks, >>>>>>> Daniel >>>>>>> >>>>>>> Andrew Schofield <andrew_schofi...@live.com> ezt írta (időpont: >> 2023. >>>>>> máj. >>>>>>> 30., K, 18:15): >>>>>>> >>>>>>>> Yes, that’s it. I imagine something similar to KIP-848 for managing >>>> the >>>>>>>> share group >>>>>>>> membership, and consumers that fetch records from their assigned >>>>>>>> partitions and >>>>>>>> acknowledge when delivery completes. >>>>>>>> >>>>>>>> Thanks, >>>>>>>> Andrew >>>>>>>> >>>>>>>>> On 30 May 2023, at 16:52, Adam Warski <a...@warski.org> wrote: >>>>>>>>> >>>>>>>>> Thanks for the explanation! >>>>>>>>> >>>>>>>>> So effectively, a share group is subscribed to each partition - but >>>> the >>>>>>>> data is not pushed to the consumer, but only sent on demand. And >> when >>>>>>>> demand is signalled, a batch of messages is sent? >>>>>>>>> Hence it would be up to the consumer to prefetch a sufficient >> number >>>> of >>>>>>>> batches to ensure, that it will never be "bored"? >>>>>>>>> >>>>>>>>> Adam >>>>>>>>> >>>>>>>>>> On 30 May 2023, at 15:25, Andrew Schofield < >>>> andrew_schofi...@live.com >>>>>>> >>>>>>>> wrote: >>>>>>>>>> >>>>>>>>>> Hi Adam, >>>>>>>>>> Thanks for your question. >>>>>>>>>> >>>>>>>>>> With a share group, each fetch is able to grab available records >>>> from >>>>>>>> any partition. So, it alleviates >>>>>>>>>> the “head-of-line” blocking problem where a slow consumer gets in >>>> the >>>>>>>> way. There’s no actual >>>>>>>>>> stealing from a slow consumer, but it can be overtaken and must >>>>>>>> complete its processing within >>>>>>>>>> the timeout. >>>>>>>>>> >>>>>>>>>> The way I see this working is that when a consumer joins a share >>>>>> group, >>>>>>>> it receives a set of >>>>>>>>>> assigned share-partitions. To start with, every consumer will be >>>>>>>> assigned all partitions. We >>>>>>>>>> can be smarter than that, but I think that’s really a question of >>>>>>>> writing a smarter assignor >>>>>>>>>> just as has occurred over the years with consumer groups. >>>>>>>>>> >>>>>>>>>> Only a small proportion of Kafka workloads are super high >>>> throughput. >>>>>>>> Share groups would >>>>>>>>>> struggle with those I’m sure. Share groups do not diminish the >> value >>>>>> of >>>>>>>> consumer groups >>>>>>>>>> for streaming. They just give another option for situations where >> a >>>>>>>> different style of >>>>>>>>>> consumption is more appropriate. >>>>>>>>>> >>>>>>>>>> Thanks, >>>>>>>>>> Andrew >>>>>>>>>> >>>>>>>>>>> On 29 May 2023, at 17:18, Adam Warski <a...@warski.org> wrote: >>>>>>>>>>> >>>>>>>>>>> Hello, >>>>>>>>>>> >>>>>>>>>>> thank you for the proposal! A very interesting read. >>>>>>>>>>> >>>>>>>>>>> I do have one question, though. When you subscribe to a topic >> using >>>>>>>> consumer groups, it might happen that one consumer has processed all >>>>>>>> messages from its partitions, while another one still has a lot of >>>> work >>>>>> to >>>>>>>> do (this might be due to unbalanced partitioning, long processing >>>> times >>>>>>>> etc.). In a message-queue approach, it would be great to solve this >>>>>> problem >>>>>>>> - so that a consumer that is free can steal work from other >> consumers. >>>>>> Is >>>>>>>> this somehow covered by share groups? >>>>>>>>>>> >>>>>>>>>>> Maybe this is planned as "further work", as indicated here: >>>>>>>>>>> >>>>>>>>>>> " >>>>>>>>>>> It manages the topic-partition assignments for the share-group >>>>>>>> members. An initial, trivial implementation would be to give each >>>> member >>>>>>>> the list of all topic-partitions which matches its subscriptions and >>>>>> then >>>>>>>> use the pull-based protocol to fetch records from all partitions. A >>>> more >>>>>>>> sophisticated implementation could use topic-partition load and lag >>>>>> metrics >>>>>>>> to distribute partitions among the consumers as a kind of >> autonomous, >>>>>>>> self-balancing partition assignment, steering more consumers to >> busier >>>>>>>> partitions, for example. Alternatively, a push-based fetching scheme >>>>>> could >>>>>>>> be used. Protocol details will follow later. >>>>>>>>>>> " >>>>>>>>>>> >>>>>>>>>>> but I’m not sure if I understand this correctly. A >> fully-connected >>>>>>>> graph seems like a lot of connections, and I’m not sure if this >> would >>>>>> play >>>>>>>> well with streaming. >>>>>>>>>>> >>>>>>>>>>> This also seems as one of the central problems - a key >>>> differentiator >>>>>>>> between share and consumer groups (the other one being persisting >>>> state >>>>>> of >>>>>>>> messages). And maybe the exact way we’d want to approach this would, >>>> to >>>>>> a >>>>>>>> certain degree, dictate the design of the queueing system? >>>>>>>>>>> >>>>>>>>>>> Best, >>>>>>>>>>> Adam Warski >>>>>>>>>>> >>>>>>>>>>> On 2023/05/15 11:55:14 Andrew Schofield wrote: >>>>>>>>>>>> Hi, >>>>>>>>>>>> I would like to start a discussion thread on KIP-932: Queues for >>>>>>>> Kafka. This KIP proposes an alternative to consumer groups to enable >>>>>>>> cooperative consumption by consumers without partition assignment. >> You >>>>>> end >>>>>>>> up with queue semantics on top of regular Kafka topics, with >>>> per-message >>>>>>>> acknowledgement and automatic handling of messages which repeatedly >>>>>> fail to >>>>>>>> be processed. >>>>>>>>>>>> >>>>>>>>>>>> >>>>>>>> >>>>>> >>>> >> https://cwiki.apache.org/confluence/display/KAFKA/KIP-932%3A+Queues+for+Kafka >>>>>>>>>>>> >>>>>>>>>>>> Please take a look and let me know what you think. >>>>>>>>>>>> >>>>>>>>>>>> Thanks. >>>>>>>>>>>> Andrew >>>>>>>>>>> >>>>>>>>>> >>>>>>>>> >>>>>>>>> -- >>>>>>>>> Adam Warski >>>>>>>>> >>>>>>>>> https://www.softwaremill.com/ >>>>>>>>> https://twitter.com/adamwarski