There is another detail about EOS that is important I guess.

Messages written into topic-partitions, are only marked as "transactional", but when we commit (or abort), we only write an additional "tx marker" into the partition (the original message is not touched). If we deliver "pending" messages, the client would need additional logic to buffer pending messages, plus logic to evaluate tx-markers to determine if/when a pending record could be processed if committed or discarded if aborted. The current client has nothing like this built-in, because we don't need to (as explained in the original message, why we don't read beyond the LSO).

Or we would need to have an different way to let the client know when a pending message is not pending any longer, and if it was committed or aborted. For example, we could change the client so it would always drop pending messages, and it would be the broker's responsibility to re-deliver them after they got committed. So the client won't need to buffer (good), however given how the broker works, this seems to be very undesirable to do it this way.

Maybe there are other options? In the end, it's always going to be much more complex, so it's not clear if it would be worth the effort or just do what we do know and not read beyond the LSO and keep it simple?


-Matthias

On 7/10/23 2:43 AM, Dániel Urbán wrote:
Yes, I think it's clear now, thank you.
I agree that allowing reading behind the LSO would require more work on the
broker side (we would need 1 more state for the messages, and transition
when the LSO moves forward), but I don't see the extra complexity on the
consumer side. Based on the KIP so far, brokers will be able to return
specific batches/messages to queue consumers - consumers will need to be
able to skip messages in case another consumer of the same group has
already acquired/acked those. If we have this logic present in the protocol
and the clients, consumers could skip pending messages using the same
mechanism, and only the broker would need to know *why* exactly a specific
record/batch is skipped.

I don't think that this feature would be too important, but compared to the
complexity of the KIP, 1 more state doesn't seem too complicated to me.

Thanks,
Daniel

Matthias J. Sax <mj...@apache.org> ezt írta (időpont: 2023. júl. 10., H,
7:22):

Daniel, sure.

To allow the client to filter aborted messages, the broker currently
attaches metadata that tell the client which records were aborted. But
the first message after the LSO is a messages in pending state, ie, it
was neither committed nor aborted yet, so it's not possible to filter or
deliver it. Thus, the broker cannot provide this metadata (not sure if
the client could filter without this metadata?)

The main reason why this happens broker side is to avoid that the client
needs to buffer pending messages "indefinitely" until the TX might
eventually commit or abort, and thus put a lot a memory pressure on the
client. For the "classic" case, the situation is  more severe as we
guarantee ordered delivery, and thus, the client would need to buffer
everything after the LSO. -- While it's relaxed for queuing as we might
not guarantee order (ie, instead of buffering everything, only pending
messages must be buffered), it would still imply a huge additional
burden on tracking metadata (for both the broker and the consumer), and
the wire protocol, and I am already worried about the metadata we might
need to track for queuing in general.

Does this make sense?


-Matthias



On 7/7/23 01:35, Dániel Urbán wrote:
Hi Matthias,
Can you please elaborate on this: "First, you need to understand that
aborted records are filtered client side, and thus for "read-committed"
we
can never read beyond the LSO, and the same seems to apply for queuing."
I don't understand the connection here - what does skipping aborted
records
have to do with the LSO? As you said, aborted message filtering is done
on
the client side (in consumers, yes, but not sure if it has to be the same
for queues), but being blocked on the LSO is the responsibility of the
broker, isn't it? My thought was that the broker could act differently
when
working with queues and read_committed isolation.
Thanks,
Daniel

On Thu, Jul 6, 2023 at 7:26 PM Matthias J. Sax <mj...@apache.org> wrote:

Thanks for the KIP.

It seems we are in very early stage, and some very important sections in
the KIP are still marked as TODO. In particular, I am curious about the
protocol changes, how the "queuing state" will be represented and made
durable, and all the error edge case / fail-over / fencing
(broker/clients) that we need to put in place.


A few other comments/question from my side:

(1) Fetch from follower: this was already touched on, but the point is
really that the consumer does not decide about it, but the broker does.
When a consumer sends it's first fetch request it will always go to the
leader, and the broker would reply to the consumer "go and fetch from
this other broker". -- I think it's ok to exclude fetch from follower in
the first version of the KIP, but it would need a broker change such
that the broker knows it's a "queue fetch" request. -- It would also be
worth to explore how fetch from follow could work in the future and
ensure that our initial design allows for it and is future proof.


(2) Why do we not allow pattern subscription and what happens if
different consumers subscribe to different topics? It's not fully
explained in the KIP.


