Hi Daniel, Kind of. I don’t want a transaction abort to cause disappearance of records which are already in-flight. A “pending” state doesn’t seem helpful for read_committed. There’s no such disappearance problem for read_uncommitted.
Thanks, Andrew > On 7 Jun 2023, at 16:19, Dániel Urbán <urb.dani...@gmail.com> wrote: > > Hi Andrew, > > I agree with having a single isolation.level for the whole group, it makes > sense. > As for: > "b) The default isolation level for a share group is read_committed, in > which case > the SPSO and SPEO cannot move past the LSO." > > With this limitation (SPEO not moving beyond LSO), are you trying to avoid > handling the complexity of some kind of a "pending" state for the > uncommitted in-flight messages? > > Thanks, > Daniel > > Andrew Schofield <andrew_schofield_j...@outlook.com> ezt írta (időpont: > 2023. jún. 7., Sze, 16:52): > >> HI Daniel, >> I’ve been thinking about this question and I think this area is a bit >> tricky. >> >> If there are some consumers in a share group with isolation level >> read_uncommitted >> and other consumers with read_committed, they have different expectations >> with >> regards to which messages are visible when EOS comes into the picture. >> It seems to me that this is not necessarily a good thing. >> >> One option would be to support just read_committed in KIP-932. This means >> it is unambiguous which records are in-flight, because they’re only >> committed >> ones. >> >> Another option would be to have the entire share group have an isolation >> level, >> which again gives an unambiguous set of in-flight records but without the >> restriction of permitting just read_committed behaviour. >> >> So, my preference is for the following: >> a) A share group has an isolation level that applies to all consumers in >> the group. >> b) The default isolation level for a share group is read_committed, in >> which case >> the SPSO and SPEO cannot move past the LSO. >> c) For a share group with read_uncommited isolation level, the SPSO and >> SPEO >> can move past the LSO. >> d) The kafka_configs.sh tool or the AdminClient can be used to set a >> non-default >> value for the isolation level for a share group. The value is applied when >> the first >> member joins. >> >> Thanks, >> Andrew >> >>> On 2 Jun 2023, at 10:02, Dániel Urbán <urb.dani...@gmail.com> wrote: >>> >>> Hi Andrew, >>> Thank you for the clarification. One follow-up to read_committed mode: >>> Taking the change in message ordering guarantees into account, does this >>> mean that in queues, share-group consumers will be able to consume >>> committed records AFTER the LSO? >>> Thanks, >>> Daniel >>> >>> Andrew Schofield <andrew_schofield_j...@outlook.com> ezt írta (időpont: >>> 2023. jún. 2., P, 10:39): >>> >>>> Hi Daniel, >>>> Thanks for your questions. >>>> >>>> 1) Yes, read_committed fetch will still be possible. >>>> >>>> 2) You weren’t wrong that this is a broad question :) >>>> >>>> Broadly speaking, I can see two ways of managing the in-flight records: >>>> the share-partition leader does it, or the share-group coordinator does >> it. >>>> I want to choose what works best, and I happen to have started with >> trying >>>> the share-partition leader doing it. This is just a whiteboard exercise >> at >>>> the >>>> moment, looking at the potential protocol flows and how well it all >> hangs >>>> together. When I have something coherent and understandable and worth >>>> reviewing, I’ll update the KIP with a proposal. >>>> >>>> I think it’s probably worth doing a similar exercise for the share-group >>>> coordinator way too. There are bound to be pros and cons, and I don’t >>>> really >>>> mind which way prevails. >>>> >>>> If the share-group coordinator does it, I already have experience of >>>> efficient >>>> storage of in-flight record state in a way that scales and is >>>> space-efficient. >>>> If the share-partition leader does it, storage of in-flight state is a >> bit >>>> more >>>> tricky. >>>> >>>> I think it’s worth thinking ahead to how EOS will work and also another >>>> couple of enhancements (key-based ordering and acquisition lock >>>> extension) so it’s somewhat future-proof. >>>> >>>> Thanks, >>>> Andrew >>>> >>>>> On 1 Jun 2023, at 11:51, Dániel Urbán <urb.dani...@gmail.com> wrote: >>>>> >>>>> Hi Andrew, >>>>> >>>>> Thank you for the KIP, exciting work you are doing :) >>>>> I have 2 questions: >>>>> 1. I understand that EOS won't be supported for share-groups (yet), but >>>>> read_committed fetch will still be possible, correct? >>>>> >>>>> 2. I have a very broad question about the proposed solution: why not >> let >>>>> the share-group coordinator manage the states of the in-flight records? >>>>> I'm asking this because it seems to me that using the same pattern as >> the >>>>> existing group coordinator would >>>>> a, solve the durability of the message state storage (same method as >> the >>>>> one used by the current group coordinator) >>>>> >>>>> b, pave the way for EOS with share-groups (same method as the one used >> by >>>>> the current group coordinator) >>>>> >>>>> c, allow follower-fetching >>>>> I saw your point about this: "FFF gives freedom to fetch records from a >>>>> nearby broker, but it does not also give the ability to commit offsets >>>> to a >>>>> nearby broker" >>>>> But does it matter if message acknowledgement is not "local"? >> Supposedly, >>>>> fetching is the actual hard work which benefits from follower fetching, >>>> not >>>>> the group related requests. >>>>> >>>>> The only problem I see with the share-group coordinator managing the >>>>> in-flight message state is that the coordinator is not aware of the >> exact >>>>> available offsets of a partition, nor how the messages are batched. For >>>>> this problem, maybe the share group coordinator could use some form of >>>>> "logical" addresses, such as "the next 2 batches after offset X", or >>>> "after >>>>> offset X, skip 2 batches, fetch next 2". Acknowledgements always >> contain >>>>> the exact offset, but for the "unknown" sections of a partition, these >>>>> logical addresses would be used. The coordinator could keep track of >>>>> message states with a mix of offsets and these batch based addresses. >> The >>>>> partition leader could support "skip X, fetch Y batches" fetch >> requests. >>>>> This solution would need changes in the Fetch API to allow such batch >>>> based >>>>> addresses, but I assume that fetch protocol changes will be needed >>>>> regardless of the specific solution. >>>>> >>>>> Thanks, >>>>> Daniel >>>>> >>>>> Andrew Schofield <andrew_schofi...@live.com> ezt írta (időpont: 2023. >>>> máj. >>>>> 30., K, 18:15): >>>>> >>>>>> Yes, that’s it. I imagine something similar to KIP-848 for managing >> the >>>>>> share group >>>>>> membership, and consumers that fetch records from their assigned >>>>>> partitions and >>>>>> acknowledge when delivery completes. >>>>>> >>>>>> Thanks, >>>>>> Andrew >>>>>> >>>>>>> On 30 May 2023, at 16:52, Adam Warski <a...@warski.org> wrote: >>>>>>> >>>>>>> Thanks for the explanation! >>>>>>> >>>>>>> So effectively, a share group is subscribed to each partition - but >> the >>>>>> data is not pushed to the consumer, but only sent on demand. And when >>>>>> demand is signalled, a batch of messages is sent? >>>>>>> Hence it would be up to the consumer to prefetch a sufficient number >> of >>>>>> batches to ensure, that it will never be "bored"? >>>>>>> >>>>>>> Adam >>>>>>> >>>>>>>> On 30 May 2023, at 15:25, Andrew Schofield < >> andrew_schofi...@live.com >>>>> >>>>>> wrote: >>>>>>>> >>>>>>>> Hi Adam, >>>>>>>> Thanks for your question. >>>>>>>> >>>>>>>> With a share group, each fetch is able to grab available records >> from >>>>>> any partition. So, it alleviates >>>>>>>> the “head-of-line” blocking problem where a slow consumer gets in >> the >>>>>> way. There’s no actual >>>>>>>> stealing from a slow consumer, but it can be overtaken and must >>>>>> complete its processing within >>>>>>>> the timeout. >>>>>>>> >>>>>>>> The way I see this working is that when a consumer joins a share >>>> group, >>>>>> it receives a set of >>>>>>>> assigned share-partitions. To start with, every consumer will be >>>>>> assigned all partitions. We >>>>>>>> can be smarter than that, but I think that’s really a question of >>>>>> writing a smarter assignor >>>>>>>> just as has occurred over the years with consumer groups. >>>>>>>> >>>>>>>> Only a small proportion of Kafka workloads are super high >> throughput. >>>>>> Share groups would >>>>>>>> struggle with those I’m sure. Share groups do not diminish the value >>>> of >>>>>> consumer groups >>>>>>>> for streaming. They just give another option for situations where a >>>>>> different style of >>>>>>>> consumption is more appropriate. >>>>>>>> >>>>>>>> Thanks, >>>>>>>> Andrew >>>>>>>> >>>>>>>>> On 29 May 2023, at 17:18, Adam Warski <a...@warski.org> wrote: >>>>>>>>> >>>>>>>>> Hello, >>>>>>>>> >>>>>>>>> thank you for the proposal! A very interesting read. >>>>>>>>> >>>>>>>>> I do have one question, though. When you subscribe to a topic using >>>>>> consumer groups, it might happen that one consumer has processed all >>>>>> messages from its partitions, while another one still has a lot of >> work >>>> to >>>>>> do (this might be due to unbalanced partitioning, long processing >> times >>>>>> etc.). In a message-queue approach, it would be great to solve this >>>> problem >>>>>> - so that a consumer that is free can steal work from other consumers. >>>> Is >>>>>> this somehow covered by share groups? >>>>>>>>> >>>>>>>>> Maybe this is planned as "further work", as indicated here: >>>>>>>>> >>>>>>>>> " >>>>>>>>> It manages the topic-partition assignments for the share-group >>>>>> members. An initial, trivial implementation would be to give each >> member >>>>>> the list of all topic-partitions which matches its subscriptions and >>>> then >>>>>> use the pull-based protocol to fetch records from all partitions. A >> more >>>>>> sophisticated implementation could use topic-partition load and lag >>>> metrics >>>>>> to distribute partitions among the consumers as a kind of autonomous, >>>>>> self-balancing partition assignment, steering more consumers to busier >>>>>> partitions, for example. Alternatively, a push-based fetching scheme >>>> could >>>>>> be used. Protocol details will follow later. >>>>>>>>> " >>>>>>>>> >>>>>>>>> but I’m not sure if I understand this correctly. A fully-connected >>>>>> graph seems like a lot of connections, and I’m not sure if this would >>>> play >>>>>> well with streaming. >>>>>>>>> >>>>>>>>> This also seems as one of the central problems - a key >> differentiator >>>>>> between share and consumer groups (the other one being persisting >> state >>>> of >>>>>> messages). And maybe the exact way we’d want to approach this would, >> to >>>> a >>>>>> certain degree, dictate the design of the queueing system? >>>>>>>>> >>>>>>>>> Best, >>>>>>>>> Adam Warski >>>>>>>>> >>>>>>>>> On 2023/05/15 11:55:14 Andrew Schofield wrote: >>>>>>>>>> Hi, >>>>>>>>>> I would like to start a discussion thread on KIP-932: Queues for >>>>>> Kafka. This KIP proposes an alternative to consumer groups to enable >>>>>> cooperative consumption by consumers without partition assignment. You >>>> end >>>>>> up with queue semantics on top of regular Kafka topics, with >> per-message >>>>>> acknowledgement and automatic handling of messages which repeatedly >>>> fail to >>>>>> be processed. >>>>>>>>>> >>>>>>>>>> >>>>>> >>>> >> https://cwiki.apache.org/confluence/display/KAFKA/KIP-932%3A+Queues+for+Kafka >>>>>>>>>> >>>>>>>>>> Please take a look and let me know what you think. >>>>>>>>>> >>>>>>>>>> Thanks. >>>>>>>>>> Andrew >>>>>>>>> >>>>>>>> >>>>>>> >>>>>>> -- >>>>>>> Adam Warski >>>>>>> >>>>>>> https://www.softwaremill.com/ >>>>>>> https://twitter.com/adamwarski