Hi Daniel,
Kind of. I don’t want a transaction abort to cause disappearance of
records which are already in-flight. A “pending” state doesn’t seem
helpful for read_committed. There’s no such disappearance problem
for read_uncommitted.

Thanks,
Andrew

> On 7 Jun 2023, at 16:19, Dániel Urbán <urb.dani...@gmail.com> wrote:
>
> Hi Andrew,
>
> I agree with having a single isolation.level for the whole group, it makes
> sense.
> As for:
> "b) The default isolation level for a share group is read_committed, in
> which case
> the SPSO and SPEO cannot move past the LSO."
>
> With this limitation (SPEO not moving beyond LSO), are you trying to avoid
> handling the complexity of some kind of a "pending" state for the
> uncommitted in-flight messages?
>
> Thanks,
> Daniel
>
> Andrew Schofield <andrew_schofield_j...@outlook.com> ezt írta (időpont:
> 2023. jún. 7., Sze, 16:52):
>
>> HI Daniel,
>> I’ve been thinking about this question and I think this area is a bit
>> tricky.
>>
>> If there are some consumers in a share group with isolation level
>> read_uncommitted
>> and other consumers with read_committed, they have different expectations
>> with
>> regards to which messages are visible when EOS comes into the picture.
>> It seems to me that this is not necessarily a good thing.
>>
>> One option would be to support just read_committed in KIP-932. This means
>> it is unambiguous which records are in-flight, because they’re only
>> committed
>> ones.
>>
>> Another option would be to have the entire share group have an isolation
>> level,
>> which again gives an unambiguous set of in-flight records but without the
>> restriction of permitting just read_committed behaviour.
>>
>> So, my preference is for the following:
>> a) A share group has an isolation level that applies to all consumers in
>> the group.
>> b) The default isolation level for a share group is read_committed, in
>> which case
>> the SPSO and SPEO cannot move past the LSO.
>> c) For a share group with read_uncommited isolation level, the SPSO and
>> SPEO
>> can move past the LSO.
>> d) The kafka_configs.sh tool or the AdminClient can be used to set a
>> non-default
>> value for the isolation level for a share group. The value is applied when
>> the first
>> member joins.
>>
>> Thanks,
>> Andrew
>>
>>> On 2 Jun 2023, at 10:02, Dániel Urbán <urb.dani...@gmail.com> wrote:
>>>
>>> Hi Andrew,
>>> Thank you for the clarification. One follow-up to read_committed mode:
>>> Taking the change in message ordering guarantees into account, does this
>>> mean that in queues, share-group consumers will be able to consume
>>> committed records AFTER the LSO?
>>> Thanks,
>>> Daniel
>>>
>>> Andrew Schofield <andrew_schofield_j...@outlook.com> ezt írta (időpont:
>>> 2023. jún. 2., P, 10:39):
>>>
>>>> Hi Daniel,
>>>> Thanks for your questions.
>>>>
>>>> 1) Yes, read_committed fetch will still be possible.
>>>>
>>>> 2) You weren’t wrong that this is a broad question :)
>>>>
>>>> Broadly speaking, I can see two ways of managing the in-flight records:
>>>> the share-partition leader does it, or the share-group coordinator does
>> it.
>>>> I want to choose what works best, and I happen to have started with
>> trying
>>>> the share-partition leader doing it. This is just a whiteboard exercise
>> at
>>>> the
>>>> moment, looking at the potential protocol flows and how well it all
>> hangs
>>>> together. When I have something coherent and understandable and worth
>>>> reviewing, I’ll update the KIP with a proposal.
>>>>
>>>> I think it’s probably worth doing a similar exercise for the share-group
>>>> coordinator way too. There are bound to be pros and cons, and I don’t
>>>> really
>>>> mind which way prevails.
>>>>
>>>> If the share-group coordinator does it, I already have experience of
>>>> efficient
>>>> storage of in-flight record state in a way that scales and is
>>>> space-efficient.
>>>> If the share-partition leader does it, storage of in-flight state is a
>> bit
>>>> more
>>>> tricky.
>>>>
>>>> I think it’s worth thinking ahead to how EOS will work and also another
>>>> couple of enhancements (key-based ordering and acquisition lock
>>>> extension) so it’s somewhat future-proof.
>>>>
>>>> Thanks,
>>>> Andrew
>>>>
>>>>> On 1 Jun 2023, at 11:51, Dániel Urbán <urb.dani...@gmail.com> wrote:
>>>>>
>>>>> Hi Andrew,
>>>>>
>>>>> Thank you for the KIP, exciting work you are doing :)
>>>>> I have 2 questions:
>>>>> 1. I understand that EOS won't be supported for share-groups (yet), but
>>>>> read_committed fetch will still be possible, correct?
>>>>>
>>>>> 2. I have a very broad question about the proposed solution: why not
>> let
>>>>> the share-group coordinator manage the states of the in-flight records?
>>>>> I'm asking this because it seems to me that using the same pattern as
>> the
>>>>> existing group coordinator would
>>>>> a, solve the durability of the message state storage (same method as
>> the
>>>>> one used by the current group coordinator)
>>>>>
>>>>> b, pave the way for EOS with share-groups (same method as the one used
>> by
>>>>> the current group coordinator)
>>>>>
>>>>> c, allow follower-fetching
>>>>> I saw your point about this: "FFF gives freedom to fetch records from a
>>>>> nearby broker, but it does not also give the ability to commit offsets
>>>> to a
>>>>> nearby broker"
>>>>> But does it matter if message acknowledgement is not "local"?
>> Supposedly,
>>>>> fetching is the actual hard work which benefits from follower fetching,
>>>> not
>>>>> the group related requests.
>>>>>
>>>>> The only problem I see with the share-group coordinator managing the
>>>>> in-flight message state is that the coordinator is not aware of the
>> exact
>>>>> available offsets of a partition, nor how the messages are batched. For
>>>>> this problem, maybe the share group coordinator could use some form of
>>>>> "logical" addresses, such as "the next 2 batches after offset X", or
>>>> "after
>>>>> offset X, skip 2 batches, fetch next 2". Acknowledgements always
>> contain
>>>>> the exact offset, but for the "unknown" sections of a partition, these
>>>>> logical addresses would be used. The coordinator could keep track of
>>>>> message states with a mix of offsets and these batch based addresses.
>> The
>>>>> partition leader could support "skip X, fetch Y batches" fetch
>> requests.
>>>>> This solution would need changes in the Fetch API to allow such batch
>>>> based
>>>>> addresses, but I assume that fetch protocol changes will be needed
>>>>> regardless of the specific solution.
>>>>>
>>>>> Thanks,
>>>>> Daniel
>>>>>
>>>>> Andrew Schofield <andrew_schofi...@live.com> ezt írta (időpont: 2023.
>>>> máj.
>>>>> 30., K, 18:15):
>>>>>
>>>>>> Yes, that’s it. I imagine something similar to KIP-848 for managing
>> the
>>>>>> share group
>>>>>> membership, and consumers that fetch records from their assigned
>>>>>> partitions and
>>>>>> acknowledge when delivery completes.
>>>>>>
>>>>>> Thanks,
>>>>>> Andrew
>>>>>>
>>>>>>> On 30 May 2023, at 16:52, Adam Warski <a...@warski.org> wrote:
>>>>>>>
>>>>>>> Thanks for the explanation!
>>>>>>>
>>>>>>> So effectively, a share group is subscribed to each partition - but
>> the
>>>>>> data is not pushed to the consumer, but only sent on demand. And when
>>>>>> demand is signalled, a batch of messages is sent?
>>>>>>> Hence it would be up to the consumer to prefetch a sufficient number
>> of
>>>>>> batches to ensure, that it will never be "bored"?
>>>>>>>
>>>>>>> Adam
>>>>>>>
>>>>>>>> On 30 May 2023, at 15:25, Andrew Schofield <
>> andrew_schofi...@live.com
>>>>>
>>>>>> wrote:
>>>>>>>>
>>>>>>>> Hi Adam,
>>>>>>>> Thanks for your question.
>>>>>>>>
>>>>>>>> With a share group, each fetch is able to grab available records
>> from
>>>>>> any partition. So, it alleviates
>>>>>>>> the “head-of-line” blocking problem where a slow consumer gets in
>> the
>>>>>> way. There’s no actual
>>>>>>>> stealing from a slow consumer, but it can be overtaken and must
>>>>>> complete its processing within
>>>>>>>> the timeout.
>>>>>>>>
>>>>>>>> The way I see this working is that when a consumer joins a share
>>>> group,
>>>>>> it receives a set of
>>>>>>>> assigned share-partitions. To start with, every consumer will be
>>>>>> assigned all partitions. We
>>>>>>>> can be smarter than that, but I think that’s really a question of
>>>>>> writing a smarter assignor
>>>>>>>> just as has occurred over the years with consumer groups.
>>>>>>>>
>>>>>>>> Only a small proportion of Kafka workloads are super high
>> throughput.
>>>>>> Share groups would
>>>>>>>> struggle with those I’m sure. Share groups do not diminish the value
>>>> of
>>>>>> consumer groups
>>>>>>>> for streaming. They just give another option for situations where a
>>>>>> different style of
>>>>>>>> consumption is more appropriate.
>>>>>>>>
>>>>>>>> Thanks,
>>>>>>>> Andrew
>>>>>>>>
>>>>>>>>> On 29 May 2023, at 17:18, Adam Warski <a...@warski.org> wrote:
>>>>>>>>>
>>>>>>>>> Hello,
>>>>>>>>>
>>>>>>>>> thank you for the proposal! A very interesting read.
>>>>>>>>>
>>>>>>>>> I do have one question, though. When you subscribe to a topic using
>>>>>> consumer groups, it might happen that one consumer has processed all
>>>>>> messages from its partitions, while another one still has a lot of
>> work
>>>> to
>>>>>> do (this might be due to unbalanced partitioning, long processing
>> times
>>>>>> etc.). In a message-queue approach, it would be great to solve this
>>>> problem
>>>>>> - so that a consumer that is free can steal work from other consumers.
>>>> Is
>>>>>> this somehow covered by share groups?
>>>>>>>>>
>>>>>>>>> Maybe this is planned as "further work", as indicated here:
>>>>>>>>>
>>>>>>>>> "
>>>>>>>>> It manages the topic-partition assignments for the share-group
>>>>>> members. An initial, trivial implementation would be to give each
>> member
>>>>>> the list of all topic-partitions which matches its subscriptions and
>>>> then
>>>>>> use the pull-based protocol to fetch records from all partitions. A
>> more
>>>>>> sophisticated implementation could use topic-partition load and lag
>>>> metrics
>>>>>> to distribute partitions among the consumers as a kind of autonomous,
>>>>>> self-balancing partition assignment, steering more consumers to busier
>>>>>> partitions, for example. Alternatively, a push-based fetching scheme
>>>> could
>>>>>> be used. Protocol details will follow later.
>>>>>>>>> "
>>>>>>>>>
>>>>>>>>> but I’m not sure if I understand this correctly. A fully-connected
>>>>>> graph seems like a lot of connections, and I’m not sure if this would
>>>> play
>>>>>> well with streaming.
>>>>>>>>>
>>>>>>>>> This also seems as one of the central problems - a key
>> differentiator
>>>>>> between share and consumer groups (the other one being persisting
>> state
>>>> of
>>>>>> messages). And maybe the exact way we’d want to approach this would,
>> to
>>>> a
>>>>>> certain degree, dictate the design of the queueing system?
>>>>>>>>>
>>>>>>>>> Best,
>>>>>>>>> Adam Warski
>>>>>>>>>
>>>>>>>>> On 2023/05/15 11:55:14 Andrew Schofield wrote:
>>>>>>>>>> Hi,
>>>>>>>>>> I would like to start a discussion thread on KIP-932: Queues for
>>>>>> Kafka. This KIP proposes an alternative to consumer groups to enable
>>>>>> cooperative consumption by consumers without partition assignment. You
>>>> end
>>>>>> up with queue semantics on top of regular Kafka topics, with
>> per-message
>>>>>> acknowledgement and automatic handling of messages which repeatedly
>>>> fail to
>>>>>> be processed.
>>>>>>>>>>
>>>>>>>>>>
>>>>>>
>>>>
>> https://cwiki.apache.org/confluence/display/KAFKA/KIP-932%3A+Queues+for+Kafka
>>>>>>>>>>
>>>>>>>>>> Please take a look and let me know what you think.
>>>>>>>>>>
>>>>>>>>>> Thanks.
>>>>>>>>>> Andrew
>>>>>>>>>
>>>>>>>>
>>>>>>>
>>>>>>> --
>>>>>>> Adam Warski
>>>>>>>
>>>>>>> https://www.softwaremill.com/
>>>>>>> https://twitter.com/adamwarski


Reply via email to