Hi Daniel,
True, I see your point. It’s analogous to a KafkaConsumer fetching
uncommitted records but not delivering them to the application.

Thanks,
Andrew

> On 7 Jun 2023, at 16:38, Dániel Urbán <urb.dani...@gmail.com> wrote:
>
> Hi Andrew,
>
> I think the "pending" state could be the solution for reading beyond the
> LSO. Pending could indicate that a message is not yet available for
> consumption (so they won't be offered for consumers), but with transactions
> ending, they can become "available". With a pending state, records wouldn't
> "disappear", they would simply not show up until they become available on
> commit, or archived on abort.
>
> Regardless, I understand that this might be some extra, unwanted
> complexity, I just thought that with the message ordering guarantee gone,
> it would be a cool feature for share-groups. I've seen use-cases where the
> LSO being blocked for an extended period of time caused huge lag for
> traditional read_committed consumers, which could be completely avoided by
> share-groups.
>
> Thanks,
> Daniel
>
> Andrew Schofield <andrew_schofield_j...@outlook.com> ezt írta (időpont:
> 2023. jún. 7., Sze, 17:28):
>
>> Hi Daniel,
>> Kind of. I don’t want a transaction abort to cause disappearance of
>> records which are already in-flight. A “pending” state doesn’t seem
>> helpful for read_committed. There’s no such disappearance problem
>> for read_uncommitted.
>>
>> Thanks,
>> Andrew
>>
>>> On 7 Jun 2023, at 16:19, Dániel Urbán <urb.dani...@gmail.com> wrote:
>>>
>>> Hi Andrew,
>>>
>>> I agree with having a single isolation.level for the whole group, it
>> makes
>>> sense.
>>> As for:
>>> "b) The default isolation level for a share group is read_committed, in
>>> which case
>>> the SPSO and SPEO cannot move past the LSO."
>>>
>>> With this limitation (SPEO not moving beyond LSO), are you trying to
>> avoid
>>> handling the complexity of some kind of a "pending" state for the
>>> uncommitted in-flight messages?
>>>
>>> Thanks,
>>> Daniel
>>>
>>> Andrew Schofield <andrew_schofield_j...@outlook.com> ezt írta (időpont:
>>> 2023. jún. 7., Sze, 16:52):
>>>
>>>> HI Daniel,
>>>> I’ve been thinking about this question and I think this area is a bit
>>>> tricky.
>>>>
>>>> If there are some consumers in a share group with isolation level
>>>> read_uncommitted
>>>> and other consumers with read_committed, they have different
>> expectations
>>>> with
>>>> regards to which messages are visible when EOS comes into the picture.
>>>> It seems to me that this is not necessarily a good thing.
>>>>
>>>> One option would be to support just read_committed in KIP-932. This
>> means
>>>> it is unambiguous which records are in-flight, because they’re only
>>>> committed
>>>> ones.
>>>>
>>>> Another option would be to have the entire share group have an isolation
>>>> level,
>>>> which again gives an unambiguous set of in-flight records but without
>> the
>>>> restriction of permitting just read_committed behaviour.
>>>>
>>>> So, my preference is for the following:
>>>> a) A share group has an isolation level that applies to all consumers in
>>>> the group.
>>>> b) The default isolation level for a share group is read_committed, in
>>>> which case
>>>> the SPSO and SPEO cannot move past the LSO.
>>>> c) For a share group with read_uncommited isolation level, the SPSO and
>>>> SPEO
>>>> can move past the LSO.
>>>> d) The kafka_configs.sh tool or the AdminClient can be used to set a
>>>> non-default
>>>> value for the isolation level for a share group. The value is applied
>> when
>>>> the first
>>>> member joins.
>>>>
>>>> Thanks,
>>>> Andrew
>>>>
>>>>> On 2 Jun 2023, at 10:02, Dániel Urbán <urb.dani...@gmail.com> wrote:
>>>>>
>>>>> Hi Andrew,
>>>>> Thank you for the clarification. One follow-up to read_committed mode:
>>>>> Taking the change in message ordering guarantees into account, does
>> this
>>>>> mean that in queues, share-group consumers will be able to consume
>>>>> committed records AFTER the LSO?
>>>>> Thanks,
>>>>> Daniel
>>>>>
>>>>> Andrew Schofield <andrew_schofield_j...@outlook.com> ezt írta
>> (időpont:
>>>>> 2023. jún. 2., P, 10:39):
>>>>>
>>>>>> Hi Daniel,
>>>>>> Thanks for your questions.
>>>>>>
>>>>>> 1) Yes, read_committed fetch will still be possible.
>>>>>>
>>>>>> 2) You weren’t wrong that this is a broad question :)
>>>>>>
>>>>>> Broadly speaking, I can see two ways of managing the in-flight
>> records:
>>>>>> the share-partition leader does it, or the share-group coordinator
>> does
>>>> it.
>>>>>> I want to choose what works best, and I happen to have started with
>>>> trying
>>>>>> the share-partition leader doing it. This is just a whiteboard
>> exercise
>>>> at
>>>>>> the
>>>>>> moment, looking at the potential protocol flows and how well it all
>>>> hangs
>>>>>> together. When I have something coherent and understandable and worth
>>>>>> reviewing, I’ll update the KIP with a proposal.
>>>>>>
>>>>>> I think it’s probably worth doing a similar exercise for the
>> share-group
>>>>>> coordinator way too. There are bound to be pros and cons, and I don’t
>>>>>> really
>>>>>> mind which way prevails.
>>>>>>
>>>>>> If the share-group coordinator does it, I already have experience of
>>>>>> efficient
>>>>>> storage of in-flight record state in a way that scales and is
>>>>>> space-efficient.
>>>>>> If the share-partition leader does it, storage of in-flight state is a
>>>> bit
>>>>>> more
>>>>>> tricky.
>>>>>>
>>>>>> I think it’s worth thinking ahead to how EOS will work and also
>> another
>>>>>> couple of enhancements (key-based ordering and acquisition lock
>>>>>> extension) so it’s somewhat future-proof.
>>>>>>
>>>>>> Thanks,
>>>>>> Andrew
>>>>>>
>>>>>>> On 1 Jun 2023, at 11:51, Dániel Urbán <urb.dani...@gmail.com> wrote:
>>>>>>>
>>>>>>> Hi Andrew,
>>>>>>>
>>>>>>> Thank you for the KIP, exciting work you are doing :)
>>>>>>> I have 2 questions:
>>>>>>> 1. I understand that EOS won't be supported for share-groups (yet),
>> but
>>>>>>> read_committed fetch will still be possible, correct?
>>>>>>>
>>>>>>> 2. I have a very broad question about the proposed solution: why not
>>>> let
>>>>>>> the share-group coordinator manage the states of the in-flight
>> records?
>>>>>>> I'm asking this because it seems to me that using the same pattern as
>>>> the
>>>>>>> existing group coordinator would
>>>>>>> a, solve the durability of the message state storage (same method as
>>>> the
>>>>>>> one used by the current group coordinator)
>>>>>>>
>>>>>>> b, pave the way for EOS with share-groups (same method as the one
>> used
>>>> by
>>>>>>> the current group coordinator)
>>>>>>>
>>>>>>> c, allow follower-fetching
>>>>>>> I saw your point about this: "FFF gives freedom to fetch records
>> from a
>>>>>>> nearby broker, but it does not also give the ability to commit
>> offsets
>>>>>> to a
>>>>>>> nearby broker"
>>>>>>> But does it matter if message acknowledgement is not "local"?
>>>> Supposedly,
>>>>>>> fetching is the actual hard work which benefits from follower
>> fetching,
>>>>>> not
>>>>>>> the group related requests.
>>>>>>>
>>>>>>> The only problem I see with the share-group coordinator managing the
>>>>>>> in-flight message state is that the coordinator is not aware of the
>>>> exact
>>>>>>> available offsets of a partition, nor how the messages are batched.
>> For
>>>>>>> this problem, maybe the share group coordinator could use some form
>> of
>>>>>>> "logical" addresses, such as "the next 2 batches after offset X", or
>>>>>> "after
>>>>>>> offset X, skip 2 batches, fetch next 2". Acknowledgements always
>>>> contain
>>>>>>> the exact offset, but for the "unknown" sections of a partition,
>> these
>>>>>>> logical addresses would be used. The coordinator could keep track of
>>>>>>> message states with a mix of offsets and these batch based addresses.
>>>> The
>>>>>>> partition leader could support "skip X, fetch Y batches" fetch
>>>> requests.
>>>>>>> This solution would need changes in the Fetch API to allow such batch
>>>>>> based
>>>>>>> addresses, but I assume that fetch protocol changes will be needed
>>>>>>> regardless of the specific solution.
>>>>>>>
>>>>>>> Thanks,
>>>>>>> Daniel
>>>>>>>
>>>>>>> Andrew Schofield <andrew_schofi...@live.com> ezt írta (időpont:
>> 2023.
>>>>>> máj.
>>>>>>> 30., K, 18:15):
>>>>>>>
>>>>>>>> Yes, that’s it. I imagine something similar to KIP-848 for managing
>>>> the
>>>>>>>> share group
>>>>>>>> membership, and consumers that fetch records from their assigned
>>>>>>>> partitions and
>>>>>>>> acknowledge when delivery completes.
>>>>>>>>
>>>>>>>> Thanks,
>>>>>>>> Andrew
>>>>>>>>
>>>>>>>>> On 30 May 2023, at 16:52, Adam Warski <a...@warski.org> wrote:
>>>>>>>>>
>>>>>>>>> Thanks for the explanation!
>>>>>>>>>
>>>>>>>>> So effectively, a share group is subscribed to each partition - but
>>>> the
>>>>>>>> data is not pushed to the consumer, but only sent on demand. And
>> when
>>>>>>>> demand is signalled, a batch of messages is sent?
>>>>>>>>> Hence it would be up to the consumer to prefetch a sufficient
>> number
>>>> of
>>>>>>>> batches to ensure, that it will never be "bored"?
>>>>>>>>>
>>>>>>>>> Adam
>>>>>>>>>
>>>>>>>>>> On 30 May 2023, at 15:25, Andrew Schofield <
>>>> andrew_schofi...@live.com
>>>>>>>
>>>>>>>> wrote:
>>>>>>>>>>
>>>>>>>>>> Hi Adam,
>>>>>>>>>> Thanks for your question.
>>>>>>>>>>
>>>>>>>>>> With a share group, each fetch is able to grab available records
>>>> from
>>>>>>>> any partition. So, it alleviates
>>>>>>>>>> the “head-of-line” blocking problem where a slow consumer gets in
>>>> the
>>>>>>>> way. There’s no actual
>>>>>>>>>> stealing from a slow consumer, but it can be overtaken and must
>>>>>>>> complete its processing within
>>>>>>>>>> the timeout.
>>>>>>>>>>
>>>>>>>>>> The way I see this working is that when a consumer joins a share
>>>>>> group,
>>>>>>>> it receives a set of
>>>>>>>>>> assigned share-partitions. To start with, every consumer will be
>>>>>>>> assigned all partitions. We
>>>>>>>>>> can be smarter than that, but I think that’s really a question of
>>>>>>>> writing a smarter assignor
>>>>>>>>>> just as has occurred over the years with consumer groups.
>>>>>>>>>>
>>>>>>>>>> Only a small proportion of Kafka workloads are super high
>>>> throughput.
>>>>>>>> Share groups would
>>>>>>>>>> struggle with those I’m sure. Share groups do not diminish the
>> value
>>>>>> of
>>>>>>>> consumer groups
>>>>>>>>>> for streaming. They just give another option for situations where
>> a
>>>>>>>> different style of
>>>>>>>>>> consumption is more appropriate.
>>>>>>>>>>
>>>>>>>>>> Thanks,
>>>>>>>>>> Andrew
>>>>>>>>>>
>>>>>>>>>>> On 29 May 2023, at 17:18, Adam Warski <a...@warski.org> wrote:
>>>>>>>>>>>
>>>>>>>>>>> Hello,
>>>>>>>>>>>
>>>>>>>>>>> thank you for the proposal! A very interesting read.
>>>>>>>>>>>
>>>>>>>>>>> I do have one question, though. When you subscribe to a topic
>> using
>>>>>>>> consumer groups, it might happen that one consumer has processed all
>>>>>>>> messages from its partitions, while another one still has a lot of
>>>> work
>>>>>> to
>>>>>>>> do (this might be due to unbalanced partitioning, long processing
>>>> times
>>>>>>>> etc.). In a message-queue approach, it would be great to solve this
>>>>>> problem
>>>>>>>> - so that a consumer that is free can steal work from other
>> consumers.
>>>>>> Is
>>>>>>>> this somehow covered by share groups?
>>>>>>>>>>>
>>>>>>>>>>> Maybe this is planned as "further work", as indicated here:
>>>>>>>>>>>
>>>>>>>>>>> "
>>>>>>>>>>> It manages the topic-partition assignments for the share-group
>>>>>>>> members. An initial, trivial implementation would be to give each
>>>> member
>>>>>>>> the list of all topic-partitions which matches its subscriptions and
>>>>>> then
>>>>>>>> use the pull-based protocol to fetch records from all partitions. A
>>>> more
>>>>>>>> sophisticated implementation could use topic-partition load and lag
>>>>>> metrics
>>>>>>>> to distribute partitions among the consumers as a kind of
>> autonomous,
>>>>>>>> self-balancing partition assignment, steering more consumers to
>> busier
>>>>>>>> partitions, for example. Alternatively, a push-based fetching scheme
>>>>>> could
>>>>>>>> be used. Protocol details will follow later.
>>>>>>>>>>> "
>>>>>>>>>>>
>>>>>>>>>>> but I’m not sure if I understand this correctly. A
>> fully-connected
>>>>>>>> graph seems like a lot of connections, and I’m not sure if this
>> would
>>>>>> play
>>>>>>>> well with streaming.
>>>>>>>>>>>
>>>>>>>>>>> This also seems as one of the central problems - a key
>>>> differentiator
>>>>>>>> between share and consumer groups (the other one being persisting
>>>> state
>>>>>> of
>>>>>>>> messages). And maybe the exact way we’d want to approach this would,
>>>> to
>>>>>> a
>>>>>>>> certain degree, dictate the design of the queueing system?
>>>>>>>>>>>
>>>>>>>>>>> Best,
>>>>>>>>>>> Adam Warski
>>>>>>>>>>>
>>>>>>>>>>> On 2023/05/15 11:55:14 Andrew Schofield wrote:
>>>>>>>>>>>> Hi,
>>>>>>>>>>>> I would like to start a discussion thread on KIP-932: Queues for
>>>>>>>> Kafka. This KIP proposes an alternative to consumer groups to enable
>>>>>>>> cooperative consumption by consumers without partition assignment.
>> You
>>>>>> end
>>>>>>>> up with queue semantics on top of regular Kafka topics, with
>>>> per-message
>>>>>>>> acknowledgement and automatic handling of messages which repeatedly
>>>>>> fail to
>>>>>>>> be processed.
>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>
>>>>>>
>>>>
>> https://cwiki.apache.org/confluence/display/KAFKA/KIP-932%3A+Queues+for+Kafka
>>>>>>>>>>>>
>>>>>>>>>>>> Please take a look and let me know what you think.
>>>>>>>>>>>>
>>>>>>>>>>>> Thanks.
>>>>>>>>>>>> Andrew
>>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>
>>>>>>>>> --
>>>>>>>>> Adam Warski
>>>>>>>>>
>>>>>>>>> https://www.softwaremill.com/
>>>>>>>>> https://twitter.com/adamwarski


Reply via email to