Hi Kamal,
Thanks for your comments. Sorry for the delay. Just back from vacation.

101) When I was drafting the KIP, I did try an approach which allowed arbitrary 
order
of acknowledgement in the consumer but I wasn’t happy with the usability.

The records delivered to a specific consumer in a batch must be acknowledged in
the order delivered in that batch. But the overall order of records on the 
share-partition are
not guaranteed to be delivered in order because there might be several consumers
and retries and so on.

102) I think that it would be useful in some situations to be able to extend 
the lock timeout for
records in order to buy extra time to process them. I see that as a future 
extension to the Java
interface, rather than changing the configuration value that applies to all 
records acquired by
the consumer.

103) I’m prototyping the code and will be able to answer the configuration 
question properly
quite soon. I expect almost all of the existing consumer configs will apply to 
share-group consumers.

104) I think quotas will work the same for existing consumers and share-group 
consumers.

Hope this helps.
Andrew

> On 1 Jul 2023, at 12:42, Kamal Chandraprakash 
> <kamal.chandraprak...@gmail.com> wrote:
>
> Hi Andrew,
>
> Thank you for the KIP -- interesting read. I have some questions:
>
> 101. "The calls to KafkaConsumer.acknowledge(ConsumerRecord,
> AcknowledgeType) must be
> issued in the order in which the records appear in the ConsumerRecords
> object, which will
> be in order of increasing offset for each share-partition"
>
> If the share-consumer uses thread pool internally and acknowledges the
> records in out-of-order fashion.
> Will this use case be supported? The "Managing durable share-partition
> state" have transitions where the
> records are ack'ed in out-of-order fashion so want to confirm this.
>
> 102. Will the configs be maintained in fine-grain per topic-to-share-group?
> Some share-consumer groups
> may want to increase the "record.lock.duration.ms" dynamically if record
> processing is taking longer time
> than usual during external system outage/downtime.
>
> 103. Can we also define whether all the consumer configs are eligible for
> share-consumer-group. (eg)
> `max.poll.interval.ms` default is 5 mins. Will this config have any effect
> on the share consumers?
>
> 104. How will the consumer quota work? Will it be similar to the existing
> consumer quota mechanism?
>
> --
> Kamal
>
> On Wed, Jun 7, 2023 at 9:17 PM Andrew Schofield <
> andrew_schofield_j...@outlook.com> wrote:
>
>> Hi Daniel,
>> True, I see your point. It’s analogous to a KafkaConsumer fetching
>> uncommitted records but not delivering them to the application.
>>
>> Thanks,
>> Andrew
>>
>>> On 7 Jun 2023, at 16:38, Dániel Urbán <urb.dani...@gmail.com> wrote:
>>>
>>> Hi Andrew,
>>>
>>> I think the "pending" state could be the solution for reading beyond the
>>> LSO. Pending could indicate that a message is not yet available for
>>> consumption (so they won't be offered for consumers), but with
>> transactions
>>> ending, they can become "available". With a pending state, records
>> wouldn't
>>> "disappear", they would simply not show up until they become available on
>>> commit, or archived on abort.
>>>
>>> Regardless, I understand that this might be some extra, unwanted
>>> complexity, I just thought that with the message ordering guarantee gone,
>>> it would be a cool feature for share-groups. I've seen use-cases where
>> the
>>> LSO being blocked for an extended period of time caused huge lag for
>>> traditional read_committed consumers, which could be completely avoided
>> by
>>> share-groups.
>>>
>>> Thanks,
>>> Daniel
>>>
>>> Andrew Schofield <andrew_schofield_j...@outlook.com> ezt írta (időpont:
>>> 2023. jún. 7., Sze, 17:28):
>>>
>>>> Hi Daniel,
>>>> Kind of. I don’t want a transaction abort to cause disappearance of
>>>> records which are already in-flight. A “pending” state doesn’t seem
>>>> helpful for read_committed. There’s no such disappearance problem
>>>> for read_uncommitted.
>>>>
>>>> Thanks,
>>>> Andrew
>>>>
>>>>> On 7 Jun 2023, at 16:19, Dániel Urbán <urb.dani...@gmail.com> wrote:
>>>>>
>>>>> Hi Andrew,
>>>>>
>>>>> I agree with having a single isolation.level for the whole group, it
>>>> makes
>>>>> sense.
>>>>> As for:
>>>>> "b) The default isolation level for a share group is read_committed, in
>>>>> which case
>>>>> the SPSO and SPEO cannot move past the LSO."
>>>>>
>>>>> With this limitation (SPEO not moving beyond LSO), are you trying to
>>>> avoid
>>>>> handling the complexity of some kind of a "pending" state for the
>>>>> uncommitted in-flight messages?
>>>>>
>>>>> Thanks,
>>>>> Daniel
>>>>>
>>>>> Andrew Schofield <andrew_schofield_j...@outlook.com> ezt írta
>> (időpont:
>>>>> 2023. jún. 7., Sze, 16:52):
>>>>>
>>>>>> HI Daniel,
>>>>>> I’ve been thinking about this question and I think this area is a bit
>>>>>> tricky.
>>>>>>
>>>>>> If there are some consumers in a share group with isolation level
>>>>>> read_uncommitted
>>>>>> and other consumers with read_committed, they have different
>>>> expectations
>>>>>> with
>>>>>> regards to which messages are visible when EOS comes into the picture.
>>>>>> It seems to me that this is not necessarily a good thing.
>>>>>>
>>>>>> One option would be to support just read_committed in KIP-932. This
>>>> means
>>>>>> it is unambiguous which records are in-flight, because they’re only
>>>>>> committed
>>>>>> ones.
>>>>>>
>>>>>> Another option would be to have the entire share group have an
>> isolation
>>>>>> level,
>>>>>> which again gives an unambiguous set of in-flight records but without
>>>> the
>>>>>> restriction of permitting just read_committed behaviour.
>>>>>>
>>>>>> So, my preference is for the following:
>>>>>> a) A share group has an isolation level that applies to all consumers
>> in
>>>>>> the group.
>>>>>> b) The default isolation level for a share group is read_committed, in
>>>>>> which case
>>>>>> the SPSO and SPEO cannot move past the LSO.
>>>>>> c) For a share group with read_uncommited isolation level, the SPSO
>> and
>>>>>> SPEO
>>>>>> can move past the LSO.
>>>>>> d) The kafka_configs.sh tool or the AdminClient can be used to set a
>>>>>> non-default
>>>>>> value for the isolation level for a share group. The value is applied
>>>> when
>>>>>> the first
>>>>>> member joins.
>>>>>>
>>>>>> Thanks,
>>>>>> Andrew
>>>>>>
>>>>>>> On 2 Jun 2023, at 10:02, Dániel Urbán <urb.dani...@gmail.com> wrote:
>>>>>>>
>>>>>>> Hi Andrew,
>>>>>>> Thank you for the clarification. One follow-up to read_committed
>> mode:
>>>>>>> Taking the change in message ordering guarantees into account, does
>>>> this
>>>>>>> mean that in queues, share-group consumers will be able to consume
>>>>>>> committed records AFTER the LSO?
>>>>>>> Thanks,
>>>>>>> Daniel
>>>>>>>
>>>>>>> Andrew Schofield <andrew_schofield_j...@outlook.com> ezt írta
>>>> (időpont:
>>>>>>> 2023. jún. 2., P, 10:39):
>>>>>>>
>>>>>>>> Hi Daniel,
>>>>>>>> Thanks for your questions.
>>>>>>>>
>>>>>>>> 1) Yes, read_committed fetch will still be possible.
>>>>>>>>
>>>>>>>> 2) You weren’t wrong that this is a broad question :)
>>>>>>>>
>>>>>>>> Broadly speaking, I can see two ways of managing the in-flight
>>>> records:
>>>>>>>> the share-partition leader does it, or the share-group coordinator
>>>> does
>>>>>> it.
>>>>>>>> I want to choose what works best, and I happen to have started with
>>>>>> trying
>>>>>>>> the share-partition leader doing it. This is just a whiteboard
>>>> exercise
>>>>>> at
>>>>>>>> the
>>>>>>>> moment, looking at the potential protocol flows and how well it all
>>>>>> hangs
>>>>>>>> together. When I have something coherent and understandable and
>> worth
>>>>>>>> reviewing, I’ll update the KIP with a proposal.
>>>>>>>>
>>>>>>>> I think it’s probably worth doing a similar exercise for the
>>>> share-group
>>>>>>>> coordinator way too. There are bound to be pros and cons, and I
>> don’t
>>>>>>>> really
>>>>>>>> mind which way prevails.
>>>>>>>>
>>>>>>>> If the share-group coordinator does it, I already have experience of
>>>>>>>> efficient
>>>>>>>> storage of in-flight record state in a way that scales and is
>>>>>>>> space-efficient.
>>>>>>>> If the share-partition leader does it, storage of in-flight state
>> is a
>>>>>> bit
>>>>>>>> more
>>>>>>>> tricky.
>>>>>>>>
>>>>>>>> I think it’s worth thinking ahead to how EOS will work and also
>>>> another
>>>>>>>> couple of enhancements (key-based ordering and acquisition lock
>>>>>>>> extension) so it’s somewhat future-proof.
>>>>>>>>
>>>>>>>> Thanks,
>>>>>>>> Andrew
>>>>>>>>
>>>>>>>>> On 1 Jun 2023, at 11:51, Dániel Urbán <urb.dani...@gmail.com>
>> wrote:
>>>>>>>>>
>>>>>>>>> Hi Andrew,
>>>>>>>>>
>>>>>>>>> Thank you for the KIP, exciting work you are doing :)
>>>>>>>>> I have 2 questions:
>>>>>>>>> 1. I understand that EOS won't be supported for share-groups (yet),
>>>> but
>>>>>>>>> read_committed fetch will still be possible, correct?
>>>>>>>>>
>>>>>>>>> 2. I have a very broad question about the proposed solution: why
>> not
>>>>>> let
>>>>>>>>> the share-group coordinator manage the states of the in-flight
>>>> records?
>>>>>>>>> I'm asking this because it seems to me that using the same pattern
>> as
>>>>>> the
>>>>>>>>> existing group coordinator would
>>>>>>>>> a, solve the durability of the message state storage (same method
>> as
>>>>>> the
>>>>>>>>> one used by the current group coordinator)
>>>>>>>>>
>>>>>>>>> b, pave the way for EOS with share-groups (same method as the one
>>>> used
>>>>>> by
>>>>>>>>> the current group coordinator)
>>>>>>>>>
>>>>>>>>> c, allow follower-fetching
>>>>>>>>> I saw your point about this: "FFF gives freedom to fetch records
>>>> from a
>>>>>>>>> nearby broker, but it does not also give the ability to commit
>>>> offsets
>>>>>>>> to a
>>>>>>>>> nearby broker"
>>>>>>>>> But does it matter if message acknowledgement is not "local"?
>>>>>> Supposedly,
>>>>>>>>> fetching is the actual hard work which benefits from follower
>>>> fetching,
>>>>>>>> not
>>>>>>>>> the group related requests.
>>>>>>>>>
>>>>>>>>> The only problem I see with the share-group coordinator managing
>> the
>>>>>>>>> in-flight message state is that the coordinator is not aware of the
>>>>>> exact
>>>>>>>>> available offsets of a partition, nor how the messages are batched.
>>>> For
>>>>>>>>> this problem, maybe the share group coordinator could use some form
>>>> of
>>>>>>>>> "logical" addresses, such as "the next 2 batches after offset X",
>> or
>>>>>>>> "after
>>>>>>>>> offset X, skip 2 batches, fetch next 2". Acknowledgements always
>>>>>> contain
>>>>>>>>> the exact offset, but for the "unknown" sections of a partition,
>>>> these
>>>>>>>>> logical addresses would be used. The coordinator could keep track
>> of
>>>>>>>>> message states with a mix of offsets and these batch based
>> addresses.
>>>>>> The
>>>>>>>>> partition leader could support "skip X, fetch Y batches" fetch
>>>>>> requests.
>>>>>>>>> This solution would need changes in the Fetch API to allow such
>> batch
>>>>>>>> based
>>>>>>>>> addresses, but I assume that fetch protocol changes will be needed
>>>>>>>>> regardless of the specific solution.
>>>>>>>>>
>>>>>>>>> Thanks,
>>>>>>>>> Daniel
>>>>>>>>>
>>>>>>>>> Andrew Schofield <andrew_schofi...@live.com> ezt írta (időpont:
>>>> 2023.
>>>>>>>> máj.
>>>>>>>>> 30., K, 18:15):
>>>>>>>>>
>>>>>>>>>> Yes, that’s it. I imagine something similar to KIP-848 for
>> managing
>>>>>> the
>>>>>>>>>> share group
>>>>>>>>>> membership, and consumers that fetch records from their assigned
>>>>>>>>>> partitions and
>>>>>>>>>> acknowledge when delivery completes.
>>>>>>>>>>
>>>>>>>>>> Thanks,
>>>>>>>>>> Andrew
>>>>>>>>>>
>>>>>>>>>>> On 30 May 2023, at 16:52, Adam Warski <a...@warski.org> wrote:
>>>>>>>>>>>
>>>>>>>>>>> Thanks for the explanation!
>>>>>>>>>>>
>>>>>>>>>>> So effectively, a share group is subscribed to each partition -
>> but
>>>>>> the
>>>>>>>>>> data is not pushed to the consumer, but only sent on demand. And
>>>> when
>>>>>>>>>> demand is signalled, a batch of messages is sent?
>>>>>>>>>>> Hence it would be up to the consumer to prefetch a sufficient
>>>> number
>>>>>> of
>>>>>>>>>> batches to ensure, that it will never be "bored"?
>>>>>>>>>>>
>>>>>>>>>>> Adam
>>>>>>>>>>>
>>>>>>>>>>>> On 30 May 2023, at 15:25, Andrew Schofield <
>>>>>> andrew_schofi...@live.com
>>>>>>>>>
>>>>>>>>>> wrote:
>>>>>>>>>>>>
>>>>>>>>>>>> Hi Adam,
>>>>>>>>>>>> Thanks for your question.
>>>>>>>>>>>>
>>>>>>>>>>>> With a share group, each fetch is able to grab available records
>>>>>> from
>>>>>>>>>> any partition. So, it alleviates
>>>>>>>>>>>> the “head-of-line” blocking problem where a slow consumer gets
>> in
>>>>>> the
>>>>>>>>>> way. There’s no actual
>>>>>>>>>>>> stealing from a slow consumer, but it can be overtaken and must
>>>>>>>>>> complete its processing within
>>>>>>>>>>>> the timeout.
>>>>>>>>>>>>
>>>>>>>>>>>> The way I see this working is that when a consumer joins a share
>>>>>>>> group,
>>>>>>>>>> it receives a set of
>>>>>>>>>>>> assigned share-partitions. To start with, every consumer will be
>>>>>>>>>> assigned all partitions. We
>>>>>>>>>>>> can be smarter than that, but I think that’s really a question
>> of
>>>>>>>>>> writing a smarter assignor
>>>>>>>>>>>> just as has occurred over the years with consumer groups.
>>>>>>>>>>>>
>>>>>>>>>>>> Only a small proportion of Kafka workloads are super high
>>>>>> throughput.
>>>>>>>>>> Share groups would
>>>>>>>>>>>> struggle with those I’m sure. Share groups do not diminish the
>>>> value
>>>>>>>> of
>>>>>>>>>> consumer groups
>>>>>>>>>>>> for streaming. They just give another option for situations
>> where
>>>> a
>>>>>>>>>> different style of
>>>>>>>>>>>> consumption is more appropriate.
>>>>>>>>>>>>
>>>>>>>>>>>> Thanks,
>>>>>>>>>>>> Andrew
>>>>>>>>>>>>
>>>>>>>>>>>>> On 29 May 2023, at 17:18, Adam Warski <a...@warski.org> wrote:
>>>>>>>>>>>>>
>>>>>>>>>>>>> Hello,
>>>>>>>>>>>>>
>>>>>>>>>>>>> thank you for the proposal! A very interesting read.
>>>>>>>>>>>>>
>>>>>>>>>>>>> I do have one question, though. When you subscribe to a topic
>>>> using
>>>>>>>>>> consumer groups, it might happen that one consumer has processed
>> all
>>>>>>>>>> messages from its partitions, while another one still has a lot of
>>>>>> work
>>>>>>>> to
>>>>>>>>>> do (this might be due to unbalanced partitioning, long processing
>>>>>> times
>>>>>>>>>> etc.). In a message-queue approach, it would be great to solve
>> this
>>>>>>>> problem
>>>>>>>>>> - so that a consumer that is free can steal work from other
>>>> consumers.
>>>>>>>> Is
>>>>>>>>>> this somehow covered by share groups?
>>>>>>>>>>>>>
>>>>>>>>>>>>> Maybe this is planned as "further work", as indicated here:
>>>>>>>>>>>>>
>>>>>>>>>>>>> "
>>>>>>>>>>>>> It manages the topic-partition assignments for the share-group
>>>>>>>>>> members. An initial, trivial implementation would be to give each
>>>>>> member
>>>>>>>>>> the list of all topic-partitions which matches its subscriptions
>> and
>>>>>>>> then
>>>>>>>>>> use the pull-based protocol to fetch records from all partitions.
>> A
>>>>>> more
>>>>>>>>>> sophisticated implementation could use topic-partition load and
>> lag
>>>>>>>> metrics
>>>>>>>>>> to distribute partitions among the consumers as a kind of
>>>> autonomous,
>>>>>>>>>> self-balancing partition assignment, steering more consumers to
>>>> busier
>>>>>>>>>> partitions, for example. Alternatively, a push-based fetching
>> scheme
>>>>>>>> could
>>>>>>>>>> be used. Protocol details will follow later.
>>>>>>>>>>>>> "
>>>>>>>>>>>>>
>>>>>>>>>>>>> but I’m not sure if I understand this correctly. A
>>>> fully-connected
>>>>>>>>>> graph seems like a lot of connections, and I’m not sure if this
>>>> would
>>>>>>>> play
>>>>>>>>>> well with streaming.
>>>>>>>>>>>>>
>>>>>>>>>>>>> This also seems as one of the central problems - a key
>>>>>> differentiator
>>>>>>>>>> between share and consumer groups (the other one being persisting
>>>>>> state
>>>>>>>> of
>>>>>>>>>> messages). And maybe the exact way we’d want to approach this
>> would,
>>>>>> to
>>>>>>>> a
>>>>>>>>>> certain degree, dictate the design of the queueing system?
>>>>>>>>>>>>>
>>>>>>>>>>>>> Best,
>>>>>>>>>>>>> Adam Warski
>>>>>>>>>>>>>
>>>>>>>>>>>>> On 2023/05/15 11:55:14 Andrew Schofield wrote:
>>>>>>>>>>>>>> Hi,
>>>>>>>>>>>>>> I would like to start a discussion thread on KIP-932: Queues
>> for
>>>>>>>>>> Kafka. This KIP proposes an alternative to consumer groups to
>> enable
>>>>>>>>>> cooperative consumption by consumers without partition assignment.
>>>> You
>>>>>>>> end
>>>>>>>>>> up with queue semantics on top of regular Kafka topics, with
>>>>>> per-message
>>>>>>>>>> acknowledgement and automatic handling of messages which
>> repeatedly
>>>>>>>> fail to
>>>>>>>>>> be processed.
>>>>>>>>>>>>>>
>>>>>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>
>>>>>>
>>>>
>> https://cwiki.apache.org/confluence/display/KAFKA/KIP-932%3A+Queues+for+Kafka
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> Please take a look and let me know what you think.
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> Thanks.
>>>>>>>>>>>>>> Andrew
>>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>> --
>>>>>>>>>>> Adam Warski
>>>>>>>>>>>
>>>>>>>>>>> https://www.softwaremill.com/
>>>>>>>>>>> https://twitter.com/adamwarski
>>
>>
>>

Reply via email to