Hi Andrew, I think the "pending" state could be the solution for reading beyond the LSO. Pending could indicate that a message is not yet available for consumption (so they won't be offered for consumers), but with transactions ending, they can become "available". With a pending state, records wouldn't "disappear", they would simply not show up until they become available on commit, or archived on abort.
Regardless, I understand that this might be some extra, unwanted complexity, I just thought that with the message ordering guarantee gone, it would be a cool feature for share-groups. I've seen use-cases where the LSO being blocked for an extended period of time caused huge lag for traditional read_committed consumers, which could be completely avoided by share-groups. Thanks, Daniel Andrew Schofield <andrew_schofield_j...@outlook.com> ezt írta (időpont: 2023. jún. 7., Sze, 17:28): > Hi Daniel, > Kind of. I don’t want a transaction abort to cause disappearance of > records which are already in-flight. A “pending” state doesn’t seem > helpful for read_committed. There’s no such disappearance problem > for read_uncommitted. > > Thanks, > Andrew > > > On 7 Jun 2023, at 16:19, Dániel Urbán <urb.dani...@gmail.com> wrote: > > > > Hi Andrew, > > > > I agree with having a single isolation.level for the whole group, it > makes > > sense. > > As for: > > "b) The default isolation level for a share group is read_committed, in > > which case > > the SPSO and SPEO cannot move past the LSO." > > > > With this limitation (SPEO not moving beyond LSO), are you trying to > avoid > > handling the complexity of some kind of a "pending" state for the > > uncommitted in-flight messages? > > > > Thanks, > > Daniel > > > > Andrew Schofield <andrew_schofield_j...@outlook.com> ezt írta (időpont: > > 2023. jún. 7., Sze, 16:52): > > > >> HI Daniel, > >> I’ve been thinking about this question and I think this area is a bit > >> tricky. > >> > >> If there are some consumers in a share group with isolation level > >> read_uncommitted > >> and other consumers with read_committed, they have different > expectations > >> with > >> regards to which messages are visible when EOS comes into the picture. > >> It seems to me that this is not necessarily a good thing. > >> > >> One option would be to support just read_committed in KIP-932. This > means > >> it is unambiguous which records are in-flight, because they’re only > >> committed > >> ones. > >> > >> Another option would be to have the entire share group have an isolation > >> level, > >> which again gives an unambiguous set of in-flight records but without > the > >> restriction of permitting just read_committed behaviour. > >> > >> So, my preference is for the following: > >> a) A share group has an isolation level that applies to all consumers in > >> the group. > >> b) The default isolation level for a share group is read_committed, in > >> which case > >> the SPSO and SPEO cannot move past the LSO. > >> c) For a share group with read_uncommited isolation level, the SPSO and > >> SPEO > >> can move past the LSO. > >> d) The kafka_configs.sh tool or the AdminClient can be used to set a > >> non-default > >> value for the isolation level for a share group. The value is applied > when > >> the first > >> member joins. > >> > >> Thanks, > >> Andrew > >> > >>> On 2 Jun 2023, at 10:02, Dániel Urbán <urb.dani...@gmail.com> wrote: > >>> > >>> Hi Andrew, > >>> Thank you for the clarification. One follow-up to read_committed mode: > >>> Taking the change in message ordering guarantees into account, does > this > >>> mean that in queues, share-group consumers will be able to consume > >>> committed records AFTER the LSO? > >>> Thanks, > >>> Daniel > >>> > >>> Andrew Schofield <andrew_schofield_j...@outlook.com> ezt írta > (időpont: > >>> 2023. jún. 2., P, 10:39): > >>> > >>>> Hi Daniel, > >>>> Thanks for your questions. > >>>> > >>>> 1) Yes, read_committed fetch will still be possible. > >>>> > >>>> 2) You weren’t wrong that this is a broad question :) > >>>> > >>>> Broadly speaking, I can see two ways of managing the in-flight > records: > >>>> the share-partition leader does it, or the share-group coordinator > does > >> it. > >>>> I want to choose what works best, and I happen to have started with > >> trying > >>>> the share-partition leader doing it. This is just a whiteboard > exercise > >> at > >>>> the > >>>> moment, looking at the potential protocol flows and how well it all > >> hangs > >>>> together. When I have something coherent and understandable and worth > >>>> reviewing, I’ll update the KIP with a proposal. > >>>> > >>>> I think it’s probably worth doing a similar exercise for the > share-group > >>>> coordinator way too. There are bound to be pros and cons, and I don’t > >>>> really > >>>> mind which way prevails. > >>>> > >>>> If the share-group coordinator does it, I already have experience of > >>>> efficient > >>>> storage of in-flight record state in a way that scales and is > >>>> space-efficient. > >>>> If the share-partition leader does it, storage of in-flight state is a > >> bit > >>>> more > >>>> tricky. > >>>> > >>>> I think it’s worth thinking ahead to how EOS will work and also > another > >>>> couple of enhancements (key-based ordering and acquisition lock > >>>> extension) so it’s somewhat future-proof. > >>>> > >>>> Thanks, > >>>> Andrew > >>>> > >>>>> On 1 Jun 2023, at 11:51, Dániel Urbán <urb.dani...@gmail.com> wrote: > >>>>> > >>>>> Hi Andrew, > >>>>> > >>>>> Thank you for the KIP, exciting work you are doing :) > >>>>> I have 2 questions: > >>>>> 1. I understand that EOS won't be supported for share-groups (yet), > but > >>>>> read_committed fetch will still be possible, correct? > >>>>> > >>>>> 2. I have a very broad question about the proposed solution: why not > >> let > >>>>> the share-group coordinator manage the states of the in-flight > records? > >>>>> I'm asking this because it seems to me that using the same pattern as > >> the > >>>>> existing group coordinator would > >>>>> a, solve the durability of the message state storage (same method as > >> the > >>>>> one used by the current group coordinator) > >>>>> > >>>>> b, pave the way for EOS with share-groups (same method as the one > used > >> by > >>>>> the current group coordinator) > >>>>> > >>>>> c, allow follower-fetching > >>>>> I saw your point about this: "FFF gives freedom to fetch records > from a > >>>>> nearby broker, but it does not also give the ability to commit > offsets > >>>> to a > >>>>> nearby broker" > >>>>> But does it matter if message acknowledgement is not "local"? > >> Supposedly, > >>>>> fetching is the actual hard work which benefits from follower > fetching, > >>>> not > >>>>> the group related requests. > >>>>> > >>>>> The only problem I see with the share-group coordinator managing the > >>>>> in-flight message state is that the coordinator is not aware of the > >> exact > >>>>> available offsets of a partition, nor how the messages are batched. > For > >>>>> this problem, maybe the share group coordinator could use some form > of > >>>>> "logical" addresses, such as "the next 2 batches after offset X", or > >>>> "after > >>>>> offset X, skip 2 batches, fetch next 2". Acknowledgements always > >> contain > >>>>> the exact offset, but for the "unknown" sections of a partition, > these > >>>>> logical addresses would be used. The coordinator could keep track of > >>>>> message states with a mix of offsets and these batch based addresses. > >> The > >>>>> partition leader could support "skip X, fetch Y batches" fetch > >> requests. > >>>>> This solution would need changes in the Fetch API to allow such batch > >>>> based > >>>>> addresses, but I assume that fetch protocol changes will be needed > >>>>> regardless of the specific solution. > >>>>> > >>>>> Thanks, > >>>>> Daniel > >>>>> > >>>>> Andrew Schofield <andrew_schofi...@live.com> ezt írta (időpont: > 2023. > >>>> máj. > >>>>> 30., K, 18:15): > >>>>> > >>>>>> Yes, that’s it. I imagine something similar to KIP-848 for managing > >> the > >>>>>> share group > >>>>>> membership, and consumers that fetch records from their assigned > >>>>>> partitions and > >>>>>> acknowledge when delivery completes. > >>>>>> > >>>>>> Thanks, > >>>>>> Andrew > >>>>>> > >>>>>>> On 30 May 2023, at 16:52, Adam Warski <a...@warski.org> wrote: > >>>>>>> > >>>>>>> Thanks for the explanation! > >>>>>>> > >>>>>>> So effectively, a share group is subscribed to each partition - but > >> the > >>>>>> data is not pushed to the consumer, but only sent on demand. And > when > >>>>>> demand is signalled, a batch of messages is sent? > >>>>>>> Hence it would be up to the consumer to prefetch a sufficient > number > >> of > >>>>>> batches to ensure, that it will never be "bored"? > >>>>>>> > >>>>>>> Adam > >>>>>>> > >>>>>>>> On 30 May 2023, at 15:25, Andrew Schofield < > >> andrew_schofi...@live.com > >>>>> > >>>>>> wrote: > >>>>>>>> > >>>>>>>> Hi Adam, > >>>>>>>> Thanks for your question. > >>>>>>>> > >>>>>>>> With a share group, each fetch is able to grab available records > >> from > >>>>>> any partition. So, it alleviates > >>>>>>>> the “head-of-line” blocking problem where a slow consumer gets in > >> the > >>>>>> way. There’s no actual > >>>>>>>> stealing from a slow consumer, but it can be overtaken and must > >>>>>> complete its processing within > >>>>>>>> the timeout. > >>>>>>>> > >>>>>>>> The way I see this working is that when a consumer joins a share > >>>> group, > >>>>>> it receives a set of > >>>>>>>> assigned share-partitions. To start with, every consumer will be > >>>>>> assigned all partitions. We > >>>>>>>> can be smarter than that, but I think that’s really a question of > >>>>>> writing a smarter assignor > >>>>>>>> just as has occurred over the years with consumer groups. > >>>>>>>> > >>>>>>>> Only a small proportion of Kafka workloads are super high > >> throughput. > >>>>>> Share groups would > >>>>>>>> struggle with those I’m sure. Share groups do not diminish the > value > >>>> of > >>>>>> consumer groups > >>>>>>>> for streaming. They just give another option for situations where > a > >>>>>> different style of > >>>>>>>> consumption is more appropriate. > >>>>>>>> > >>>>>>>> Thanks, > >>>>>>>> Andrew > >>>>>>>> > >>>>>>>>> On 29 May 2023, at 17:18, Adam Warski <a...@warski.org> wrote: > >>>>>>>>> > >>>>>>>>> Hello, > >>>>>>>>> > >>>>>>>>> thank you for the proposal! A very interesting read. > >>>>>>>>> > >>>>>>>>> I do have one question, though. When you subscribe to a topic > using > >>>>>> consumer groups, it might happen that one consumer has processed all > >>>>>> messages from its partitions, while another one still has a lot of > >> work > >>>> to > >>>>>> do (this might be due to unbalanced partitioning, long processing > >> times > >>>>>> etc.). In a message-queue approach, it would be great to solve this > >>>> problem > >>>>>> - so that a consumer that is free can steal work from other > consumers. > >>>> Is > >>>>>> this somehow covered by share groups? > >>>>>>>>> > >>>>>>>>> Maybe this is planned as "further work", as indicated here: > >>>>>>>>> > >>>>>>>>> " > >>>>>>>>> It manages the topic-partition assignments for the share-group > >>>>>> members. An initial, trivial implementation would be to give each > >> member > >>>>>> the list of all topic-partitions which matches its subscriptions and > >>>> then > >>>>>> use the pull-based protocol to fetch records from all partitions. A > >> more > >>>>>> sophisticated implementation could use topic-partition load and lag > >>>> metrics > >>>>>> to distribute partitions among the consumers as a kind of > autonomous, > >>>>>> self-balancing partition assignment, steering more consumers to > busier > >>>>>> partitions, for example. Alternatively, a push-based fetching scheme > >>>> could > >>>>>> be used. Protocol details will follow later. > >>>>>>>>> " > >>>>>>>>> > >>>>>>>>> but I’m not sure if I understand this correctly. A > fully-connected > >>>>>> graph seems like a lot of connections, and I’m not sure if this > would > >>>> play > >>>>>> well with streaming. > >>>>>>>>> > >>>>>>>>> This also seems as one of the central problems - a key > >> differentiator > >>>>>> between share and consumer groups (the other one being persisting > >> state > >>>> of > >>>>>> messages). And maybe the exact way we’d want to approach this would, > >> to > >>>> a > >>>>>> certain degree, dictate the design of the queueing system? > >>>>>>>>> > >>>>>>>>> Best, > >>>>>>>>> Adam Warski > >>>>>>>>> > >>>>>>>>> On 2023/05/15 11:55:14 Andrew Schofield wrote: > >>>>>>>>>> Hi, > >>>>>>>>>> I would like to start a discussion thread on KIP-932: Queues for > >>>>>> Kafka. This KIP proposes an alternative to consumer groups to enable > >>>>>> cooperative consumption by consumers without partition assignment. > You > >>>> end > >>>>>> up with queue semantics on top of regular Kafka topics, with > >> per-message > >>>>>> acknowledgement and automatic handling of messages which repeatedly > >>>> fail to > >>>>>> be processed. > >>>>>>>>>> > >>>>>>>>>> > >>>>>> > >>>> > >> > https://cwiki.apache.org/confluence/display/KAFKA/KIP-932%3A+Queues+for+Kafka > >>>>>>>>>> > >>>>>>>>>> Please take a look and let me know what you think. > >>>>>>>>>> > >>>>>>>>>> Thanks. > >>>>>>>>>> Andrew > >>>>>>>>> > >>>>>>>> > >>>>>>> > >>>>>>> -- > >>>>>>> Adam Warski > >>>>>>> > >>>>>>> https://www.softwaremill.com/ > >>>>>>> https://twitter.com/adamwarski > > >