(3) auto.offset.reset and SPSO/SPSE -- I don't understand why we would
not allow auto.offset.reset? In the discussion, you mentioned that
"first consumer would win, if two consumers have a different config" --
while this is correct, it's the same for a consumer group right now.
Maybe we should not try to solve a "non problem"? -- In general, my
impression is that we are going to do Kafkaeque Queuing, what is fine,
but it might be to our advantage to carry over as many established
concepts as we can? And if not, have a very good reason not to.

In the end, it find if very clumsy to only have an admin API to change
the starting point of a consumer.

(3B) What happens if lag grows and data is purged broker side?

(3C) What happens if the broker released records (based on "timeout /
exceeding deliver count), and the "ack/reject" comes afterwards?

(3D) How to find out what records got archived but where not acked (ie,
lost) for re-processing/debugging purpose? The question was already
asked and the answer was "not supported", but I think it would be
must-have before the feature is usable in production? We can of course
also only do it in a future release and not the first "MVP"
implementation, but the KIP should address it. In the end, the overall
group monitoring story is missing.


(4) I am also wondering about the overall design with regard to "per
record" vs "per batch" granularity. In the end, queuing usually aims for
"per records" semantics, but "per record" implies to keep track of a lot
of metadata. Kafka is designed on a "per batch" granularity, and it's
unclear to me how both will go together?

(4A) Do we keep "ack/reject/..." state per-record, or per batch? It
seems per record, but it would require to hold a lot of meta-data. Also,
how does it work for the current protocol, is a batch is partially acked
and we need to re-deliver? Would we add metadata and the let client
filter acked messages (similar to how "read-committed" mode works)?

(4B) What does "the share-partition leader prefers to return complete
    record batches." exactly mean? "Prefers" is a fuzzy word. What
happens
if we cannot return a complete record batch?

(4C) What happens if different consumer of the same group configure
different batch sizes for fetching records? How do we track the
corresponding meta-data?

(4D)

In the situation where some records in a batch have been released or
rejected separately, subsequent fetches of those records are more
likely to
have gaps.

What does this mean?

(4E)

For efficiency, the consumer preferentially returns complete record
sets
with no gaps

Can you elaborate on the details?


API contract:

(5A)
acks must be issued in the order in which the records appear

Why is this the case? Sounds like an arbitrary restriction to me? Can
you share your reasoning?


(5B) How to "reject" (or just "release") all records of a batch at once?
It seem the API only allows to "ack" all record of a batch at once.

(5C) Currently, `ConsumerRecords` object may contain records from
different partitions? Would this still be the case?


(6) Group management / re-balancing:

(6A) The KIP should explain better how heart-beating works (was already
partially discussed). How does `max.poll.interval.ms` interact? Would
it
trigger a "release" of records if violated?

(6B) You mentioned that a consumer that does not heartbeat would just be
removed from the group with a rebalance: Given the current design to
assign all partitions to every consumer in the group, that would be ok.
But as you mentioned on the KIP, we might want to be more clever with
regard to assigning partitions in the future, and I think we would
actually need to trigger a rebalance to avoid a later protocol change:
otherwise, partition X could be assigned to a single consumer and could
become an offline partitions if we don't rebalance and re-assign it if
its current consumer stops heartbeating.


(7) Delivery Semantics

(7A) You state that we aim for "at-least-once delivery". But why could
we not also provide "at-most-once delivery" (ie, fire an forget)?

(7B) There was also a discussion about "read-commmitted" and reading
beyond the LSO and what happens if consumer have different configs.
First, you need to understand that aborted records are filtered client
side, and thus for "read-committed" we can never read beyond the LSO,
and the same seems to apply for queuing. Second, if different client
have different config, it seems "ok" -- of course you can consider it a
miss-configuration, but the same issue applies to Kafka now. I am not
sure if we want to try fixing a non-issue? In the end, the user should
be able to configure the client however they want, and we should not
artificially restrict it IMHO.

Btw: "read-committed" mode does NOT give you "exaclty-once"!

(7C)
Finally, this KIP does not include support for acknowledging delivery
using transactions for exactly-once semantics. Conceptually, this is
quite
straightforward but would take changes to the API.

Doing exactly-once delivery is not possible. For a "consumer only"
application, talking about transactions does not make any sense, and
"conceptually, this is quite straightforward" is (sorry for being salty)
just a non-sense statement.


(8) API design:

(8A) Are we sure we want to overload the semantics of
`beginningOffsets/endOffsets` ?


(8B) Why does `close()` not commit?

(8C) Why do the following methods throw?
    - enforceRebalance
    - offsetsForTimes
    - pause / paused / resume


