Hi Andrew,

I think the "pending" state could be the solution for reading beyond the
LSO. Pending could indicate that a message is not yet available for
consumption (so they won't be offered for consumers), but with transactions
ending, they can become "available". With a pending state, records wouldn't
"disappear", they would simply not show up until they become available on
commit, or archived on abort.

Regardless, I understand that this might be some extra, unwanted
complexity, I just thought that with the message ordering guarantee gone,
it would be a cool feature for share-groups. I've seen use-cases where the
LSO being blocked for an extended period of time caused huge lag for
traditional read_committed consumers, which could be completely avoided by
share-groups.

Thanks,
Daniel

Andrew Schofield <andrew_schofield_j...@outlook.com> ezt írta (időpont:
2023. jún. 7., Sze, 17:28):

> Hi Daniel,
> Kind of. I don’t want a transaction abort to cause disappearance of
> records which are already in-flight. A “pending” state doesn’t seem
> helpful for read_committed. There’s no such disappearance problem
> for read_uncommitted.
>
> Thanks,
> Andrew
>
> > On 7 Jun 2023, at 16:19, Dániel Urbán <urb.dani...@gmail.com> wrote:
> >
> > Hi Andrew,
> >
> > I agree with having a single isolation.level for the whole group, it
> makes
> > sense.
> > As for:
> > "b) The default isolation level for a share group is read_committed, in
> > which case
> > the SPSO and SPEO cannot move past the LSO."
> >
> > With this limitation (SPEO not moving beyond LSO), are you trying to
> avoid
> > handling the complexity of some kind of a "pending" state for the
> > uncommitted in-flight messages?
> >
> > Thanks,
> > Daniel
> >
> > Andrew Schofield <andrew_schofield_j...@outlook.com> ezt írta (időpont:
> > 2023. jún. 7., Sze, 16:52):
> >
> >> HI Daniel,
> >> I’ve been thinking about this question and I think this area is a bit
> >> tricky.
> >>
> >> If there are some consumers in a share group with isolation level
> >> read_uncommitted
> >> and other consumers with read_committed, they have different
> expectations
> >> with
> >> regards to which messages are visible when EOS comes into the picture.
> >> It seems to me that this is not necessarily a good thing.
> >>
> >> One option would be to support just read_committed in KIP-932. This
> means
> >> it is unambiguous which records are in-flight, because they’re only
> >> committed
> >> ones.
> >>
> >> Another option would be to have the entire share group have an isolation
> >> level,
> >> which again gives an unambiguous set of in-flight records but without
> the
> >> restriction of permitting just read_committed behaviour.
> >>
> >> So, my preference is for the following:
> >> a) A share group has an isolation level that applies to all consumers in
> >> the group.
> >> b) The default isolation level for a share group is read_committed, in
> >> which case
> >> the SPSO and SPEO cannot move past the LSO.
> >> c) For a share group with read_uncommited isolation level, the SPSO and
> >> SPEO
> >> can move past the LSO.
> >> d) The kafka_configs.sh tool or the AdminClient can be used to set a
> >> non-default
> >> value for the isolation level for a share group. The value is applied
> when
> >> the first
> >> member joins.
> >>
> >> Thanks,
> >> Andrew
> >>
> >>> On 2 Jun 2023, at 10:02, Dániel Urbán <urb.dani...@gmail.com> wrote:
> >>>
> >>> Hi Andrew,
> >>> Thank you for the clarification. One follow-up to read_committed mode:
> >>> Taking the change in message ordering guarantees into account, does
> this
> >>> mean that in queues, share-group consumers will be able to consume
> >>> committed records AFTER the LSO?
> >>> Thanks,
> >>> Daniel
> >>>
> >>> Andrew Schofield <andrew_schofield_j...@outlook.com> ezt írta
> (időpont:
> >>> 2023. jún. 2., P, 10:39):
> >>>
> >>>> Hi Daniel,
> >>>> Thanks for your questions.
> >>>>
> >>>> 1) Yes, read_committed fetch will still be possible.
> >>>>
> >>>> 2) You weren’t wrong that this is a broad question :)
> >>>>
> >>>> Broadly speaking, I can see two ways of managing the in-flight
> records:
> >>>> the share-partition leader does it, or the share-group coordinator
> does
> >> it.
> >>>> I want to choose what works best, and I happen to have started with
> >> trying
> >>>> the share-partition leader doing it. This is just a whiteboard
> exercise
> >> at
> >>>> the
> >>>> moment, looking at the potential protocol flows and how well it all
> >> hangs
> >>>> together. When I have something coherent and understandable and worth
> >>>> reviewing, I’ll update the KIP with a proposal.
> >>>>
> >>>> I think it’s probably worth doing a similar exercise for the
> share-group
> >>>> coordinator way too. There are bound to be pros and cons, and I don’t
> >>>> really
> >>>> mind which way prevails.
> >>>>
> >>>> If the share-group coordinator does it, I already have experience of
> >>>> efficient
> >>>> storage of in-flight record state in a way that scales and is
> >>>> space-efficient.
> >>>> If the share-partition leader does it, storage of in-flight state is a
> >> bit
> >>>> more
> >>>> tricky.
> >>>>
> >>>> I think it’s worth thinking ahead to how EOS will work and also
> another
> >>>> couple of enhancements (key-based ordering and acquisition lock
> >>>> extension) so it’s somewhat future-proof.
> >>>>
> >>>> Thanks,
> >>>> Andrew
> >>>>
> >>>>> On 1 Jun 2023, at 11:51, Dániel Urbán <urb.dani...@gmail.com> wrote:
> >>>>>
> >>>>> Hi Andrew,
> >>>>>
> >>>>> Thank you for the KIP, exciting work you are doing :)
> >>>>> I have 2 questions:
> >>>>> 1. I understand that EOS won't be supported for share-groups (yet),
> but
> >>>>> read_committed fetch will still be possible, correct?
> >>>>>
> >>>>> 2. I have a very broad question about the proposed solution: why not
> >> let
> >>>>> the share-group coordinator manage the states of the in-flight
> records?
> >>>>> I'm asking this because it seems to me that using the same pattern as
> >> the
> >>>>> existing group coordinator would
> >>>>> a, solve the durability of the message state storage (same method as
> >> the
> >>>>> one used by the current group coordinator)
> >>>>>
> >>>>> b, pave the way for EOS with share-groups (same method as the one
> used
> >> by
> >>>>> the current group coordinator)
> >>>>>
> >>>>> c, allow follower-fetching
> >>>>> I saw your point about this: "FFF gives freedom to fetch records
> from a
> >>>>> nearby broker, but it does not also give the ability to commit
> offsets
> >>>> to a
> >>>>> nearby broker"
> >>>>> But does it matter if message acknowledgement is not "local"?
> >> Supposedly,
> >>>>> fetching is the actual hard work which benefits from follower
> fetching,
> >>>> not
> >>>>> the group related requests.
> >>>>>
> >>>>> The only problem I see with the share-group coordinator managing the
> >>>>> in-flight message state is that the coordinator is not aware of the
> >> exact
> >>>>> available offsets of a partition, nor how the messages are batched.
> For
> >>>>> this problem, maybe the share group coordinator could use some form
> of
> >>>>> "logical" addresses, such as "the next 2 batches after offset X", or
> >>>> "after
> >>>>> offset X, skip 2 batches, fetch next 2". Acknowledgements always
> >> contain
> >>>>> the exact offset, but for the "unknown" sections of a partition,
> these
> >>>>> logical addresses would be used. The coordinator could keep track of
> >>>>> message states with a mix of offsets and these batch based addresses.
> >> The
> >>>>> partition leader could support "skip X, fetch Y batches" fetch
> >> requests.
> >>>>> This solution would need changes in the Fetch API to allow such batch
> >>>> based
> >>>>> addresses, but I assume that fetch protocol changes will be needed
> >>>>> regardless of the specific solution.
> >>>>>
> >>>>> Thanks,
> >>>>> Daniel
> >>>>>
> >>>>> Andrew Schofield <andrew_schofi...@live.com> ezt írta (időpont:
> 2023.
> >>>> máj.
> >>>>> 30., K, 18:15):
> >>>>>
> >>>>>> Yes, that’s it. I imagine something similar to KIP-848 for managing
> >> the
> >>>>>> share group
> >>>>>> membership, and consumers that fetch records from their assigned
> >>>>>> partitions and
> >>>>>> acknowledge when delivery completes.
> >>>>>>
> >>>>>> Thanks,
> >>>>>> Andrew
> >>>>>>
> >>>>>>> On 30 May 2023, at 16:52, Adam Warski <a...@warski.org> wrote:
> >>>>>>>
> >>>>>>> Thanks for the explanation!
> >>>>>>>
> >>>>>>> So effectively, a share group is subscribed to each partition - but
> >> the
> >>>>>> data is not pushed to the consumer, but only sent on demand. And
> when
> >>>>>> demand is signalled, a batch of messages is sent?
> >>>>>>> Hence it would be up to the consumer to prefetch a sufficient
> number
> >> of
> >>>>>> batches to ensure, that it will never be "bored"?
> >>>>>>>
> >>>>>>> Adam
> >>>>>>>
> >>>>>>>> On 30 May 2023, at 15:25, Andrew Schofield <
> >> andrew_schofi...@live.com
> >>>>>
> >>>>>> wrote:
> >>>>>>>>
> >>>>>>>> Hi Adam,
> >>>>>>>> Thanks for your question.
> >>>>>>>>
> >>>>>>>> With a share group, each fetch is able to grab available records
> >> from
> >>>>>> any partition. So, it alleviates
> >>>>>>>> the “head-of-line” blocking problem where a slow consumer gets in
> >> the
> >>>>>> way. There’s no actual
> >>>>>>>> stealing from a slow consumer, but it can be overtaken and must
> >>>>>> complete its processing within
> >>>>>>>> the timeout.
> >>>>>>>>
> >>>>>>>> The way I see this working is that when a consumer joins a share
> >>>> group,
> >>>>>> it receives a set of
> >>>>>>>> assigned share-partitions. To start with, every consumer will be
> >>>>>> assigned all partitions. We
> >>>>>>>> can be smarter than that, but I think that’s really a question of
> >>>>>> writing a smarter assignor
> >>>>>>>> just as has occurred over the years with consumer groups.
> >>>>>>>>
> >>>>>>>> Only a small proportion of Kafka workloads are super high
> >> throughput.
> >>>>>> Share groups would
> >>>>>>>> struggle with those I’m sure. Share groups do not diminish the
> value
> >>>> of
> >>>>>> consumer groups
> >>>>>>>> for streaming. They just give another option for situations where
> a
> >>>>>> different style of
> >>>>>>>> consumption is more appropriate.
> >>>>>>>>
> >>>>>>>> Thanks,
> >>>>>>>> Andrew
> >>>>>>>>
> >>>>>>>>> On 29 May 2023, at 17:18, Adam Warski <a...@warski.org> wrote:
> >>>>>>>>>
> >>>>>>>>> Hello,
> >>>>>>>>>
> >>>>>>>>> thank you for the proposal! A very interesting read.
> >>>>>>>>>
> >>>>>>>>> I do have one question, though. When you subscribe to a topic
> using
> >>>>>> consumer groups, it might happen that one consumer has processed all
> >>>>>> messages from its partitions, while another one still has a lot of
> >> work
> >>>> to
> >>>>>> do (this might be due to unbalanced partitioning, long processing
> >> times
> >>>>>> etc.). In a message-queue approach, it would be great to solve this
> >>>> problem
> >>>>>> - so that a consumer that is free can steal work from other
> consumers.
> >>>> Is
> >>>>>> this somehow covered by share groups?
> >>>>>>>>>
> >>>>>>>>> Maybe this is planned as "further work", as indicated here:
> >>>>>>>>>
> >>>>>>>>> "
> >>>>>>>>> It manages the topic-partition assignments for the share-group
> >>>>>> members. An initial, trivial implementation would be to give each
> >> member
> >>>>>> the list of all topic-partitions which matches its subscriptions and
> >>>> then
> >>>>>> use the pull-based protocol to fetch records from all partitions. A
> >> more
> >>>>>> sophisticated implementation could use topic-partition load and lag
> >>>> metrics
> >>>>>> to distribute partitions among the consumers as a kind of
> autonomous,
> >>>>>> self-balancing partition assignment, steering more consumers to
> busier
> >>>>>> partitions, for example. Alternatively, a push-based fetching scheme
> >>>> could
> >>>>>> be used. Protocol details will follow later.
> >>>>>>>>> "
> >>>>>>>>>
> >>>>>>>>> but I’m not sure if I understand this correctly. A
> fully-connected
> >>>>>> graph seems like a lot of connections, and I’m not sure if this
> would
> >>>> play
> >>>>>> well with streaming.
> >>>>>>>>>
> >>>>>>>>> This also seems as one of the central problems - a key
> >> differentiator
> >>>>>> between share and consumer groups (the other one being persisting
> >> state
> >>>> of
> >>>>>> messages). And maybe the exact way we’d want to approach this would,
> >> to
> >>>> a
> >>>>>> certain degree, dictate the design of the queueing system?
> >>>>>>>>>
> >>>>>>>>> Best,
> >>>>>>>>> Adam Warski
> >>>>>>>>>
> >>>>>>>>> On 2023/05/15 11:55:14 Andrew Schofield wrote:
> >>>>>>>>>> Hi,
> >>>>>>>>>> I would like to start a discussion thread on KIP-932: Queues for
> >>>>>> Kafka. This KIP proposes an alternative to consumer groups to enable
> >>>>>> cooperative consumption by consumers without partition assignment.
> You
> >>>> end
> >>>>>> up with queue semantics on top of regular Kafka topics, with
> >> per-message
> >>>>>> acknowledgement and automatic handling of messages which repeatedly
> >>>> fail to
> >>>>>> be processed.
> >>>>>>>>>>
> >>>>>>>>>>
> >>>>>>
> >>>>
> >>
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-932%3A+Queues+for+Kafka
> >>>>>>>>>>
> >>>>>>>>>> Please take a look and let me know what you think.
> >>>>>>>>>>
> >>>>>>>>>> Thanks.
> >>>>>>>>>> Andrew
> >>>>>>>>>
> >>>>>>>>
> >>>>>>>
> >>>>>>> --
> >>>>>>> Adam Warski
> >>>>>>>
> >>>>>>> https://www.softwaremill.com/
> >>>>>>> https://twitter.com/adamwarski
>
>
>

Reply via email to