(9) Configs. In general I am wondering how existing consumer config
would work, or to what extend configs would "disable" each other, and
how established mechanism align.

(9A) Committing (auto-commit configs?)? Is there a consumer group
"status / lag" (in the end if we don't commit an offset, how can users
monitor the progress of the group)? We could for example commit the SPSO
to give an upper bound on lag, without breaking existing tooling?
Or we commit both SPSO and SPSE and change consumer-group tool to show
both?

(9B) Your example code shows "enable.auto.commit=false" -- is this a
requirement? Overall, it seem we have a "auto commit" mechanism, so
should this config be `true` -- or why do we not actually allow for both
auto-commit and manual commit? The KIP does not really discuss tradeoffs
/ reasoning about it.

(9C) Config names:

`share.group.enable` -> `shared.group.enabled` ?

`share.delivery.count.limit` -> Should this be a client config? And
should we add a broker config `max.share.delivery.count.limit`? This
would follow established Kafka patterns.

`share.record.lock.duration.ms` -> `
default.share.record.lock.duration.ms`
?

(9D) Why do we have hard limits broker side (eg maximum of 10 for
`share.delivery.count.limit`). Seems rather arbitrary. If there is no
good reason, should we ship with a good default but no hard-coded limit
what the user can config?

(9E) Should we have a client and broker config for all new configs (ie,
broker set a max/min, but client can pick within those bounds)



(10) Couple of question about certain statements:

If any records in the batch were not acknowledged, they remain acquired
and will be presented to the application in response to a future poll.

Do the un-acked records stay in the consumer buffer? Or would
`ConsumerRecords` be "purged" for this case?


the share-partition leader guarantees that acknowledgements for the
records in a batch are performed atomically.

How are ackds for partial batches handled? And do you want to ensure
atomicity?


and the Acquired state is not persisted. This minimises the amount of
share-partition state that has to be logged.

Is there any concerned about excessive re-delivery in case of error and
a large "window"? What are the tradeoffs with regard to re-delivery vs
maintaining too much meta-data?


Thanks for reading all this...



-Matthias



On 7/1/23 4:42 AM, Kamal Chandraprakash wrote:
Hi Andrew,

Thank you for the KIP -- interesting read. I have some questions:

101. "The calls to KafkaConsumer.acknowledge(ConsumerRecord,
AcknowledgeType) must be
issued in the order in which the records appear in the ConsumerRecords
object, which will
be in order of increasing offset for each share-partition"

If the share-consumer uses thread pool internally and acknowledges the
records in out-of-order fashion.
Will this use case be supported? The "Managing durable share-partition
state" have transitions where the
records are ack'ed in out-of-order fashion so want to confirm this.

102. Will the configs be maintained in fine-grain per
topic-to-share-group?
Some share-consumer groups
may want to increase the "record.lock.duration.ms" dynamically if
record
processing is taking longer time
than usual during external system outage/downtime.

103. Can we also define whether all the consumer configs are eligible
for
share-consumer-group. (eg)
`max.poll.interval.ms` default is 5 mins. Will this config have any
effect
on the share consumers?

104. How will the consumer quota work? Will it be similar to the
existing
consumer quota mechanism?

--
Kamal

On Wed, Jun 7, 2023 at 9:17 PM Andrew Schofield <
andrew_schofield_j...@outlook.com> wrote:

Hi Daniel,
True, I see your point. It’s analogous to a KafkaConsumer fetching
uncommitted records but not delivering them to the application.

Thanks,
Andrew

On 7 Jun 2023, at 16:38, Dániel Urbán <urb.dani...@gmail.com> wrote:

Hi Andrew,

I think the "pending" state could be the solution for reading beyond
the
LSO. Pending could indicate that a message is not yet available for
consumption (so they won't be offered for consumers), but with
transactions
ending, they can become "available". With a pending state, records
wouldn't
"disappear", they would simply not show up until they become
available
on
commit, or archived on abort.

Regardless, I understand that this might be some extra, unwanted
complexity, I just thought that with the message ordering guarantee
gone,
it would be a cool feature for share-groups. I've seen use-cases
where
the
LSO being blocked for an extended period of time caused huge lag for
traditional read_committed consumers, which could be completely
avoided
by
share-groups.

Thanks,
Daniel

Andrew Schofield <andrew_schofield_j...@outlook.com> ezt írta
(időpont:
2023. jún. 7., Sze, 17:28):

Hi Daniel,
Kind of. I don’t want a transaction abort to cause disappearance of
records which are already in-flight. A “pending” state doesn’t seem
helpful for read_committed. There’s no such disappearance problem
for read_uncommitted.

Thanks,
Andrew

On 7 Jun 2023, at 16:19, Dániel Urbán <urb.dani...@gmail.com>
wrote:

Hi Andrew,

I agree with having a single isolation.level for the whole group,
it
makes
sense.
As for:
"b) The default isolation level for a share group is
read_committed,
in
which case
the SPSO and SPEO cannot move past the LSO."

With this limitation (SPEO not moving beyond LSO), are you trying
to
avoid
handling the complexity of some kind of a "pending" state for the
uncommitted in-flight messages?

Thanks,
Daniel

Andrew Schofield <andrew_schofield_j...@outlook.com> ezt írta
(időpont:
2023. jún. 7., Sze, 16:52):

HI Daniel,
I’ve been thinking about this question and I think this area is a
bit
tricky.

If there are some consumers in a share group with isolation level
read_uncommitted
and other consumers with read_committed, they have different
expectations
with
regards to which messages are visible when EOS comes into the
picture.
It seems to me that this is not necessarily a good thing.

One option would be to support just read_committed in KIP-932.
This
means
it is unambiguous which records are in-flight, because they’re
only
committed
ones.

Another option would be to have the entire share group have an
isolation
level,
which again gives an unambiguous set of in-flight records but
without
the
restriction of permitting just read_committed behaviour.

So, my preference is for the following:
a) A share group has an isolation level that applies to all
consumers
in
the group.
b) The default isolation level for a share group is
read_committed,
in
which case
the SPSO and SPEO cannot move past the LSO.
c) For a share group with read_uncommited isolation level, the
SPSO
and
SPEO
can move past the LSO.
d) The kafka_configs.sh tool or the AdminClient can be used to
set a
non-default
value for the isolation level for a share group. The value is
applied
when
the first
member joins.

Thanks,
Andrew

On 2 Jun 2023, at 10:02, Dániel Urbán <urb.dani...@gmail.com>
wrote:

Hi Andrew,
Thank you for the clarification. One follow-up to read_committed
mode:
Taking the change in message ordering guarantees into account,
does
this
mean that in queues, share-group consumers will be able to
consume
committed records AFTER the LSO?
Thanks,
Daniel

Andrew Schofield <andrew_schofield_j...@outlook.com> ezt írta
(időpont:
2023. jún. 2., P, 10:39):

Hi Daniel,
Thanks for your questions.

1) Yes, read_committed fetch will still be possible.

2) You weren’t wrong that this is a broad question :)

Broadly speaking, I can see two ways of managing the in-flight
records:
the share-partition leader does it, or the share-group
coordinator
does
it.
I want to choose what works best, and I happen to have started
with
trying
the share-partition leader doing it. This is just a whiteboard
exercise
at
the
moment, looking at the potential protocol flows and how well it
all
hangs
together. When I have something coherent and understandable and
worth
reviewing, I’ll update the KIP with a proposal.

I think it’s probably worth doing a similar exercise for the
share-group
coordinator way too. There are bound to be pros and cons, and I
don’t
really
mind which way prevails.

If the share-group coordinator does it, I already have
experience
of
efficient
storage of in-flight record state in a way that scales and is
space-efficient.
If the share-partition leader does it, storage of in-flight
state
is a
bit
more
tricky.

I think it’s worth thinking ahead to how EOS will work and also
another
couple of enhancements (key-based ordering and acquisition lock
extension) so it’s somewhat future-proof.

Thanks,
Andrew

On 1 Jun 2023, at 11:51, Dániel Urbán <urb.dani...@gmail.com>
wrote:

Hi Andrew,

Thank you for the KIP, exciting work you are doing :)
I have 2 questions:
1. I understand that EOS won't be supported for share-groups
(yet),
but
read_committed fetch will still be possible, correct?

2. I have a very broad question about the proposed solution:
why
not
let
the share-group coordinator manage the states of the in-flight
records?
I'm asking this because it seems to me that using the same
pattern
as
the
existing group coordinator would
a, solve the durability of the message state storage (same
method
as
the
one used by the current group coordinator)

b, pave the way for EOS with share-groups (same method as the
one
used
by
the current group coordinator)

c, allow follower-fetching
I saw your point about this: "FFF gives freedom to fetch
records
from a
nearby broker, but it does not also give the ability to commit
offsets
to a
nearby broker"
But does it matter if message acknowledgement is not "local"?
Supposedly,
fetching is the actual hard work which benefits from follower
fetching,
not
the group related requests.

The only problem I see with the share-group coordinator
managing
the
in-flight message state is that the coordinator is not aware of
the
exact
available offsets of a partition, nor how the messages are
batched.
For
this problem, maybe the share group coordinator could use some
form
of
"logical" addresses, such as "the next 2 batches after offset
X",
or
"after
offset X, skip 2 batches, fetch next 2". Acknowledgements
always
contain
the exact offset, but for the "unknown" sections of a
partition,
these
logical addresses would be used. The coordinator could keep
track
of
message states with a mix of offsets and these batch based
addresses.
The
partition leader could support "skip X, fetch Y batches" fetch
requests.
This solution would need changes in the Fetch API to allow such
batch
based
addresses, but I assume that fetch protocol changes will be
needed
regardless of the specific solution.

Thanks,
Daniel

Andrew Schofield <andrew_schofi...@live.com> ezt írta
(időpont:
2023.
máj.
30., K, 18:15):

Yes, that’s it. I imagine something similar to KIP-848 for
managing
the
share group
membership, and consumers that fetch records from their
assigned
partitions and
acknowledge when delivery completes.

Thanks,
Andrew

On 30 May 2023, at 16:52, Adam Warski <a...@warski.org>
wrote:

Thanks for the explanation!

So effectively, a share group is subscribed to each
partition -
but
the
data is not pushed to the consumer, but only sent on demand.
And
when
demand is signalled, a batch of messages is sent?
Hence it would be up to the consumer to prefetch a sufficient
number
of
batches to ensure, that it will never be "bored"?

Adam

On 30 May 2023, at 15:25, Andrew Schofield <
andrew_schofi...@live.com

wrote:

Hi Adam,
Thanks for your question.

With a share group, each fetch is able to grab available
records
from
any partition. So, it alleviates
the “head-of-line” blocking problem where a slow consumer
gets
in
the
way. There’s no actual
stealing from a slow consumer, but it can be overtaken and
must
complete its processing within
the timeout.

The way I see this working is that when a consumer joins a
share
group,
it receives a set of
assigned share-partitions. To start with, every consumer
will
be
assigned all partitions. We
can be smarter than that, but I think that’s really a
question
of
writing a smarter assignor
just as has occurred over the years with consumer groups.

Only a small proportion of Kafka workloads are super high
throughput.
Share groups would
struggle with those I’m sure. Share groups do not diminish
the
value
of
consumer groups
for streaming. They just give another option for situations
where
a
different style of
consumption is more appropriate.

Thanks,
Andrew

On 29 May 2023, at 17:18, Adam Warski <a...@warski.org>
wrote:

Hello,

thank you for the proposal! A very interesting read.

I do have one question, though. When you subscribe to a
topic
using
consumer groups, it might happen that one consumer has
processed
all
messages from its partitions, while another one still has a
lot
of
work
to
do (this might be due to unbalanced partitioning, long
processing
times
etc.). In a message-queue approach, it would be great to solve
this
problem
- so that a consumer that is free can steal work from other
consumers.
Is
this somehow covered by share groups?

Maybe this is planned as "further work", as indicated here:

"
It manages the topic-partition assignments for the
share-group
members. An initial, trivial implementation would be to give
each
member
the list of all topic-partitions which matches its
subscriptions
and
then
use the pull-based protocol to fetch records from all
partitions.
A
more
sophisticated implementation could use topic-partition load
and
lag
metrics
to distribute partitions among the consumers as a kind of
autonomous,
self-balancing partition assignment, steering more consumers
to
busier
partitions, for example. Alternatively, a push-based fetching
scheme
could
be used. Protocol details will follow later.
"

but I’m not sure if I understand this correctly. A
fully-connected
graph seems like a lot of connections, and I’m not sure if
this
would
play
well with streaming.

This also seems as one of the central problems - a key
differentiator
between share and consumer groups (the other one being
persisting
state
of
messages). And maybe the exact way we’d want to approach this
would,
to
a
certain degree, dictate the design of the queueing system?

Best,
Adam Warski

On 2023/05/15 11:55:14 Andrew Schofield wrote:
Hi,
I would like to start a discussion thread on KIP-932:
Queues
for
Kafka. This KIP proposes an alternative to consumer groups to
enable
cooperative consumption by consumers without partition
assignment.
You
end
up with queue semantics on top of regular Kafka topics, with
per-message
acknowledgement and automatic handling of messages which
repeatedly
fail to
be processed.








https://cwiki.apache.org/confluence/display/KAFKA/KIP-932%3A+Queues+for+Kafka

Please take a look and let me know what you think.

Thanks.
Andrew



--
Adam Warski

https://www.softwaremill.com/
https://twitter.com/adamwarski








Reply via email to